You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-17 01:02:23 +03:00
Fixed a couple bugs in S3Storage.
- there was no closing of any file descriptors (!!) - there was the typo in the connection pruning code s.t. nothing would get pruned. - added -lm to the libmarias3 stuff, was getting a linker error, not sure why.
This commit is contained in:
@@ -49,7 +49,7 @@ set(S3API_DIR ${CMAKE_CURRENT_SOURCE_DIR}/libmarias3)
|
|||||||
include(ExternalProject)
|
include(ExternalProject)
|
||||||
ExternalProject_Add(ms3
|
ExternalProject_Add(ms3
|
||||||
SOURCE_DIR ${S3API_DIR}
|
SOURCE_DIR ${S3API_DIR}
|
||||||
CONFIGURE_COMMAND autoreconf -fi ${S3API_DIR} && ${S3API_DIR}/configure --enable-shared=no --prefix=${CMAKE_CURRENT_BINARY_DIR} ${S3_CONFIGURE_OPT}
|
CONFIGURE_COMMAND autoreconf -fi ${S3API_DIR} && LIBS=-lm ${S3API_DIR}/configure --enable-shared=no --prefix=${CMAKE_CURRENT_BINARY_DIR} ${S3_CONFIGURE_OPT}
|
||||||
BUILD_COMMAND make
|
BUILD_COMMAND make
|
||||||
BUILD_IN_SOURCE 0
|
BUILD_IN_SOURCE 0
|
||||||
INSTALL_DIR ${CMAKE_CURRENT_BINARY_DIR}
|
INSTALL_DIR ${CMAKE_CURRENT_BINARY_DIR}
|
||||||
|
|||||||
Submodule libmarias3 updated: 6d8be7ec6d...d0bee4a257
@@ -364,7 +364,7 @@ void Cache::deletedObject(const string &key, size_t size)
|
|||||||
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
|
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
|
||||||
assert(currentCacheSize >= size);
|
assert(currentCacheSize >= size);
|
||||||
M_LRU_t::iterator mit = m_lru.find(key);
|
M_LRU_t::iterator mit = m_lru.find(key);
|
||||||
assert(mit != m_lru.end());
|
assert(mit != m_lru.end()); // TODO: 5/16/19 - got this assertion using S3 by running test000, then test000 again.
|
||||||
assert(doNotEvict.find(mit->lit) == doNotEvict.end());
|
assert(doNotEvict.find(mit->lit) == doNotEvict.end());
|
||||||
lru.erase(mit->lit);
|
lru.erase(mit->lit);
|
||||||
m_lru.erase(mit);
|
m_lru.erase(mit);
|
||||||
|
|||||||
@@ -7,6 +7,7 @@
|
|||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <boost/filesystem.hpp>
|
#include <boost/filesystem.hpp>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include "Utilities.h"
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
@@ -105,6 +106,7 @@ S3Storage::~S3Storage()
|
|||||||
{
|
{
|
||||||
for (auto &conn : freeConns)
|
for (auto &conn : freeConns)
|
||||||
ms3_deinit(conn.conn);
|
ms3_deinit(conn.conn);
|
||||||
|
ms3_library_deinit();
|
||||||
}
|
}
|
||||||
|
|
||||||
int S3Storage::getObject(const string &sourceKey, const string &destFile, size_t *size)
|
int S3Storage::getObject(const string &sourceKey, const string &destFile, size_t *size)
|
||||||
@@ -120,6 +122,7 @@ int S3Storage::getObject(const string &sourceKey, const string &destFile, size_t
|
|||||||
fd = ::open(destFile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600);
|
fd = ::open(destFile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600);
|
||||||
if (fd < 0)
|
if (fd < 0)
|
||||||
return err;
|
return err;
|
||||||
|
ScopedCloser s(fd);
|
||||||
while (count < len)
|
while (count < len)
|
||||||
{
|
{
|
||||||
err = ::write(fd, &data[count], len - count);
|
err = ::write(fd, &data[count], len - count);
|
||||||
@@ -150,7 +153,7 @@ int S3Storage::getObject(const string &sourceKey, boost::shared_array<uint8_t> *
|
|||||||
err = ms3_get(creds, bucket.c_str(), sourceKey.c_str(), &_data, &len);
|
err = ms3_get(creds, bucket.c_str(), sourceKey.c_str(), &_data, &len);
|
||||||
if (err && retryable_error(err))
|
if (err && retryable_error(err))
|
||||||
{
|
{
|
||||||
if (err == MS3_ERR_SERVER)
|
if (ms3_server_error(creds))
|
||||||
logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s."
|
logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s."
|
||||||
" Retrying...", ms3_server_error(creds), bucket.c_str(), sourceKey.c_str());
|
" Retrying...", ms3_server_error(creds), bucket.c_str(), sourceKey.c_str());
|
||||||
else
|
else
|
||||||
@@ -161,7 +164,7 @@ int S3Storage::getObject(const string &sourceKey, boost::shared_array<uint8_t> *
|
|||||||
} while (err && retryable_error(err));
|
} while (err && retryable_error(err));
|
||||||
if (err)
|
if (err)
|
||||||
{
|
{
|
||||||
if (err == MS3_ERR_SERVER)
|
if (ms3_server_error(creds))
|
||||||
logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s.",
|
logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s.",
|
||||||
ms3_server_error(creds), bucket.c_str(), sourceKey.c_str());
|
ms3_server_error(creds), bucket.c_str(), sourceKey.c_str());
|
||||||
else
|
else
|
||||||
@@ -203,6 +206,7 @@ int S3Storage::putObject(const string &sourceFile, const string &destKey)
|
|||||||
errno = l_errno;
|
errno = l_errno;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
ScopedCloser s(fd);
|
||||||
while (count < len)
|
while (count < len)
|
||||||
{
|
{
|
||||||
err = ::read(fd, &data[count], len - count);
|
err = ::read(fd, &data[count], len - count);
|
||||||
@@ -238,7 +242,7 @@ int S3Storage::putObject(const boost::shared_array<uint8_t> data, size_t len, co
|
|||||||
s3err = ms3_put(creds, bucket.c_str(), destKey.c_str(), data.get(), len);
|
s3err = ms3_put(creds, bucket.c_str(), destKey.c_str(), data.get(), len);
|
||||||
if (s3err && retryable_error(s3err))
|
if (s3err && retryable_error(s3err))
|
||||||
{
|
{
|
||||||
if (s3err == MS3_ERR_SERVER)
|
if (ms3_server_error(creds))
|
||||||
logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s."
|
logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s."
|
||||||
" Retrying...", ms3_server_error(creds), bucket.c_str(), destKey.c_str());
|
" Retrying...", ms3_server_error(creds), bucket.c_str(), destKey.c_str());
|
||||||
else
|
else
|
||||||
@@ -249,7 +253,7 @@ int S3Storage::putObject(const boost::shared_array<uint8_t> data, size_t len, co
|
|||||||
} while (s3err && retryable_error(s3err));
|
} while (s3err && retryable_error(s3err));
|
||||||
if (s3err)
|
if (s3err)
|
||||||
{
|
{
|
||||||
if (s3err == MS3_ERR_SERVER)
|
if (ms3_server_error(creds))
|
||||||
logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s.",
|
logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s.",
|
||||||
ms3_server_error(creds), bucket.c_str(), destKey.c_str());
|
ms3_server_error(creds), bucket.c_str(), destKey.c_str());
|
||||||
else
|
else
|
||||||
@@ -271,7 +275,7 @@ void S3Storage::deleteObject(const string &key)
|
|||||||
s3err = ms3_delete(creds, bucket.c_str(), key.c_str());
|
s3err = ms3_delete(creds, bucket.c_str(), key.c_str());
|
||||||
if (s3err && s3err != MS3_ERR_NOT_FOUND && retryable_error(s3err))
|
if (s3err && s3err != MS3_ERR_NOT_FOUND && retryable_error(s3err))
|
||||||
{
|
{
|
||||||
if (s3err == MS3_ERR_SERVER)
|
if (ms3_server_error(creds))
|
||||||
logger->log(LOG_WARNING, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s."
|
logger->log(LOG_WARNING, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s."
|
||||||
" Retrying...", ms3_server_error(creds), bucket.c_str(), key.c_str());
|
" Retrying...", ms3_server_error(creds), bucket.c_str(), key.c_str());
|
||||||
else
|
else
|
||||||
@@ -283,7 +287,7 @@ void S3Storage::deleteObject(const string &key)
|
|||||||
|
|
||||||
if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
|
if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
|
||||||
{
|
{
|
||||||
if (s3err == MS3_ERR_SERVER)
|
if (ms3_server_error(creds))
|
||||||
logger->log(LOG_WARNING, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s.",
|
logger->log(LOG_WARNING, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s.",
|
||||||
ms3_server_error(creds), bucket.c_str(), key.c_str());
|
ms3_server_error(creds), bucket.c_str(), key.c_str());
|
||||||
else
|
else
|
||||||
@@ -303,7 +307,7 @@ int S3Storage::copyObject(const string &sourceKey, const string &destKey)
|
|||||||
s3err = ms3_copy(creds, bucket.c_str(), sourceKey.c_str(), bucket.c_str(), destKey.c_str());
|
s3err = ms3_copy(creds, bucket.c_str(), sourceKey.c_str(), bucket.c_str(), destKey.c_str());
|
||||||
if (s3err && retryable_error(s3err))
|
if (s3err && retryable_error(s3err))
|
||||||
{
|
{
|
||||||
if (s3err == MS3_ERR_SERVER)
|
if (ms3_server_error(creds))
|
||||||
logger->log(LOG_WARNING, "S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
|
logger->log(LOG_WARNING, "S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
|
||||||
"destkey = %s. Retrying...", ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
"destkey = %s. Retrying...", ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
||||||
else
|
else
|
||||||
@@ -315,10 +319,11 @@ int S3Storage::copyObject(const string &sourceKey, const string &destKey)
|
|||||||
|
|
||||||
if (s3err)
|
if (s3err)
|
||||||
{
|
{
|
||||||
if (s3err == MS3_ERR_SERVER)
|
// added the add'l check MS3_ERR_NOT_FOUND to suppress error msgs for a legitimate case in IOC::copyFile()
|
||||||
|
if (ms3_server_error(creds) && s3err != MS3_ERR_NOT_FOUND)
|
||||||
logger->log(LOG_WARNING, "S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
|
logger->log(LOG_WARNING, "S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
|
||||||
"destkey = %s.", ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
"destkey = %s.", ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
||||||
else
|
else if (s3err != MS3_ERR_NOT_FOUND)
|
||||||
logger->log(LOG_CRIT, "S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
|
logger->log(LOG_CRIT, "S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
|
||||||
"destkey = %s.", s3err_msgs[s3err], bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
"destkey = %s.", s3err_msgs[s3err], bucket.c_str(), sourceKey.c_str(), destKey.c_str());
|
||||||
errno = s3err_to_errno[s3err];
|
errno = s3err_to_errno[s3err];
|
||||||
@@ -350,7 +355,7 @@ int S3Storage::exists(const string &key, bool *out)
|
|||||||
s3err = ms3_status(creds, bucket.c_str(), key.c_str(), &status);
|
s3err = ms3_status(creds, bucket.c_str(), key.c_str(), &status);
|
||||||
if (s3err && s3err != MS3_ERR_NOT_FOUND && retryable_error(s3err))
|
if (s3err && s3err != MS3_ERR_NOT_FOUND && retryable_error(s3err))
|
||||||
{
|
{
|
||||||
if (s3err == MS3_ERR_SERVER)
|
if (ms3_server_error(creds))
|
||||||
logger->log(LOG_WARNING, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s."
|
logger->log(LOG_WARNING, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s."
|
||||||
" Retrying...", ms3_server_error(creds), bucket.c_str(), key.c_str());
|
" Retrying...", ms3_server_error(creds), bucket.c_str(), key.c_str());
|
||||||
else
|
else
|
||||||
@@ -362,7 +367,7 @@ int S3Storage::exists(const string &key, bool *out)
|
|||||||
|
|
||||||
if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
|
if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
|
||||||
{
|
{
|
||||||
if (s3err == MS3_ERR_SERVER)
|
if (ms3_server_error(creds))
|
||||||
logger->log(LOG_WARNING, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s.",
|
logger->log(LOG_WARNING, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s.",
|
||||||
ms3_server_error(creds), bucket.c_str(), key.c_str());
|
ms3_server_error(creds), bucket.c_str(), key.c_str());
|
||||||
else
|
else
|
||||||
@@ -380,17 +385,38 @@ ms3_st * S3Storage::getConnection()
|
|||||||
{
|
{
|
||||||
boost::unique_lock<boost::mutex> s(connMutex);
|
boost::unique_lock<boost::mutex> s(connMutex);
|
||||||
|
|
||||||
|
// prune the list. Most-idle connections are at the back.
|
||||||
|
timespec now;
|
||||||
|
clock_gettime(CLOCK_MONOTONIC_COARSE, &now);
|
||||||
|
while (!freeConns.empty())
|
||||||
|
{
|
||||||
|
Connection &back = freeConns.back();
|
||||||
|
if (back.idleSince.tv_sec + maxIdleSecs <= now.tv_sec)
|
||||||
|
{
|
||||||
|
ms3_deinit(back.conn);
|
||||||
|
//connMutexes.erase(back.conn);
|
||||||
|
back.conn = NULL;
|
||||||
|
freeConns.pop_back();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
break; // everything in front of this entry will not have been idle long enough
|
||||||
|
}
|
||||||
|
|
||||||
|
// get a connection
|
||||||
ms3_st *ret = NULL;
|
ms3_st *ret = NULL;
|
||||||
if (freeConns.empty())
|
if (freeConns.empty())
|
||||||
{
|
{
|
||||||
s.unlock();
|
ret = ms3_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? NULL : endpoint.c_str()));
|
||||||
ret = ms3_thread_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? NULL : endpoint.c_str()));
|
|
||||||
if (ret == NULL)
|
if (ret == NULL)
|
||||||
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_thread_init returned NULL, no specific info to report");
|
logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_init returned NULL, no specific info to report");
|
||||||
|
//assert(connMutexes[ret].try_lock());
|
||||||
|
s.unlock();
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
assert(freeConns.front().idleSince.tv_sec + maxIdleSecs > now.tv_sec);
|
||||||
ret = freeConns.front().conn;
|
ret = freeConns.front().conn;
|
||||||
freeConns.pop_front();
|
freeConns.pop_front();
|
||||||
|
//assert(connMutexes[ret].try_lock());
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -402,19 +428,8 @@ void S3Storage::returnConnection(ms3_st *ms3)
|
|||||||
clock_gettime(CLOCK_MONOTONIC_COARSE, &conn.idleSince);
|
clock_gettime(CLOCK_MONOTONIC_COARSE, &conn.idleSince);
|
||||||
|
|
||||||
boost::unique_lock<boost::mutex> s(connMutex);
|
boost::unique_lock<boost::mutex> s(connMutex);
|
||||||
// prune the list
|
|
||||||
while (!freeConns.empty())
|
|
||||||
{
|
|
||||||
Connection &back = freeConns.back();
|
|
||||||
if (back.idleSince.tv_sec + maxIdleSecs <= back.idleSince.tv_sec)
|
|
||||||
{
|
|
||||||
ms3_deinit(back.conn);
|
|
||||||
freeConns.pop_back();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
break; // everything in front of this entry will not have been idle enough
|
|
||||||
}
|
|
||||||
freeConns.push_front(conn);
|
freeConns.push_front(conn);
|
||||||
|
//connMutexes[ms3].unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
#define S3STORAGE_H_
|
#define S3STORAGE_H_
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <map>
|
||||||
#include "CloudStorage.h"
|
#include "CloudStorage.h"
|
||||||
extern "C"
|
extern "C"
|
||||||
{
|
{
|
||||||
@@ -49,9 +50,12 @@ class S3Storage : public CloudStorage
|
|||||||
ms3_st *conn;
|
ms3_st *conn;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// for sanity checking
|
||||||
|
//std::map<ms3_st *, boost::mutex> connMutexes;
|
||||||
|
|
||||||
boost::mutex connMutex;
|
boost::mutex connMutex;
|
||||||
std::deque<Connection> freeConns; // using this as a stack to keep lru objects together
|
std::deque<Connection> freeConns; // using this as a stack to keep lru objects together
|
||||||
const time_t maxIdleSecs = 120;
|
const time_t maxIdleSecs = 30;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user