You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	Read DBRM unresponsive timeout from Columnstore.xml DBRMUnresponsiveTimeout Tag instead of hardcoded 300 sec value.
		
			
				
	
	
		
			269 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			269 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /******************************************************************************
 | |
|  * $Id: masterdbrmnode.h 1823 2013-01-21 14:13:09Z rdempsey $
 | |
|  *
 | |
|  *****************************************************************************/
 | |
| 
 | |
| /** @file
 | |
|  * class MasterDBRMNode interface
 | |
|  */
 | |
| 
 | |
| #pragma once
 | |
| 
 | |
| #include <boost/thread.hpp>
 | |
| #include <boost/scoped_ptr.hpp>
 | |
| 
 | |
| #include <stdint.h>
 | |
| #include "brmtypes.h"
 | |
| #include "lbidresourcegraph.h"
 | |
| #include "messagequeue.h"
 | |
| #include "bytestream.h"
 | |
| #include "configcpp.h"
 | |
| #include "sessionmanagerserver.h"
 | |
| #include "oidserver.h"
 | |
| #include "tablelockserver.h"
 | |
| #include "autoincrementmanager.h"
 | |
| 
 | |
| namespace BRM
 | |
| {
 | |
| /** @brief The Master node of the DBRM system.
 | |
|  *
 | |
|  * There are 3 components of the Distributed BRM (DBRM).
 | |
|  * \li The interface
 | |
|  * \li The Master node
 | |
|  * \li Slave nodes
 | |
|  *
 | |
|  * The DBRM components effectively implement a networking & synchronization
 | |
|  * layer to the BlockResolutionManager class so that every node that needs
 | |
|  * BRM data always has an up-to-date copy of it locally.  An operation that changes
 | |
|  * BRM data is duplicated on all hosts that run a Slave node so that every
 | |
|  * node has identical copies.  All "read" operations are satisfied locally.
 | |
|  *
 | |
|  * The MasterDBRMNode class implements the Master node.  All changes to BRM
 | |
|  * data are serialized and distributed through the Master node.
 | |
|  *
 | |
|  * The Master node requires configuration file entries for itself and
 | |
|  * every slave it should connect to.
 | |
|  *
 | |
|  * \code
 | |
|  * <DBRM_Controller>
 | |
|  * 	<IPAddr>
 | |
|  *	<Port>
 | |
|  * 	<NumWorkers>N</NumWorkers>
 | |
|  * </DBRM_Controller>
 | |
|  * <DBRM_Worker1>
 | |
|  *	<IPAddr>
 | |
|  *	<Port>
 | |
|  * </DBRM_Worker1>
 | |
|  *	...
 | |
|  * <DBRM_WorkerN>
 | |
|  *	<IPAddr>
 | |
|  *	<Port>
 | |
|  * </DBRM_WorkerN>
 | |
|  * \endcode
 | |
|  */
 | |
| 
 | |
| constexpr size_t connectTimeoutStep = 50000;
 | |
| 
 | |
| class MasterDBRMNode
 | |
| {
 | |
|  public:
 | |
|   MasterDBRMNode();
 | |
|   ~MasterDBRMNode();
 | |
| 
 | |
|   /** @brief The primary function of the class.
 | |
|    *
 | |
|    * The main loop of the master node.  It accepts connections from the DBRM
 | |
|    * class, receives commands, and distributes them to each slave.  It returns
 | |
|    * only after stop() or the destructor is called by another thread.
 | |
|    */
 | |
|   void run();
 | |
| 
 | |
|   /** @brief Tells the Master to shut down cleanly.
 | |
|    *
 | |
|    * Tells the Master to shut down cleanly.
 | |
|    */
 | |
|   void stop();
 | |
| 
 | |
|   /** @brief Effectively makes the whole DBRM system stop.
 | |
|    *
 | |
|    * Grabs a lock that effectively halts all further BRM data changes.
 | |
|    * @warning Use with care.  It's basically an accessor to a raw pthread_mutex.
 | |
|    */
 | |
|   void lock();
 | |
| 
 | |
|   /** @brief Resumes DBRM functionality.
 | |
|    *
 | |
|    * Releases a lock that allows the DBRM to continue processing changes.
 | |
|    * @warning Use with care.  It's basically an accessor to a raw pthread_mutex.
 | |
|    */
 | |
|   void unlock();
 | |
| 
 | |
|   /** @brief Reload the config file and reconnect to all slaves.
 | |
|    *
 | |
|    * Drops all existing connections, reloads the config file and
 | |
|    * reconnects with all slaves.
 | |
|    * @note Doesn't work yet.  Redundant anyway.
 | |
|    */
 | |
|   void reload();
 | |
| 
 | |
|   /** @brief Sets either read/write or read-only mode
 | |
|    *
 | |
|    * Sets either read/write or read-only mode.  When in read-only mode
 | |
|    * all BRM change requests will return ERR_READONLY immediately.
 | |
|    * @param ro true specifies read-only, false specifies read/write
 | |
|    */
 | |
|   void setReadOnly(bool ro);
 | |
| 
 | |
|   /** @brief Returns true if the Master is in read-only mode, false if in RW mode.
 | |
|    *
 | |
|    * @returns true if the Master is in read-only mode, false if in read-write
 | |
|    * mode
 | |
|    */
 | |
|   bool isReadOnly() const
 | |
|   {
 | |
|     return readOnly;
 | |
|   }
 | |
|   /** @brief Connects to the all workers */
 | |
|   void connectToWorkers(const size_t connectTimeoutSecs);
 | |
| 
 | |
|   /** @brief Extracts number of workers and connection timeout from the config */
 | |
|   void getNumWorkersAndTimeout(size_t& connectTimeoutSecs, const std::string& methodName,
 | |
|                                config::Config* config);
 | |
| 
 | |
|  private:
 | |
|   class MsgProcessor
 | |
|   {
 | |
|    public:
 | |
|     MsgProcessor(MasterDBRMNode* master);
 | |
|     ~MsgProcessor();
 | |
|     void operator()();
 | |
| 
 | |
|    private:
 | |
|     MasterDBRMNode* m;
 | |
|   };
 | |
| 
 | |
|   struct ThreadParams
 | |
|   {
 | |
|     messageqcpp::IOSocket* sock;
 | |
|     boost::thread* t;
 | |
|   };
 | |
| 
 | |
|   MasterDBRMNode(const MasterDBRMNode& m) = delete;
 | |
|   MasterDBRMNode& operator=(const MasterDBRMNode& m) = delete;
 | |
| 
 | |
|   void initMsgQueues(config::Config* config);
 | |
|   void msgProcessor();
 | |
|   void distribute(messageqcpp::ByteStream* msg);
 | |
|   void undo() throw();
 | |
|   void confirm();
 | |
|   void sendError(messageqcpp::IOSocket* dest, uint8_t err) const throw();
 | |
|   int gatherResponses(uint8_t cmd, uint32_t msgCmdLength, std::vector<messageqcpp::ByteStream*>* responses,
 | |
|                       bool& readErrFlag) throw();
 | |
|   int compareResponses(uint8_t cmd, uint32_t msgCmdLength,
 | |
|                        const std::vector<messageqcpp::ByteStream*>& responses) const;
 | |
|   void finalCleanup();
 | |
| 
 | |
|   /* Commands the master executes */
 | |
|   void doHalt(messageqcpp::IOSocket* sock);
 | |
|   void doResume(messageqcpp::IOSocket* sock);
 | |
|   void doReload(messageqcpp::IOSocket* sock);
 | |
|   void doSetReadOnly(messageqcpp::IOSocket* sock, bool b);
 | |
|   void doGetReadOnly(messageqcpp::IOSocket* sock);
 | |
|   void doStartReadOnly(messageqcpp::IOSocket* sock);
 | |
| 
 | |
|   /* SessionManager interface */
 | |
|   SessionManagerServer sm;
 | |
|   void doVerID(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetSystemCatalog(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doSysCatVerID(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doNewTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doNewCpimportJob(ThreadParams* p);
 | |
|   void doFinishCpimportJob(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doForceClearAllCpimportJobs(messageqcpp::IOSocket* sock);
 | |
|   void doCommitted(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doRolledBack(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doSIDTIDMap(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetShmContents(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetUniqueUint32(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetUniqueUint64(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetSystemState(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doSetSystemState(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doClearSystemState(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doSessionManagerReset(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetUncommittedLbids(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
| 
 | |
|   /* OID Manager interface */
 | |
|   OIDServer oids;
 | |
|   boost::mutex oidsMutex;
 | |
|   void doAllocOIDs(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doReturnOIDs(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doOidmSize(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doAllocVBOID(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetDBRootOfVBOID(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetVBOIDToDBRootMap(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
| 
 | |
|   /* Table lock interface */
 | |
|   boost::scoped_ptr<TableLockServer> tableLockServer;
 | |
|   void doGetTableLock(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doReleaseTableLock(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doChangeTableLockState(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doChangeTableLockOwner(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetAllTableLocks(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doReleaseAllTableLocks(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetTableLockInfo(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doOwnerCheck(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
| 
 | |
|   /* Autoincrement interface */
 | |
|   AutoincrementManager aiManager;
 | |
|   void doStartAISequence(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetAIRange(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doResetAISequence(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doGetAILock(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doReleaseAILock(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
|   void doDeleteAISequence(messageqcpp::ByteStream& msg, ThreadParams* p);
 | |
| 
 | |
|   messageqcpp::MessageQueueServer* dbrmServer;
 | |
|   std::vector<messageqcpp::MessageQueueClient*> slaves;
 | |
|   std::vector<messageqcpp::MessageQueueClient*>::iterator iSlave;
 | |
|   std::vector<messageqcpp::IOSocket*> activeSessions;
 | |
| 
 | |
|   LBIDResourceGraph* rg;
 | |
| 
 | |
|   boost::mutex mutex;
 | |
|   boost::mutex mutex2;      // protects params and the hand-off  TODO: simplify
 | |
|   boost::mutex slaveLock;   // syncs communication with the slaves
 | |
|   boost::mutex serverLock;  // kludge to synchronize reloading
 | |
|   std::mutex cpimportMutex;
 | |
|   std::condition_variable cpimportJobsCond;
 | |
|   int runners, NumWorkers;
 | |
|   ThreadParams* params;
 | |
|   volatile bool die, halting;
 | |
|   bool reloadCmd;
 | |
|   mutable bool readOnly;
 | |
|   // Maximum time to wait for worker responses/reconfigure before forcing read-only
 | |
|   // Loaded from Columnstore.xml: SystemConfig/DBRMUnresponsiveTimeout (default: 300 seconds)
 | |
|   struct timespec haltTimeout;
 | |
|   mutable bool waitToFinishJobs{false};
 | |
|   struct timespec MSG_TIMEOUT;
 | |
| };
 | |
| 
 | |
| }  // namespace BRM
 |