diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index b4a930617..dee3fb423 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -686,6 +686,8 @@ int ServiceDMLProc::Child() dmlserver.start(); + // WIP the rc looks misguiding b/c DMLProc now can legitimately quits from DMLServer::start() + // so Child() should return dmlserver.start(). return 1; } diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index 112b23241..da7906be7 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -98,7 +98,7 @@ boost::mutex PackageHandler::tableOidMutex; // If FORCE is set, we can't rollback. struct CancellationThread { - CancellationThread(DBRM* aDbrm) : fDbrm(aDbrm) + CancellationThread(DBRM* aDbrm, DMLServer& aServer) : fDbrm(aDbrm), fServer(aServer) {} void operator()() { @@ -118,6 +118,9 @@ struct CancellationThread // Check to see if someone has ordered a shutdown or suspend with rollback. (void)fDbrm->getSystemShutdownPending(bRollback, bForce); + if (bForce) + break; + if (bDoingRollback && bRollback) { continue; @@ -162,6 +165,7 @@ struct CancellationThread DMLProcessor::log(oss1.str(), logging::LOG_TYPE_INFO); } + // WIP Need to set cluster to read-only via CMAPI before shutting the cluster down. if (fDbrm->isReadWrite()) { continue; @@ -288,10 +292,15 @@ struct CancellationThread oss2 << "DMLProc has rolled back " << idleTransCount << " idle transactions."; DMLProcessor::log(oss2.str(), logging::LOG_TYPE_INFO); } + // Here is the end of the rollback if so DMLProc rollbacks what it can. + break; } } + // Setting the flag to tell DMLServer to exit. + fServer.startShutdown(); } DBRM* fDbrm; + DMLServer& fServer; }; PackageHandler::PackageHandler(const messageqcpp::IOSocket& ios, @@ -1195,7 +1204,7 @@ void added_a_pm(int) } DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm) : - fPackageMaxThreads(packageMaxThreads), fPackageWorkQueueSize(packageWorkQueueSize), fDbrm(dbrm) + fPackageMaxThreads(packageMaxThreads), fPackageWorkQueueSize(packageWorkQueueSize), fDbrm(dbrm), fShutdownFlag(false) { fMqServer.reset(new MessageQueueServer("DMLProc")); @@ -1214,14 +1223,24 @@ void DMLServer::start() // CancellationThread is for telling all active transactions // to quit working because the system is either going down // or going into write suspend mode - CancellationThread cancelObject(fDbrm); + CancellationThread cancelObject(fDbrm, *this); boost::thread cancelThread(cancelObject); cout << "DMLProc is ready..." << endl; + const static struct timespec timeout = {1, 100}; // roughly 1 second TO for (;;) { - ios = fMqServer->accept(); + ios = fMqServer->accept(&timeout); + // MCS polls in a loop watching for a pending shutdown + // that is signalled via fShutdownFlag set in a + // CancellationThread. CT sets the flag if a cluster state + // has SS_SHUTDOWNPENDING value set. + while (!ios.hasSocketDescriptor() && !pendingShutdown()) + ios = fMqServer->accept(&timeout); + + if (pendingShutdown()) + break; ios.setSockID(nextID++); fDmlPackagepool.invoke(DMLProcessor(ios, fDbrm)); } diff --git a/dmlproc/dmlprocessor.h b/dmlproc/dmlprocessor.h index 6a18493b1..888cadf08 100644 --- a/dmlproc/dmlprocessor.h +++ b/dmlproc/dmlprocessor.h @@ -146,6 +146,16 @@ public: fPackageWorkQueueSize = workQueueSize; } + inline bool pendingShutdown() const + { + return fShutdownFlag; + } + + inline void startShutdown() + { + fShutdownFlag = true; + } + private: //not copyable DMLServer(const DMLServer& rhs); @@ -156,6 +166,7 @@ private: boost::scoped_ptr fMqServer; BRM::DBRM* fDbrm; + bool fShutdownFlag; public: /** @brief the thread pool for processing dml packages diff --git a/utils/messageqcpp/iosocket.h b/utils/messageqcpp/iosocket.h index 0d09fa161..6ec946c81 100644 --- a/utils/messageqcpp/iosocket.h +++ b/utils/messageqcpp/iosocket.h @@ -42,6 +42,7 @@ #include "socket.h" #include "socketparms.h" #include "bytestream.h" +#include "checks.h" class MessageQTestSuite; @@ -198,6 +199,7 @@ public: inline virtual bool isConnected() const; inline virtual bool hasData() const; + inline bool hasSocketDescriptor() const; friend class ::MessageQTestSuite; @@ -217,6 +219,12 @@ inline const sockaddr IOSocket::sa() const { return fSa; } + +inline bool IOSocket::hasSocketDescriptor() const +{ + return fSocket && utils::is_nonnegative(fSocket->socketParms().sd()); +} + inline void IOSocket::sa(const sockaddr* sa) { fSa = *sa;