You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
884 lines
28 KiB
C++
884 lines
28 KiB
C++
/* Copyright (C) 2019 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
#include "S3Storage.h"
|
|
#include "Config.h"
|
|
#include <stdlib.h>
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
#include <sys/types.h>
|
|
#include <boost/filesystem.hpp>
|
|
#include <boost/thread/lock_types.hpp>
|
|
#include <ctime>
|
|
#include <iostream>
|
|
#include <boost/uuid/uuid.hpp>
|
|
#include <boost/uuid/uuid_io.hpp>
|
|
#include <boost/uuid/random_generator.hpp>
|
|
#include <sstream>
|
|
#define BOOST_SPIRIT_THREADSAFE
|
|
#ifndef __clang__
|
|
#pragma GCC diagnostic push
|
|
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
|
|
#endif
|
|
|
|
#include <boost/property_tree/ptree.hpp>
|
|
|
|
#ifndef __clang__
|
|
#pragma GCC diagnostic pop
|
|
#endif
|
|
#include <boost/property_tree/json_parser.hpp>
|
|
#include "Utilities.h"
|
|
|
|
using namespace std;
|
|
|
|
namespace storagemanager
|
|
{
|
|
string tolower(const string& s)
|
|
{
|
|
string ret(s);
|
|
for (uint i = 0; i < ret.length(); i++)
|
|
ret[i] = ::tolower(ret[i]);
|
|
return ret;
|
|
}
|
|
|
|
static size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* userp)
|
|
{
|
|
((std::string*)userp)->append((char*)contents, size * nmemb);
|
|
return size * nmemb;
|
|
}
|
|
|
|
inline bool retryable_error(uint8_t s3err)
|
|
{
|
|
return (s3err == MS3_ERR_RESPONSE_PARSE || s3err == MS3_ERR_REQUEST_ERROR || s3err == MS3_ERR_OOM ||
|
|
s3err == MS3_ERR_IMPOSSIBLE || s3err == MS3_ERR_SERVER || s3err == MS3_ERR_AUTH_ROLE);
|
|
}
|
|
|
|
// Best effort to map the errors returned by the ms3 API to linux errnos
|
|
// Can and should be changed if we find better mappings. Some of these are
|
|
// nonsensical or misleading, so we should allow non-errno values to be propagated upward.
|
|
const int s3err_to_errno[] = {
|
|
0,
|
|
EINVAL, // 1 MS3_ERR_PARAMETER. Best effort. D'oh.
|
|
ENODATA, // 2 MS3_ERR_NO_DATA
|
|
ENAMETOOLONG, // 3 MS3_ERR_URI_TOO_LONG
|
|
EBADMSG, // 4 MS3_ERR_RESPONSE_PARSE
|
|
ECOMM, // 5 MS3_ERR_REQUEST_ERROR
|
|
ENOMEM, // 6 MS3_ERR_OOM
|
|
EINVAL, // 7 MS3_ERR_IMPOSSIBLE. Will have to look through the code to find out what this is exactly.
|
|
EKEYREJECTED, // 8 MS3_ERR_AUTH
|
|
ENOENT, // 9 MS3_ERR_NOT_FOUND
|
|
EPROTO, // 10 MS3_ERR_SERVER
|
|
EMSGSIZE, // 11 MS3_ERR_TOO_BIG
|
|
EKEYREJECTED, // 12 MS3_ERR_AUTH_ROLE
|
|
EFAULT // 13 MS3_ERR_ENDPOINT
|
|
};
|
|
|
|
const char* s3err_msgs[] = {"All is well",
|
|
"A required function parameter is missing",
|
|
"No data is supplied to a function that requires data",
|
|
"The generated URI for the request is too long",
|
|
"The API could not parse the response",
|
|
"The API could not send the request",
|
|
"Could not allocate required memory",
|
|
"An impossible condition occurred, how unlucky are you?",
|
|
"Authentication failed",
|
|
"Object not found",
|
|
"Unknown error code in response",
|
|
"Data to PUT is too large; 4GB maximum length",
|
|
"Authentication failed, token has expired",
|
|
"Configured bucket does not match endpoint location"};
|
|
|
|
S3Storage::ScopedConnection::ScopedConnection(S3Storage* s, shared_ptr<S3Storage::Connection> m) : s3(s), conn(m)
|
|
{
|
|
assert(conn);
|
|
}
|
|
|
|
S3Storage::ScopedConnection::~ScopedConnection()
|
|
{
|
|
s3->returnConnection(conn);
|
|
}
|
|
|
|
S3Storage::S3Storage(bool skipRetry, bool verbose) : skipRetryableErrors(skipRetry), verbose_enabled(verbose)
|
|
{
|
|
/* Check creds from envvars
|
|
Get necessary vars from config
|
|
Init an ms3_st object
|
|
*/
|
|
Config* config = Config::get();
|
|
const char* keyerr =
|
|
"S3 access requires setting AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY env vars, "
|
|
" or setting aws_access_key_id and aws_secret_access_key, or configure an IAM role with "
|
|
"ec2_iam_mode=enabled."
|
|
" Check storagemanager.cnf file for more information.";
|
|
key = config->getValue("S3", "aws_access_key_id");
|
|
secret = config->getValue("S3", "aws_secret_access_key");
|
|
IAMrole = config->getValue("S3", "iam_role_name");
|
|
STSendpoint = config->getValue("S3", "sts_endpoint");
|
|
STSregion = config->getValue("S3", "sts_region");
|
|
string ec2_mode = tolower(config->getValue("S3", "ec2_iam_mode"));
|
|
string use_http = tolower(config->getValue("S3", "use_http"));
|
|
string ssl_verify = tolower(config->getValue("S3", "ssl_verify"));
|
|
string port_number = config->getValue("S3", "port_number");
|
|
string libs3_debug = config->getValue("S3", "libs3_debug");
|
|
string connect_timeout = config->getValue("S3", "connect_timeout");
|
|
string timeout = config->getValue("S3", "timeout");
|
|
|
|
bool keyMissing = false;
|
|
isEC2Instance = false;
|
|
ec2iamEnabled = false;
|
|
useHTTP = false;
|
|
sslVerify = true;
|
|
portNumber = 0;
|
|
|
|
if (!port_number.empty())
|
|
{
|
|
portNumber = stoi(port_number);
|
|
}
|
|
|
|
if (ec2_mode == "enabled")
|
|
{
|
|
ec2iamEnabled = true;
|
|
}
|
|
|
|
if (use_http == "enabled")
|
|
{
|
|
useHTTP = true;
|
|
}
|
|
|
|
if (ssl_verify == "disabled")
|
|
{
|
|
sslVerify = false;
|
|
}
|
|
|
|
if (key.empty())
|
|
{
|
|
char* _key_id = getenv("AWS_ACCESS_KEY_ID");
|
|
if (!_key_id)
|
|
{
|
|
keyMissing = true;
|
|
}
|
|
else
|
|
key = _key_id;
|
|
}
|
|
if (secret.empty())
|
|
{
|
|
char* _secret_id = getenv("AWS_SECRET_ACCESS_KEY");
|
|
if (!_secret_id)
|
|
{
|
|
keyMissing = true;
|
|
}
|
|
else
|
|
secret = _secret_id;
|
|
}
|
|
|
|
// Valid to not have keys configured if running on ec2 instance.
|
|
// Attempt to get those credentials from instance.
|
|
if (keyMissing)
|
|
{
|
|
if (ec2iamEnabled)
|
|
{
|
|
getIAMRoleFromMetadataEC2();
|
|
}
|
|
if (!IAMrole.empty() && getCredentialsFromMetadataEC2())
|
|
{
|
|
isEC2Instance = true;
|
|
}
|
|
else
|
|
{
|
|
logger->log(LOG_ERR, keyerr);
|
|
throw runtime_error(keyerr);
|
|
}
|
|
}
|
|
|
|
region = config->getValue("S3", "region");
|
|
bucket = config->getValue("S3", "bucket");
|
|
prefix = config->getValue("S3", "prefix");
|
|
if (bucket.empty())
|
|
{
|
|
const char* msg = "S3 access requires setting S3/bucket in storagemanager.cnf";
|
|
logger->log(LOG_ERR, msg);
|
|
throw runtime_error(msg);
|
|
}
|
|
|
|
endpoint = config->getValue("S3", "endpoint");
|
|
if (!connect_timeout.empty())
|
|
{
|
|
connectTimeout = stof(connect_timeout);
|
|
}
|
|
if (!timeout.empty())
|
|
{
|
|
operationTimeout = stof(timeout);
|
|
}
|
|
|
|
ms3_library_init();
|
|
if (verbose_enabled || libs3_debug == "enabled")
|
|
{
|
|
ms3_debug(1);
|
|
}
|
|
testConnectivityAndPerms();
|
|
}
|
|
|
|
S3Storage::~S3Storage()
|
|
{
|
|
for (auto& conn : freeConns)
|
|
ms3_deinit(conn->conn);
|
|
ms3_library_deinit();
|
|
}
|
|
|
|
// convenience macro for testConnectivityAndPerms()
|
|
#define FAIL(OP) \
|
|
{ \
|
|
const char* msg = "S3Storage: failed to " #OP ", check log files for specific error"; \
|
|
logger->log(LOG_ERR, msg); \
|
|
throw runtime_error(msg); \
|
|
}
|
|
|
|
bool S3Storage::getIAMRoleFromMetadataEC2()
|
|
{
|
|
CURL* curl = nullptr;
|
|
CURLcode curl_res;
|
|
string readBuffer;
|
|
string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/";
|
|
curl = curl_easy_init();
|
|
curl_easy_setopt(curl, CURLOPT_URL, instanceMetadata.c_str());
|
|
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
|
|
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
|
|
curl_res = curl_easy_perform(curl);
|
|
if (curl_res != CURLE_OK)
|
|
{
|
|
logger->log(LOG_ERR, "CURL fail %u", curl_res);
|
|
return false;
|
|
}
|
|
IAMrole = readBuffer;
|
|
// logger->log(LOG_INFO, "S3Storage: IAM Role Name = %s",IAMrole.c_str());
|
|
|
|
return true;
|
|
}
|
|
|
|
bool S3Storage::getCredentialsFromMetadataEC2()
|
|
{
|
|
CURL* curl = nullptr;
|
|
CURLcode curl_res;
|
|
std::string readBuffer;
|
|
string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/" + IAMrole;
|
|
curl = curl_easy_init();
|
|
curl_easy_setopt(curl, CURLOPT_URL, instanceMetadata.c_str());
|
|
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
|
|
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
|
|
curl_res = curl_easy_perform(curl);
|
|
if (curl_res != CURLE_OK)
|
|
{
|
|
logger->log(LOG_ERR, "CURL fail %u", curl_res);
|
|
return false;
|
|
}
|
|
stringstream credentials(readBuffer);
|
|
boost::property_tree::ptree pt;
|
|
boost::property_tree::read_json(credentials, pt);
|
|
key = pt.get<string>("AccessKeyId");
|
|
secret = pt.get<string>("SecretAccessKey");
|
|
token = pt.get<string>("Token");
|
|
// logger->log(LOG_INFO, "S3Storage: key = %s secret = %s token =
|
|
// %s",key.c_str(),secret.c_str(),token.c_str());
|
|
|
|
return true;
|
|
}
|
|
|
|
void S3Storage::testConnectivityAndPerms()
|
|
{
|
|
std::shared_ptr<uint8_t[]> testObj(new uint8_t[1]);
|
|
testObj[0] = 0;
|
|
boost::uuids::uuid u = boost::uuids::random_generator()();
|
|
ostringstream oss;
|
|
oss << u << "connectivity_test";
|
|
|
|
string testObjKey = oss.str();
|
|
|
|
int err = putObject(testObj, 1, testObjKey);
|
|
if (err)
|
|
FAIL(PUT)
|
|
bool _exists;
|
|
err = exists(testObjKey, &_exists);
|
|
if (err)
|
|
FAIL(HEAD)
|
|
size_t len;
|
|
err = getObject(testObjKey, &testObj, &len);
|
|
if (err)
|
|
FAIL(GET)
|
|
err = deleteObject(testObjKey);
|
|
if (err)
|
|
FAIL(DELETE)
|
|
err = exists(testObjKey, &_exists);
|
|
if (err)
|
|
{
|
|
logger->log(LOG_CRIT,
|
|
"S3Storage::exists() failed on nonexistent object. Check 'ListBucket' permissions.");
|
|
FAIL(HEAD)
|
|
}
|
|
logger->log(LOG_INFO, "S3Storage: S3 connectivity & permissions are OK");
|
|
}
|
|
|
|
int S3Storage::getObject(const string& sourceKey, const string& destFile, size_t* size)
|
|
{
|
|
int fd, err;
|
|
std::shared_ptr<uint8_t[]> data;
|
|
size_t len, count = 0;
|
|
char buf[80];
|
|
|
|
err = getObject(sourceKey, &data, &len);
|
|
if (err)
|
|
return err;
|
|
|
|
fd = ::open(destFile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600);
|
|
if (fd < 0)
|
|
{
|
|
int l_errno = errno;
|
|
logger->log(LOG_ERR, "S3Storage::getObject(): Failed to open %s, got %s", destFile.c_str(),
|
|
strerror_r(l_errno, buf, 80));
|
|
errno = l_errno;
|
|
return err;
|
|
}
|
|
ScopedCloser s(fd);
|
|
while (count < len)
|
|
{
|
|
err = ::write(fd, &data[count], len - count);
|
|
if (err < 0)
|
|
{
|
|
int l_errno = errno;
|
|
logger->log(LOG_ERR, "S3Storage::getObject(): Failed to write to %s, got %s", destFile.c_str(),
|
|
strerror_r(errno, buf, 80));
|
|
errno = l_errno;
|
|
return -1;
|
|
}
|
|
count += err;
|
|
}
|
|
if (size)
|
|
*size = len;
|
|
return 0;
|
|
}
|
|
|
|
int S3Storage::getObject(const string& _sourceKey, std::shared_ptr<uint8_t[]>* data, size_t* size)
|
|
{
|
|
uint8_t err;
|
|
size_t len = 0;
|
|
uint8_t* _data = nullptr;
|
|
string sourceKey = prefix + _sourceKey;
|
|
|
|
auto conn = getConnection();
|
|
if (!conn)
|
|
{
|
|
logger->log(LOG_ERR,
|
|
"S3Storage::getObject(): failed to GET, S3Storage::getConnection() returned NULL on init");
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
ScopedConnection sc(this, conn);
|
|
|
|
do
|
|
{
|
|
err = ms3_get(conn->conn, bucket.c_str(), sourceKey.c_str(), &_data, &len);
|
|
if (err && (!skipRetryableErrors && retryable_error(err)) && !conn->terminate)
|
|
{
|
|
if (ms3_server_error(conn->conn))
|
|
logger->log(LOG_WARNING,
|
|
"S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s."
|
|
" Retrying...",
|
|
ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str());
|
|
else
|
|
logger->log(LOG_WARNING,
|
|
"S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s. Retrying...",
|
|
s3err_msgs[err], bucket.c_str(), sourceKey.c_str());
|
|
if (ec2iamEnabled)
|
|
{
|
|
getIAMRoleFromMetadataEC2();
|
|
getCredentialsFromMetadataEC2();
|
|
ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
|
}
|
|
else if (!IAMrole.empty())
|
|
{
|
|
ms3_assume_role(conn->conn);
|
|
}
|
|
sleep(5);
|
|
}
|
|
} while (err && (!skipRetryableErrors && retryable_error(err)) && !conn->terminate);
|
|
if (err)
|
|
{
|
|
if (ms3_server_error(conn->conn))
|
|
logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s.",
|
|
ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str());
|
|
else
|
|
logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s.",
|
|
s3err_msgs[err], bucket.c_str(), sourceKey.c_str());
|
|
data->reset();
|
|
errno = s3err_to_errno[err];
|
|
return -1;
|
|
}
|
|
|
|
data->reset(_data, free);
|
|
if (size)
|
|
*size = len;
|
|
return 0;
|
|
}
|
|
|
|
int S3Storage::putObject(const string& sourceFile, const string& destKey)
|
|
{
|
|
std::shared_ptr<uint8_t[]> data;
|
|
int err, fd;
|
|
size_t len, count = 0;
|
|
char buf[80];
|
|
boost::system::error_code boost_err;
|
|
|
|
len = boost::filesystem::file_size(sourceFile, boost_err);
|
|
if (boost_err)
|
|
{
|
|
errno = boost_err.value();
|
|
return -1;
|
|
}
|
|
data.reset(new uint8_t[len]);
|
|
fd = ::open(sourceFile.c_str(), O_RDONLY);
|
|
if (fd < 0)
|
|
{
|
|
int l_errno = errno;
|
|
logger->log(LOG_ERR, "S3Storage::putObject(): Failed to open %s, got %s", sourceFile.c_str(),
|
|
strerror_r(l_errno, buf, 80));
|
|
errno = l_errno;
|
|
return -1;
|
|
}
|
|
ScopedCloser s(fd);
|
|
while (count < len)
|
|
{
|
|
err = ::read(fd, &data[count], len - count);
|
|
if (err < 0)
|
|
{
|
|
int l_errno = errno;
|
|
logger->log(LOG_ERR, "S3Storage::putObject(): Failed to read %s @ position %ld, got %s",
|
|
sourceFile.c_str(), count, strerror_r(l_errno, buf, 80));
|
|
errno = l_errno;
|
|
return -1;
|
|
}
|
|
else if (err == 0)
|
|
{
|
|
// this shouldn't happen, we just checked the size
|
|
logger->log(LOG_ERR, "S3Storage::putObject(): Got early EOF reading %s @ position %ld",
|
|
sourceFile.c_str(), count);
|
|
errno = ENODATA; // is there a better errno for early eof?
|
|
return -1;
|
|
}
|
|
count += err;
|
|
}
|
|
|
|
return putObject(data, len, destKey);
|
|
}
|
|
|
|
int S3Storage::putObject(const std::shared_ptr<uint8_t[]> data, size_t len, const string& _destKey)
|
|
{
|
|
string destKey = prefix + _destKey;
|
|
uint8_t s3err;
|
|
auto conn = getConnection();
|
|
if (!conn)
|
|
{
|
|
logger->log(LOG_ERR,
|
|
"S3Storage::putObject(): failed to PUT, S3Storage::getConnection() returned NULL on init");
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
ScopedConnection sc(this, conn);
|
|
|
|
do
|
|
{
|
|
s3err = ms3_put(conn->conn, bucket.c_str(), destKey.c_str(), data.get(), len);
|
|
if (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate)
|
|
{
|
|
if (ms3_server_error(conn->conn))
|
|
logger->log(LOG_WARNING,
|
|
"S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s."
|
|
" Retrying...",
|
|
ms3_server_error(conn->conn), bucket.c_str(), destKey.c_str());
|
|
else
|
|
logger->log(LOG_WARNING,
|
|
"S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s."
|
|
" Retrying...",
|
|
s3err_msgs[s3err], bucket.c_str(), destKey.c_str());
|
|
if (ec2iamEnabled)
|
|
{
|
|
getIAMRoleFromMetadataEC2();
|
|
getCredentialsFromMetadataEC2();
|
|
ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
|
}
|
|
else if (!IAMrole.empty())
|
|
{
|
|
ms3_assume_role(conn->conn);
|
|
}
|
|
sleep(5);
|
|
}
|
|
} while (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate);
|
|
if (s3err)
|
|
{
|
|
if (ms3_server_error(conn->conn))
|
|
logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s.",
|
|
ms3_server_error(conn->conn), bucket.c_str(), destKey.c_str());
|
|
else
|
|
logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s.",
|
|
s3err_msgs[s3err], bucket.c_str(), destKey.c_str());
|
|
errno = s3err_to_errno[s3err];
|
|
if (s3err == MS3_ERR_ENDPOINT)
|
|
logger->log(
|
|
LOG_ERR,
|
|
"S3Storage::putObject(): Bucket location not match provided endpoint:, bucket = %s, endpoint = %s.",
|
|
bucket.c_str(), endpoint.c_str());
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int S3Storage::deleteObject(const string& _key)
|
|
{
|
|
uint8_t s3err;
|
|
string deleteKey = prefix + _key;
|
|
auto conn = getConnection();
|
|
if (!conn)
|
|
{
|
|
logger->log(
|
|
LOG_ERR,
|
|
"S3Storage::deleteObject(): failed to DELETE, S3Storage::getConnection() returned NULL on init");
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
ScopedConnection sc(this, conn);
|
|
|
|
do
|
|
{
|
|
s3err = ms3_delete(conn->conn, bucket.c_str(), deleteKey.c_str());
|
|
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate)
|
|
{
|
|
if (ms3_server_error(conn->conn))
|
|
logger->log(LOG_WARNING,
|
|
"S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s."
|
|
" Retrying...",
|
|
ms3_server_error(conn->conn), bucket.c_str(), deleteKey.c_str());
|
|
else
|
|
logger->log(
|
|
LOG_WARNING,
|
|
"S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s. Retrying...",
|
|
s3err_msgs[s3err], bucket.c_str(), deleteKey.c_str());
|
|
if (ec2iamEnabled)
|
|
{
|
|
getIAMRoleFromMetadataEC2();
|
|
getCredentialsFromMetadataEC2();
|
|
ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
|
}
|
|
else if (!IAMrole.empty())
|
|
{
|
|
ms3_assume_role(conn->conn);
|
|
}
|
|
sleep(5);
|
|
}
|
|
} while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate);
|
|
|
|
if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
|
|
{
|
|
if (ms3_server_error(conn->conn))
|
|
logger->log(LOG_ERR,
|
|
"S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s.",
|
|
ms3_server_error(conn->conn), bucket.c_str(), deleteKey.c_str());
|
|
else
|
|
logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s.",
|
|
s3err_msgs[s3err], bucket.c_str(), deleteKey.c_str());
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int S3Storage::copyObject(const string& _sourceKey, const string& _destKey)
|
|
{
|
|
string sourceKey = prefix + _sourceKey, destKey = prefix + _destKey;
|
|
uint8_t s3err;
|
|
auto conn = getConnection();
|
|
if (!conn)
|
|
{
|
|
logger->log(LOG_ERR,
|
|
"S3Storage::copyObject(): failed to copy, S3Storage::getConnection() returned NULL on init");
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
ScopedConnection sc(this, conn);
|
|
|
|
do
|
|
{
|
|
s3err = ms3_copy(conn->conn, bucket.c_str(), sourceKey.c_str(), bucket.c_str(), destKey.c_str());
|
|
if (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate)
|
|
{
|
|
if (ms3_server_error(conn->conn))
|
|
logger->log(LOG_WARNING,
|
|
"S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
|
|
"destkey = %s. Retrying...",
|
|
ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
|
else
|
|
logger->log(LOG_WARNING,
|
|
"S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
|
|
" destkey = %s. Retrying...",
|
|
s3err_msgs[s3err], bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
|
if (ec2iamEnabled)
|
|
{
|
|
getIAMRoleFromMetadataEC2();
|
|
getCredentialsFromMetadataEC2();
|
|
ms3_ec2_set_cred(conn->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
|
}
|
|
else if (!IAMrole.empty())
|
|
{
|
|
ms3_assume_role(conn->conn);
|
|
}
|
|
sleep(5);
|
|
}
|
|
} while (s3err && (!skipRetryableErrors && retryable_error(s3err)) && !conn->terminate);
|
|
|
|
if (s3err)
|
|
{
|
|
// added the add'l check MS3_ERR_NOT_FOUND to suppress error msgs for a legitimate case in IOC::copyFile()
|
|
if (ms3_server_error(conn->conn) && s3err != MS3_ERR_NOT_FOUND)
|
|
logger->log(LOG_ERR,
|
|
"S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
|
|
"destkey = %s.",
|
|
ms3_server_error(conn->conn), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
|
else if (s3err != MS3_ERR_NOT_FOUND)
|
|
logger->log(LOG_ERR,
|
|
"S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
|
|
"destkey = %s.",
|
|
s3err_msgs[s3err], bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
|
errno = s3err_to_errno[s3err];
|
|
return -1;
|
|
}
|
|
return 0;
|
|
|
|
#if 0
|
|
// no s3-s3 copy yet. get & put for now.
|
|
|
|
int err;
|
|
std::shared_ptr<uint8_t[]> data;
|
|
size_t len;
|
|
err = getObject(sourceKey, &data, &len);
|
|
if (err)
|
|
return err;
|
|
return putObject(data, len, destKey);
|
|
#endif
|
|
}
|
|
|
|
int S3Storage::exists(const string& _key, bool* out)
|
|
{
|
|
string existsKey = prefix + _key;
|
|
uint8_t s3err;
|
|
ms3_status_st status;
|
|
auto creds = getConnection();
|
|
if (!creds)
|
|
{
|
|
logger->log(LOG_ERR,
|
|
"S3Storage::exists(): failed to HEAD, S3Storage::getConnection() returned NULL on init");
|
|
errno = EINVAL;
|
|
return -1;
|
|
}
|
|
ScopedConnection sc(this, creds);
|
|
|
|
do
|
|
{
|
|
s3err = ms3_status(creds->conn, bucket.c_str(), existsKey.c_str(), &status);
|
|
if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !creds->terminate)
|
|
{
|
|
if (ms3_server_error(creds->conn))
|
|
logger->log(LOG_WARNING,
|
|
"S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s."
|
|
" Retrying...",
|
|
ms3_server_error(creds->conn), bucket.c_str(), existsKey.c_str());
|
|
else
|
|
logger->log(LOG_WARNING,
|
|
"S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s. Retrying...",
|
|
s3err_msgs[s3err], bucket.c_str(), existsKey.c_str());
|
|
if (ec2iamEnabled)
|
|
{
|
|
getIAMRoleFromMetadataEC2();
|
|
getCredentialsFromMetadataEC2();
|
|
ms3_ec2_set_cred(creds->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
|
}
|
|
else if (!IAMrole.empty())
|
|
{
|
|
ms3_assume_role(creds->conn);
|
|
}
|
|
sleep(5);
|
|
}
|
|
} while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)) && !creds->terminate);
|
|
|
|
if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
|
|
{
|
|
if (ms3_server_error(creds->conn))
|
|
logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s.",
|
|
ms3_server_error(creds->conn), bucket.c_str(), existsKey.c_str());
|
|
else
|
|
logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s.",
|
|
s3err_msgs[s3err], bucket.c_str(), existsKey.c_str());
|
|
errno = s3err_to_errno[s3err];
|
|
return -1;
|
|
}
|
|
|
|
*out = (s3err == 0);
|
|
return 0;
|
|
}
|
|
|
|
shared_ptr<S3Storage::Connection> S3Storage::getConnection()
|
|
{
|
|
boost::unique_lock<boost::mutex> s(connMutex);
|
|
|
|
// prune the list. Most-idle connections are at the back.
|
|
timespec now;
|
|
clock_gettime(CLOCK_MONOTONIC_COARSE, &now);
|
|
while (!freeConns.empty())
|
|
{
|
|
auto& back = freeConns.back();
|
|
if (back->touchedAt.tv_sec + maxIdleSecs <= now.tv_sec)
|
|
{
|
|
ms3_deinit(back->conn);
|
|
// connMutexes.erase(back.conn);
|
|
back->conn = nullptr;
|
|
freeConns.pop_back();
|
|
}
|
|
else
|
|
break; // everything in front of this entry will not have been idle long enough
|
|
}
|
|
|
|
// get a connection
|
|
uint8_t res = 0;
|
|
if (freeConns.empty())
|
|
{
|
|
auto ret = make_shared<S3Storage::Connection>(nextConnId++);
|
|
ret->conn = ms3_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? nullptr : endpoint.c_str()));
|
|
// Something went wrong with libmarias3 init
|
|
if (ret->conn == nullptr)
|
|
{
|
|
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_init returned NULL, no specific info to report");
|
|
return nullptr;
|
|
}
|
|
// Set option for use http instead of https
|
|
if (useHTTP)
|
|
{
|
|
ms3_set_option(ret->conn, MS3_OPT_USE_HTTP, nullptr);
|
|
}
|
|
// Set option to disable SSL Verification
|
|
if (!sslVerify)
|
|
{
|
|
ms3_set_option(ret->conn, MS3_OPT_DISABLE_SSL_VERIFY, nullptr);
|
|
}
|
|
// Port number is not 0 so it was set by cnf file
|
|
if (portNumber != 0)
|
|
{
|
|
ms3_set_option(ret->conn, MS3_OPT_PORT_NUMBER, &portNumber);
|
|
}
|
|
|
|
// timeouts
|
|
if (connectTimeout.has_value())
|
|
{
|
|
ms3_set_option(ret->conn, MS3_OPT_CONNECT_TIMEOUT, &connectTimeout.value());
|
|
}
|
|
if (operationTimeout.has_value())
|
|
{
|
|
ms3_set_option(ret->conn, MS3_OPT_TIMEOUT, &operationTimeout.value());
|
|
}
|
|
// IAM role setup for keys
|
|
if (!IAMrole.empty())
|
|
{
|
|
if (isEC2Instance)
|
|
{
|
|
res = ms3_ec2_set_cred(ret->conn, IAMrole.c_str(), key.c_str(), secret.c_str(), token.c_str());
|
|
}
|
|
else
|
|
{
|
|
res = ms3_init_assume_role(ret->conn, (IAMrole.empty() ? nullptr : IAMrole.c_str()),
|
|
(STSendpoint.empty() ? nullptr : STSendpoint.c_str()),
|
|
(STSregion.empty() ? nullptr : STSregion.c_str()));
|
|
}
|
|
|
|
if (res)
|
|
{
|
|
// Something is wrong with the assume role so abort as if the ms3_init failed
|
|
logger->log(LOG_ERR,
|
|
"S3Storage::getConnection(): ERROR: ms3_init_assume_role. Verify iam_role_name = %s, "
|
|
"aws_access_key_id, aws_secret_access_key values. Also check sts_region and sts_endpoint "
|
|
"if configured.",
|
|
IAMrole.c_str());
|
|
if (ms3_server_error(ret->conn))
|
|
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_error: server says '%s' role name = %s",
|
|
ms3_server_error(ret->conn), IAMrole.c_str());
|
|
ms3_deinit(ret->conn);
|
|
ret->conn = nullptr;
|
|
}
|
|
}
|
|
// assert(connMutexes[ret].try_lock());
|
|
if (ret->conn == nullptr)
|
|
{
|
|
s.unlock();
|
|
return {};
|
|
}
|
|
clock_gettime(CLOCK_MONOTONIC_COARSE, &ret->touchedAt);
|
|
usedConns.try_emplace(ret->id, ret);
|
|
return ret;
|
|
}
|
|
assert(freeConns.front()->touchedAt.tv_sec + maxIdleSecs > now.tv_sec);
|
|
auto ret = freeConns.front();
|
|
clock_gettime(CLOCK_MONOTONIC_COARSE, &ret->touchedAt);
|
|
freeConns.pop_front();
|
|
// assert(connMutexes[ret].try_lock());
|
|
return ret;
|
|
}
|
|
|
|
void S3Storage::returnConnection(std::shared_ptr<S3Storage::Connection> conn)
|
|
{
|
|
assert(conn);
|
|
assert(conn->conn);
|
|
clock_gettime(CLOCK_MONOTONIC_COARSE, &conn->touchedAt);
|
|
|
|
boost::unique_lock<boost::mutex> s(connMutex);
|
|
usedConns.erase(conn->id);
|
|
conn->terminate = false;
|
|
conn->id = nextConnId++;
|
|
freeConns.push_front(conn);
|
|
// connMutexes[ms3].unlock();
|
|
}
|
|
|
|
vector<S3Storage::IOTaskData> S3Storage::taskList() const
|
|
{
|
|
boost::unique_lock<boost::mutex> s(connMutex);
|
|
vector<IOTaskData> ret;
|
|
ret.reserve(usedConns.size());
|
|
timespec now;
|
|
clock_gettime(CLOCK_MONOTONIC_COARSE, &now);
|
|
for (const auto& [id, conn]: usedConns) {
|
|
double elapsed = (double)(now.tv_sec - conn->touchedAt.tv_sec) + 1.e-9 * (now.tv_nsec - conn->touchedAt.tv_nsec);;
|
|
ret.emplace_back(id, elapsed);
|
|
}
|
|
s.unlock();
|
|
return ret;
|
|
}
|
|
|
|
bool S3Storage::killTask(uint64_t id)
|
|
{
|
|
boost::unique_lock<boost::mutex> s(connMutex);
|
|
if (auto it = usedConns.find(id); it != usedConns.end())
|
|
{
|
|
it->second->terminate = true;
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
} // namespace storagemanager
|