You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
MCOL-4386: Create new StorageManager setting for EC2 using assigned IAM credentials.
This commit is contained in:
@ -27,12 +27,28 @@
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
#include <boost/uuid/uuid_io.hpp>
|
||||
#include <boost/uuid/random_generator.hpp>
|
||||
#define BOOST_SPIRIT_THREADSAFE
|
||||
#include <boost/property_tree/ptree.hpp>
|
||||
#include <boost/property_tree/json_parser.hpp>
|
||||
#include "Utilities.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace storagemanager
|
||||
{
|
||||
string tolower(const string &s)
|
||||
{
|
||||
string ret(s);
|
||||
for (uint i = 0; i < ret.length(); i++)
|
||||
ret[i] = ::tolower(ret[i]);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp)
|
||||
{
|
||||
((std::string*)userp)->append((char*)contents, size * nmemb);
|
||||
return size * nmemb;
|
||||
}
|
||||
|
||||
inline bool retryable_error(uint8_t s3err)
|
||||
{
|
||||
@ -41,6 +57,7 @@ inline bool retryable_error(uint8_t s3err)
|
||||
s3err == MS3_ERR_REQUEST_ERROR ||
|
||||
s3err == MS3_ERR_OOM ||
|
||||
s3err == MS3_ERR_IMPOSSIBLE ||
|
||||
s3err == MS3_ERR_AUTH ||
|
||||
s3err == MS3_ERR_SERVER ||
|
||||
s3err == MS3_ERR_AUTH_ROLE
|
||||
);
|
||||
@ -61,7 +78,8 @@ const int s3err_to_errno[] = {
|
||||
EKEYREJECTED, // 8 MS3_ERR_AUTH
|
||||
ENOENT, // 9 MS3_ERR_NOT_FOUND
|
||||
EPROTO, // 10 MS3_ERR_SERVER
|
||||
EMSGSIZE // 11 MS3_ERR_TOO_BIG
|
||||
EMSGSIZE, // 11 MS3_ERR_TOO_BIG
|
||||
EKEYREJECTED // 12 MS3_ERR_AUTH_ROLE
|
||||
};
|
||||
|
||||
const char *s3err_msgs[] = {
|
||||
@ -76,7 +94,8 @@ const char *s3err_msgs[] = {
|
||||
"Authentication failed",
|
||||
"Object not found",
|
||||
"Unknown error code in response",
|
||||
"Data to PUT is too large; 4GB maximum length"
|
||||
"Data to PUT is too large; 4GB maximum length",
|
||||
"Authentication failed, token has expired"
|
||||
};
|
||||
|
||||
|
||||
@ -97,33 +116,62 @@ S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry)
|
||||
Init an ms3_st object
|
||||
*/
|
||||
Config *config = Config::get();
|
||||
|
||||
const char *keyerr = "S3 access requires setting AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY env vars, "
|
||||
" or setting aws_access_key_id and aws_secret_access_key in storagemanager.cnf";
|
||||
" or setting aws_access_key_id and aws_secret_access_key, or configure an IAM role with ec2_iam_mode=enabled."
|
||||
" Check storagemanager.cnf file for more information.";
|
||||
key = config->getValue("S3", "aws_access_key_id");
|
||||
secret = config->getValue("S3", "aws_secret_access_key");
|
||||
IAMrole = config->getValue("S3", "iam_role_name");
|
||||
STSendpoint = config->getValue("S3", "sts_endpoint");
|
||||
STSregion = config->getValue("S3", "sts_region");
|
||||
string ec2_mode = tolower(config->getValue("S3", "ec2_iam_mode"));
|
||||
bool keyMissing = false;
|
||||
isEC2Instance = false;
|
||||
ec2iamEnabled = false;
|
||||
|
||||
if (ec2_mode == "enabled")
|
||||
{
|
||||
ec2iamEnabled = true;
|
||||
}
|
||||
|
||||
if (key.empty())
|
||||
{
|
||||
char *_key_id = getenv("AWS_ACCESS_KEY_ID");
|
||||
if (!_key_id)
|
||||
{
|
||||
logger->log(LOG_ERR, keyerr);
|
||||
throw runtime_error(keyerr);
|
||||
keyMissing = true;
|
||||
}
|
||||
key = _key_id;
|
||||
else
|
||||
key = _key_id;
|
||||
}
|
||||
if (secret.empty())
|
||||
{
|
||||
char *_secret_id = getenv("AWS_SECRET_ACCESS_KEY");
|
||||
if (!_secret_id)
|
||||
{
|
||||
keyMissing = true;
|
||||
}
|
||||
else
|
||||
secret = _secret_id;
|
||||
}
|
||||
|
||||
// Valid to not have keys configured if running on ec2 instance.
|
||||
// Attempt to get those credentials from instance.
|
||||
if (keyMissing)
|
||||
{
|
||||
if (ec2iamEnabled)
|
||||
{
|
||||
getIAMRoleFromMetadataEC2();
|
||||
}
|
||||
if (!IAMrole.empty() && getCredentialsFromMetadataEC2())
|
||||
{
|
||||
isEC2Instance = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
logger->log(LOG_ERR, keyerr);
|
||||
throw runtime_error(keyerr);
|
||||
}
|
||||
secret = _secret_id;
|
||||
}
|
||||
|
||||
region = config->getValue("S3", "region");
|
||||
@ -157,6 +205,56 @@ S3Storage::~S3Storage()
|
||||
throw runtime_error(msg); \
|
||||
}
|
||||
|
||||
|
||||
bool S3Storage::getIAMRoleFromMetadataEC2()
|
||||
{
|
||||
CURL *curl = NULL;
|
||||
CURLcode curl_res;
|
||||
string readBuffer;
|
||||
string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/";
|
||||
curl = curl_easy_init();
|
||||
curl_easy_setopt(curl, CURLOPT_URL, instanceMetadata.c_str());
|
||||
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
|
||||
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
|
||||
curl_res = curl_easy_perform(curl);
|
||||
if (curl_res != CURLE_OK)
|
||||
{
|
||||
logger->log(LOG_ERR, "CURL fail %u",curl_res);
|
||||
return false;
|
||||
}
|
||||
IAMrole = readBuffer;
|
||||
logger->log(LOG_INFO, "S3Storage: IAM Role Name = %s",IAMrole.c_str());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool S3Storage::getCredentialsFromMetadataEC2()
|
||||
{
|
||||
CURL *curl = NULL;
|
||||
CURLcode curl_res;
|
||||
std::string readBuffer;
|
||||
string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/" + IAMrole;
|
||||
curl = curl_easy_init();
|
||||
curl_easy_setopt(curl, CURLOPT_URL, instanceMetadata.c_str());
|
||||
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
|
||||
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
|
||||
curl_res = curl_easy_perform(curl);
|
||||
if (curl_res != CURLE_OK)
|
||||
{
|
||||
logger->log(LOG_ERR, "CURL fail %u",curl_res);
|
||||
return false;
|
||||
}
|
||||
stringstream credentials(readBuffer);
|
||||
boost::property_tree::ptree pt;
|
||||
boost::property_tree::read_json(credentials, pt);
|
||||
key = pt.get<string>("AccessKeyId");
|
||||
secret = pt.get<string>("SecretAccessKey");
|
||||
token = pt.get<string>("Token");
|
||||
logger->log(LOG_INFO, "S3Storage: key = %s secret = %s token = %s",key.c_str(),secret.c_str(),token.c_str());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void S3Storage::testConnectivityAndPerms()
|
||||
{
|
||||
boost::shared_array<uint8_t> testObj(new uint8_t[1]);
|
||||
@ -244,12 +342,18 @@ int S3Storage::getObject(const string &_sourceKey, boost::shared_array<uint8_t>
|
||||
if (err && (!skipRetryableErrors && retryable_error(err)))
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s."
|
||||
logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s."
|
||||
" Retrying...", ms3_server_error(creds), bucket.c_str(), sourceKey.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
s3err_msgs[err], bucket.c_str(), sourceKey.c_str());
|
||||
if(!IAMrole.empty())
|
||||
if (ec2iamEnabled)
|
||||
{
|
||||
getIAMRoleFromMetadataEC2();
|
||||
getCredentialsFromMetadataEC2();
|
||||
ms3_ec2_set_cred(creds,IAMrole.c_str(),key.c_str(),secret.c_str(),token.c_str());
|
||||
}
|
||||
else if(!IAMrole.empty())
|
||||
{
|
||||
ms3_assume_role(creds);
|
||||
}
|
||||
@ -344,12 +448,18 @@ int S3Storage::putObject(const boost::shared_array<uint8_t> data, size_t len, co
|
||||
if (s3err && (!skipRetryableErrors && retryable_error(s3err)))
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s."
|
||||
logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s."
|
||||
" Retrying...", ms3_server_error(creds), bucket.c_str(), destKey.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s."
|
||||
logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s."
|
||||
" Retrying...", s3err_msgs[s3err], bucket.c_str(), destKey.c_str());
|
||||
if(!IAMrole.empty())
|
||||
if (ec2iamEnabled)
|
||||
{
|
||||
getIAMRoleFromMetadataEC2();
|
||||
getCredentialsFromMetadataEC2();
|
||||
ms3_ec2_set_cred(creds,IAMrole.c_str(),key.c_str(),secret.c_str(),token.c_str());
|
||||
}
|
||||
else if(!IAMrole.empty())
|
||||
{
|
||||
ms3_assume_role(creds);
|
||||
}
|
||||
@ -373,7 +483,7 @@ int S3Storage::putObject(const boost::shared_array<uint8_t> data, size_t len, co
|
||||
int S3Storage::deleteObject(const string &_key)
|
||||
{
|
||||
uint8_t s3err;
|
||||
string key = prefix + _key;
|
||||
string deleteKey = prefix + _key;
|
||||
ms3_st *creds = getConnection();
|
||||
if (!creds)
|
||||
{
|
||||
@ -384,16 +494,22 @@ int S3Storage::deleteObject(const string &_key)
|
||||
ScopedConnection sc(this, creds);
|
||||
|
||||
do {
|
||||
s3err = ms3_delete(creds, bucket.c_str(), key.c_str());
|
||||
s3err = ms3_delete(creds, bucket.c_str(), deleteKey.c_str());
|
||||
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)))
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s."
|
||||
" Retrying...", ms3_server_error(creds), bucket.c_str(), key.c_str());
|
||||
logger->log(LOG_WARNING, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s."
|
||||
" Retrying...", ms3_server_error(creds), bucket.c_str(), deleteKey.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
s3err_msgs[s3err], bucket.c_str(), key.c_str());
|
||||
if(!IAMrole.empty())
|
||||
logger->log(LOG_WARNING, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
s3err_msgs[s3err], bucket.c_str(), deleteKey.c_str());
|
||||
if (ec2iamEnabled)
|
||||
{
|
||||
getIAMRoleFromMetadataEC2();
|
||||
getCredentialsFromMetadataEC2();
|
||||
ms3_ec2_set_cred(creds,IAMrole.c_str(),key.c_str(),secret.c_str(),token.c_str());
|
||||
}
|
||||
else if(!IAMrole.empty())
|
||||
{
|
||||
ms3_assume_role(creds);
|
||||
}
|
||||
@ -405,10 +521,10 @@ int S3Storage::deleteObject(const string &_key)
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s.",
|
||||
ms3_server_error(creds), bucket.c_str(), key.c_str());
|
||||
ms3_server_error(creds), bucket.c_str(), deleteKey.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s.",
|
||||
s3err_msgs[s3err], bucket.c_str(), key.c_str());
|
||||
s3err_msgs[s3err], bucket.c_str(), deleteKey.c_str());
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
@ -433,12 +549,18 @@ int S3Storage::copyObject(const string &_sourceKey, const string &_destKey)
|
||||
if (s3err && (!skipRetryableErrors && retryable_error(s3err)))
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
logger->log(LOG_ERR, "S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
|
||||
logger->log(LOG_WARNING, "S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
|
||||
"destkey = %s. Retrying...", ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
|
||||
logger->log(LOG_WARNING, "S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
|
||||
" destkey = %s. Retrying...", s3err_msgs[s3err], bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
||||
if(!IAMrole.empty())
|
||||
if (ec2iamEnabled)
|
||||
{
|
||||
getIAMRoleFromMetadataEC2();
|
||||
getCredentialsFromMetadataEC2();
|
||||
ms3_ec2_set_cred(creds,IAMrole.c_str(),key.c_str(),secret.c_str(),token.c_str());
|
||||
}
|
||||
else if(!IAMrole.empty())
|
||||
{
|
||||
ms3_assume_role(creds);
|
||||
}
|
||||
@ -475,7 +597,7 @@ int S3Storage::copyObject(const string &_sourceKey, const string &_destKey)
|
||||
|
||||
int S3Storage::exists(const string &_key, bool *out)
|
||||
{
|
||||
string key = prefix + _key;
|
||||
string existsKey = prefix + _key;
|
||||
uint8_t s3err;
|
||||
ms3_status_st status;
|
||||
ms3_st *creds = getConnection();
|
||||
@ -488,16 +610,22 @@ int S3Storage::exists(const string &_key, bool *out)
|
||||
ScopedConnection sc(this, creds);
|
||||
|
||||
do {
|
||||
s3err = ms3_status(creds, bucket.c_str(), key.c_str(), &status);
|
||||
s3err = ms3_status(creds, bucket.c_str(), existsKey.c_str(), &status);
|
||||
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)))
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s."
|
||||
" Retrying...", ms3_server_error(creds), bucket.c_str(), key.c_str());
|
||||
logger->log(LOG_WARNING, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s."
|
||||
" Retrying...", ms3_server_error(creds), bucket.c_str(), existsKey.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
s3err_msgs[s3err], bucket.c_str(), key.c_str());
|
||||
if(!IAMrole.empty())
|
||||
logger->log(LOG_WARNING, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
s3err_msgs[s3err], bucket.c_str(), existsKey.c_str());
|
||||
if (ec2iamEnabled)
|
||||
{
|
||||
getIAMRoleFromMetadataEC2();
|
||||
getCredentialsFromMetadataEC2();
|
||||
ms3_ec2_set_cred(creds,IAMrole.c_str(),key.c_str(),secret.c_str(),token.c_str());
|
||||
}
|
||||
else if(!IAMrole.empty())
|
||||
{
|
||||
ms3_assume_role(creds);
|
||||
}
|
||||
@ -509,10 +637,10 @@ int S3Storage::exists(const string &_key, bool *out)
|
||||
{
|
||||
if (ms3_server_error(creds))
|
||||
logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s.",
|
||||
ms3_server_error(creds), bucket.c_str(), key.c_str());
|
||||
ms3_server_error(creds), bucket.c_str(), existsKey.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s.",
|
||||
s3err_msgs[s3err], bucket.c_str(), key.c_str());
|
||||
s3err_msgs[s3err], bucket.c_str(), existsKey.c_str());
|
||||
errno = s3err_to_errno[s3err];
|
||||
return -1;
|
||||
}
|
||||
@ -552,9 +680,17 @@ ms3_st * S3Storage::getConnection()
|
||||
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_init returned NULL, no specific info to report");
|
||||
if(!IAMrole.empty())
|
||||
{
|
||||
res = ms3_init_assume_role(ret, (IAMrole.empty() ? NULL : IAMrole.c_str()),
|
||||
(STSendpoint.empty() ? NULL : STSendpoint.c_str()),
|
||||
(STSregion.empty() ? NULL : STSregion.c_str()));
|
||||
if (isEC2Instance)
|
||||
{
|
||||
res = ms3_ec2_set_cred(ret,IAMrole.c_str(),key.c_str(),secret.c_str(),token.c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
res = ms3_init_assume_role(ret, (IAMrole.empty() ? NULL : IAMrole.c_str()),
|
||||
(STSendpoint.empty() ? NULL : STSendpoint.c_str()),
|
||||
(STSregion.empty() ? NULL : STSregion.c_str()));
|
||||
}
|
||||
|
||||
if (res)
|
||||
{
|
||||
// Something is wrong with the assume role so abort as if the ms3_init failed
|
||||
|
Reference in New Issue
Block a user