From a1b4a49d5c83bd533ca8ee1c4952697b0c590dcb Mon Sep 17 00:00:00 2001 From: Alexey Antipovsky Date: Sat, 17 Aug 2024 21:59:29 +0200 Subject: [PATCH] feat(SM): MCOL-5785 S3Storage improvements [develop-23.02] (#3270) Update libmarias3 fix build with the recent libmarias3 feat(SM): MCOL-5785 Add timeout options for S3Storage In some unfortunate situations StorageManager may get stuck on network operations. This commit adds the ability to set network timeouts which will help to ensure that the system is more responsive. feat(SM): MCOL-5785 Add smps & smkill tools * `smps` shows all active S3 network operations * `smkill` terminates S3 network operations NB! At the moment smkill is able to terminate operations that are stuck on retries, but not hang inside the libcurl call. In other words if you want to terminate all operations you should configure `connect_timeout` & `timeout` Install smkill & smps Add install for new binaries --- debian/mariadb-plugin-columnstore.install | 2 + storage-manager/CMakeLists.txt | 16 +- storage-manager/include/messageFormat.h | 44 +++- storage-manager/src/CloudStorage.cpp | 10 + storage-manager/src/CloudStorage.h | 8 + storage-manager/src/ListIOTask.cpp | 80 +++++++ storage-manager/src/ListIOTask.h | 34 +++ storage-manager/src/ProcessTask.cpp | 4 + storage-manager/src/S3Storage.cpp | 241 ++++++++++++++-------- storage-manager/src/S3Storage.h | 46 +++-- storage-manager/src/TerminateIOTask.cpp | 72 +++++++ storage-manager/src/TerminateIOTask.h | 35 ++++ storage-manager/src/smkill.cpp | 107 ++++++++++ storage-manager/src/smps.cpp | 126 +++++++++++ storage-manager/storagemanager.cnf.in | 10 + utils/cloudio/SMComm.cpp | 42 ++++ utils/cloudio/SMComm.h | 8 + utils/libmarias3/libmarias3 | 2 +- 18 files changed, 776 insertions(+), 111 deletions(-) create mode 100644 storage-manager/src/ListIOTask.cpp create mode 100644 storage-manager/src/ListIOTask.h create mode 100644 storage-manager/src/TerminateIOTask.cpp create mode 100644 storage-manager/src/TerminateIOTask.h create mode 100644 storage-manager/src/smkill.cpp create mode 100644 storage-manager/src/smps.cpp diff --git a/debian/mariadb-plugin-columnstore.install b/debian/mariadb-plugin-columnstore.install index 398dccd1e..9350095be 100644 --- a/debian/mariadb-plugin-columnstore.install +++ b/debian/mariadb-plugin-columnstore.install @@ -44,7 +44,9 @@ usr/bin/reset_locks usr/bin/rollback usr/bin/save_brm usr/bin/smcat +usr/bin/smkill usr/bin/smls +usr/bin/smps usr/bin/smput usr/bin/smrm usr/bin/testS3Connection diff --git a/storage-manager/CMakeLists.txt b/storage-manager/CMakeLists.txt index a874a93d4..4297081c5 100755 --- a/storage-manager/CMakeLists.txt +++ b/storage-manager/CMakeLists.txt @@ -36,6 +36,8 @@ set(storagemanager_SRCS src/Ownership.cpp src/PrefixCache.cpp src/SyncTask.cpp + src/ListIOTask.cpp + src/TerminateIOTask.cpp ../utils/common/crashtrace.cpp ) @@ -125,12 +127,24 @@ target_link_libraries(smrm storagemanager cloudio ${ENGINE_EXEC_LIBS} ) +add_executable(smps src/smps.cpp) +target_link_libraries(smps storagemanager cloudio + ${ENGINE_LDFLAGS} + ${ENGINE_EXEC_LIBS} +) + + +add_executable(smkill src/smkill.cpp) +target_link_libraries(smkill storagemanager cloudio + ${ENGINE_LDFLAGS} + ${ENGINE_EXEC_LIBS} +) install(TARGETS storagemanager LIBRARY DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine ) -install(TARGETS StorageManager smcat smput smls smrm testS3Connection +install(TARGETS StorageManager smcat smkill smps smput smls smrm testS3Connection RUNTIME DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine ) diff --git a/storage-manager/include/messageFormat.h b/storage-manager/include/messageFormat.h index 719401427..a2760afa2 100644 --- a/storage-manager/include/messageFormat.h +++ b/storage-manager/include/messageFormat.h @@ -84,7 +84,9 @@ enum Opcodes LIST_DIRECTORY, PING, COPY, - SYNC + SYNC, + LIST_IOTASKS, + TERMINATE_IOTASK }; /* @@ -300,6 +302,46 @@ struct copy_cmd // use f_name as an overlay at the end of file1 to get file2. }; +/* + LIST_IOTASKS + ------------ + command format: + 1-byte opcode + + response format: + 4-byte num elements|(8-byte id|double runningTime) * num elements +*/ +struct list_iotask_cmd +{ + uint8_t opcode; +}; + +struct list_iotask_resp_entry +{ + uint64_t id; + double runningTime; +}; + +struct list_iotask_resp +{ + uint32_t elements; + list_iotask_resp_entry entries[]; +}; + +/* + TERMINATE_IOTASK + ---------------- + command format: + 1-byte opcode|8-byte id + + response format: +*/ +struct terminate_iotask_cmd +{ + uint8_t opcode; + uint64_t id; +}; + #pragma pack(pop) } // namespace storagemanager diff --git a/storage-manager/src/CloudStorage.cpp b/storage-manager/src/CloudStorage.cpp index 40eae93ae..c1419198d 100644 --- a/storage-manager/src/CloudStorage.cpp +++ b/storage-manager/src/CloudStorage.cpp @@ -99,4 +99,14 @@ void CloudStorage::printKPIs() const cout << "\texistenceChecks = " << existenceChecks << endl; } +vector CloudStorage::taskList() const +{ + return {}; +} + +bool CloudStorage::killTask(uint64_t) +{ + return false; +} + } // namespace storagemanager diff --git a/storage-manager/src/CloudStorage.h b/storage-manager/src/CloudStorage.h index a9ad0d687..c857f1b67 100644 --- a/storage-manager/src/CloudStorage.h +++ b/storage-manager/src/CloudStorage.h @@ -39,6 +39,14 @@ class CloudStorage virtual void printKPIs() const; + struct IOTaskData + { + uint64_t id; + double runningTime; + }; + virtual std::vector taskList() const; + virtual bool killTask(uint64_t task_id); + // this will return a CloudStorage instance of the type specified in StorageManager.cnf static CloudStorage* get(); diff --git a/storage-manager/src/ListIOTask.cpp b/storage-manager/src/ListIOTask.cpp new file mode 100644 index 000000000..acfadce18 --- /dev/null +++ b/storage-manager/src/ListIOTask.cpp @@ -0,0 +1,80 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include "ListIOTask.h" +#include "SMLogging.h" +#include "messageFormat.h" +#include "src/CloudStorage.h" +#include +#include + +namespace storagemanager +{ + +ListIOTask::ListIOTask(int sock, uint len) : PosixTask(sock, len) +{ +} + +bool ListIOTask::run() +{ + bool success; + uint8_t buf[1]; + int err; + + if (getLength() > 1) + { + handleError("ListIOTask read", E2BIG); + return true; + } + + // consume the msg + err = read(buf, getLength()); + if (err < 0) + { + handleError("ListIOTask read", errno); + return false; + } + + auto* cs = CloudStorage::get(); + auto taskList = cs->taskList(); + size_t payloadLen = sizeof(list_iotask_resp) + sizeof(list_iotask_resp_entry) * taskList.size(); + size_t headerLen = sizeof(sm_response); + std::vector payloadBuf(payloadLen + headerLen); + + auto* resp = reinterpret_cast(payloadBuf.data()); + resp->header.type = SM_MSG_START; + resp->header.payloadLen = payloadLen + headerLen - sizeof(sm_msg_header); + resp->header.flags = 0; + resp->returnCode = 0; + auto* r = reinterpret_cast(resp->payload); + r->elements = taskList.size(); + + for (size_t i = 0; i < taskList.size(); ++i) + { + r->entries[i].id = taskList[i].id; + r->entries[i].runningTime = taskList[i].runningTime; + } + success = write(payloadBuf.data(), payloadBuf.size()); + if (!success) + { + handleError("ListIOTask read", errno); + return false; + } + return true; +} + +} // namespace storagemanager diff --git a/storage-manager/src/ListIOTask.h b/storage-manager/src/ListIOTask.h new file mode 100644 index 000000000..ed41a08da --- /dev/null +++ b/storage-manager/src/ListIOTask.h @@ -0,0 +1,34 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include "PosixTask.h" + +namespace storagemanager +{ +class ListIOTask : public PosixTask +{ + public: + ListIOTask() = delete; + ListIOTask(int sock, uint length); + ~ListIOTask() override = default; + + bool run() override; +}; + +} // namespace storagemanager diff --git a/storage-manager/src/ProcessTask.cpp b/storage-manager/src/ProcessTask.cpp index a849fd303..5da40a7bf 100644 --- a/storage-manager/src/ProcessTask.cpp +++ b/storage-manager/src/ProcessTask.cpp @@ -25,10 +25,12 @@ #include "AppendTask.h" #include "CopyTask.h" #include "ListDirectoryTask.h" +#include "ListIOTask.h" #include "OpenTask.h" #include "PingTask.h" #include "ReadTask.h" #include "StatTask.h" +#include "TerminateIOTask.h" #include "TruncateTask.h" #include "UnlinkTask.h" #include "WriteTask.h" @@ -93,6 +95,8 @@ void ProcessTask::operator()() case PING: task.reset(new PingTask(sock, length)); break; case SYNC: task.reset(new SyncTask(sock, length)); break; case COPY: task.reset(new CopyTask(sock, length)); break; + case LIST_IOTASKS: task.reset(new ListIOTask(sock, length)); break; + case TERMINATE_IOTASK: task.reset(new TerminateIOTask(sock, length)); break; default: throw runtime_error("ProcessTask: got an unknown opcode"); } task->primeBuffer(); diff --git a/storage-manager/src/S3Storage.cpp b/storage-manager/src/S3Storage.cpp index 9cbc73bb8..7e0b1957e 100644 --- a/storage-manager/src/S3Storage.cpp +++ b/storage-manager/src/S3Storage.cpp @@ -22,10 +22,13 @@ #include #include #include +#include +#include #include #include #include #include +#include #define BOOST_SPIRIT_THREADSAFE #ifndef __clang__ #pragma GCC diagnostic push @@ -99,7 +102,7 @@ const char* s3err_msgs[] = {"All is well", "Authentication failed, token has expired", "Configured bucket does not match endpoint location"}; -S3Storage::ScopedConnection::ScopedConnection(S3Storage* s, ms3_st* m) : s3(s), conn(m) +S3Storage::ScopedConnection::ScopedConnection(S3Storage* s, shared_ptr m) : s3(s), conn(m) { assert(conn); } @@ -131,6 +134,8 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry) string ssl_verify = tolower(config->getValue("S3", "ssl_verify")); string port_number = config->getValue("S3", "port_number"); string libs3_debug = config->getValue("S3", "libs3_debug"); + string connect_timeout = config->getValue("S3", "connect_timeout"); + string timeout = config->getValue("S3", "timeout"); bool keyMissing = false; isEC2Instance = false; @@ -210,11 +215,19 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry) } endpoint = config->getValue("S3", "endpoint"); + if (!connect_timeout.empty()) + { + connectTimeout = stof(connect_timeout); + } + if (!timeout.empty()) + { + operationTimeout = stof(timeout); + } ms3_library_init(); if (libs3_debug == "enabled") { - ms3_debug(); + ms3_debug(1); } testConnectivityAndPerms(); } @@ -222,7 +235,7 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry) S3Storage::~S3Storage() { for (auto& conn : freeConns) - ms3_deinit(conn.conn); + ms3_deinit(conn->conn); ms3_library_deinit(); } @@ -236,7 +249,7 @@ S3Storage::~S3Storage() bool S3Storage::getIAMRoleFromMetadataEC2() { - CURL* curl = NULL; + CURL* curl = nullptr; CURLcode curl_res; string readBuffer; string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/"; @@ -258,7 +271,7 @@ bool S3Storage::getIAMRoleFromMetadataEC2() bool S3Storage::getCredentialsFromMetadataEC2() { - CURL* curl = NULL; + CURL* curl = nullptr; CURLcode curl_res; std::string readBuffer; string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/" + IAMrole; @@ -361,29 +374,29 @@ int S3Storage::getObject(const string& _sourceKey, std::shared_ptr* d { uint8_t err; size_t len = 0; - uint8_t* _data = NULL; + uint8_t* _data = nullptr; string sourceKey = prefix + _sourceKey; - ms3_st* creds = getConnection(); - if (!creds) + auto conn = getConnection(); + if (!conn) { logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, S3Storage::getConnection() returned NULL on init"); errno = EINVAL; return -1; } - ScopedConnection sc(this, creds); + ScopedConnection sc(this, conn); do { - err = ms3_get(creds, bucket.c_str(), sourceKey.c_str(), &_data, &len); - if (err && (!skipRetryableErrors && retryable_error(err))) + err = ms3_get(conn->conn, bucket.c_str(), sourceKey.c_str(), &_data, &len); + if (err && (!skipRetryableErrors && retryable_error(err)) && !conn->terminate) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s." " Retrying...", - ms3_server_error(creds), bucket.c_str(), sourceKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str()); else logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s. Retrying...", @@ -392,20 +405,20 @@ int S3Storage::getObject(const string& _sourceKey, std::shared_ptr* d { getIAMRoleFromMetadataEC2(); getCredentialsFromMetadataEC2(); - ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); + ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); } else if (!IAMrole.empty()) { - ms3_assume_role(creds); + ms3_assume_role(conn->conn); } sleep(5); } - } while (err && (!skipRetryableErrors && retryable_error(err))); + } while (err && (!skipRetryableErrors && retryable_error(err)) && !conn->terminate); if (err) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s.", - ms3_server_error(creds), bucket.c_str(), sourceKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str()); else logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s.", s3err_msgs[err], bucket.c_str(), sourceKey.c_str()); @@ -474,26 +487,26 @@ int S3Storage::putObject(const std::shared_ptr data, size_t len, cons { string destKey = prefix + _destKey; uint8_t s3err; - ms3_st* creds = getConnection(); - if (!creds) + auto conn = getConnection(); + if (!conn) { logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, S3Storage::getConnection() returned NULL on init"); errno = EINVAL; return -1; } - ScopedConnection sc(this, creds); + ScopedConnection sc(this, conn); do { - s3err = ms3_put(creds, bucket.c_str(), destKey.c_str(), data.get(), len); - if (s3err && (!skipRetryableErrors && retryable_error(s3err))) + s3err = ms3_put(conn->conn, bucket.c_str(), destKey.c_str(), data.get(), len); + if (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s." " Retrying...", - ms3_server_error(creds), bucket.c_str(), destKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), destKey.c_str()); else logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s." @@ -503,20 +516,20 @@ int S3Storage::putObject(const std::shared_ptr data, size_t len, cons { getIAMRoleFromMetadataEC2(); getCredentialsFromMetadataEC2(); - ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); + ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); } else if (!IAMrole.empty()) { - ms3_assume_role(creds); + ms3_assume_role(conn->conn); } sleep(5); } - } while (s3err && (!skipRetryableErrors && retryable_error(s3err))); + } while (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate); if (s3err) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s.", - ms3_server_error(creds), bucket.c_str(), destKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), destKey.c_str()); else logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s.", s3err_msgs[s3err], bucket.c_str(), destKey.c_str()); @@ -535,8 +548,8 @@ int S3Storage::deleteObject(const string& _key) { uint8_t s3err; string deleteKey = prefix + _key; - ms3_st* creds = getConnection(); - if (!creds) + auto conn = getConnection(); + if (!conn) { logger->log( LOG_ERR, @@ -544,18 +557,18 @@ int S3Storage::deleteObject(const string& _key) errno = EINVAL; return -1; } - ScopedConnection sc(this, creds); + ScopedConnection sc(this, conn); do { - s3err = ms3_delete(creds, bucket.c_str(), deleteKey.c_str()); - if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err))) + s3err = ms3_delete(conn->conn, bucket.c_str(), deleteKey.c_str()); + if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_WARNING, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s." " Retrying...", - ms3_server_error(creds), bucket.c_str(), deleteKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), deleteKey.c_str()); else logger->log( LOG_WARNING, @@ -565,22 +578,22 @@ int S3Storage::deleteObject(const string& _key) { getIAMRoleFromMetadataEC2(); getCredentialsFromMetadataEC2(); - ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); + ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); } else if (!IAMrole.empty()) { - ms3_assume_role(creds); + ms3_assume_role(conn->conn); } sleep(5); } - } while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err))); + } while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate); if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s.", - ms3_server_error(creds), bucket.c_str(), deleteKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), deleteKey.c_str()); else logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s.", s3err_msgs[s3err], bucket.c_str(), deleteKey.c_str()); @@ -593,26 +606,26 @@ int S3Storage::copyObject(const string& _sourceKey, const string& _destKey) { string sourceKey = prefix + _sourceKey, destKey = prefix + _destKey; uint8_t s3err; - ms3_st* creds = getConnection(); - if (!creds) + auto conn = getConnection(); + if (!conn) { logger->log(LOG_ERR, "S3Storage::copyObject(): failed to copy, S3Storage::getConnection() returned NULL on init"); errno = EINVAL; return -1; } - ScopedConnection sc(this, creds); + ScopedConnection sc(this, conn); do { - s3err = ms3_copy(creds, bucket.c_str(), sourceKey.c_str(), bucket.c_str(), destKey.c_str()); - if (s3err && (!skipRetryableErrors && retryable_error(s3err))) + s3err = ms3_copy(conn->conn, bucket.c_str(), sourceKey.c_str(), bucket.c_str(), destKey.c_str()); + if (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate) { - if (ms3_server_error(creds)) + if (ms3_server_error(conn->conn)) logger->log(LOG_WARNING, "S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, " "destkey = %s. Retrying...", - ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str(), destKey.c_str()); else logger->log(LOG_WARNING, "S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, " @@ -622,24 +635,24 @@ int S3Storage::copyObject(const string& _sourceKey, const string& _destKey) { getIAMRoleFromMetadataEC2(); getCredentialsFromMetadataEC2(); - ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); + ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); } else if (!IAMrole.empty()) { - ms3_assume_role(creds); + ms3_assume_role(conn->conn); } sleep(5); } - } while (s3err && (!skipRetryableErrors && retryable_error(s3err))); + } while (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate); if (s3err) { // added the add'l check MS3_ERR_NOT_FOUND to suppress error msgs for a legitimate case in IOC::copyFile() - if (ms3_server_error(creds) && s3err != MS3_ERR_NOT_FOUND) + if (ms3_server_error(conn->conn) && s3err != MS3_ERR_NOT_FOUND) logger->log(LOG_ERR, "S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, " "destkey = %s.", - ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str()); + ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str(), destKey.c_str()); else if (s3err != MS3_ERR_NOT_FOUND) logger->log(LOG_ERR, "S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, " @@ -668,7 +681,7 @@ int S3Storage::exists(const string& _key, bool* out) string existsKey = prefix + _key; uint8_t s3err; ms3_status_st status; - ms3_st* creds = getConnection(); + auto creds = getConnection(); if (!creds) { logger->log(LOG_ERR, @@ -680,14 +693,14 @@ int S3Storage::exists(const string& _key, bool* out) do { - s3err = ms3_status(creds, bucket.c_str(), existsKey.c_str(), &status); - if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err))) + s3err = ms3_status(creds->conn, bucket.c_str(), existsKey.c_str(), &status); + if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !creds->terminate) { - if (ms3_server_error(creds)) + if (ms3_server_error(creds->conn)) logger->log(LOG_WARNING, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s." " Retrying...", - ms3_server_error(creds), bucket.c_str(), existsKey.c_str()); + ms3_server_error(creds->conn), bucket.c_str(), existsKey.c_str()); else logger->log(LOG_WARNING, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s. Retrying...", @@ -696,21 +709,21 @@ int S3Storage::exists(const string& _key, bool* out) { getIAMRoleFromMetadataEC2(); getCredentialsFromMetadataEC2(); - ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); + ms3_ec2_set_cred(creds->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); } else if (!IAMrole.empty()) { - ms3_assume_role(creds); + ms3_assume_role(creds->conn); } sleep(5); } - } while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err))); + } while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !creds->terminate); if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND) { - if (ms3_server_error(creds)) + if (ms3_server_error(creds->conn)) logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s.", - ms3_server_error(creds), bucket.c_str(), existsKey.c_str()); + ms3_server_error(creds->conn), bucket.c_str(), existsKey.c_str()); else logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s.", s3err_msgs[s3err], bucket.c_str(), existsKey.c_str()); @@ -722,7 +735,7 @@ int S3Storage::exists(const string& _key, bool* out) return 0; } -ms3_st* S3Storage::getConnection() +shared_ptr S3Storage::getConnection() { boost::unique_lock s(connMutex); @@ -731,12 +744,12 @@ ms3_st* S3Storage::getConnection() clock_gettime(CLOCK_MONOTONIC_COARSE, &now); while (!freeConns.empty()) { - Connection& back = freeConns.back(); - if (back.idleSince.tv_sec + maxIdleSecs <= now.tv_sec) + auto& back = freeConns.back(); + if (back->touchedAt.tv_sec + maxIdleSecs <= now.tv_sec) { - ms3_deinit(back.conn); + ms3_deinit(back->conn); // connMutexes.erase(back.conn); - back.conn = NULL; + back->conn = nullptr; freeConns.pop_back(); } else @@ -744,43 +757,54 @@ ms3_st* S3Storage::getConnection() } // get a connection - ms3_st* ret = NULL; uint8_t res = 0; if (freeConns.empty()) { - ret = ms3_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? NULL : endpoint.c_str())); + auto ret = make_shared(nextConnId++); + ret->conn = ms3_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? nullptr : endpoint.c_str())); // Something went wrong with libmarias3 init - if (ret == NULL) + if (ret->conn == nullptr) { logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_init returned NULL, no specific info to report"); + return nullptr; } // Set option for use http instead of https if (useHTTP) { - ms3_set_option(ret, MS3_OPT_USE_HTTP, NULL); + ms3_set_option(ret->conn, MS3_OPT_USE_HTTP, nullptr); } // Set option to disable SSL Verification if (!sslVerify) { - ms3_set_option(ret, MS3_OPT_DISABLE_SSL_VERIFY, NULL); + ms3_set_option(ret->conn, MS3_OPT_DISABLE_SSL_VERIFY, nullptr); } // Port number is not 0 so it was set by cnf file if (portNumber != 0) { - ms3_set_option(ret, MS3_OPT_PORT_NUMBER, &portNumber); + ms3_set_option(ret->conn, MS3_OPT_PORT_NUMBER, &portNumber); + } + + // timeouts + if (connectTimeout.has_value()) + { + ms3_set_option(ret->conn, MS3_OPT_CONNECT_TIMEOUT, &connectTimeout.value()); + } + if (operationTimeout.has_value()) + { + ms3_set_option(ret->conn, MS3_OPT_TIMEOUT, &operationTimeout.value()); } // IAM role setup for keys if (!IAMrole.empty()) { if (isEC2Instance) { - res = ms3_ec2_set_cred(ret, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); + res = ms3_ec2_set_cred(ret->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str()); } else { - res = ms3_init_assume_role(ret, (IAMrole.empty() ? NULL : IAMrole.c_str()), - (STSendpoint.empty() ? NULL : STSendpoint.c_str()), - (STSregion.empty() ? NULL : STSregion.c_str())); + res = ms3_init_assume_role(ret->conn, (IAMrole.empty() ? nullptr : IAMrole.c_str()), + (STSendpoint.empty() ? nullptr : STSendpoint.c_str()), + (STSregion.empty() ? nullptr : STSregion.c_str())); } if (res) @@ -791,34 +815,69 @@ ms3_st* S3Storage::getConnection() "aws_access_key_id, aws_secret_access_key values. Also check sts_region and sts_endpoint " "if configured.", IAMrole.c_str()); - if (ms3_server_error(ret)) + if (ms3_server_error(ret->conn)) logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_error: server says '%s' role name = %s", - ms3_server_error(ret), IAMrole.c_str()); - ms3_deinit(ret); - ret = NULL; + ms3_server_error(ret->conn), IAMrole.c_str()); + ms3_deinit(ret->conn); + ret->conn = nullptr; } } // assert(connMutexes[ret].try_lock()); - s.unlock(); + if (ret->conn == nullptr) + { + s.unlock(); + return {}; + } + clock_gettime(CLOCK_MONOTONIC_COARSE, &ret->touchedAt); + usedConns.try_emplace(ret->id, ret); return ret; } - assert(freeConns.front().idleSince.tv_sec + maxIdleSecs > now.tv_sec); - ret = freeConns.front().conn; + assert(freeConns.front()->touchedAt.tv_sec + maxIdleSecs > now.tv_sec); + auto ret = freeConns.front(); + clock_gettime(CLOCK_MONOTONIC_COARSE, &ret->touchedAt); freeConns.pop_front(); // assert(connMutexes[ret].try_lock()); return ret; } -void S3Storage::returnConnection(ms3_st* ms3) +void S3Storage::returnConnection(std::shared_ptr conn) { - assert(ms3); - Connection conn; - conn.conn = ms3; - clock_gettime(CLOCK_MONOTONIC_COARSE, &conn.idleSince); + assert(conn); + assert(conn->conn); + clock_gettime(CLOCK_MONOTONIC_COARSE, &conn->touchedAt); boost::unique_lock s(connMutex); + usedConns.erase(conn->id); + conn->terminate = false; + conn->id = nextConnId++; freeConns.push_front(conn); // connMutexes[ms3].unlock(); } +vector S3Storage::taskList() const +{ + boost::unique_lock s(connMutex); + vector ret; + ret.reserve(usedConns.size()); + timespec now; + clock_gettime(CLOCK_MONOTONIC_COARSE, &now); + for (const auto& [id, conn]: usedConns) { + double elapsed = (double)(now.tv_sec - conn->touchedAt.tv_sec) + 1.e-9 * (now.tv_nsec - conn->touchedAt.tv_nsec);; + ret.emplace_back(id, elapsed); + } + s.unlock(); + return ret; +} + +bool S3Storage::killTask(uint64_t id) +{ + boost::unique_lock s(connMutex); + if (auto it = usedConns.find(id); it != usedConns.end()) + { + it->second->terminate = true; + return true; + } + return false; +} + } // namespace storagemanager diff --git a/storage-manager/src/S3Storage.h b/storage-manager/src/S3Storage.h index 98d4b43f8..96e412edf 100644 --- a/storage-manager/src/S3Storage.h +++ b/storage-manager/src/S3Storage.h @@ -21,6 +21,7 @@ #include #include #include +#include #include "CloudStorage.h" #include "libmarias3/marias3.h" #include "Config.h" @@ -31,24 +32,28 @@ namespace storagemanager class S3Storage : public CloudStorage { public: - S3Storage(bool skipRetry = false); + explicit S3Storage(bool skipRetry = false); - virtual ~S3Storage(); + ~S3Storage() override; - int getObject(const std::string& sourceKey, const std::string& destFile, size_t* size = NULL); - int getObject(const std::string& sourceKey, std::shared_ptr* data, size_t* size = NULL); - int putObject(const std::string& sourceFile, const std::string& destKey); - int putObject(const std::shared_ptr data, size_t len, const std::string& destKey); - int deleteObject(const std::string& key); - int copyObject(const std::string& sourceKey, const std::string& destKey); - int exists(const std::string& key, bool* out); + int getObject(const std::string& sourceKey, const std::string& destFile, size_t* size = NULL) override; + int getObject(const std::string& sourceKey, std::shared_ptr* data, size_t* size = NULL) override; + int putObject(const std::string& sourceFile, const std::string& destKey) override; + int putObject(const std::shared_ptr data, size_t len, const std::string& destKey) override; + int deleteObject(const std::string& key) override; + int copyObject(const std::string& sourceKey, const std::string& destKey) override; + int exists(const std::string& key, bool* out) override; + + std::vector taskList() const override; + bool killTask(uint64_t task_id) override; private: + struct Connection; bool getIAMRoleFromMetadataEC2(); bool getCredentialsFromMetadataEC2(); void testConnectivityAndPerms(); - ms3_st* getConnection(); - void returnConnection(ms3_st*); + std::shared_ptr getConnection(); + void returnConnection(std::shared_ptr conn); bool skipRetryableErrors; @@ -67,25 +72,32 @@ class S3Storage : public CloudStorage bool useHTTP; bool sslVerify; int portNumber; + std::optional connectTimeout; + std::optional operationTimeout; struct Connection { - ms3_st* conn; - timespec idleSince; + Connection(uint64_t id): id(id) {} + uint64_t id; + ms3_st* conn{nullptr}; + timespec touchedAt{}; + bool terminate{false}; }; struct ScopedConnection { - ScopedConnection(S3Storage*, ms3_st*); + ScopedConnection(S3Storage*, std::shared_ptr); ~ScopedConnection(); S3Storage* s3; - ms3_st* conn; + std::shared_ptr conn; }; // for sanity checking // std::map connMutexes; - boost::mutex connMutex; - std::deque freeConns; // using this as a stack to keep lru objects together + mutable boost::mutex connMutex; + std::deque> freeConns; // using this as a stack to keep lru objects together + std::unordered_map> usedConns; // using this for displaying and killing tasks + uint64_t nextConnId = 0; const time_t maxIdleSecs = 30; }; diff --git a/storage-manager/src/TerminateIOTask.cpp b/storage-manager/src/TerminateIOTask.cpp new file mode 100644 index 000000000..0fe6f7e42 --- /dev/null +++ b/storage-manager/src/TerminateIOTask.cpp @@ -0,0 +1,72 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include "TerminateIOTask.h" +#include "messageFormat.h" +#include "src/CloudStorage.h" +#include + +namespace storagemanager +{ + +TerminateIOTask::TerminateIOTask(int sock, uint len) : PosixTask(sock, len) +{ +} + +bool TerminateIOTask::run() +{ + bool success; + uint8_t buf[sizeof(terminate_iotask_cmd)]; + int err; + + if (getLength() != sizeof(terminate_iotask_cmd)) + { + handleError("TerminateIOTask read", EBADMSG); + return true; + } + + err = read(buf, getLength()); + if (err < 0) + { + handleError("TerminateIOTask read", errno); + return false; + } + + auto* cmd = reinterpret_cast(buf); + + auto* cs = CloudStorage::get(); + success = cs->killTask(cmd->id); + + if (success) + { + sm_response resp; + resp.returnCode = 0; + success = write(resp, 0); + if (!success) + { + handleError("TerminateIOTask write", errno); + return false; + } + } + else + { + handleError("TerminateIOTask", ENOENT); + } + return true; +} + +} // namespace storagemanager diff --git a/storage-manager/src/TerminateIOTask.h b/storage-manager/src/TerminateIOTask.h new file mode 100644 index 000000000..105e7adac --- /dev/null +++ b/storage-manager/src/TerminateIOTask.h @@ -0,0 +1,35 @@ +/* Copyright (C) 2024 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include "PosixTask.h" + +namespace storagemanager +{ + +class TerminateIOTask : public PosixTask +{ + public: + TerminateIOTask() = delete; + TerminateIOTask(int sock, uint length); + ~TerminateIOTask() override = default; + + bool run() override; +}; + +} // namespace storagemanager diff --git a/storage-manager/src/smkill.cpp b/storage-manager/src/smkill.cpp new file mode 100644 index 000000000..ededf709b --- /dev/null +++ b/storage-manager/src/smkill.cpp @@ -0,0 +1,107 @@ +/* Copyright (C) 2019 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "IOCoordinator.h" +#include "messageFormat.h" +#include "SMComm.h" + +using namespace std; +using namespace storagemanager; + +void usage(const char* progname) +{ + cerr << progname << " kills S3 network operations by StorageManager" << endl; + cerr << "Usage: " << progname << " |all" << endl; +} + +bool SMOnline() +{ + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strcpy(&addr.sun_path[1], &socket_name[1]); // first char is null... + int clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0); + int err = ::connect(clientSocket, (const struct sockaddr*)&addr, sizeof(addr)); + if (err >= 0) + { + ::close(clientSocket); + return true; + } + return false; +} + +int main(int argc, char** argv) +{ + if (argc != 2) + { + usage(argv[0]); + return EXIT_FAILURE; + } + + string arg{argv[1]}; + uint64_t id = 0; + bool all = false; + if (arg == "all") + { + all = true; + } + else + { + id = stoll(arg); + } + + if (!SMOnline()) + { + cerr << "StorageManager is offline" << endl; + return EXIT_FAILURE; + } + + auto* sm = idbdatafile::SMComm::get(); + if (!all) + { + auto ret = sm->killIOTask(id); + if (ret < 0) + { + cerr << strerror(errno) << endl; + } + return ret < 0 ? EXIT_FAILURE : EXIT_SUCCESS; + } + + // iterate over ids + vector entries; + if (sm->listIOTasks(&entries) != 0) + { + cerr << "Couldn't get list of IO tasks" << endl; + return EXIT_FAILURE; + } + + for (const auto& entry: entries) + { + sm->killIOTask(entry.id); + } + + return EXIT_SUCCESS; +} diff --git a/storage-manager/src/smps.cpp b/storage-manager/src/smps.cpp new file mode 100644 index 000000000..8b2fec6c8 --- /dev/null +++ b/storage-manager/src/smps.cpp @@ -0,0 +1,126 @@ +/* Copyright (C) 2019 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "IOCoordinator.h" +#include "messageFormat.h" +#include "SMComm.h" + +using namespace std; +using namespace storagemanager; + +void usage(const char* progname) +{ + cerr << progname << " shows S3 network operations by StorageManager" << endl; + cerr << "Usage: " << progname << " [options]" << endl; + cerr << "Options:" << endl; + cerr << " -n - sort by id" << endl; + cerr << " -t - sort by running time" << endl; + cerr << " -r - reverse order" << endl; +} + +bool SMOnline() +{ + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strcpy(&addr.sun_path[1], &socket_name[1]); // first char is null... + int clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0); + int err = ::connect(clientSocket, (const struct sockaddr*)&addr, sizeof(addr)); + if (err >= 0) + { + ::close(clientSocket); + return true; + } + return false; +} + +bool byId(const list_iotask_resp_entry& lhs, const list_iotask_resp_entry& rhs) +{ + return lhs.id < rhs.id; +} + +bool byTime(const list_iotask_resp_entry& lhs, const list_iotask_resp_entry& rhs) +{ + return lhs.runningTime < rhs.runningTime; +} + +int main(int argc, char** argv) +{ + bool reverseSort = false; + function sortPred; + int c; + while ((c = getopt(argc, argv, "nrth")) != -1) + { + switch (c) + { + case 'n': + sortPred = byId; + break; + case 't': + sortPred = byTime; + break; + case 'r': + reverseSort = true; + break; + default: + usage(argv[0]); + return c == 'h' ? EXIT_SUCCESS : EXIT_FAILURE; + } + } + + if (!SMOnline()) + { + cerr << "StorageManager is offline" << endl; + return EXIT_FAILURE; + } + + auto* sm = idbdatafile::SMComm::get(); + vector entries; + if (sm->listIOTasks(&entries) != 0) + { + cerr << "Couldn't get list of IO tasks" << endl; + return EXIT_FAILURE; + } + + if (sortPred) + { + sort(entries.begin(), entries.end(), sortPred); + } + if (reverseSort) + { + reverse(entries.begin(), entries.end()); + } + cout << setw(24) << left << "Id" << "Time" << endl; + for (const auto& entry: entries) + { + cout << setw(24) << left << entry.id << setw(12) << right << fixed << entry.runningTime << endl; + } + + return EXIT_SUCCESS; +} diff --git a/storage-manager/storagemanager.cnf.in b/storage-manager/storagemanager.cnf.in index 532213022..04347aced 100644 --- a/storage-manager/storagemanager.cnf.in +++ b/storage-manager/storagemanager.cnf.in @@ -141,6 +141,16 @@ bucket = some_bucket # Default is libs3_debug = disabled # libs3_debug = disabled +# Sets the maximum time in seconds for the connection phase to take. This +# timeout only limits the connection phase, it has no impact once the connection +# is established. The default value indicating that the default libcurl +# timeout (300 seconds?) will be used. +# connect_timeout = 5.5 + +# Sets the maximum time in seconds for the entire transfer operation to take. +# Default (no value) - no timeout at all. +# timeout = 7.5 + # The LocalStorage section configures the 'local storage' module # if specified by ObjectStorage/service. [LocalStorage] diff --git a/utils/cloudio/SMComm.cpp b/utils/cloudio/SMComm.cpp index e2e9b3a93..a8d7d1557 100644 --- a/utils/cloudio/SMComm.cpp +++ b/utils/cloudio/SMComm.cpp @@ -16,6 +16,7 @@ MA 02110-1301, USA. */ #include "SMComm.h" +#include "bytestream.h" #include "messageFormat.h" using namespace std; @@ -277,4 +278,45 @@ int SMComm::copyFile(const string& file1, const string& file2) common_exit(command, response, err); } +int SMComm::listIOTasks(vector* entries) +{ + ByteStream* command = buffers.getByteStream(); + ByteStream* response = buffers.getByteStream(); + ssize_t err; + + *command << (uint8_t)storagemanager::LIST_IOTASKS; + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + + uint32_t numElements; + entries->clear(); + *response >> numElements; + entries->reserve(numElements); + while (numElements > 0) + { + storagemanager::list_iotask_resp_entry entry; + *response >> entry.id >> entry.runningTime; + entries->push_back(std::move(entry)); + --numElements; + } + + common_exit(command, response, err); +} + +int SMComm::killIOTask(uint64_t id) +{ + ByteStream* command = buffers.getByteStream(); + ByteStream* response = buffers.getByteStream(); + ssize_t err; + + *command << (uint8_t)storagemanager::TERMINATE_IOTASK << id; + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + common_exit(command, response, err); +} + } // namespace idbdatafile diff --git a/utils/cloudio/SMComm.h b/utils/cloudio/SMComm.h index 71d984b24..0470d01d6 100644 --- a/utils/cloudio/SMComm.h +++ b/utils/cloudio/SMComm.h @@ -23,6 +23,11 @@ #include "bytestream.h" #include "bytestreampool.h" +namespace storagemanager +{ + struct list_iotask_resp_entry; +} + namespace idbdatafile { class SMComm : public boost::noncopyable @@ -61,6 +66,9 @@ class SMComm : public boost::noncopyable int copyFile(const std::string& file1, const std::string& file2); + int listIOTasks(std::vector* entries); + int killIOTask(uint64_t id); + virtual ~SMComm(); private: diff --git a/utils/libmarias3/libmarias3 b/utils/libmarias3/libmarias3 index bce1ac8da..f74150b05 160000 --- a/utils/libmarias3/libmarias3 +++ b/utils/libmarias3/libmarias3 @@ -1 +1 @@ -Subproject commit bce1ac8da0847420ff4cf1489c44c3911d5f1f59 +Subproject commit f74150b05693440d35f93c43e2d2411cc66fee19