1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-4675 DMLProc now automatically and gracefully shutdowns when a cluster state is set to

SS_SHUTDOWN_PENDING | SS_ROLLBACK
This commit is contained in:
Roman Nozdrin
2021-05-18 18:46:35 +00:00
committed by Leonid Fedorov
parent 646ffb6f95
commit 90397dfed0
4 changed files with 44 additions and 4 deletions

View File

@ -686,6 +686,8 @@ int ServiceDMLProc::Child()
dmlserver.start(); dmlserver.start();
// WIP the rc looks misguiding b/c DMLProc now can legitimately quits from DMLServer::start()
// so Child() should return dmlserver.start().
return 1; return 1;
} }

View File

@ -98,7 +98,7 @@ boost::mutex PackageHandler::tableOidMutex;
// If FORCE is set, we can't rollback. // If FORCE is set, we can't rollback.
struct CancellationThread struct CancellationThread
{ {
CancellationThread(DBRM* aDbrm) : fDbrm(aDbrm) CancellationThread(DBRM* aDbrm, DMLServer& aServer) : fDbrm(aDbrm), fServer(aServer)
{} {}
void operator()() void operator()()
{ {
@ -118,6 +118,9 @@ struct CancellationThread
// Check to see if someone has ordered a shutdown or suspend with rollback. // Check to see if someone has ordered a shutdown or suspend with rollback.
(void)fDbrm->getSystemShutdownPending(bRollback, bForce); (void)fDbrm->getSystemShutdownPending(bRollback, bForce);
if (bForce)
break;
if (bDoingRollback && bRollback) if (bDoingRollback && bRollback)
{ {
continue; continue;
@ -162,6 +165,7 @@ struct CancellationThread
DMLProcessor::log(oss1.str(), logging::LOG_TYPE_INFO); DMLProcessor::log(oss1.str(), logging::LOG_TYPE_INFO);
} }
// WIP Need to set cluster to read-only via CMAPI before shutting the cluster down.
if (fDbrm->isReadWrite()) if (fDbrm->isReadWrite())
{ {
continue; continue;
@ -288,10 +292,15 @@ struct CancellationThread
oss2 << "DMLProc has rolled back " << idleTransCount << " idle transactions."; oss2 << "DMLProc has rolled back " << idleTransCount << " idle transactions.";
DMLProcessor::log(oss2.str(), logging::LOG_TYPE_INFO); DMLProcessor::log(oss2.str(), logging::LOG_TYPE_INFO);
} }
// Here is the end of the rollback if so DMLProc rollbacks what it can.
break;
} }
} }
// Setting the flag to tell DMLServer to exit.
fServer.startShutdown();
} }
DBRM* fDbrm; DBRM* fDbrm;
DMLServer& fServer;
}; };
PackageHandler::PackageHandler(const messageqcpp::IOSocket& ios, PackageHandler::PackageHandler(const messageqcpp::IOSocket& ios,
@ -1195,7 +1204,7 @@ void added_a_pm(int)
} }
DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm) : DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm) :
fPackageMaxThreads(packageMaxThreads), fPackageWorkQueueSize(packageWorkQueueSize), fDbrm(dbrm) fPackageMaxThreads(packageMaxThreads), fPackageWorkQueueSize(packageWorkQueueSize), fDbrm(dbrm), fShutdownFlag(false)
{ {
fMqServer.reset(new MessageQueueServer("DMLProc")); fMqServer.reset(new MessageQueueServer("DMLProc"));
@ -1214,14 +1223,24 @@ void DMLServer::start()
// CancellationThread is for telling all active transactions // CancellationThread is for telling all active transactions
// to quit working because the system is either going down // to quit working because the system is either going down
// or going into write suspend mode // or going into write suspend mode
CancellationThread cancelObject(fDbrm); CancellationThread cancelObject(fDbrm, *this);
boost::thread cancelThread(cancelObject); boost::thread cancelThread(cancelObject);
cout << "DMLProc is ready..." << endl; cout << "DMLProc is ready..." << endl;
const static struct timespec timeout = {1, 100}; // roughly 1 second TO
for (;;) for (;;)
{ {
ios = fMqServer->accept(); ios = fMqServer->accept(&timeout);
// MCS polls in a loop watching for a pending shutdown
// that is signalled via fShutdownFlag set in a
// CancellationThread. CT sets the flag if a cluster state
// has SS_SHUTDOWNPENDING value set.
while (!ios.hasSocketDescriptor() && !pendingShutdown())
ios = fMqServer->accept(&timeout);
if (pendingShutdown())
break;
ios.setSockID(nextID++); ios.setSockID(nextID++);
fDmlPackagepool.invoke(DMLProcessor(ios, fDbrm)); fDmlPackagepool.invoke(DMLProcessor(ios, fDbrm));
} }

View File

@ -146,6 +146,16 @@ public:
fPackageWorkQueueSize = workQueueSize; fPackageWorkQueueSize = workQueueSize;
} }
inline bool pendingShutdown() const
{
return fShutdownFlag;
}
inline void startShutdown()
{
fShutdownFlag = true;
}
private: private:
//not copyable //not copyable
DMLServer(const DMLServer& rhs); DMLServer(const DMLServer& rhs);
@ -156,6 +166,7 @@ private:
boost::scoped_ptr<messageqcpp::MessageQueueServer> fMqServer; boost::scoped_ptr<messageqcpp::MessageQueueServer> fMqServer;
BRM::DBRM* fDbrm; BRM::DBRM* fDbrm;
bool fShutdownFlag;
public: public:
/** @brief the thread pool for processing dml packages /** @brief the thread pool for processing dml packages

View File

@ -42,6 +42,7 @@
#include "socket.h" #include "socket.h"
#include "socketparms.h" #include "socketparms.h"
#include "bytestream.h" #include "bytestream.h"
#include "checks.h"
class MessageQTestSuite; class MessageQTestSuite;
@ -198,6 +199,7 @@ public:
inline virtual bool isConnected() const; inline virtual bool isConnected() const;
inline virtual bool hasData() const; inline virtual bool hasData() const;
inline bool hasSocketDescriptor() const;
friend class ::MessageQTestSuite; friend class ::MessageQTestSuite;
@ -217,6 +219,12 @@ inline const sockaddr IOSocket::sa() const
{ {
return fSa; return fSa;
} }
inline bool IOSocket::hasSocketDescriptor() const
{
return fSocket && utils::is_nonnegative(fSocket->socketParms().sd());
}
inline void IOSocket::sa(const sockaddr* sa) inline void IOSocket::sa(const sockaddr* sa)
{ {
fSa = *sa; fSa = *sa;