You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
feat(SM): MCOL-5785 S3Storage improvements
Update libmarias3 fix build with the recent libmarias3 feat(SM): MCOL-5785 Add timeout options for S3Storage In some unfortunate situations StorageManager may get stuck on network operations. This commit adds the ability to set network timeouts which will help to ensure that the system is more responsive. feat(SM): MCOL-5785 Add smps & smkill tools * `smps` shows all active S3 network operations * `smkill` terminates S3 network operations NB! At the moment smkill is able to terminate operations that are stuck on retries, but not hang inside the libcurl call. In other words if you want to terminate all operations you should configure `connect_timeout` & `timeout` Install smkill & smps Add install for new binaries
This commit is contained in:
committed by
Leonid Fedorov
parent
91dd3abb0a
commit
4aa281645e
@ -22,10 +22,13 @@
|
||||
#include <fcntl.h>
|
||||
#include <sys/types.h>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/thread/lock_types.hpp>
|
||||
#include <ctime>
|
||||
#include <iostream>
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
#include <boost/uuid/uuid_io.hpp>
|
||||
#include <boost/uuid/random_generator.hpp>
|
||||
#include <sstream>
|
||||
#define BOOST_SPIRIT_THREADSAFE
|
||||
#ifndef __clang__
|
||||
#pragma GCC diagnostic push
|
||||
@ -99,7 +102,7 @@ const char* s3err_msgs[] = {"All is well",
|
||||
"Authentication failed, token has expired",
|
||||
"Configured bucket does not match endpoint location"};
|
||||
|
||||
S3Storage::ScopedConnection::ScopedConnection(S3Storage* s, ms3_st* m) : s3(s), conn(m)
|
||||
S3Storage::ScopedConnection::ScopedConnection(S3Storage* s, shared_ptr<S3Storage::Connection> m) : s3(s), conn(m)
|
||||
{
|
||||
assert(conn);
|
||||
}
|
||||
@ -131,6 +134,8 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry)
|
||||
string ssl_verify = tolower(config->getValue("S3", "ssl_verify"));
|
||||
string port_number = config->getValue("S3", "port_number");
|
||||
string libs3_debug = config->getValue("S3", "libs3_debug");
|
||||
string connect_timeout = config->getValue("S3", "connect_timeout");
|
||||
string timeout = config->getValue("S3", "timeout");
|
||||
|
||||
bool keyMissing = false;
|
||||
isEC2Instance = false;
|
||||
@ -210,11 +215,19 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry)
|
||||
}
|
||||
|
||||
endpoint = config->getValue("S3", "endpoint");
|
||||
if (!connect_timeout.empty())
|
||||
{
|
||||
connectTimeout = stof(connect_timeout);
|
||||
}
|
||||
if (!timeout.empty())
|
||||
{
|
||||
operationTimeout = stof(timeout);
|
||||
}
|
||||
|
||||
ms3_library_init();
|
||||
if (libs3_debug == "enabled")
|
||||
{
|
||||
ms3_debug();
|
||||
ms3_debug(1);
|
||||
}
|
||||
testConnectivityAndPerms();
|
||||
}
|
||||
@ -222,7 +235,7 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry)
|
||||
S3Storage::~S3Storage()
|
||||
{
|
||||
for (auto& conn : freeConns)
|
||||
ms3_deinit(conn.conn);
|
||||
ms3_deinit(conn->conn);
|
||||
ms3_library_deinit();
|
||||
}
|
||||
|
||||
@ -236,7 +249,7 @@ S3Storage::~S3Storage()
|
||||
|
||||
bool S3Storage::getIAMRoleFromMetadataEC2()
|
||||
{
|
||||
CURL* curl = NULL;
|
||||
CURL* curl = nullptr;
|
||||
CURLcode curl_res;
|
||||
string readBuffer;
|
||||
string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/";
|
||||
@ -258,7 +271,7 @@ bool S3Storage::getIAMRoleFromMetadataEC2()
|
||||
|
||||
bool S3Storage::getCredentialsFromMetadataEC2()
|
||||
{
|
||||
CURL* curl = NULL;
|
||||
CURL* curl = nullptr;
|
||||
CURLcode curl_res;
|
||||
std::string readBuffer;
|
||||
string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/" + IAMrole;
|
||||
@ -361,29 +374,29 @@ int S3Storage::getObject(const string& _sourceKey, std::shared_ptr<uint8_t[]>* d
|
||||
{
|
||||
uint8_t err;
|
||||
size_t len = 0;
|
||||
uint8_t* _data = NULL;
|
||||
uint8_t* _data = nullptr;
|
||||
string sourceKey = prefix + _sourceKey;
|
||||
|
||||
ms3_st* creds = getConnection();
|
||||
if (!creds)
|
||||
auto conn = getConnection();
|
||||
if (!conn)
|
||||
{
|
||||
logger->log(LOG_ERR,
|
||||
"S3Storage::getObject(): failed to GET, S3Storage::getConnection() returned NULL on init");
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
ScopedConnection sc(this, creds);
|
||||
ScopedConnection sc(this, conn);
|
||||
|
||||
do
|
||||
{
|
||||
err = ms3_get(creds, bucket.c_str(), sourceKey.c_str(), &_data, &len);
|
||||
if (err && (!skipRetryableErrors && retryable_error(err)))
|
||||
err = ms3_get(conn->conn, bucket.c_str(), sourceKey.c_str(), &_data, &len);
|
||||
if (err && (!skipRetryableErrors && retryable_error(err)) && !conn->terminate)
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
if (ms3_server_error(conn->conn))
|
||||
logger->log(LOG_WARNING,
|
||||
"S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s."
|
||||
" Retrying...",
|
||||
ms3_server_error(creds), bucket.c_str(), sourceKey.c_str());
|
||||
ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str());
|
||||
else
|
||||
logger->log(LOG_WARNING,
|
||||
"S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
@ -392,20 +405,20 @@ int S3Storage::getObject(const string& _sourceKey, std::shared_ptr<uint8_t[]>* d
|
||||
{
|
||||
getIAMRoleFromMetadataEC2();
|
||||
getCredentialsFromMetadataEC2();
|
||||
ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
||||
ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
||||
}
|
||||
else if (!IAMrole.empty())
|
||||
{
|
||||
ms3_assume_role(creds);
|
||||
ms3_assume_role(conn->conn);
|
||||
}
|
||||
sleep(5);
|
||||
}
|
||||
} while (err && (!skipRetryableErrors && retryable_error(err)));
|
||||
} while (err && (!skipRetryableErrors && retryable_error(err)) && !conn->terminate);
|
||||
if (err)
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
if (ms3_server_error(conn->conn))
|
||||
logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s.",
|
||||
ms3_server_error(creds), bucket.c_str(), sourceKey.c_str());
|
||||
ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s.",
|
||||
s3err_msgs[err], bucket.c_str(), sourceKey.c_str());
|
||||
@ -474,26 +487,26 @@ int S3Storage::putObject(const std::shared_ptr<uint8_t[]> data, size_t len, cons
|
||||
{
|
||||
string destKey = prefix + _destKey;
|
||||
uint8_t s3err;
|
||||
ms3_st* creds = getConnection();
|
||||
if (!creds)
|
||||
auto conn = getConnection();
|
||||
if (!conn)
|
||||
{
|
||||
logger->log(LOG_ERR,
|
||||
"S3Storage::putObject(): failed to PUT, S3Storage::getConnection() returned NULL on init");
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
ScopedConnection sc(this, creds);
|
||||
ScopedConnection sc(this, conn);
|
||||
|
||||
do
|
||||
{
|
||||
s3err = ms3_put(creds, bucket.c_str(), destKey.c_str(), data.get(), len);
|
||||
if (s3err && (!skipRetryableErrors && retryable_error(s3err)))
|
||||
s3err = ms3_put(conn->conn, bucket.c_str(), destKey.c_str(), data.get(), len);
|
||||
if (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate)
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
if (ms3_server_error(conn->conn))
|
||||
logger->log(LOG_WARNING,
|
||||
"S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s."
|
||||
" Retrying...",
|
||||
ms3_server_error(creds), bucket.c_str(), destKey.c_str());
|
||||
ms3_server_error(conn->conn), bucket.c_str(), destKey.c_str());
|
||||
else
|
||||
logger->log(LOG_WARNING,
|
||||
"S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s."
|
||||
@ -503,20 +516,20 @@ int S3Storage::putObject(const std::shared_ptr<uint8_t[]> data, size_t len, cons
|
||||
{
|
||||
getIAMRoleFromMetadataEC2();
|
||||
getCredentialsFromMetadataEC2();
|
||||
ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
||||
ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
||||
}
|
||||
else if (!IAMrole.empty())
|
||||
{
|
||||
ms3_assume_role(creds);
|
||||
ms3_assume_role(conn->conn);
|
||||
}
|
||||
sleep(5);
|
||||
}
|
||||
} while (s3err && (!skipRetryableErrors && retryable_error(s3err)));
|
||||
} while (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate);
|
||||
if (s3err)
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
if (ms3_server_error(conn->conn))
|
||||
logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s.",
|
||||
ms3_server_error(creds), bucket.c_str(), destKey.c_str());
|
||||
ms3_server_error(conn->conn), bucket.c_str(), destKey.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s.",
|
||||
s3err_msgs[s3err], bucket.c_str(), destKey.c_str());
|
||||
@ -535,8 +548,8 @@ int S3Storage::deleteObject(const string& _key)
|
||||
{
|
||||
uint8_t s3err;
|
||||
string deleteKey = prefix + _key;
|
||||
ms3_st* creds = getConnection();
|
||||
if (!creds)
|
||||
auto conn = getConnection();
|
||||
if (!conn)
|
||||
{
|
||||
logger->log(
|
||||
LOG_ERR,
|
||||
@ -544,18 +557,18 @@ int S3Storage::deleteObject(const string& _key)
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
ScopedConnection sc(this, creds);
|
||||
ScopedConnection sc(this, conn);
|
||||
|
||||
do
|
||||
{
|
||||
s3err = ms3_delete(creds, bucket.c_str(), deleteKey.c_str());
|
||||
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)))
|
||||
s3err = ms3_delete(conn->conn, bucket.c_str(), deleteKey.c_str());
|
||||
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate)
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
if (ms3_server_error(conn->conn))
|
||||
logger->log(LOG_WARNING,
|
||||
"S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s."
|
||||
" Retrying...",
|
||||
ms3_server_error(creds), bucket.c_str(), deleteKey.c_str());
|
||||
ms3_server_error(conn->conn), bucket.c_str(), deleteKey.c_str());
|
||||
else
|
||||
logger->log(
|
||||
LOG_WARNING,
|
||||
@ -565,22 +578,22 @@ int S3Storage::deleteObject(const string& _key)
|
||||
{
|
||||
getIAMRoleFromMetadataEC2();
|
||||
getCredentialsFromMetadataEC2();
|
||||
ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
||||
ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
||||
}
|
||||
else if (!IAMrole.empty())
|
||||
{
|
||||
ms3_assume_role(creds);
|
||||
ms3_assume_role(conn->conn);
|
||||
}
|
||||
sleep(5);
|
||||
}
|
||||
} while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)));
|
||||
} while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate);
|
||||
|
||||
if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
if (ms3_server_error(conn->conn))
|
||||
logger->log(LOG_ERR,
|
||||
"S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s.",
|
||||
ms3_server_error(creds), bucket.c_str(), deleteKey.c_str());
|
||||
ms3_server_error(conn->conn), bucket.c_str(), deleteKey.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s.",
|
||||
s3err_msgs[s3err], bucket.c_str(), deleteKey.c_str());
|
||||
@ -593,26 +606,26 @@ int S3Storage::copyObject(const string& _sourceKey, const string& _destKey)
|
||||
{
|
||||
string sourceKey = prefix + _sourceKey, destKey = prefix + _destKey;
|
||||
uint8_t s3err;
|
||||
ms3_st* creds = getConnection();
|
||||
if (!creds)
|
||||
auto conn = getConnection();
|
||||
if (!conn)
|
||||
{
|
||||
logger->log(LOG_ERR,
|
||||
"S3Storage::copyObject(): failed to copy, S3Storage::getConnection() returned NULL on init");
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
ScopedConnection sc(this, creds);
|
||||
ScopedConnection sc(this, conn);
|
||||
|
||||
do
|
||||
{
|
||||
s3err = ms3_copy(creds, bucket.c_str(), sourceKey.c_str(), bucket.c_str(), destKey.c_str());
|
||||
if (s3err && (!skipRetryableErrors && retryable_error(s3err)))
|
||||
s3err = ms3_copy(conn->conn, bucket.c_str(), sourceKey.c_str(), bucket.c_str(), destKey.c_str());
|
||||
if (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate)
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
if (ms3_server_error(conn->conn))
|
||||
logger->log(LOG_WARNING,
|
||||
"S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
|
||||
"destkey = %s. Retrying...",
|
||||
ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
||||
ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
||||
else
|
||||
logger->log(LOG_WARNING,
|
||||
"S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
|
||||
@ -622,24 +635,24 @@ int S3Storage::copyObject(const string& _sourceKey, const string& _destKey)
|
||||
{
|
||||
getIAMRoleFromMetadataEC2();
|
||||
getCredentialsFromMetadataEC2();
|
||||
ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
||||
ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
||||
}
|
||||
else if (!IAMrole.empty())
|
||||
{
|
||||
ms3_assume_role(creds);
|
||||
ms3_assume_role(conn->conn);
|
||||
}
|
||||
sleep(5);
|
||||
}
|
||||
} while (s3err && (!skipRetryableErrors && retryable_error(s3err)));
|
||||
} while (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate);
|
||||
|
||||
if (s3err)
|
||||
{
|
||||
// added the add'l check MS3_ERR_NOT_FOUND to suppress error msgs for a legitimate case in IOC::copyFile()
|
||||
if (ms3_server_error(creds) && s3err != MS3_ERR_NOT_FOUND)
|
||||
if (ms3_server_error(conn->conn) && s3err != MS3_ERR_NOT_FOUND)
|
||||
logger->log(LOG_ERR,
|
||||
"S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
|
||||
"destkey = %s.",
|
||||
ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
||||
ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
||||
else if (s3err != MS3_ERR_NOT_FOUND)
|
||||
logger->log(LOG_ERR,
|
||||
"S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
|
||||
@ -668,7 +681,7 @@ int S3Storage::exists(const string& _key, bool* out)
|
||||
string existsKey = prefix + _key;
|
||||
uint8_t s3err;
|
||||
ms3_status_st status;
|
||||
ms3_st* creds = getConnection();
|
||||
auto creds = getConnection();
|
||||
if (!creds)
|
||||
{
|
||||
logger->log(LOG_ERR,
|
||||
@ -680,14 +693,14 @@ int S3Storage::exists(const string& _key, bool* out)
|
||||
|
||||
do
|
||||
{
|
||||
s3err = ms3_status(creds, bucket.c_str(), existsKey.c_str(), &status);
|
||||
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)))
|
||||
s3err = ms3_status(creds->conn, bucket.c_str(), existsKey.c_str(), &status);
|
||||
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !creds->terminate)
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
if (ms3_server_error(creds->conn))
|
||||
logger->log(LOG_WARNING,
|
||||
"S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s."
|
||||
" Retrying...",
|
||||
ms3_server_error(creds), bucket.c_str(), existsKey.c_str());
|
||||
ms3_server_error(creds->conn), bucket.c_str(), existsKey.c_str());
|
||||
else
|
||||
logger->log(LOG_WARNING,
|
||||
"S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
@ -696,21 +709,21 @@ int S3Storage::exists(const string& _key, bool* out)
|
||||
{
|
||||
getIAMRoleFromMetadataEC2();
|
||||
getCredentialsFromMetadataEC2();
|
||||
ms3_ec2_set_cred(creds, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
||||
ms3_ec2_set_cred(creds->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
||||
}
|
||||
else if (!IAMrole.empty())
|
||||
{
|
||||
ms3_assume_role(creds);
|
||||
ms3_assume_role(creds->conn);
|
||||
}
|
||||
sleep(5);
|
||||
}
|
||||
} while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)));
|
||||
} while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !creds->terminate);
|
||||
|
||||
if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
if (ms3_server_error(creds->conn))
|
||||
logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s.",
|
||||
ms3_server_error(creds), bucket.c_str(), existsKey.c_str());
|
||||
ms3_server_error(creds->conn), bucket.c_str(), existsKey.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s.",
|
||||
s3err_msgs[s3err], bucket.c_str(), existsKey.c_str());
|
||||
@ -722,7 +735,7 @@ int S3Storage::exists(const string& _key, bool* out)
|
||||
return 0;
|
||||
}
|
||||
|
||||
ms3_st* S3Storage::getConnection()
|
||||
shared_ptr<S3Storage::Connection> S3Storage::getConnection()
|
||||
{
|
||||
boost::unique_lock<boost::mutex> s(connMutex);
|
||||
|
||||
@ -731,12 +744,12 @@ ms3_st* S3Storage::getConnection()
|
||||
clock_gettime(CLOCK_MONOTONIC_COARSE, &now);
|
||||
while (!freeConns.empty())
|
||||
{
|
||||
Connection& back = freeConns.back();
|
||||
if (back.idleSince.tv_sec + maxIdleSecs <= now.tv_sec)
|
||||
auto& back = freeConns.back();
|
||||
if (back->touchedAt.tv_sec + maxIdleSecs <= now.tv_sec)
|
||||
{
|
||||
ms3_deinit(back.conn);
|
||||
ms3_deinit(back->conn);
|
||||
// connMutexes.erase(back.conn);
|
||||
back.conn = NULL;
|
||||
back->conn = nullptr;
|
||||
freeConns.pop_back();
|
||||
}
|
||||
else
|
||||
@ -744,43 +757,54 @@ ms3_st* S3Storage::getConnection()
|
||||
}
|
||||
|
||||
// get a connection
|
||||
ms3_st* ret = NULL;
|
||||
uint8_t res = 0;
|
||||
if (freeConns.empty())
|
||||
{
|
||||
ret = ms3_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? NULL : endpoint.c_str()));
|
||||
auto ret = make_shared<S3Storage::Connection>(nextConnId++);
|
||||
ret->conn = ms3_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? nullptr : endpoint.c_str()));
|
||||
// Something went wrong with libmarias3 init
|
||||
if (ret == NULL)
|
||||
if (ret->conn == nullptr)
|
||||
{
|
||||
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_init returned NULL, no specific info to report");
|
||||
return nullptr;
|
||||
}
|
||||
// Set option for use http instead of https
|
||||
if (useHTTP)
|
||||
{
|
||||
ms3_set_option(ret, MS3_OPT_USE_HTTP, NULL);
|
||||
ms3_set_option(ret->conn, MS3_OPT_USE_HTTP, nullptr);
|
||||
}
|
||||
// Set option to disable SSL Verification
|
||||
if (!sslVerify)
|
||||
{
|
||||
ms3_set_option(ret, MS3_OPT_DISABLE_SSL_VERIFY, NULL);
|
||||
ms3_set_option(ret->conn, MS3_OPT_DISABLE_SSL_VERIFY, nullptr);
|
||||
}
|
||||
// Port number is not 0 so it was set by cnf file
|
||||
if (portNumber != 0)
|
||||
{
|
||||
ms3_set_option(ret, MS3_OPT_PORT_NUMBER, &portNumber);
|
||||
ms3_set_option(ret->conn, MS3_OPT_PORT_NUMBER, &portNumber);
|
||||
}
|
||||
|
||||
// timeouts
|
||||
if (connectTimeout.has_value())
|
||||
{
|
||||
ms3_set_option(ret->conn, MS3_OPT_CONNECT_TIMEOUT, &connectTimeout.value());
|
||||
}
|
||||
if (operationTimeout.has_value())
|
||||
{
|
||||
ms3_set_option(ret->conn, MS3_OPT_TIMEOUT, &operationTimeout.value());
|
||||
}
|
||||
// IAM role setup for keys
|
||||
if (!IAMrole.empty())
|
||||
{
|
||||
if (isEC2Instance)
|
||||
{
|
||||
res = ms3_ec2_set_cred(ret, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
||||
res = ms3_ec2_set_cred(ret->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
res = ms3_init_assume_role(ret, (IAMrole.empty() ? NULL : IAMrole.c_str()),
|
||||
(STSendpoint.empty() ? NULL : STSendpoint.c_str()),
|
||||
(STSregion.empty() ? NULL : STSregion.c_str()));
|
||||
res = ms3_init_assume_role(ret->conn, (IAMrole.empty() ? nullptr : IAMrole.c_str()),
|
||||
(STSendpoint.empty() ? nullptr : STSendpoint.c_str()),
|
||||
(STSregion.empty() ? nullptr : STSregion.c_str()));
|
||||
}
|
||||
|
||||
if (res)
|
||||
@ -791,34 +815,69 @@ ms3_st* S3Storage::getConnection()
|
||||
"aws_access_key_id, aws_secret_access_key values. Also check sts_region and sts_endpoint "
|
||||
"if configured.",
|
||||
IAMrole.c_str());
|
||||
if (ms3_server_error(ret))
|
||||
if (ms3_server_error(ret->conn))
|
||||
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_error: server says '%s' role name = %s",
|
||||
ms3_server_error(ret), IAMrole.c_str());
|
||||
ms3_deinit(ret);
|
||||
ret = NULL;
|
||||
ms3_server_error(ret->conn), IAMrole.c_str());
|
||||
ms3_deinit(ret->conn);
|
||||
ret->conn = nullptr;
|
||||
}
|
||||
}
|
||||
// assert(connMutexes[ret].try_lock());
|
||||
s.unlock();
|
||||
if (ret->conn == nullptr)
|
||||
{
|
||||
s.unlock();
|
||||
return {};
|
||||
}
|
||||
clock_gettime(CLOCK_MONOTONIC_COARSE, &ret->touchedAt);
|
||||
usedConns.try_emplace(ret->id, ret);
|
||||
return ret;
|
||||
}
|
||||
assert(freeConns.front().idleSince.tv_sec + maxIdleSecs > now.tv_sec);
|
||||
ret = freeConns.front().conn;
|
||||
assert(freeConns.front()->touchedAt.tv_sec + maxIdleSecs > now.tv_sec);
|
||||
auto ret = freeConns.front();
|
||||
clock_gettime(CLOCK_MONOTONIC_COARSE, &ret->touchedAt);
|
||||
freeConns.pop_front();
|
||||
// assert(connMutexes[ret].try_lock());
|
||||
return ret;
|
||||
}
|
||||
|
||||
void S3Storage::returnConnection(ms3_st* ms3)
|
||||
void S3Storage::returnConnection(std::shared_ptr<S3Storage::Connection> conn)
|
||||
{
|
||||
assert(ms3);
|
||||
Connection conn;
|
||||
conn.conn = ms3;
|
||||
clock_gettime(CLOCK_MONOTONIC_COARSE, &conn.idleSince);
|
||||
assert(conn);
|
||||
assert(conn->conn);
|
||||
clock_gettime(CLOCK_MONOTONIC_COARSE, &conn->touchedAt);
|
||||
|
||||
boost::unique_lock<boost::mutex> s(connMutex);
|
||||
usedConns.erase(conn->id);
|
||||
conn->terminate = false;
|
||||
conn->id = nextConnId++;
|
||||
freeConns.push_front(conn);
|
||||
// connMutexes[ms3].unlock();
|
||||
}
|
||||
|
||||
vector<S3Storage::IOTaskData> S3Storage::taskList() const
|
||||
{
|
||||
boost::unique_lock<boost::mutex> s(connMutex);
|
||||
vector<IOTaskData> ret;
|
||||
ret.reserve(usedConns.size());
|
||||
timespec now;
|
||||
clock_gettime(CLOCK_MONOTONIC_COARSE, &now);
|
||||
for (const auto& [id, conn]: usedConns) {
|
||||
double elapsed = (double)(now.tv_sec - conn->touchedAt.tv_sec) + 1.e-9 * (now.tv_nsec - conn->touchedAt.tv_nsec);;
|
||||
ret.emplace_back(id, elapsed);
|
||||
}
|
||||
s.unlock();
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool S3Storage::killTask(uint64_t id)
|
||||
{
|
||||
boost::unique_lock<boost::mutex> s(connMutex);
|
||||
if (auto it = usedConns.find(id); it != usedConns.end())
|
||||
{
|
||||
it->second->terminate = true;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace storagemanager
|
||||
|
Reference in New Issue
Block a user