You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-3976:AWS S3 support for IAM roles. Initial commit.
This commit is contained in:
@ -41,7 +41,8 @@ inline bool retryable_error(uint8_t s3err)
|
||||
s3err == MS3_ERR_REQUEST_ERROR ||
|
||||
s3err == MS3_ERR_OOM ||
|
||||
s3err == MS3_ERR_IMPOSSIBLE ||
|
||||
s3err == MS3_ERR_SERVER
|
||||
s3err == MS3_ERR_SERVER ||
|
||||
s3err == MS3_ERR_AUTH_ROLE
|
||||
);
|
||||
}
|
||||
|
||||
@ -101,6 +102,8 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry)
|
||||
" or setting aws_access_key_id and aws_secret_access_key in storagemanager.cnf";
|
||||
key = config->getValue("S3", "aws_access_key_id");
|
||||
secret = config->getValue("S3", "aws_secret_access_key");
|
||||
IAMrole = config->getValue("S3", "iam_role_name");
|
||||
STSendpoint = config->getValue("S3", "sts_endpoint");
|
||||
if (key.empty())
|
||||
{
|
||||
char *_key_id = getenv("AWS_ACCESS_KEY_ID");
|
||||
@ -121,7 +124,7 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry)
|
||||
}
|
||||
secret = _secret_id;
|
||||
}
|
||||
|
||||
|
||||
region = config->getValue("S3", "region");
|
||||
bucket = config->getValue("S3", "bucket");
|
||||
prefix = config->getValue("S3", "prefix");
|
||||
@ -135,7 +138,7 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry)
|
||||
endpoint = config->getValue("S3", "endpoint");
|
||||
|
||||
ms3_library_init();
|
||||
//ms3_debug(true);
|
||||
//ms3_debug();
|
||||
testConnectivityAndPerms();
|
||||
}
|
||||
|
||||
@ -239,6 +242,10 @@ int S3Storage::getObject(const string &_sourceKey, boost::shared_array<uint8_t>
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
s3err_msgs[err], bucket.c_str(), sourceKey.c_str());
|
||||
if(!IAMrole.empty())
|
||||
{
|
||||
ms3_assumeRole(creds);
|
||||
}
|
||||
sleep(5);
|
||||
}
|
||||
} while (err && (!skipRetryableErrors && retryable_error(err)));
|
||||
@ -329,6 +336,10 @@ int S3Storage::putObject(const boost::shared_array<uint8_t> data, size_t len, co
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s."
|
||||
" Retrying...", s3err_msgs[s3err], bucket.c_str(), destKey.c_str());
|
||||
if(!IAMrole.empty())
|
||||
{
|
||||
ms3_assumeRole(creds);
|
||||
}
|
||||
sleep(5);
|
||||
}
|
||||
} while (s3err && (!skipRetryableErrors && retryable_error(s3err)));
|
||||
@ -352,7 +363,7 @@ int S3Storage::deleteObject(const string &_key)
|
||||
string key = prefix + _key;
|
||||
ms3_st *creds = getConnection();
|
||||
ScopedConnection sc(this, creds);
|
||||
|
||||
|
||||
do {
|
||||
s3err = ms3_delete(creds, bucket.c_str(), key.c_str());
|
||||
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)))
|
||||
@ -363,6 +374,10 @@ int S3Storage::deleteObject(const string &_key)
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
s3err_msgs[s3err], bucket.c_str(), key.c_str());
|
||||
if(!IAMrole.empty())
|
||||
{
|
||||
ms3_assumeRole(creds);
|
||||
}
|
||||
sleep(5);
|
||||
}
|
||||
} while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)));
|
||||
@ -398,6 +413,10 @@ int S3Storage::copyObject(const string &_sourceKey, const string &_destKey)
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
|
||||
" destkey = %s. Retrying...", s3err_msgs[s3err], bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
||||
if(!IAMrole.empty())
|
||||
{
|
||||
ms3_assumeRole(creds);
|
||||
}
|
||||
sleep(5);
|
||||
}
|
||||
} while (s3err && (!skipRetryableErrors && retryable_error(s3err)));
|
||||
@ -447,6 +466,10 @@ int S3Storage::exists(const string &_key, bool *out)
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
s3err_msgs[s3err], bucket.c_str(), key.c_str());
|
||||
if(!IAMrole.empty())
|
||||
{
|
||||
ms3_assumeRole(creds);
|
||||
}
|
||||
sleep(5);
|
||||
}
|
||||
} while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)));
|
||||
@ -479,6 +502,7 @@ ms3_st * S3Storage::getConnection()
|
||||
Connection &back = freeConns.back();
|
||||
if (back.idleSince.tv_sec + maxIdleSecs <= now.tv_sec)
|
||||
{
|
||||
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_deinit");
|
||||
ms3_deinit(back.conn);
|
||||
//connMutexes.erase(back.conn);
|
||||
back.conn = NULL;
|
||||
@ -490,11 +514,26 @@ ms3_st * S3Storage::getConnection()
|
||||
|
||||
// get a connection
|
||||
ms3_st *ret = NULL;
|
||||
uint8_t res = 0;
|
||||
if (freeConns.empty())
|
||||
{
|
||||
ret = ms3_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? NULL : endpoint.c_str()));
|
||||
if (ret == NULL)
|
||||
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_init returned NULL, no specific info to report");
|
||||
if(!IAMrole.empty())
|
||||
{
|
||||
res = ms3_init_assume_role(ret, (IAMrole.empty() ? NULL : IAMrole.c_str()),
|
||||
(STSendpoint.empty() ? NULL : STSendpoint.c_str()));
|
||||
if (res)
|
||||
{
|
||||
// Something is wrong with the assume role so abort as if the ms3_init failed
|
||||
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_init_assume_role returned error.");
|
||||
if (ms3_server_error(ret))
|
||||
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_errror: server says '%s'. role name = %s", ms3_server_error(ret), IAMrole.c_str());
|
||||
ms3_deinit(ret);
|
||||
ret = NULL;
|
||||
}
|
||||
}
|
||||
//assert(connMutexes[ret].try_lock());
|
||||
s.unlock();
|
||||
return ret;
|
||||
|
@ -55,6 +55,8 @@ class S3Storage : public CloudStorage
|
||||
std::string key;
|
||||
std::string secret;
|
||||
std::string endpoint;
|
||||
std::string IAMrole;
|
||||
std::string STSendpoint;
|
||||
|
||||
struct Connection
|
||||
{
|
||||
|
Reference in New Issue
Block a user