1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-3577: Add functionality to sync S3 storage for suspendDatabaseWrites.

This commit is contained in:
benthompson15
2019-11-20 11:57:55 -06:00
parent dbb9b21d26
commit 5bbd21b8e0
13 changed files with 189 additions and 4 deletions

View File

@ -2712,6 +2712,8 @@ void processMSG(messageqcpp::IOSocket* cfIos)
ByteStream::byte ackResponse = API_FAILURE;
log.writeLog(__LINE__, "MSG RECEIVED: suspend database writes");
string storageType = Config::makeConfig()->getConfig("Installation", "DBRootStorageType");
// GRACEFUL_WAIT means that we are Suspending writes, but waiting for all
// transactions to finish or rollback as commanded. This is only set if there
// are, in fact, transactions active (or cpimport).
@ -2785,6 +2787,16 @@ void processMSG(messageqcpp::IOSocket* cfIos)
dbrm.setSystemSuspended(false);
}
if (storageType == "storagemanager")
{
string DBRMroot;
oam.getSystemConfig("DBRMRoot", DBRMroot);
string currentFileName = DBRMroot + "_current";
IDBFileSystem &fs = IDBPolicy::getFs(currentFileName.c_str());
fs.filesystemSync();
}
ackMsg.reset();
ackMsg << (ByteStream::byte) oam::ACK;
ackMsg << actionType;

View File

@ -34,6 +34,7 @@ set(storagemanager_SRCS
src/Utilities.cpp
src/Ownership.cpp
src/PrefixCache.cpp
src/SyncTask.cpp
../utils/common/crashtrace.cpp
)

View File

@ -82,7 +82,8 @@ enum Opcodes {
TRUNCATE,
LIST_DIRECTORY,
PING,
COPY
COPY,
SYNC
};
/*

View File

@ -33,6 +33,7 @@
#include "TruncateTask.h"
#include "UnlinkTask.h"
#include "WriteTask.h"
#include "SyncTask.h"
#include "SessionManager.h"
#include "SMLogging.h"
@ -109,6 +110,9 @@ void ProcessTask::operator()()
case PING:
task.reset(new PingTask(sock, length));
break;
case SYNC:
task.reset(new SyncTask(sock, length));
break;
case COPY:
task.reset(new CopyTask(sock, length));
break;

View File

@ -0,0 +1,62 @@
/* Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "SyncTask.h"
#include "Synchronizer.h"
#include "messageFormat.h"
#include <errno.h>
namespace storagemanager
{
SyncTask::SyncTask(int sock, uint len) : PosixTask(sock, len)
{
}
SyncTask::~SyncTask()
{
}
bool SyncTask::run()
{
// force flush synchronizer
uint8_t buf;
if (getLength() > 1)
{
handleError("SyncTask", E2BIG);
return true;
}
// consume the msg
bool success = read(&buf, getLength());
if (!success)
{
handleError("SyncTask", errno);
return false;
}
Synchronizer::get()->syncNow();
// send generic success response
sm_response ret;
ret.returnCode = 0;
success = write(ret, 0);
return success;
}
}

View File

@ -0,0 +1,41 @@
/* Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef SYNCTASK_H_
#define SYNCTASK_H_
#include "PosixTask.h"
namespace storagemanager
{
class SyncTask : public PosixTask
{
public:
SyncTask(int sock, uint length);
virtual ~SyncTask();
bool run();
private:
SyncTask();
};
}
#endif

View File

@ -185,6 +185,10 @@ void Synchronizer::deletedObjects(const bf::path &prefix, const vector<string> &
void Synchronizer::flushObject(const bf::path &prefix, const string &_key)
{
string key = (prefix/_key).string();
while (blockNewJobs)
boost::this_thread::sleep_for(boost::chrono::seconds(1));
boost::unique_lock<boost::mutex> s(mutex);
// if there is something to do on key, it should be either in pendingOps or opsInProgress
@ -307,13 +311,45 @@ void Synchronizer::syncNow(const bf::path &prefix)
if (job.first.find(prefix.string()) == 0)
makeJob(job.first);
uncommittedJournalSize[prefix] = 0;
lock.unlock();
threadPool.reset(new ThreadPool());
threadPool->setMaxThreads(maxUploads);
lock.lock();
blockNewJobs = false;
}
void Synchronizer::syncNow()
{
boost::unique_lock<boost::mutex> lock(mutex);
// this is pretty hacky. when time permits, implement something better.
//
// Issue all of the pendingOps for the given prefix
// recreate the threadpool (dtor returns once all jobs have finished)
// resume normal operation
blockNewJobs = true;
while (pendingOps.size() != 0)
{
for (auto &job : pendingOps)
makeJob(job.first);
for (auto it = uncommittedJournalSize.begin(); it != uncommittedJournalSize.end(); ++it)
it->second = 0;
lock.unlock();
while (opsInProgress.size() > 0)
boost::this_thread::sleep_for(boost::chrono::seconds(1));
if (pendingOps.size() != 0)
logger->log(LOG_DEBUG,"Synchronizer syncNow pendingOps not empty.");
lock.lock();
}
blockNewJobs = false;
}
void Synchronizer::forceFlush()
{
boost::unique_lock<boost::mutex> lock(mutex);
syncThread.interrupt();
}

View File

@ -53,7 +53,7 @@ class Synchronizer : public boost::noncopyable , public ConfigListener
void deletedObjects(const boost::filesystem::path &firstDir, const std::vector<std::string> &keys);
void flushObject(const boost::filesystem::path &firstDir, const std::string &key);
void forceFlush(); // ideally, make a version of this that takes a firstDir parameter
void syncNow(); // synchronous version of force for SyncTask
void newPrefix(const boost::filesystem::path &p);
void dropPrefix(const boost::filesystem::path &p);

View File

@ -247,6 +247,20 @@ int SMComm::ping()
common_exit(command, response, err);
}
int SMComm::sync()
{
ByteStream *command = buffers.getByteStream();
ByteStream *response = buffers.getByteStream();
ssize_t err;
*command << (uint8_t) storagemanager::SYNC;
err = sockets.send_recv(*command, response);
if (err)
common_exit(command, response, err);
check_for_error(command, response, err);
common_exit(command, response, err);
}
int SMComm::copyFile(const string &file1, const string &file2)
{
ByteStream *command = buffers.getByteStream();

View File

@ -59,6 +59,8 @@ class SMComm : public boost::noncopyable
// the specified S3 bucket. Need to define specific error codes.
int ping();
int sync();
int copyFile(const std::string &file1, const std::string &file2);
virtual ~SMComm();

View File

@ -110,4 +110,9 @@ bool SMFileSystem::filesystemIsUp() const
return (comm->ping() == 0);
}
bool SMFileSystem::filesystemSync() const
{
SMComm *comm = SMComm::get();
return (comm->sync() == 0);
}
}

View File

@ -44,6 +44,8 @@ class SMFileSystem : public IDBFileSystem, boost::noncopyable
bool isDir(const char* pathname) const;
int copyFile(const char* srcPath, const char* destPath) const;
bool filesystemIsUp() const;
bool filesystemSync() const;
};
}

View File

@ -129,6 +129,11 @@ public:
return true;
}
virtual bool filesystemSync() const
{
return true;
}
protected:
IDBFileSystem( Types type );