1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00

feat(SM): MCOL-5785 Add timeout options for S3Storage (#3265)

* Update libmarias3

fix build with the recent libmarias3

* feat(SM): MCOL-5785 Add timeout options for S3Storage

    In some unfortunate situations StorageManager may get stuck on
    network operations. This commit adds the ability to set network
    timeouts which will help to ensure that the system is more
    responsive.

* feat(SM): MCOL-5785 Add smps & smkill tools

    * `smps` shows all active S3 network operations
    * `smkill` terminates S3 network operations

    NB! At the moment smkill is able to terminate operations
    that are stuck on retries, but not hang inside the libcurl
    call. In other words if you want to terminate all operations
    you should configure `connect_timeout` & `timeout`
---------

Co-authored-by: Leonid Fedorov <leonid.fedorov@mariadb.com>
This commit is contained in:
Alexey Antipovsky 2024-08-21 17:38:49 +02:00 committed by GitHub
parent f36ca611eb
commit c22409760f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 776 additions and 111 deletions

View File

@ -46,7 +46,9 @@ usr/bin/reset_locks
usr/bin/rollback
usr/bin/save_brm
usr/bin/smcat
usr/bin/smkill
usr/bin/smls
usr/bin/smps
usr/bin/smput
usr/bin/smrm
usr/bin/testS3Connection

View File

@ -36,6 +36,8 @@ set(storagemanager_SRCS
src/Ownership.cpp
src/PrefixCache.cpp
src/SyncTask.cpp
src/ListIOTask.cpp
src/TerminateIOTask.cpp
../utils/common/crashtrace.cpp
)
@ -141,12 +143,24 @@ target_link_libraries(smrm storagemanager cloudio
${ENGINE_EXEC_LIBS}
)
add_executable(smps src/smps.cpp)
target_link_libraries(smps storagemanager cloudio
${ENGINE_LDFLAGS}
${ENGINE_EXEC_LIBS}
)
add_executable(smkill src/smkill.cpp)
target_link_libraries(smkill storagemanager cloudio
${ENGINE_LDFLAGS}
${ENGINE_EXEC_LIBS}
)
install(TARGETS storagemanager
LIBRARY DESTINATION ${ENGINE_LIBDIR}
COMPONENT columnstore-engine
)
install(TARGETS StorageManager smcat smput smls smrm testS3Connection
install(TARGETS StorageManager smcat smkill smps smput smls smrm testS3Connection
RUNTIME DESTINATION ${ENGINE_BINDIR}
COMPONENT columnstore-engine
)

View File

@ -84,7 +84,9 @@ enum Opcodes
LIST_DIRECTORY,
PING,
COPY,
SYNC
SYNC,
LIST_IOTASKS,
TERMINATE_IOTASK
};
/*
@ -300,6 +302,46 @@ struct copy_cmd
// use f_name as an overlay at the end of file1 to get file2.
};
/*
LIST_IOTASKS
------------
command format:
1-byte opcode
response format:
4-byte num elements|(8-byte id|double runningTime) * num elements
*/
struct list_iotask_cmd
{
uint8_t opcode;
};
struct list_iotask_resp_entry
{
uint64_t id;
double runningTime;
};
struct list_iotask_resp
{
uint32_t elements;
list_iotask_resp_entry entries[];
};
/*
TERMINATE_IOTASK
----------------
command format:
1-byte opcode|8-byte id
response format:
*/
struct terminate_iotask_cmd
{
uint8_t opcode;
uint64_t id;
};
#pragma pack(pop)
} // namespace storagemanager

View File

@ -99,4 +99,14 @@ void CloudStorage::printKPIs() const
cout << "\texistenceChecks = " << existenceChecks << endl;
}
vector<CloudStorage::IOTaskData> CloudStorage::taskList() const
{
return {};
}
bool CloudStorage::killTask(uint64_t)
{
return false;
}
} // namespace storagemanager

View File

@ -39,6 +39,14 @@ class CloudStorage
virtual void printKPIs() const;
struct IOTaskData
{
uint64_t id;
double runningTime;
};
virtual std::vector<IOTaskData> taskList() const;
virtual bool killTask(uint64_t task_id);
// this will return a CloudStorage instance of the type specified in StorageManager.cnf
static CloudStorage* get();

View File

