1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-20 09:07:44 +03:00
Alexey Antipovsky a1b4a49d5c
feat(SM): MCOL-5785 S3Storage improvements [develop-23.02] (#3270)
Update libmarias3
fix build with the recent libmarias3

feat(SM): MCOL-5785 Add timeout options for S3Storage

    In some unfortunate situations StorageManager may get stuck on
    network operations. This commit adds the ability to set network
    timeouts which will help to ensure that the system is more
    responsive.

feat(SM): MCOL-5785 Add smps & smkill tools

    * `smps` shows all active S3 network operations
    * `smkill` terminates S3 network operations

    NB! At the moment smkill is able to terminate operations
    that are stuck on retries, but not hang inside the libcurl
    call. In other words if you want to terminate all operations
    you should configure `connect_timeout` & `timeout`

Install smkill & smps

Add install for new binaries
2024-08-17 20:59:29 +01:00

323 lines
9.2 KiB
C++

/* Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "SMComm.h"
#include "bytestream.h"
#include "messageFormat.h"
using namespace std;
using namespace messageqcpp;
namespace
{
idbdatafile::SMComm* instance = NULL;
boost::mutex m;
}; // namespace
namespace idbdatafile
{
SMComm* SMComm::get()
{
if (instance)
return instance;
boost::mutex::scoped_lock sl(m);
if (instance)
return instance;
instance = new SMComm();
return instance;
}
// timesavers
#define common_exit(bs1, bs2, retCode) \
{ \
int l_errno = errno; \
buffers.returnByteStream(bs1); \
buffers.returnByteStream(bs2); \
errno = l_errno; \
return retCode; \
}
// bs1 is the bytestream ptr with the command to SMComm.
// bs2 is the bytestream pointer with the response from SMComm.
// retCode is the var to store the return code in from the msg.
// returns with the output pointer at the fcn-specific data
#define check_for_error(bs1, bs2, retCode) \
{ \
int l_errno; \
*bs2 >> retCode; \
if (retCode < 0) \
{ \
*bs2 >> l_errno; \
errno = l_errno; \
common_exit(bs1, bs2, retCode); \
} \
else \
errno = 0; \
}
SMComm::SMComm()
{
char buf[4096];
cwd = ::getcwd(buf, 4096);
}
SMComm::~SMComm()
{
}
string SMComm::getAbsFilename(const string& filename)
{
if (filename[0] == '/')
return filename;
else
return cwd + '/' + filename;
}
int SMComm::open(const string& filename, const int mode, struct stat* statbuf)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
string absfilename(getAbsFilename(filename));
*command << (uint8_t)storagemanager::OPEN << mode << absfilename;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
memcpy(statbuf, response->buf(), sizeof(*statbuf));
common_exit(command, response, err);
}
ssize_t SMComm::pread(const string& filename, void* buf, const size_t count, const off_t offset)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
string absfilename(getAbsFilename(filename));
*command << (uint8_t)storagemanager::READ << count << offset << absfilename;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
memcpy(buf, response->buf(), err);
common_exit(command, response, err);
}
ssize_t SMComm::pwrite(const string& filename, const void* buf, const size_t count, const off_t offset)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
string absfilename(getAbsFilename(filename));
*command << (uint8_t)storagemanager::WRITE << count << offset << absfilename;
command->needAtLeast(count);
uint8_t* cmdBuf = command->getInputPtr();
memcpy(cmdBuf, buf, count);
command->advanceInputPtr(count);
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
ssize_t SMComm::append(const string& filename, const void* buf, const size_t count)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
string absfilename(getAbsFilename(filename));
*command << (uint8_t)storagemanager::APPEND << count << absfilename;
command->needAtLeast(count);
uint8_t* cmdBuf = command->getInputPtr();
memcpy(cmdBuf, buf, count);
command->advanceInputPtr(count);
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
int SMComm::unlink(const string& filename)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
string absfilename(getAbsFilename(filename));
*command << (uint8_t)storagemanager::UNLINK << absfilename;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
int SMComm::stat(const string& filename, struct stat* statbuf)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
string absfilename(getAbsFilename(filename));
*command << (uint8_t)storagemanager::STAT << absfilename;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
memcpy(statbuf, response->buf(), sizeof(*statbuf));
common_exit(command, response, err);
}
int SMComm::truncate(const string& filename, const off64_t length)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
string absfilename(getAbsFilename(filename));
*command << (uint8_t)storagemanager::TRUNCATE << length << absfilename;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
int SMComm::listDirectory(const string& path, list<string>* entries)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
string abspath(getAbsFilename(path));
*command << (uint8_t)storagemanager::LIST_DIRECTORY << abspath;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
uint32_t numElements;
string stmp;
entries->clear();
*response >> numElements;
while (numElements > 0)
{
*response >> stmp;
entries->push_back(stmp);
numElements--;
}
common_exit(command, response, err);
}
int SMComm::ping()
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
*command << (uint8_t)storagemanager::PING;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
int SMComm::sync()
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
*command << (uint8_t)storagemanager::SYNC;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
int SMComm::copyFile(const string& file1, const string& file2)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
string absfilename1(getAbsFilename(file1));
string absfilename2(getAbsFilename(file2));
*command << (uint8_t)storagemanager::COPY << absfilename1 << absfilename2;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
int SMComm::listIOTasks(vector<storagemanager::list_iotask_resp_entry>* entries)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
*command << (uint8_t)storagemanager::LIST_IOTASKS;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
uint32_t numElements;
entries->clear();
*response >> numElements;
entries->reserve(numElements);
while (numElements > 0)
{
storagemanager::list_iotask_resp_entry entry;
*response >> entry.id >> entry.runningTime;
entries->push_back(std::move(entry));
--numElements;
}
common_exit(command, response, err);
}
int SMComm::killIOTask(uint64_t id)
{
ByteStream* command = buffers.getByteStream();
ByteStream* response = buffers.getByteStream();
ssize_t err;
*command << (uint8_t)storagemanager::TERMINATE_IOTASK << id;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
} // namespace idbdatafile