From 5bbd21b8e0d03bf28264324a7a50f1fdd92dea0d Mon Sep 17 00:00:00 2001 From: benthompson15 Date: Wed, 20 Nov 2019 11:57:55 -0600 Subject: [PATCH] MCOL-3577: Add functionality to sync S3 storage for suspendDatabaseWrites. --- procmgr/processmanager.cpp | 12 +++++ storage-manager/CMakeLists.txt | 1 + storage-manager/include/messageFormat.h | 3 +- storage-manager/src/ProcessTask.cpp | 4 ++ storage-manager/src/SyncTask.cpp | 62 +++++++++++++++++++++++++ storage-manager/src/SyncTask.h | 41 ++++++++++++++++ storage-manager/src/Synchronizer.cpp | 36 ++++++++++++++ storage-manager/src/Synchronizer.h | 6 +-- utils/cloudio/SMComm.cpp | 14 ++++++ utils/cloudio/SMComm.h | 2 + utils/cloudio/SMFileSystem.cpp | 5 ++ utils/cloudio/SMFileSystem.h | 2 + utils/idbdatafile/IDBFileSystem.h | 5 ++ 13 files changed, 189 insertions(+), 4 deletions(-) create mode 100644 storage-manager/src/SyncTask.cpp create mode 100644 storage-manager/src/SyncTask.h diff --git a/procmgr/processmanager.cpp b/procmgr/processmanager.cpp index b2315f5f4..be3ae085e 100644 --- a/procmgr/processmanager.cpp +++ b/procmgr/processmanager.cpp @@ -2712,6 +2712,8 @@ void processMSG(messageqcpp::IOSocket* cfIos) ByteStream::byte ackResponse = API_FAILURE; log.writeLog(__LINE__, "MSG RECEIVED: suspend database writes"); + string storageType = Config::makeConfig()->getConfig("Installation", "DBRootStorageType"); + // GRACEFUL_WAIT means that we are Suspending writes, but waiting for all // transactions to finish or rollback as commanded. This is only set if there // are, in fact, transactions active (or cpimport). @@ -2785,6 +2787,16 @@ void processMSG(messageqcpp::IOSocket* cfIos) dbrm.setSystemSuspended(false); } + if (storageType == "storagemanager") + { + string DBRMroot; + oam.getSystemConfig("DBRMRoot", DBRMroot); + + string currentFileName = DBRMroot + "_current"; + IDBFileSystem &fs = IDBPolicy::getFs(currentFileName.c_str()); + fs.filesystemSync(); + } + ackMsg.reset(); ackMsg << (ByteStream::byte) oam::ACK; ackMsg << actionType; diff --git a/storage-manager/CMakeLists.txt b/storage-manager/CMakeLists.txt index ff1138aa5..36fcfc796 100755 --- a/storage-manager/CMakeLists.txt +++ b/storage-manager/CMakeLists.txt @@ -34,6 +34,7 @@ set(storagemanager_SRCS src/Utilities.cpp src/Ownership.cpp src/PrefixCache.cpp + src/SyncTask.cpp ../utils/common/crashtrace.cpp ) diff --git a/storage-manager/include/messageFormat.h b/storage-manager/include/messageFormat.h index d0a20ca96..aefcea4da 100755 --- a/storage-manager/include/messageFormat.h +++ b/storage-manager/include/messageFormat.h @@ -82,7 +82,8 @@ enum Opcodes { TRUNCATE, LIST_DIRECTORY, PING, - COPY + COPY, + SYNC }; /* diff --git a/storage-manager/src/ProcessTask.cpp b/storage-manager/src/ProcessTask.cpp index feb79ca33..c6cdc337f 100644 --- a/storage-manager/src/ProcessTask.cpp +++ b/storage-manager/src/ProcessTask.cpp @@ -33,6 +33,7 @@ #include "TruncateTask.h" #include "UnlinkTask.h" #include "WriteTask.h" +#include "SyncTask.h" #include "SessionManager.h" #include "SMLogging.h" @@ -109,6 +110,9 @@ void ProcessTask::operator()() case PING: task.reset(new PingTask(sock, length)); break; + case SYNC: + task.reset(new SyncTask(sock, length)); + break; case COPY: task.reset(new CopyTask(sock, length)); break; diff --git a/storage-manager/src/SyncTask.cpp b/storage-manager/src/SyncTask.cpp new file mode 100644 index 000000000..a962f9151 --- /dev/null +++ b/storage-manager/src/SyncTask.cpp @@ -0,0 +1,62 @@ +/* Copyright (C) 2019 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + + +#include "SyncTask.h" +#include "Synchronizer.h" +#include "messageFormat.h" +#include + +namespace storagemanager +{ + +SyncTask::SyncTask(int sock, uint len) : PosixTask(sock, len) +{ +} + +SyncTask::~SyncTask() +{ +} + +bool SyncTask::run() +{ + // force flush synchronizer + uint8_t buf; + + if (getLength() > 1) + { + handleError("SyncTask", E2BIG); + return true; + } + // consume the msg + bool success = read(&buf, getLength()); + if (!success) + { + handleError("SyncTask", errno); + return false; + } + + Synchronizer::get()->syncNow(); + + // send generic success response + sm_response ret; + ret.returnCode = 0; + success = write(ret, 0); + return success; +} + +} diff --git a/storage-manager/src/SyncTask.h b/storage-manager/src/SyncTask.h new file mode 100644 index 000000000..c6b3a4abe --- /dev/null +++ b/storage-manager/src/SyncTask.h @@ -0,0 +1,41 @@ +/* Copyright (C) 2019 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + + + +#ifndef SYNCTASK_H_ +#define SYNCTASK_H_ + +#include "PosixTask.h" + +namespace storagemanager +{ + +class SyncTask : public PosixTask +{ + public: + SyncTask(int sock, uint length); + virtual ~SyncTask(); + + bool run(); + + private: + SyncTask(); +}; + +} +#endif diff --git a/storage-manager/src/Synchronizer.cpp b/storage-manager/src/Synchronizer.cpp index 979c1ac94..017a78d61 100644 --- a/storage-manager/src/Synchronizer.cpp +++ b/storage-manager/src/Synchronizer.cpp @@ -185,6 +185,10 @@ void Synchronizer::deletedObjects(const bf::path &prefix, const vector & void Synchronizer::flushObject(const bf::path &prefix, const string &_key) { string key = (prefix/_key).string(); + + while (blockNewJobs) + boost::this_thread::sleep_for(boost::chrono::seconds(1)); + boost::unique_lock s(mutex); // if there is something to do on key, it should be either in pendingOps or opsInProgress @@ -307,13 +311,45 @@ void Synchronizer::syncNow(const bf::path &prefix) if (job.first.find(prefix.string()) == 0) makeJob(job.first); uncommittedJournalSize[prefix] = 0; + lock.unlock(); threadPool.reset(new ThreadPool()); threadPool->setMaxThreads(maxUploads); + lock.lock(); blockNewJobs = false; } + +void Synchronizer::syncNow() +{ + boost::unique_lock lock(mutex); + + // this is pretty hacky. when time permits, implement something better. + // + // Issue all of the pendingOps for the given prefix + // recreate the threadpool (dtor returns once all jobs have finished) + // resume normal operation + + blockNewJobs = true; + while (pendingOps.size() != 0) + { + for (auto &job : pendingOps) + makeJob(job.first); + for (auto it = uncommittedJournalSize.begin(); it != uncommittedJournalSize.end(); ++it) + it->second = 0; + lock.unlock(); + while (opsInProgress.size() > 0) + boost::this_thread::sleep_for(boost::chrono::seconds(1)); + if (pendingOps.size() != 0) + logger->log(LOG_DEBUG,"Synchronizer syncNow pendingOps not empty."); + lock.lock(); + } + blockNewJobs = false; +} + + void Synchronizer::forceFlush() { boost::unique_lock lock(mutex); + syncThread.interrupt(); } diff --git a/storage-manager/src/Synchronizer.h b/storage-manager/src/Synchronizer.h index 2ed4192a5..908a4e5e8 100644 --- a/storage-manager/src/Synchronizer.h +++ b/storage-manager/src/Synchronizer.h @@ -53,8 +53,8 @@ class Synchronizer : public boost::noncopyable , public ConfigListener void deletedObjects(const boost::filesystem::path &firstDir, const std::vector &keys); void flushObject(const boost::filesystem::path &firstDir, const std::string &key); void forceFlush(); // ideally, make a version of this that takes a firstDir parameter - - + void syncNow(); // synchronous version of force for SyncTask + void newPrefix(const boost::filesystem::path &p); void dropPrefix(const boost::filesystem::path &p); @@ -118,7 +118,7 @@ class Synchronizer : public boost::noncopyable , public ConfigListener bool blockNewJobs; void syncNow(const boost::filesystem::path &prefix); // a synchronous version of forceFlush() - + // some KPIs size_t numBytesRead, numBytesWritten, numBytesUploaded, numBytesDownloaded, flushesTriggeredBySize, flushesTriggeredByTimer, journalsMerged, objectsSyncedWithNoJournal, diff --git a/utils/cloudio/SMComm.cpp b/utils/cloudio/SMComm.cpp index 912596f83..59d93c78f 100644 --- a/utils/cloudio/SMComm.cpp +++ b/utils/cloudio/SMComm.cpp @@ -247,6 +247,20 @@ int SMComm::ping() common_exit(command, response, err); } +int SMComm::sync() +{ + ByteStream *command = buffers.getByteStream(); + ByteStream *response = buffers.getByteStream(); + ssize_t err; + + *command << (uint8_t) storagemanager::SYNC; + err = sockets.send_recv(*command, response); + if (err) + common_exit(command, response, err); + check_for_error(command, response, err); + common_exit(command, response, err); +} + int SMComm::copyFile(const string &file1, const string &file2) { ByteStream *command = buffers.getByteStream(); diff --git a/utils/cloudio/SMComm.h b/utils/cloudio/SMComm.h index 8ae7a853d..d7240e73a 100644 --- a/utils/cloudio/SMComm.h +++ b/utils/cloudio/SMComm.h @@ -59,6 +59,8 @@ class SMComm : public boost::noncopyable // the specified S3 bucket. Need to define specific error codes. int ping(); + int sync(); + int copyFile(const std::string &file1, const std::string &file2); virtual ~SMComm(); diff --git a/utils/cloudio/SMFileSystem.cpp b/utils/cloudio/SMFileSystem.cpp index 5cf1ec453..fcd12b073 100644 --- a/utils/cloudio/SMFileSystem.cpp +++ b/utils/cloudio/SMFileSystem.cpp @@ -110,4 +110,9 @@ bool SMFileSystem::filesystemIsUp() const return (comm->ping() == 0); } +bool SMFileSystem::filesystemSync() const +{ + SMComm *comm = SMComm::get(); + return (comm->sync() == 0); +} } diff --git a/utils/cloudio/SMFileSystem.h b/utils/cloudio/SMFileSystem.h index 80a189dfb..4b136a2fd 100644 --- a/utils/cloudio/SMFileSystem.h +++ b/utils/cloudio/SMFileSystem.h @@ -44,6 +44,8 @@ class SMFileSystem : public IDBFileSystem, boost::noncopyable bool isDir(const char* pathname) const; int copyFile(const char* srcPath, const char* destPath) const; bool filesystemIsUp() const; + bool filesystemSync() const; + }; } diff --git a/utils/idbdatafile/IDBFileSystem.h b/utils/idbdatafile/IDBFileSystem.h index 5979d822f..27ef09be7 100644 --- a/utils/idbdatafile/IDBFileSystem.h +++ b/utils/idbdatafile/IDBFileSystem.h @@ -129,6 +129,11 @@ public: return true; } + virtual bool filesystemSync() const + { + return true; + } + protected: IDBFileSystem( Types type );