@ -0,0 +1,80 @@
/* Copyright (C) 2024 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "ListIOTask.h"
#include "SMLogging.h"
#include "messageFormat.h"
#include "src/CloudStorage.h"
#include <errno.h>
#include <string.h>
namespace storagemanager
{
ListIOTask::ListIOTask(int sock, uint len) : PosixTask(sock, len)
{
}
bool ListIOTask::run()
{
bool success;
uint8_t buf[1];
int err;
if (getLength() > 1)
{
handleError("ListIOTask read", E2BIG);
return true;
}
// consume the msg
err = read(buf, getLength());
if (err < 0)
{
handleError("ListIOTask read", errno);
return false;
}
auto* cs = CloudStorage::get();
auto taskList = cs->taskList();
size_t payloadLen = sizeof(list_iotask_resp) + sizeof(list_iotask_resp_entry) * taskList.size();
size_t headerLen = sizeof(sm_response);
std::vector<uint8_t> payloadBuf(payloadLen + headerLen);
auto* resp = reinterpret_cast<sm_response*>(payloadBuf.data());
resp->header.type = SM_MSG_START;
resp->header.payloadLen = payloadLen + headerLen - sizeof(sm_msg_header);
resp->header.flags = 0;
resp->returnCode = 0;
auto* r = reinterpret_cast<list_iotask_resp*>(resp->payload);
r->elements = taskList.size();
for (size_t i = 0; i < taskList.size(); ++i)
{
r->entries[i].id = taskList[i].id;
r->entries[i].runningTime = taskList[i].runningTime;
}
success = write(payloadBuf.data(), payloadBuf.size());
if (!success)
{
handleError("ListIOTask read", errno);
return false;
}
return true;
}
} // namespace storagemanager

View File

@ -0,0 +1,34 @@
/* Copyright (C) 2024 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include "PosixTask.h"
namespace storagemanager
{
class ListIOTask : public PosixTask
{
public:
ListIOTask() = delete;
ListIOTask(int sock, uint length);
~ListIOTask() override = default;
bool run() override;
};
} // namespace storagemanager

View File

@ -25,10 +25,12 @@
#include "AppendTask.h"
#include "CopyTask.h"
#include "ListDirectoryTask.h"
#include "ListIOTask.h"
#include "OpenTask.h"
#include "PingTask.h"
#include "ReadTask.h"
#include "StatTask.h"
#include "TerminateIOTask.h"
#include "TruncateTask.h"
#include "UnlinkTask.h"
#include "WriteTask.h"
@ -93,6 +95,8 @@ void ProcessTask::operator()()
case PING: task.reset(new PingTask(sock, length)); break;
case SYNC: task.reset(new SyncTask(sock, length)); break;
case COPY: task.reset(new CopyTask(sock, length)); break;
case LIST_IOTASKS: task.reset(new ListIOTask(sock, length)); break;
case TERMINATE_IOTASK: task.reset(new TerminateIOTask(sock, length)); break;
default: throw runtime_error("ProcessTask: got an unknown opcode");
}
task->primeBuffer();

View File

@ -22,10 +22,13 @@
#include <fcntl.h>
#include <sys/types.h>
#include <boost/filesystem.hpp>
#include <boost/thread/lock_types.hpp>
#include <ctime>
#include <iostream>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/uuid/random_generator.hpp>
#include <sstream>
#define BOOST_SPIRIT_THREADSAFE
#ifndef __clang__
#pragma GCC diagnostic push
@ -99,7 +102,7 @@ const char* s3err_msgs[] = {"All is well",
"Authentication failed, token has expired",
"Configured bucket does not match endpoint location"};
S3Storage::ScopedConnection::ScopedConnection(S3Storage* s, ms3_st* m) : s3(s), conn(m)
S3Storage::ScopedConnection::ScopedConnection(S3Storage* s, shared_ptr<S3Storage::Connection> m) : s3(s), conn(m)
{
assert(conn);
}
@ -131,6 +134,8 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry)
string ssl_verify = tolower(config->getValue("S3", "ssl_verify"));
string port_number = config->getValue("S3", "port_number");
string libs3_debug = config->getValue("S3", "libs3_debug");
string connect_timeout = config->getValue("S3", "connect_timeout");
string timeout = config->getValue("S3", "timeout");
bool keyMissing = false;
isEC2Instance = false;
@ -210,11 +215,19 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry)
}
endpoint = config->getValue("S3", "endpoint");
if (!connect_timeout.empty())
{
connectTimeout = stof(connect_timeout);
}
if (!timeout.empty())
{
operationTimeout = stof(timeout);
}
ms3_library_init();
if (libs3_debug == "enabled")
{
ms3_debug();
ms3_debug(1);
}
testConnectivityAndPerms();
}
@ -222,7 +235,7 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry)
S3Storage::~S3Storage()
{
for (auto& conn : freeConns)
ms3_deinit(conn.conn);
ms3_deinit(conn->conn);
ms3_library_deinit();
}
@ -236,7 +249,7 @@ S3Storage::~S3Storage()
bool S3Storage::getIAMRoleFromMetadataEC2()
{
CURL* curl = NULL;
CURL* curl = nullptr;
CURLcode curl_res;
string readBuffer;
string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/";
@ -258,7 +271,7 @@ bool S3Storage::getIAMRoleFromMetadataEC2()
bool S3Storage::getCredentialsFromMetadataEC2()
{
CURL* curl = NULL;
CURL* curl = nullptr;
CURLcode curl_res;
std::string readBuffer;
string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/" + IAMrole;
@ -361,29 +374,29 @@ int S3Storage::getObject(const string& _sourceKey, std::shared_ptr<uint8_t[]>* d
{
uint8_t err;
size_t len = 0;
uint8_t* _data = NULL;
uint8_t* _data = nullptr;
string sourceKey = prefix + _sourceKey;
ms3_st* creds = getConnection();
if (!creds)
auto conn = getConnection();
if (!conn)
{
logger->log(LOG_ERR,
"S3Storage::getObject(): failed to GET, S3Storage::getConnection() returned NULL on init");
errno = EINVAL;
return -1;
}
ScopedConnection sc(this, creds);
ScopedConnection sc(this, conn);
do
{
err = ms3_get(creds, bucket.c_str(), sourceKey.c_str(), &_data, &len);
if (err && (!skipRetryableErrors && retryable_error(err)))
err = ms3_get(conn->conn, bucket.c_str(), sourceKey.c_str(), &_data, &len);
if (err && (!skipRetryableErrors && retryable_error(err)) && !conn->terminate)
{
if (ms3_server_error(creds))
if (ms3_server_error(conn->conn))
logger->log(LOG_WARNING,
"S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s."
" Retrying...",
ms3_server_error(creds), bucket.c_str(), sourceKey.c_str());
ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str());
else
logger->log(LOG_WARNING,
"S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s. Retrying...",
@ -392,20 +405,20 @@ int S3Storage::getObject(const string& _sourceKey, std::shared_ptr<uint8_t[]>* d
{
getIAMRoleFromMetadataEC2();
getCredentialsFromMetadataEC2();
ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
}
else if (!IAMrole.empty())
{
ms3_assume_role(creds);
ms3_assume_role(conn->conn);
}
sleep(5);
}
} while (err && (!skipRetryableErrors && retryable_error(err)));
} while (err && (!skipRetryableErrors && retryable_error(err)) && !conn->terminate);
if (err)
{
if (ms3_server_error(creds))
if (ms3_server_error(conn->conn))
logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s.",
ms3_server_error(creds), bucket.c_str(), sourceKey.c_str());
ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str());
else
logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s.",
s3err_msgs[err], bucket.c_str(), sourceKey.c_str());
@ -474,26 +487,26 @@ int S3Storage::putObject(const std::shared_ptr<uint8_t[]> data, size_t len, cons
{
string destKey = prefix + _destKey;
uint8_t s3err;
ms3_st* creds = getConnection();
if (!creds)
auto conn = getConnection();
if (!conn)
{
logger->log(LOG_ERR,
"S3Storage::putObject(): failed to PUT, S3Storage::getConnection() returned NULL on init");
errno = EINVAL;
return -1;
}
ScopedConnection sc(this, creds);
ScopedConnection sc(this, conn);
do
{
s3err = ms3_put(creds, bucket.c_str(), destKey.c_str(), data.get(), len);
if (s3err && (!skipRetryableErrors && retryable_error(s3err)))
s3err = ms3_put(conn->conn, bucket.c_str(), destKey.c_str(), data.get(), len);
if (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate)
{
if (ms3_server_error(creds))
if (ms3_server_error(conn->conn))
logger->log(LOG_WARNING,
"S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s."
" Retrying...",
ms3_server_error(creds), bucket.c_str(), destKey.c_str());
ms3_server_error(conn->conn), bucket.c_str(), destKey.c_str());
else
logger->log(LOG_WARNING,
"S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s."
@ -503,20 +516,20 @@ int S3Storage::putObject(const std::shared_ptr<uint8_t[]> data, size_t len, cons
{
getIAMRoleFromMetadataEC2();
getCredentialsFromMetadataEC2();
ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
}
else if (!IAMrole.empty())
{
ms3_assume_role(creds);
ms3_assume_role(conn->conn);
}
sleep(5);
}
} while (s3err && (!skipRetryableErrors && retryable_error(s3err)));
} while (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate);
if (s3err)
{
if (ms3_server_error(creds))
if (ms3_server_error(conn->conn))
logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s.",
ms3_server_error(creds), bucket.c_str(), destKey.c_str());
ms3_server_error(conn->conn), bucket.c_str(), destKey.c_str());
else
logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s.",
s3err_msgs[s3err], bucket.c_str(), destKey.c_str());
@ -535,8 +548,8 @@ int S3Storage::deleteObject(const string& _key)
{
uint8_t s3err;
string deleteKey = prefix + _key;
ms3_st* creds = getConnection();
if (!creds)
auto conn = getConnection();
if (!conn)
{
logger->log(
LOG_ERR,
@ -544,18 +557,18 @@ int S3Storage::deleteObject(const string& _key)
errno = EINVAL;
return -1;
}
ScopedConnection sc(this, creds);
ScopedConnection sc(this, conn);
do
{
s3err = ms3_delete(creds, bucket.c_str(), deleteKey.c_str());
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)))
s3err = ms3_delete(conn->conn, bucket.c_str(), deleteKey.c_str());
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate)
{
if (ms3_server_error(creds))
if (ms3_server_error(conn->conn))
logger->log(LOG_WARNING,
"S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s."
" Retrying...",
ms3_server_error(creds), bucket.c_str(), deleteKey.c_str());
ms3_server_error(conn->conn), bucket.c_str(), deleteKey.c_str());
else
logger->log(
LOG_WARNING,
@ -565,22 +578,22 @@ int S3Storage::deleteObject(const string& _key)
{
getIAMRoleFromMetadataEC2();
getCredentialsFromMetadataEC2();
ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
}
else if (!IAMrole.empty())
{
ms3_assume_role(creds);
ms3_assume_role(conn->conn);
}
sleep(5);
}
} while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)));
} while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate);
if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
{
if (ms3_server_error(creds))
if (ms3_server_error(conn->conn))
logger->log(LOG_ERR,
"S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s.",
ms3_server_error(creds), bucket.c_str(), deleteKey.c_str());
ms3_server_error(conn->conn), bucket.c_str(), deleteKey.c_str());
else
logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s.",
s3err_msgs[s3err], bucket.c_str(), deleteKey.c_str());
@ -593,26 +606,26 @@ int S3Storage::copyObject(const string& _sourceKey, const string& _destKey)
{
string sourceKey = prefix + _sourceKey, destKey = prefix + _destKey;
uint8_t s3err;
ms3_st* creds = getConnection();
if (!creds)
auto conn = getConnection();
if (!conn)
{
logger->log(LOG_ERR,
"S3Storage::copyObject(): failed to copy, S3Storage::getConnection() returned NULL on init");
errno = EINVAL;
return -1;
}
ScopedConnection sc(this, creds);
ScopedConnection sc(this, conn);
do
{
s3err = ms3_copy(creds, bucket.c_str(), sourceKey.c_str(), bucket.c_str(), destKey.c_str());
if (s3err && (!skipRetryableErrors && retryable_error(s3err)))
s3err = ms3_copy(conn->conn, bucket.c_str(), sourceKey.c_str(), bucket.c_str(), destKey.c_str());
if (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate)
{
if (ms3_server_error(creds))
if (ms3_server_error(conn->conn))
logger->log(LOG_WARNING,
"S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
"destkey = %s. Retrying...",
ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
else
logger->log(LOG_WARNING,
"S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
@ -622,24 +635,24 @@ int S3Storage::copyObject(const string& _sourceKey, const string& _destKey)
{
getIAMRoleFromMetadataEC2();
getCredentialsFromMetadataEC2();
ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
}
else if (!IAMrole.empty())
{
ms3_assume_role(creds);
ms3_assume_role(conn->conn);
}
sleep(5);
}
} while (s3err && (!skipRetryableErrors && retryable_error(s3err)));
} while (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate);
if (s3err)
{
// added the add'l check MS3_ERR_NOT_FOUND to suppress error msgs for a legitimate case in IOC::copyFile()
if (ms3_server_error(creds) && s3err != MS3_ERR_NOT_FOUND)
if (ms3_server_error(conn->conn) && s3err != MS3_ERR_NOT_FOUND)
logger->log(LOG_ERR,
"S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
"destkey = %s.",
ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
else if (s3err != MS3_ERR_NOT_FOUND)
logger->log(LOG_ERR,
"S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
@ -668,7 +681,7 @@ int S3Storage::exists(const string& _key, bool* out)
string existsKey = prefix + _key;
uint8_t s3err;
ms3_status_st status;
ms3_st* creds = getConnection();
auto creds = getConnection();
if (!creds)
{
logger->log(LOG_ERR,
@ -680,14 +693,14 @@ int S3Storage::exists(const string& _key, bool* out)
do
{
s3err = ms3_status(creds, bucket.c_str(), existsKey.c_str(), &status);
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)))
s3err = ms3_status(creds->conn, bucket.c_str(), existsKey.c_str(), &status);
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !creds->terminate)
{
if (ms3_server_error(creds))
if (ms3_server_error(creds->conn))
logger->log(LOG_WARNING,
"S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s."
" Retrying...",
ms3_server_error(creds), bucket.c_str(), existsKey.c_str());
ms3_server_error(creds->conn), bucket.c_str(), existsKey.c_str());
else
logger->log(LOG_WARNING,
"S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s. Retrying...",
@ -696,21 +709,21 @@ int S3Storage::exists(const string& _key, bool* out)
{
getIAMRoleFromMetadataEC2();
getCredentialsFromMetadataEC2();
ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
ms3_ec2_set_cred(creds->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
}
else if (!IAMrole.empty())
{
ms3_assume_role(creds);
ms3_assume_role(creds->conn);
}
sleep(5);
}
} while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)));
} while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !creds->terminate);
if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
{
if (ms3_server_error(creds))
if (ms3_server_error(creds->conn))
logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s.",
ms3_server_error(creds), bucket.c_str(), existsKey.c_str());
ms3_server_error(creds->conn), bucket.c_str(), existsKey.c_str());
else
logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s.",
s3err_msgs[s3err], bucket.c_str(), existsKey.c_str());
@ -722,7 +735,7 @@ int S3Storage::exists(const string& _key, bool* out)
return 0;
}
ms3_st* S3Storage::getConnection()
shared_ptr<S3Storage::Connection> S3Storage::getConnection()
{
boost::unique_lock<boost::mutex> s(connMutex);
@ -731,12 +744,12 @@ ms3_st* S3Storage::getConnection()
clock_gettime(CLOCK_MONOTONIC_COARSE, &now);
while (!freeConns.empty())
{
Connection& back = freeConns.back();
if (back.idleSince.tv_sec + maxIdleSecs <= now.tv_sec)
auto& back = freeConns.back();
if (back->touchedAt.tv_sec + maxIdleSecs <= now.tv_sec)
{
ms3_deinit(back.conn);
ms3_deinit(back->conn);
// connMutexes.erase(back.conn);
back.conn = NULL;
back->conn = nullptr;
freeConns.pop_back();
}
else
@ -744,43 +757,54 @@ ms3_st* S3Storage::getConnection()
}
// get a connection
ms3_st* ret = NULL;
uint8_t res = 0;
if (freeConns.empty())
{
ret = ms3_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? NULL : endpoint.c_str()));
auto ret = make_shared<S3Storage::Connection>(nextConnId++);
ret->conn = ms3_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? nullptr : endpoint.c_str()));
// Something went wrong with libmarias3 init
if (ret == NULL)
if (ret->conn == nullptr)
{
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_init returned NULL, no specific info to report");
return nullptr;
}
// Set option for use http instead of https
if (useHTTP)
{
ms3_set_option(ret, MS3_OPT_USE_HTTP, NULL);
ms3_set_option(ret->conn, MS3_OPT_USE_HTTP, nullptr);
}
// Set option to disable SSL Verification
if (!sslVerify)
{
ms3_set_option(ret, MS3_OPT_DISABLE_SSL_VERIFY, NULL);
ms3_set_option(ret->conn, MS3_OPT_DISABLE_SSL_VERIFY, nullptr);
}
// Port number is not 0 so it was set by cnf file
if (portNumber != 0)
{
ms3_set_option(ret, MS3_OPT_PORT_NUMBER, &portNumber);
ms3_set_option(ret->conn, MS3_OPT_PORT_NUMBER, &portNumber);
}
// timeouts
if (connectTimeout.has_value())
{
ms3_set_option(ret->conn, MS3_OPT_CONNECT_TIMEOUT, &connectTimeout.value());
}
if (operationTimeout.has_value())
{
ms3_set_option(ret->conn, MS3_OPT_TIMEOUT, &operationTimeout.value());
}
// IAM role setup for keys
if (!IAMrole.empty())
{
if (isEC2Instance)
{
res = ms3_ec2_set_cred(ret, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
res = ms3_ec2_set_cred(ret->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
}
else
{
res = ms3_init_assume_role(ret, (IAMrole.empty() ? NULL : IAMrole.c_str()),
(STSendpoint.empty() ? NULL : STSendpoint.c_str()),
(STSregion.empty() ? NULL : STSregion.c_str()));
res = ms3_init_assume_role(ret->conn, (IAMrole.empty() ? nullptr : IAMrole.c_str()),
(STSendpoint.empty() ? nullptr : STSendpoint.c_str()),
(STSregion.empty() ? nullptr : STSregion.c_str()));
}
if (res)
@ -791,34 +815,69 @@ ms3_st* S3Storage::getConnection()
"aws_access_key_id, aws_secret_access_key values. Also check sts_region and sts_endpoint "
"if configured.",
IAMrole.c_str());
if (ms3_server_error(ret))
if (ms3_server_error(ret->conn))
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_error: server says '%s' role name = %s",
ms3_server_error(ret), IAMrole.c_str());
ms3_deinit(ret);
ret = NULL;
ms3_server_error(ret->conn), IAMrole.c_str());
ms3_deinit(ret->conn);
ret->conn = nullptr;
}
}
// assert(connMutexes[ret].try_lock());
s.unlock();
if (ret->conn == nullptr)
{
s.unlock();
return {};
}
clock_gettime(CLOCK_MONOTONIC_COARSE, &ret->touchedAt);
usedConns.try_emplace(ret->id, ret);
return ret;
}
assert(freeConns.front().idleSince.tv_sec + maxIdleSecs > now.tv_sec);
ret = freeConns.front().conn;
assert(freeConns.front()->touchedAt.tv_sec + maxIdleSecs > now.tv_sec);
auto ret = freeConns.front();
clock_gettime(CLOCK_MONOTONIC_COARSE, &ret->touchedAt);
freeConns.pop_front();
// assert(connMutexes[ret].try_lock());
return ret;
}
void S3Storage::returnConnection(ms3_st* ms3)
void S3Storage::returnConnection(std::shared_ptr<S3Storage::Connection> conn)
{
assert(ms3);
Connection conn;
conn.conn = ms3;
clock_gettime(CLOCK_MONOTONIC_COARSE, &conn.idleSince);
assert(conn);
assert(conn->conn);
clock_gettime(CLOCK_MONOTONIC_COARSE, &conn->touchedAt);
boost::unique_lock<boost::mutex> s(connMutex);
usedConns.erase(conn->id);
conn->terminate = false;
conn->id = nextConnId++;
freeConns.push_front(conn);
// connMutexes[ms3].unlock();
}
vector<S3Storage::IOTaskData> S3Storage::taskList() const
{
boost::unique_lock<boost::mutex> s(connMutex);
vector<IOTaskData> ret;
ret.reserve(usedConns.size());
timespec now;
clock_gettime(CLOCK_MONOTONIC_COARSE, &now);
for (const auto& [id, conn]: usedConns) {
double elapsed = (double)(now.tv_sec - conn->touchedAt.tv_sec) + 1.e-9 * (now.tv_nsec - conn->touchedAt.tv_nsec);;
ret.emplace_back(id, elapsed);
}
s.unlock();
return ret;
}
bool S3Storage::killTask(uint64_t id)
{
boost::unique_lock<boost::mutex> s(connMutex);
if (auto it = usedConns.find(id); it != usedConns.end())
{
it->second->terminate = true;
return true;
}
return false;
}
} // namespace storagemanager

View File

@ -21,6 +21,7 @@
#include <string>
#include <map>
#include <memory>
#include <optional>
#include "CloudStorage.h"
#include "libmarias3/marias3.h"
#include "Config.h"
@ -31,24 +32,28 @@ namespace storagemanager
class S3Storage : public CloudStorage
{
public:
S3Storage(bool skipRetry = false);
explicit S3Storage(bool skipRetry = false);
virtual ~S3Storage();
~S3Storage() override;
int getObject(const std::string& sourceKey, const std::string& destFile, size_t* size = NULL);
int getObject(const std::string& sourceKey, std::shared_ptr<uint8_t[]>* data, size_t* size = NULL);
int putObject(const std::string& sourceFile, const std::string& destKey);
int putObject(const std::shared_ptr<uint8_t[]> data, size_t len, const std::string& destKey);
int deleteObject(const std::string& key);
int copyObject(const std::string& sourceKey, const std::string& destKey);
int exists(const std::string& key, bool* out);
int getObject(const std::string& sourceKey, const std::string& destFile, size_t* size = NULL) override;
int getObject(const std::string& sourceKey, std::shared_ptr<uint8_t[]>* data, size_t* size = NULL) override;
int putObject(const std::string& sourceFile, const std::string& destKey) override;
int putObject(const std::shared_ptr<uint8_t[]> data, size_t len, const std::string& destKey) override;
int deleteObject(const std::string& key) override;
int copyObject(const std::string& sourceKey, const std::string& destKey) override;
int exists(const std::string& key, bool* out) override;
std::vector<IOTaskData> taskList() const override;
bool killTask(uint64_t task_id) override;
private:
struct Connection;
bool getIAMRoleFromMetadataEC2();
bool getCredentialsFromMetadataEC2();
void testConnectivityAndPerms();
ms3_st* getConnection();
void returnConnection(ms3_st*);
std::shared_ptr<Connection> getConnection();
void returnConnection(std::shared_ptr<Connection> conn);
bool skipRetryableErrors;
@ -67,25 +72,32 @@ class S3Storage : public CloudStorage
bool useHTTP;
bool sslVerify;
int portNumber;
std::optional<float> connectTimeout;
std::optional<float> operationTimeout;
struct Connection
{
ms3_st* conn;
timespec idleSince;
Connection(uint64_t id): id(id) {}
uint64_t id;
ms3_st* conn{nullptr};
timespec touchedAt{};
bool terminate{false};
};
struct ScopedConnection
{
ScopedConnection(S3Storage*, ms3_st*);
ScopedConnection(S3Storage*, std::shared_ptr<Connection>);
~ScopedConnection();
S3Storage* s3;
ms3_st* conn;
std::shared_ptr<Connection> conn;
};
// for sanity checking
// std::map<ms3_st *, boost::mutex> connMutexes;
boost::mutex connMutex;
std::deque<Connection> freeConns; // using this as a stack to keep lru objects together
mutable boost::mutex connMutex;
std::deque<std::shared_ptr<Connection>> freeConns; // using this as a stack to keep lru objects together
std::unordered_map<uint64_t, std::shared_ptr<Connection>> usedConns; // using this for displaying and killing tasks
uint64_t nextConnId = 0;
const time_t maxIdleSecs = 30;
};

View File

@ -0,0 +1,72 @@
/* Copyright (C) 2024 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "TerminateIOTask.h"
#include "messageFormat.h"
#include "src/CloudStorage.h"
#include <errno.h>
namespace storagemanager
{
TerminateIOTask::TerminateIOTask(int sock, uint len) : PosixTask(sock, len)
{
}
bool TerminateIOTask::run()
{
bool success;
uint8_t buf[sizeof(terminate_iotask_cmd)];
int err;
if (getLength() != sizeof(terminate_iotask_cmd))
{
handleError("TerminateIOTask read", EBADMSG);
return true;
}
err = read(buf, getLength());
if (err < 0)
{
handleError("TerminateIOTask read", errno);
return false;
}
auto* cmd = reinterpret_cast<terminate_iotask_cmd*>(buf);
auto* cs = CloudStorage::get();
success = cs->killTask(cmd->id);
if (success)
{
sm_response resp;
resp.returnCode = 0;
success = write(resp, 0);
if (!success)
{
handleError("TerminateIOTask write", errno);
return false;
}
}
else
{
handleError("TerminateIOTask", ENOENT);
}
return true;
}
} // namespace storagemanager

View File

@ -0,0 +1,35 @@
/* Copyright (C) 2024 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include "PosixTask.h"
namespace storagemanager
{
class TerminateIOTask : public PosixTask
{
public:
TerminateIOTask() = delete;
TerminateIOTask(int sock, uint length);
~TerminateIOTask() override = default;
bool run() override;
};
} // namespace storagemanager

View File

@ -0,0 +1,107 @@
/* Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <cstdlib>
#include <iostream>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <fcntl.h>
#include <time.h>
#include "IOCoordinator.h"
#include "messageFormat.h"
#include "SMComm.h"
using namespace std;
using namespace storagemanager;
void usage(const char* progname)
{
cerr << progname << " kills S3 network operations by StorageManager" << endl;
cerr << "Usage: " << progname << " <Id>|all" << endl;
}
bool SMOnline()
{
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strcpy(&addr.sun_path[1], &socket_name[1]); // first char is null...
int clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0);
int err = ::connect(clientSocket, (const struct sockaddr*)&addr, sizeof(addr));
if (err >= 0)
{
::close(clientSocket);
return true;
}
return false;
}
int main(int argc, char** argv)
{
if (argc != 2)
{
usage(argv[0]);
return EXIT_FAILURE;
}
string arg{argv[1]};
uint64_t id = 0;
bool all = false;
if (arg == "all")
{
all = true;
}
else
{
id = stoll(arg);
}
if (!SMOnline())
{
cerr << "StorageManager is offline" << endl;
return EXIT_FAILURE;
}
auto* sm = idbdatafile::SMComm::get();
if (!all)
{
auto ret = sm->killIOTask(id);
if (ret < 0)
{
cerr << strerror(errno) << endl;
}
return ret < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
}
// iterate over ids
vector<list_iotask_resp_entry> entries;
if (sm->listIOTasks(&entries) != 0)
{
cerr << "Couldn't get list of IO tasks" << endl;
return EXIT_FAILURE;
}
for (const auto& entry: entries)
{
sm->killIOTask(entry.id);
}
return EXIT_SUCCESS;
}

View File

@ -0,0 +1,126 @@
/* Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <algorithm>
#include <cstdlib>
#include <iostream>
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <fcntl.h>
#include <time.h>
#include <getopt.h>
#include "IOCoordinator.h"
#include "messageFormat.h"
#include "SMComm.h"
using namespace std;
using namespace storagemanager;
void usage(const char* progname)
{
cerr << progname << " shows S3 network operations by StorageManager" << endl;
cerr << "Usage: " << progname << " [options]" << endl;
cerr << "Options:" << endl;
cerr << " -n - sort by id" << endl;
cerr << " -t - sort by running time" << endl;
cerr << " -r - reverse order" << endl;
}
bool SMOnline()
{
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strcpy(&addr.sun_path[1], &socket_name[1]); // first char is null...
int clientSocket = ::socket(AF_UNIX, SOCK_STREAM, 0);
int err = ::connect(clientSocket, (const struct sockaddr*)&addr, sizeof(addr));
if (err >= 0)
{
::close(clientSocket);
return true;
}
return false;
}
bool byId(const list_iotask_resp_entry& lhs, const list_iotask_resp_entry& rhs)
{
return lhs.id < rhs.id;
}
bool byTime(const list_iotask_resp_entry& lhs, const list_iotask_resp_entry& rhs)
{
return lhs.runningTime < rhs.runningTime;
}
int main(int argc, char** argv)
{
bool reverseSort = false;
function<bool(const list_iotask_resp_entry&, const list_iotask_resp_entry&)> sortPred;
int c;
while ((c = getopt(argc, argv, "nrth")) != -1)
{
switch (c)
{
case 'n':
sortPred = byId;
break;
case 't':
sortPred = byTime;
break;
case 'r':
reverseSort = true;
break;
default:
usage(argv[0]);
return c == 'h' ? EXIT_SUCCESS : EXIT_FAILURE;
}
}
if (!SMOnline())
{
cerr << "StorageManager is offline" << endl;
return EXIT_FAILURE;
}
auto* sm = idbdatafile::SMComm::get();
vector<list_iotask_resp_entry> entries;
if (sm->listIOTasks(&entries) != 0)
{
cerr << "Couldn't get list of IO tasks" << endl;
return EXIT_FAILURE;
}
if (sortPred)
{
sort(entries.begin(), entries.end(), sortPred);
}
if (reverseSort)
{
reverse(entries.begin(), entries.end());
}
cout << setw(24) << left << "Id" << "Time" << endl;
for (const auto& entry: entries)
{
cout << setw(24) << left << entry.id << setw(12) << right << fixed << entry.runningTime << endl;
}
return EXIT_SUCCESS;
}

View File

@ -141,6 +141,16 @@ bucket = some_bucket
# Default is libs3_debug = disabled
# libs3_debug = disabled
# Sets the maximum time in seconds for the connection phase to take. This
# timeout only limits the connection phase, it has no impact once the connection
# is established. The default value indicating that the default libcurl
# timeout (300 seconds?) will be used.
# connect_timeout = 5.5
# Sets the maximum time in seconds for the entire transfer operation to take.
# Default (no value) - no timeout at all.
# timeout = 7.5
# The LocalStorage section configures the 'local storage' module
# if specified by ObjectStorage/service.
[LocalStorage]

View File

@ -16,6 +16,7 @@
MA 02110-1301, USA. */
#include "SMComm.h"
#include "bytestream.h"
#include "messageFormat.h"
using namespace std;
@ -277,4 +278,45 @@ int SMComm::copyFile(const string& file1, const string& file2)
common_exit(command, response, err);
}
int SMComm::listIOTasks(vector<storagemanager::list_iotask_resp_entry>* entries)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
*command << (uint8_t)storagemanager::LIST_IOTASKS;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
uint32_t numElements;
entries->clear();
*response >> numElements;
entries->reserve(numElements);
while (numElements > 0)
{
storagemanager::list_iotask_resp_entry entry;
*response >> entry.id >> entry.runningTime;
entries->push_back(std::move(entry));
--numElements;
}
common_exit(command, response, err);
}
int SMComm::killIOTask(uint64_t id)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
*command << (uint8_t)storagemanager::TERMINATE_IOTASK << id;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
} // namespace idbdatafile

View File

@ -23,6 +23,11 @@
#include "bytestream.h"
#include "bytestreampool.h"
namespace storagemanager
{
struct list_iotask_resp_entry;
}
namespace idbdatafile
{
class SMComm : public boost::noncopyable
@ -61,6 +66,9 @@ class SMComm : public boost::noncopyable
int copyFile(const std::string& file1, const std::string& file2);
int listIOTasks(std::vector<storagemanager::list_iotask_resp_entry>* entries);
int killIOTask(uint64_t id);
virtual ~SMComm();
private:

@ -1 +1 @@
Subproject commit bce1ac8da0847420ff4cf1489c44c3911d5f1f59
Subproject commit f74150b05693440d35f93c43e2d2411cc66fee19