diff --git a/debian/mariadb-plugin-columnstore.install b/debian/mariadb-plugin-columnstore.install index 398dccd1e..9350095be 100644 --- a/debian/mariadb-plugin-columnstore.install +++ b/debian/mariadb-plugin-columnstore.install @@ -44,7 +44,9 @@ usr/bin/reset_locks usr/bin/rollback usr/bin/save_brm usr/bin/smcat +usr/bin/smkill usr/bin/smls +usr/bin/smps usr/bin/smput usr/bin/smrm usr/bin/testS3Connection diff --git a/storage-manager/CMakeLists.txt b/storage-manager/CMakeLists.txt index a874a93d4..4297081c5 100755 --- a/storage-manager/CMakeLists.txt +++ b/storage-manager/CMakeLists.txt @@ -36,6 +36,8 @@ set(storagemanager_SRCS src/Ownership.cpp src/PrefixCache.cpp src/SyncTask.cpp + src/ListIOTask.cpp + src/TerminateIOTask.cpp ../utils/common/crashtrace.cpp ) @@ -125,12 +127,24 @@ target_link_libraries(smrm storagemanager cloudio ${ENGINE_EXEC_LIBS} ) +add_executable(smps src/smps.cpp) +target_link_libraries(smps storagemanager cloudio + ${ENGINE_LDFLAGS} + ${ENGINE_EXEC_LIBS} +) + + +add_executable(smkill src/smkill.cpp) +target_link_libraries(smkill storagemanager cloudio + ${ENGINE_LDFLAGS} + ${ENGINE_EXEC_LIBS} +) install(TARGETS storagemanager LIBRARY DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine ) -install(TARGETS StorageManager smcat smput smls smrm testS3Connection +install(TARGETS StorageManager smcat smkill smps smput smls smrm testS3Connection RUNTIME DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine ) diff --git a/storage-manager/include/messageFormat.h b/storage-manager/include/messageFormat.h index 719401427..a2760afa2 100644 --- a/storage-manager/include/messageFormat.h +++ b/storage-manager/include/messageFormat.h @@ -84,7 +84,9 @@ enum Opcodes LIST_DIRECTORY, PING, COPY, - SYNC + SYNC, + LIST_IOTASKS, + TERMINATE_IOTASK }; /* @@ -300,6 +302,46 @@ struct copy_cmd // use f_name as an overlay at the end of file1 to get file2. }; +/* + LIST_IOTASKS + ------------ + command format: + 1-byte opcode + + response format: + 4-byte num elements|(8-byte id|double runningTime) * num elements +*/ +struct list_iotask_cmd +{ + uint8_t opcode; +}; + +struct list_iotask_resp_entry +{ + uint64_t id; + double runningTime; +}; + +struct list_iotask_resp +{ + uint32_t elements; + list_iotask_resp_entry entries[]; +}; + +/* + TERMINATE_IOTASK + ---------------- + command format: + 1-byte opcode|8-byte id + + response format: +*/ +struct terminate_iotask_cmd +{ + uint8_t opcode; + uint64_t id; +}; + #pragma pack(pop) } // namespace storagemanager diff --git a/storage-manager/src/CloudStorage.cpp b/storage-manager/src/CloudStorage.cpp index 40eae93ae..c1419198d 100644 --- a/storage-manager/src/CloudStorage.cpp +++ b/storage-manager/src/CloudStorage.cpp @@ -99,4 +99,14 @@ void CloudStorage::printKPIs() const cout << "\texistenceChecks = " << existenceChecks << endl; } +vector CloudStorage::taskList() const +{ + return {}; +} + +bool CloudStorage::killTask(uint64_t) +{ + return false; +} + } // namespace storagemanager diff --git a/storage-manager/src/CloudStorage.h b/storage-manager/src/CloudStorage.h index a9ad0d687..c857f1b67 100644 --- a/storage-manager/src/CloudStorage.h +++ b/storage-manager/src/CloudStorage.h @@ -39,6 +39,14 @@ class CloudStorage virtual void printKPIs() const; + struct IOTaskData + { + uint64_t id; + double runningTime; + }; + virtual std::vector taskList() const; + virtual bool killTask(uint64_t task_id); + // this will return a CloudStorage instance of the type specified in StorageManager.cnf static CloudStorage* get(); diff --git a/storage-manager/src/ListIOTask.cpp b/storage-manager/src/ListIOTask.cpp new file mode 100644 index 000000000..acfadce18 --- /dev/null +++ b/storage-manager/src/ListIOTask.cpp @@ -0,0 +1,80 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include "ListIOTask.h" +#include "SMLogging.h" +#include "messageFormat.h" +#include "src/CloudStorage.h" +#include +#include + +namespace storagemanager +{ + +ListIOTask::ListIOTask(int sock, uint len) : PosixTask(sock, len) +{ +} + +bool ListIOTask::run() +{ + bool success; + uint8_t buf[1]; + int err; + + if (getLength() > 1) + { + handleError("ListIOTask read", E2BIG); + return true; + } + + // consume the msg + err = read(buf, getLength()); + if (err < 0) + { + handleError("ListIOTask read", errno); + return false; + } + + auto* cs = CloudStorage::get(); + auto taskList = cs->taskList(); + size_t payloadLen = sizeof(list_iotask_resp) + sizeof(list_iotask_resp_entry) * taskList.size(); + size_t headerLen = sizeof(sm_response); + std::vector payloadBuf(payloadLen + headerLen); + + auto* resp = reinterpret_cast(payloadBuf.data()); + resp->header.type = SM_MSG_START; + resp->header.payloadLen = payloadLen + headerLen - sizeof(sm_msg_header); + resp->header.flags = 0; + resp->returnCode = 0; + auto* r = reinterpret_cast(resp->payload); + r->elements = taskList.size(); + + for (size_t i = 0; i < taskList.size(); ++i) + { + r->entries[i].id = taskList[i].id; + r->entries[i].runningTime = taskList[i].runningTime; + } + success = write(payloadBuf.data(), payloadBuf.size()); + if (!success) + { + handleError("ListIOTask read", errno); + return false; + } + return true; +} + +} // namespace storagemanager diff --git a/storage-manager/src/ListIOTask.h b/storage-manager/src/ListIOTask.h new file mode 100644 index 000000000..ed41a08da --- /dev/null +++ b/storage-manager/src/ListIOTask.h @@ -0,0 +1,34 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include "PosixTask.h" + +namespace storagemanager +{ +class ListIOTask : public PosixTask +{ + public: + ListIOTask() = delete; + ListIOTask(int sock, uint length); + ~ListIOTask() override = default; + + bool run() override; +}; + +} // namespace storagemanager diff --git a/storage-manager/src/ProcessTask.cpp b/storage-manager/src/ProcessTask.cpp index a849fd303..5da40a7bf 100644 --- a/storage-manager/src/ProcessTask.cpp +++ b/storage-manager/src/ProcessTask.cpp @@ -25,10 +25,12 @@ #include "AppendTask.h" #include "CopyTask.h" #include "ListDirectoryTask.h" +#include "ListIOTask.h" #include "OpenTask.h" #include "PingTask.h" #include "ReadTask.h" #include "StatTask.h" +#include "TerminateIOTask.h" #include "TruncateTask.h" #include "UnlinkTask.h" #include "WriteTask.h" @@ -93,6 +95,8 @@ void ProcessTask::operator()() case PING: task.reset(new PingTask(sock, length)); break; case SYNC: task.reset(new SyncTask(sock, length)); break; case COPY: task.reset(new CopyTask(sock, length)); break; + case LIST_IOTASKS: task.reset(new ListIOTask(sock, length)); break; + case TERMINATE_IOTASK: task.reset(new TerminateIOTask(sock, length)); break; default: throw runtime_error("ProcessTask: got an unknown opcode"); } task->primeBuffer(); diff --git a/storage-manager/src/S3Storage.cpp b/storage-manager/src/S3Storage.cpp index 9cbc73bb8..7e0b1957e 100644 --- a/storage-manager/src/S3Storage.cpp +++ b/storage-manager/src/S3Storage.cpp @@ -22,10 +22,13 @@ #include #include #include +#include +#include #include #include #include #include +#include #define BOOST_SPIRIT_THREADSAFE #ifndef __clang__ #pragma GCC diagnostic push @@ -99,7 +102,7 @@ const char* s3err_msgs[] = {"All is well", "Authentication failed, token has expired", "Configured bucket does not match endpoint location"}; -S3Storage::ScopedConnection::ScopedConnection(S3Storage* s, ms3_st* m) : s3(s), conn(m) +S3Storage::ScopedConnection::ScopedConnection(S3Storage* s, shared_ptr m) : s3(s), conn(m) { assert(conn); } @@ -131,6 +134,8 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry) string ssl_verify = tolower(config->getValue("S3", "ssl_verify")); string port_number = config->getValue("S3", "port_number"); string libs3_debug = config->getValue("S3", "libs3_debug"); + string connect_timeout = config->getValue("S3", "connect_timeout"); + string timeout = config->getValue("S3", "timeout"); bool keyMissing = false; isEC2Instance = false; @@ -210,11 +215,19 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry) } endpoint = config->getValue("S3", "endpoint"); + if (!connect_timeout.empty()) + { + connectTimeout = stof(connect_timeout); + } + if (!timeout.empty()) + { + operationTimeout = stof(timeout); + } ms3_library_init(); if (libs3_debug == "enabled") { - ms3_debug(); + ms3_debug(1); } testConnectivityAndPerms(); } @@ -222,7 +235,7 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry) S3Storage::~S3Storage() { for (auto& conn : freeConns) - ms3_deinit(conn.conn); + ms3_deinit(conn->conn); ms3_library_deinit(); } @@ -236,7 +249,7 @@ S3Storage::~S3Storage() bool S3Storage::getIAMRoleFromMetadataEC2() { - CURL* curl = NULL; + CURL* curl = nullptr; CURLcode curl_res; string readBuffer; string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/"; @@ -258,7 +271,7 @@ bool S3Storage::getIAMRoleFromMetadataEC2() bool S3Storage::getCredentialsFromMetadataEC2() { - CURL* curl = NULL; + CURL* curl = nullptr; CURLcode curl_res; std::string readBuffer; string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/" + IAMrole; @@ -361,29 +374,29 @@ int S3Storage::getObject(const string& _sourceKey, std::shared_ptr* d { uint8_t err; size_t len = 0; - uint8_t* _data = NULL; + uint8_t* _data = nullptr; string sourceKey = prefix + _sourceKey; - ms3_st* creds = getConnection(); - if (!creds) + auto conn = getConnection(); + if (!conn) { logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, S3Storage::getConnection() returned NULL on init"); errno = EINVAL; return -1; } - ScopedConnection sc(this, creds); + ScopedConnection sc(this, conn); do { - err = ms3_get(creds, bucket.c_str(), sourceKey.c_str(), &_data, &len); - if (err && (!skipRetryableErrors && retryable_error(err))) + err = ms3_get(conn->conn, bucket.c_str(), sourceKey.c_str(), &_data, &len); + if (err && (!skipRetryableErrors && retryable_error(err)) && !conn->terminate) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s." " Retrying...", - ms3_server_error(creds), bucket.c_str(), sourceKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str()); else logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s. Retrying...", @@ -392,20 +405,20 @@ int S3Storage::getObject(const string& _sourceKey, std::shared_ptr* d { getIAMRoleFromMetadataEC2(); getCredentialsFromMetadataEC2(); - ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); + ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); } else if (!IAMrole.empty()) { - ms3_assume_role(creds); + ms3_assume_role(conn->conn); } sleep(5); } - } while (err && (!skipRetryableErrors && retryable_error(err))); + } while (err && (!skipRetryableErrors && retryable_error(err)) && !conn->terminate); if (err) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s.", - ms3_server_error(creds), bucket.c_str(), sourceKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str()); else logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s.", s3err_msgs[err], bucket.c_str(), sourceKey.c_str()); @@ -474,26 +487,26 @@ int S3Storage::putObject(const std::shared_ptr data, size_t len, cons { string destKey = prefix + _destKey; uint8_t s3err; - ms3_st* creds = getConnection(); - if (!creds) + auto conn = getConnection(); + if (!conn) { logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, S3Storage::getConnection() returned NULL on init"); errno = EINVAL; return -1; } - ScopedConnection sc(this, creds); + ScopedConnection sc(this, conn); do { - s3err = ms3_put(creds, bucket.c_str(), destKey.c_str(), data.get(), len); - if (s3err && (!skipRetryableErrors && retryable_error(s3err))) + s3err = ms3_put(conn->conn, bucket.c_str(), destKey.c_str(), data.get(), len); + if (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s." " Retrying...", - ms3_server_error(creds), bucket.c_str(), destKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), destKey.c_str()); else logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s." @@ -503,20 +516,20 @@ int S3Storage::putObject(const std::shared_ptr data, size_t len, cons { getIAMRoleFromMetadataEC2(); getCredentialsFromMetadataEC2(); - ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); + ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); } else if (!IAMrole.empty()) { - ms3_assume_role(creds); + ms3_assume_role(conn->conn); } sleep(5); } - } while (s3err && (!skipRetryableErrors && retryable_error(s3err))); + } while (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate); if (s3err) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s.", - ms3_server_error(creds), bucket.c_str(), destKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), destKey.c_str()); else logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s.", s3err_msgs[s3err], bucket.c_str(), destKey.c_str()); @@ -535,8 +548,8 @@ int S3Storage::deleteObject(const string& _key) { uint8_t s3err; string deleteKey = prefix + _key; - ms3_st* creds = getConnection(); - if (!creds) + auto conn = getConnection(); + if (!conn) { logger->log( LOG_ERR, @@ -544,18 +557,18 @@ int S3Storage::deleteObject(const string& _key) errno = EINVAL; return -1; } - ScopedConnection sc(this, creds); + ScopedConnection sc(this, conn); do { - s3err = ms3_delete(creds, bucket.c_str(), deleteKey.c_str()); - if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err))) + s3err = ms3_delete(conn->conn, bucket.c_str(), deleteKey.c_str()); + if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_WARNING, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s." " Retrying...", - ms3_server_error(creds), bucket.c_str(), deleteKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), deleteKey.c_str()); else logger->log( LOG_WARNING, @@ -565,22 +578,22 @@ int S3Storage::deleteObject(const string& _key) { getIAMRoleFromMetadataEC2(); getCredentialsFromMetadataEC2(); - ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); + ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); } else if (!IAMrole.empty()) { - ms3_assume_role(creds); + ms3_assume_role(conn->conn); } sleep(5); } - } while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err))); + } while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate); if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s.", - ms3_server_error(creds), bucket.c_str(), deleteKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), deleteKey.c_str()); else logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s.", s3err_msgs[s3err], bucket.c_str(), deleteKey.c_str()); @@ -593,26 +606,26 @@ int S3Storage::copyObject(const string& _sourceKey, const string& _destKey) { string sourceKey = prefix + _sourceKey, destKey = prefix + _destKey; uint8_t s3err; - ms3_st* creds = getConnection(); - if (!creds) + auto conn = getConnection(); + if (!conn) { logger->log(LOG_ERR, "S3Storage::copyObject(): failed to copy, S3Storage::getConnection() returned NULL on init"); errno = EINVAL; return -1; } - ScopedConnection sc(this, creds); + ScopedConnection sc(this, conn); do { - s3err = ms3_copy(creds, bucket.c_str(), sourceKey.c_str(), bucket.c_str(), destKey.c_str()); - if (s3err && (!skipRetryableErrors && retryable_error(s3err))) + s3err = ms3_copy(conn->conn, bucket.c_str(), sourceKey.c_str(), bucket.c_str(), destKey.c_str()); + if (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_WARNING, "S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, " "destkey = %s. Retrying...", - ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str(), destKey.c_str()); else logger->log(LOG_WARNING, "S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, " @@ -622,24 +635,24 @@ int S3Storage::copyObject(const string& _sourceKey, const string& _destKey) { getIAMRoleFromMetadataEC2(); getCredentialsFromMetadataEC2(); - ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); + ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); } else if (!IAMrole.empty()) { - ms3_assume_role(creds); + ms3_assume_role(conn->conn); } sleep(5); } - } while (s3err && (!skipRetryableErrors && retryable_error(s3err))); + } while (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate); if (s3err) { // added the add'l check MS3_ERR_NOT_FOUND to suppress error msgs for a legitimate case in IOC::copyFile() - if (ms3_server_error(creds) && s3err != MS3_ERR_NOT_FOUND) + if (ms3_server_error(conn->conn) && s3err != MS3_ERR_NOT_FOUND) logger->log(LOG_ERR, "S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, " "destkey = %s.", - ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str(), destKey.c_str()); else if (s3err != MS3_ERR_NOT_FOUND) logger->log(LOG_ERR, "S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, " @@ -668,7 +681,7 @@ int S3Storage::exists(const string& _key, bool* out) string existsKey = prefix + _key; uint8_t s3err; ms3_status_st status; - ms3_st* creds = getConnection(); + auto creds = getConnection(); if (!creds) { logger->log(LOG_ERR, @@ -680,14 +693,14 @@ int S3Storage::exists(const string& _key, bool* out) do { - s3err = ms3_status(creds, bucket.c_str(), existsKey.c_str(), &status); - if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err))) + s3err = ms3_status(creds->conn, bucket.c_str(), existsKey.c_str(), &status); + if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !creds->terminate) { - if (ms3_server_error(creds)) + if (ms3_server_error(creds->conn)) logger->log(LOG_WARNING, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s." " Retrying...", - ms3_server_error(creds), bucket.c_str(), existsKey.c_str()); + ms3_server_error(creds->conn), bucket.c_str(), existsKey.c_str()); else logger->log(LOG_WARNING, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s. Retrying...", @@ -696,21 +709,21 @@ int S3Storage::exists(const string& _key, bool* out) { getIAMRoleFromMetadataEC2(); getCredentialsFromMetadataEC2(); - ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); + ms3_ec2_set_cred(creds->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); } else if (!IAMrole.empty()) { - ms3_assume_role(creds); + ms3_assume_role(creds->conn); } sleep(5); } - } while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err))); + } while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !creds->terminate); if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND) { - if (ms3_server_error(creds)) + if (ms3_server_error(creds->conn)) logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s.", - ms3_server_error(creds), bucket.c_str(), existsKey.c_str()); + ms3_server_error(creds->conn), bucket.c_str(), existsKey.c_str()); else logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s.", s3err_msgs[s3err], bucket.c_str(), existsKey.c_str()); @@ -722,7 +735,7 @@ int S3Storage::exists(const string& _key, bool* out) return 0; } -ms3_st* S3Storage::getConnection() +shared_ptr S3Storage::getConnection() { boost::unique_lock s(connMutex); @@ -731,12 +744,12 @@ ms3_st* S3Storage::getConnection() clock_gettime(CLOCK_MONOTONIC_COARSE, &now); while (!freeConns.empty()) { - Connection& back = freeConns.back(); - if (back.idleSince.tv_sec + maxIdleSecs <= now.tv_sec) + auto& back = freeConns.back(); + if (back->touchedAt.tv_sec + maxIdleSecs <= now.tv_sec) { - ms3_deinit(back.conn); + ms3_deinit(back->conn); // connMutexes.erase(back.conn); - back.conn = NULL; + back->conn = nullptr; freeConns.pop_back(); } else @@ -744,43 +757,54 @@ ms3_st* S3Storage::getConnection() } // get a connection - ms3_st* ret = NULL; uint8_t res = 0; if (freeConns.empty()) { - ret = ms3_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? NULL : endpoint.c_str())); + auto ret = make_shared(nextConnId++); + ret->conn = ms3_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? nullptr : endpoint.c_str())); // Something went wrong with libmarias3 init - if (ret == NULL) + if (ret->conn == nullptr) { logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_init returned NULL, no specific info to report"); + return nullptr; } // Set option for use http instead of https if (useHTTP) { - ms3_set_option(ret, MS3_OPT_USE_HTTP, NULL); + ms3_set_option(ret->conn, MS3_OPT_USE_HTTP, nullptr); } // Set option to disable SSL Verification if (!sslVerify) { - ms3_set_option(ret, MS3_OPT_DISABLE_SSL_VERIFY, NULL); + ms3_set_option(ret->conn, MS3_OPT_DISABLE_SSL_VERIFY, nullptr); } // Port number is not 0 so it was set by cnf file if (portNumber != 0) { - ms3_set_option(ret, MS3_OPT_PORT_NUMBER, &portNumber); + ms3_set_option(ret->conn, MS3_OPT_PORT_NUMBER, &portNumber); + } + + // timeouts + if (connectTimeout.has_value()) + { + ms3_set_option(ret->conn, MS3_OPT_CONNECT_TIMEOUT, &connectTimeout.value()); + } + if (operationTimeout.has_value()) + { + ms3_set_option(ret->conn, MS3_OPT_TIMEOUT, &operationTimeout.value()); } // IAM role setup for keys if (!IAMrole.empty()) { if (isEC2Instance) { - res = ms3_ec2_set_cred(ret, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); + res = ms3_ec2_set_cred(ret->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); } else { - res = ms3_init_assume_role(ret, (IAMrole.empty() ? NULL : IAMrole.c_str()), - (STSendpoint.empty() ? NULL : STSendpoint.c_str()), - (STSregion.empty() ? NULL : STSregion.c_str())); + res = ms3_init_assume_role(ret->conn, (IAMrole.empty() ? nullptr : IAMrole.c_str()), + (STSendpoint.empty() ? nullptr : STSendpoint.c_str()), + (STSregion.empty() ? nullptr : STSregion.c_str())); } if (res) @@ -791,34 +815,69 @@ ms3_st* S3Storage::getConnection() "aws_access_key_id, aws_secret_access_key values. Also check sts_region and sts_endpoint " "if configured.", IAMrole.c_str()); - if (ms3_server_error(ret)) + if (ms3_server_error(ret->conn)) logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_error: server says '%s' role name = %s", - ms3_server_error(ret), IAMrole.c_str()); - ms3_deinit(ret); - ret = NULL; + ms3_server_error(ret->conn), IAMrole.c_str()); + ms3_deinit(ret->conn); + ret->conn = nullptr; } } // assert(connMutexes[ret].try_lock()); - s.unlock(); + if (ret->conn == nullptr) + { + s.unlock(); + return {}; + } + clock_gettime(CLOCK_MONOTONIC_COARSE, &ret->touchedAt); + usedConns.try_emplace(ret->id, ret); return ret; } - assert(freeConns.front().idleSince.tv_sec + maxIdleSecs > now.tv_sec); - ret = freeConns.front().conn; + assert(freeConns.front()->touchedAt.tv_sec + maxIdleSecs > now.tv_sec); + auto ret = freeConns.front(); + clock_gettime(CLOCK_MONOTONIC_COARSE, &ret->touchedAt); freeConns.pop_front(); // assert(connMutexes[ret].try_lock()); return ret; } -void S3Storage::returnConnection(ms3_st* ms3) +void S3Storage::returnConnection(std::shared_ptr conn) { - assert(ms3); - Connection conn; - conn.conn = ms3; - clock_gettime(CLOCK_MONOTONIC_COARSE, &conn.idleSince); + assert(conn); + assert(conn->conn); + clock_gettime(CLOCK_MONOTONIC_COARSE, &conn->touchedAt); boost::unique_lock s(connMutex); + usedConns.erase(conn->id); + conn->terminate = false; + conn->id = nextConnId++; freeConns.push_front(conn); // connMutexes[ms3].unlock(); } +vector S3Storage::taskList() const +{ + boost::unique_lock s(connMutex); + vector ret; + ret.reserve(usedConns.size()); + timespec now; + clock_gettime(CLOCK_MONOTONIC_COARSE, &now); + for (const auto& [id, conn]: usedConns) { + double elapsed = (double)(now.tv_sec - conn->touchedAt.tv_sec) + 1.e-9 * (now.tv_nsec - conn->touchedAt.tv_nsec);; + ret.emplace_back(id, elapsed); + } + s.unlock(); + return ret; +} + +bool S3Storage::killTask(uint64_t id) +{ + boost::unique_lock s(connMutex); + if (auto it = usedConns.find(id); it != usedConns.end()) + { + it->second->terminate = true; + return true; + } + return false; +} + } // namespace storagemanager diff --git a/storage-manager/src/S3Storage.h b/storage-manager/src/S3Storage.h index 98d4b43f8..96e412edf 100644 --- a/storage-manager/src/S3Storage.h +++ b/storage-manager/src/S3Storage.h @@ -21,6 +21,7 @@ #include #include #include +#include #include "CloudStorage.h" #include "libmarias3/marias3.h" #include "Config.h" @@ -31,24 +32,28 @@ namespace storagemanager class S3Storage : public CloudStorage { public: - S3Storage(bool skipRetry = false); + explicit S3Storage(bool skipRetry = false); - virtual ~S3Storage(); + ~S3Storage() override; - int getObject(const std::string& sourceKey, const std::string& destFile, size_t* size = NULL); - int getObject(const std::string& sourceKey, std::shared_ptr* data, size_t* size = NULL); - int putObject(const std::string& sourceFile, const std::string& destKey); - int putObject(const std::shared_ptr data, size_t len, const std::string& destKey); - int deleteObject(const std::string& key); - int copyObject(const std::string& sourceKey, const std::string& destKey); - int exists(const std::string& key, bool* out); + int getObject(const std::string& sourceKey, const std::string& destFile, size_t* size = NULL) override; + int getObject(const std::string& sourceKey, std::shared_ptr* data, size_t* size = NULL) override; + int putObject(const std::string& sourceFile, const std::string& destKey) override; + int putObject(const std::shared_ptr data, size_t len, const std::string& destKey) override; + int deleteObject(const std::string& key) override; + int copyObject(const std::string& sourceKey, const std::string& destKey) override; + int exists(const std::string& key, bool* out) override; + + std::vector taskList() const override; + bool killTask(uint64_t task_id) override; private: + struct Connection; bool getIAMRoleFromMetadataEC2(); bool getCredentialsFromMetadataEC2(); void testConnectivityAndPerms(); - ms3_st* getConnection(); - void returnConnection(ms3_st*); + std::shared_ptr getConnection(); + void returnConnection(std::shared_ptr conn); bool skipRetryableErrors; @@ -67,25 +72,32 @@ class S3Storage : public CloudStorage bool useHTTP; bool sslVerify; int portNumber; + std::optional connectTimeout; + std::optional operationTimeout; struct Connection { - ms3_st* conn; - timespec idleSince; + Connection(uint64_t id): id(id) {} + uint64_t id; + ms3_st* conn{nullptr}; + timespec touchedAt{}; + bool terminate{false}; }; struct ScopedConnection { - ScopedConnection(S3Storage*, ms3_st*); + ScopedConnection(S3Storage*, std::shared_ptr); ~ScopedConnection(); S3Storage* s3; - ms3_st* conn; + std::shared_ptr conn; }; // for sanity checking // std::map connMutexes; - boost::mutex connMutex; - std::deque freeConns; // using this as a stack to keep lru objects together + mutable boost::mutex connMutex; + std::deque> freeConns; // using this as a stack to keep lru objects together + std::unordered_map> usedConns; // using this for displaying and killing tasks + uint64_t nextConnId = 0; const time_t maxIdleSecs = 30; }; diff --git a/storage-manager/src/TerminateIOTask.cpp b/storage-manager/src/TerminateIOTask.cpp new file mode 100644 index 000000000..0fe6f7e42 --- /dev/null +++ b/storage-manager/src/TerminateIOTask.cpp @@ -0,0 +1,72 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include "TerminateIOTask.h" +#include "messageFormat.h" +#include "src/CloudStorage.h" +#include + +namespace storagemanager +{ + +TerminateIOTask::TerminateIOTask(int sock, uint len) : PosixTask(sock, len) +{ +} + +bool TerminateIOTask::run() +{ + bool success; + uint8_t buf[sizeof(terminate_iotask_cmd)]; + int err; + + if (getLength() != sizeof(terminate_iotask_cmd)) + { + handleError("TerminateIOTask read", EBADMSG); + return true; + } + + err = read(buf, getLength()); + if (err < 0) + { + handleError("TerminateIOTask read", errno); + return false; + } + + auto* cmd = reinterpret_cast(buf); + + auto* cs = CloudStorage::get(); + success = cs->killTask(cmd->id); + + if (success) + { + sm_response resp; + resp.returnCode = 0; + success = write(resp, 0); + if (!success) + { + handleError("TerminateIOTask write", errno); + return false; + } + } + else + { + handleError("TerminateIOTask", ENOENT); + } + return true; +} + +} // namespace storagemanager diff --git a/storage-manager/src/TerminateIOTask.h b/storage-manager/src/TerminateIOTask.h new file mode 100644 index 000000000..105e7adac --- /dev/null +++ b/storage-manager/src/TerminateIOTask.h @@ -0,0 +1,35 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include "PosixTask.h" + +namespace storagemanager +{ + +class TerminateIOTask : public PosixTask +{ + public: + TerminateIOTask() = delete; + TerminateIOTask(int sock, uint length); + ~TerminateIOTask() override = default; + + bool run() override; +}; + +} // namespace storagemanager diff --git a/storage-manager/src/smkill.cpp b/storage-manager/src/smkill.cpp new file mode 100644 index 000000000..ededf709b --- /dev/null +++ b/storage-manager/src/smkill.cpp @@ -0,0 +1,107 @@ +/* Copyright (C) 2019 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "IOCoordinator.h" +#include "messageFormat.h" +#include "SMComm.h" + +using namespace std; +using namespace storagemanager; + +void usage(const char* progname) +{ + cerr << progname << " kills S3 network operations by StorageManager" << endl; + cerr << "Usage: " << progname << " |all" << endl; +} + +bool SMOnline() +{ + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strcpy(&addr.sun_path[1], &socket_name[1]); // first char is null... + int clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0); + int err = ::connect(clientSocket, (const struct sockaddr*)&addr, sizeof(addr)); + if (err >= 0) + { + ::close(clientSocket); + return true; + } + return false; +} + +int main(int argc, char** argv) +{ + if (argc != 2) + { + usage(argv[0]); + return EXIT_FAILURE; + } + + string arg{argv[1]}; + uint64_t id = 0; + bool all = false; + if (arg == "all") + { + all = true; + } + else + { + id = stoll(arg); + } + + if (!SMOnline()) + { + cerr << "StorageManager is offline" << endl; + return EXIT_FAILURE; + } + + auto* sm = idbdatafile::SMComm::get(); + if (!all) + { + auto ret = sm->killIOTask(id); + if (ret < 0) + { + cerr << strerror(errno) << endl; + } + return ret < 0 ? EXIT_FAILURE : EXIT_SUCCESS; + } + + // iterate over ids + vector entries; + if (sm->listIOTasks(&entries) != 0) + { + cerr << "Couldn't get list of IO tasks" << endl; + return EXIT_FAILURE; + } + + for (const auto& entry: entries) + { + sm->killIOTask(entry.id); + } + + return EXIT_SUCCESS; +} diff --git a/storage-manager/src/smps.cpp b/storage-manager/src/smps.cpp new file mode 100644 index 000000000..8b2fec6c8 --- /dev/null +++ b/storage-manager/src/smps.cpp @@ -0,0 +1,126 @@ +/* Copyright (C) 2019 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "IOCoordinator.h" +#include "messageFormat.h" +#include "SMComm.h" + +using namespace std; +using namespace storagemanager; + +void usage(const char* progname) +{ + cerr << progname << " shows S3 network operations by StorageManager" << endl; + cerr << "Usage: " << progname << " [options]" << endl; + cerr << "Options:" << endl; + cerr << " -n - sort by id" << endl; + cerr << " -t - sort by running time" << endl; + cerr << " -r - reverse order" << endl; +} + +bool SMOnline() +{ + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strcpy(&addr.sun_path[1], &socket_name[1]); // first char is null... + int clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0); + int err = ::connect(clientSocket, (const struct sockaddr*)&addr, sizeof(addr)); + if (err >= 0) + { + ::close(clientSocket); + return true; + } + return false; +} + +bool byId(const list_iotask_resp_entry& lhs, const list_iotask_resp_entry& rhs) +{ + return lhs.id < rhs.id; +} + +bool byTime(const list_iotask_resp_entry& lhs, const list_iotask_resp_entry& rhs) +{ + return lhs.runningTime < rhs.runningTime; +} + +int main(int argc, char** argv) +{ + bool reverseSort = false; + function sortPred; + int c; + while ((c = getopt(argc, argv, "nrth")) != -1) + { + switch (c) + { + case 'n': + sortPred = byId; + break; + case 't': + sortPred = byTime; + break; + case 'r': + reverseSort = true; + break; + default: + usage(argv[0]); + return c == 'h' ? EXIT_SUCCESS : EXIT_FAILURE; + } + } + + if (!SMOnline()) + { + cerr << "StorageManager is offline" << endl; + return EXIT_FAILURE; + } + + auto* sm = idbdatafile::SMComm::get(); + vector entries; + if (sm->listIOTasks(&entries) != 0) + { + cerr << "Couldn't get list of IO tasks" << endl; + return EXIT_FAILURE; + } + + if (sortPred) + { + sort(entries.begin(), entries.end(), sortPred); + } + if (reverseSort) + { + reverse(entries.begin(), entries.end()); + } + cout << setw(24) << left << "Id" << "Time" << endl; + for (const auto& entry: entries) + { + cout << setw(24) << left << entry.id << setw(12) << right << fixed << entry.runningTime << endl; + } + + return EXIT_SUCCESS; +} diff --git a/storage-manager/storagemanager.cnf.in b/storage-manager/storagemanager.cnf.in index 532213022..04347aced 100644 --- a/storage-manager/storagemanager.cnf.in +++ b/storage-manager/storagemanager.cnf.in @@ -141,6 +141,16 @@ bucket = some_bucket # Default is libs3_debug = disabled # libs3_debug = disabled +# Sets the maximum time in seconds for the connection phase to take. This +# timeout only limits the connection phase, it has no impact once the connection +# is established. The default value indicating that the default libcurl +# timeout (300 seconds?) will be used. +# connect_timeout = 5.5 + +# Sets the maximum time in seconds for the entire transfer operation to take. +# Default (no value) - no timeout at all. +# timeout = 7.5 + # The LocalStorage section configures the 'local storage' module # if specified by ObjectStorage/service. [LocalStorage] diff --git a/utils/cloudio/SMComm.cpp b/utils/cloudio/SMComm.cpp index e2e9b3a93..a8d7d1557 100644 --- a/utils/cloudio/SMComm.cpp +++ b/utils/cloudio/SMComm.cpp @@ -16,6 +16,7 @@ MA 02110-1301, USA. */ #include "SMComm.h" +#include "bytestream.h" #include "messageFormat.h" using namespace std; @@ -277,4 +278,45 @@ int SMComm::copyFile(const string& file1, const string& file2) common_exit(command, response, err); } +int SMComm::listIOTasks(vector* entries) +{ + ByteStream* command = buffers.getByteStream(); + ByteStream* response = buffers.getByteStream(); + ssize_t err; + + *command << (uint8_t)storagemanager::LIST_IOTASKS; + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + + uint32_t numElements; + entries->clear(); + *response >> numElements; + entries->reserve(numElements); + while (numElements > 0) + { + storagemanager::list_iotask_resp_entry entry; + *response >> entry.id >> entry.runningTime; + entries->push_back(std::move(entry)); + --numElements; + } + + common_exit(command, response, err); +} + +int SMComm::killIOTask(uint64_t id) +{ + ByteStream* command = buffers.getByteStream(); + ByteStream* response = buffers.getByteStream(); + ssize_t err; + + *command << (uint8_t)storagemanager::TERMINATE_IOTASK << id; + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + common_exit(command, response, err); +} + } // namespace idbdatafile diff --git a/utils/cloudio/SMComm.h b/utils/cloudio/SMComm.h index 71d984b24..0470d01d6 100644 --- a/utils/cloudio/SMComm.h +++ b/utils/cloudio/SMComm.h @@ -23,6 +23,11 @@ #include "bytestream.h" #include "bytestreampool.h" +namespace storagemanager +{ + struct list_iotask_resp_entry; +} + namespace idbdatafile { class SMComm : public boost::noncopyable @@ -61,6 +66,9 @@ class SMComm : public boost::noncopyable int copyFile(const std::string& file1, const std::string& file2); + int listIOTasks(std::vector* entries); + int killIOTask(uint64_t id); + virtual ~SMComm(); private: diff --git a/utils/libmarias3/libmarias3 b/utils/libmarias3/libmarias3 index bce1ac8da..f74150b05 160000 --- a/utils/libmarias3/libmarias3 +++ b/utils/libmarias3/libmarias3 @@ -1 +1 @@ -Subproject commit bce1ac8da0847420ff4cf1489c44c3911d5f1f59 +Subproject commit f74150b05693440d35f93c43e2d2411cc66fee19