You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			461 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			461 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /*****************************************************************************
 | |
|  * $Id: sessionmanagerserver.cpp 1906 2013-06-14 19:15:32Z rdempsey $
 | |
|  *
 | |
|  ****************************************************************************/
 | |
| 
 | |
| /*
 | |
|  * This class issues Transaction ID and keeps track of the current version ID
 | |
|  */
 | |
| 
 | |
| #include <sys/types.h>
 | |
| #include <sys/stat.h>
 | |
| #include <cerrno>
 | |
| #include <fcntl.h>
 | |
| #include <mutex>
 | |
| #include <unistd.h>
 | |
| 
 | |
| #include <iostream>
 | |
| #include <string>
 | |
| #include <stdexcept>
 | |
| #include <limits>
 | |
| #include <unordered_set>
 | |
| using namespace std;
 | |
| 
 | |
| #include <boost/thread/mutex.hpp>
 | |
| #include <boost/scoped_ptr.hpp>
 | |
| using namespace boost;
 | |
| 
 | |
| #include "brmtypes.h"
 | |
| #include "calpontsystemcatalog.h"
 | |
| using namespace execplan;
 | |
| 
 | |
| #include "configcpp.h"
 | |
| #include "atomicops.h"
 | |
| 
 | |
| #define SESSIONMANAGERSERVER_DLLEXPORT
 | |
| #include "sessionmanagerserver.h"
 | |
| #undef SESSIONMANAGERSERVER_DLLEXPORT
 | |
| 
 | |
| #ifndef O_BINARY
 | |
| #define O_BINARY 0
 | |
| #endif
 | |
| #ifndef O_DIRECT
 | |
| #define O_DIRECT 0
 | |
| #endif
 | |
| #ifndef O_LARGEFILE
 | |
| #define O_LARGEFILE 0
 | |
| #endif
 | |
| #ifndef O_NOATIME
 | |
| #define O_NOATIME 0
 | |
| #endif
 | |
| 
 | |
| #include "IDBDataFile.h"
 | |
| #include "IDBPolicy.h"
 | |
| using namespace idbdatafile;
 | |
| 
 | |
| namespace BRM
 | |
| {
 | |
| const uint32_t SessionManagerServer::SS_READY = 1 << 0;  // Set by dmlProc one time when dmlProc is ready
 | |
| const uint32_t SessionManagerServer::SS_SUSPENDED =
 | |
|     1 << 1;  // Set by console when the system has been suspended by user.
 | |
| const uint32_t SessionManagerServer::SS_SUSPEND_PENDING =
 | |
|     1 << 2;  // Set by console when user wants to suspend, but writing is occuring.
 | |
| const uint32_t SessionManagerServer::SS_SHUTDOWN_PENDING =
 | |
|     1 << 3;  // Set by console when user wants to shutdown, but writing is occuring.
 | |
| const uint32_t SessionManagerServer::SS_ROLLBACK =
 | |
|     1 << 4;  // In combination with a PENDING flag, force a rollback as soom as possible.
 | |
| const uint32_t SessionManagerServer::SS_FORCE =
 | |
|     1 << 5;  // In combination with a PENDING flag, force a shutdown without rollback.
 | |
| const uint32_t SessionManagerServer::SS_QUERY_READY =
 | |
|     1 << 6;  // Set by ProcManager when system is ready for queries
 | |
| 
 | |
| SessionManagerServer::SessionManagerServer() : unique32(0), unique64(0)
 | |
| {
 | |
|   config::Config* conf;
 | |
|   string stmp;
 | |
|   const char* ctmp;
 | |
| 
 | |
|   conf = config::Config::makeConfig();
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     stmp = conf->getConfig("SessionManager", "MaxConcurrentTransactions");
 | |
|   }
 | |
|   catch (const std::exception& e)
 | |
|   {
 | |
|     cout << e.what() << endl;
 | |
|     stmp.clear();
 | |
|   }
 | |
| 
 | |
|   if (stmp != "")
 | |
|   {
 | |
|     int64_t tmp;
 | |
|     ctmp = stmp.c_str();
 | |
|     tmp = config::Config::fromText(ctmp);
 | |
| 
 | |
|     if (tmp < 1)
 | |
|       maxTxns = 1;
 | |
|     else
 | |
|       maxTxns = static_cast<int>(tmp);
 | |
|   }
 | |
|   else
 | |
|     maxTxns = 1;
 | |
| 
 | |
|   txnidFilename = conf->getConfig("SessionManager", "TxnIDFile");
 | |
| 
 | |
|   semValue = maxTxns;
 | |
|   _verID = 0;
 | |
|   _sysCatVerID = 0;
 | |
|   systemState = 0;
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     loadState();
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     // first-time run most likely, ignore the error
 | |
|   }
 | |
| }
 | |
| 
 | |
| SessionManagerServer::~SessionManagerServer()
 | |
| {
 | |
| }
 | |
| void SessionManagerServer::reset()
 | |
| {
 | |
|   mutex.try_lock();
 | |
|   semValue = maxTxns;
 | |
|   condvar.notify_all();
 | |
|   activeTxns.clear();
 | |
|   mutex.unlock();
 | |
| }
 | |
| 
 | |
| void SessionManagerServer::loadState()
 | |
| {
 | |
|   int lastTxnID;
 | |
|   int err;
 | |
|   int lastSysCatVerId;
 | |
| 
 | |
| again:
 | |
| 
 | |
|   // There are now 3 pieces of info stored in the txnidfd file: last
 | |
|   // transaction id, last system catalog version id, and the
 | |
|   // system state flags. All these values are stored in shared, an
 | |
|   // instance of struct Overlay.
 | |
|   // If we fail to read a full four bytes for any value, then the
 | |
|   // value isn't in the file, and we start with the default.
 | |
| 
 | |
|   if (IDBPolicy::exists(txnidFilename.c_str()))
 | |
|   {
 | |
|     scoped_ptr<IDBDataFile> txnidfp(IDBDataFile::open(
 | |
|         IDBPolicy::getType(txnidFilename.c_str(), IDBPolicy::WRITEENG), txnidFilename.c_str(), "rb", 0));
 | |
| 
 | |
|     if (!txnidfp)
 | |
|     {
 | |
|       perror("SessionManagerServer(): open");
 | |
|       throw runtime_error("SessionManagerServer: Could not open the transaction ID file");
 | |
|     }
 | |
| 
 | |
|     // Last transaction id
 | |
|     txnidfp->seek(0, SEEK_SET);
 | |
|     err = txnidfp->read(&lastTxnID, 4);
 | |
| 
 | |
|     if (err < 0 && errno != EINTR)
 | |
|     {
 | |
|       perror("Sessionmanager::initSegment(): read");
 | |
|       throw runtime_error("SessionManagerServer: read failed, aborting");
 | |
|     }
 | |
|     else if (err < 0)
 | |
|       goto again;
 | |
|     else if (err == sizeof(int))
 | |
|       _verID = lastTxnID;
 | |
| 
 | |
|     // last system catalog version id
 | |
|     err = txnidfp->read(&lastSysCatVerId, 4);
 | |
| 
 | |
|     if (err < 0 && errno != EINTR)
 | |
|     {
 | |
|       perror("Sessionmanager::initSegment(): read");
 | |
|       throw runtime_error("SessionManagerServer: read failed, aborting");
 | |
|     }
 | |
|     else if (err < 0)
 | |
|       goto again;
 | |
|     else if (err == sizeof(int))
 | |
|       _sysCatVerID = lastSysCatVerId;
 | |
| 
 | |
|     // System state. Contains flags regarding the suspend state of the system.
 | |
|     err = txnidfp->read(&systemState, 4);
 | |
| 
 | |
|     if (err < 0 && errno == EINTR)
 | |
|     {
 | |
|       goto again;
 | |
|     }
 | |
|     else if (err == sizeof(int))
 | |
|     {
 | |
|       // Turn off the pending and force flags. They make no sense for a clean start.
 | |
|       // Turn off the ready flag. DMLProc will set it back on when
 | |
|       // initialized.
 | |
|       systemState &=
 | |
|           ~(SS_READY | SS_QUERY_READY | SS_SUSPEND_PENDING | SS_SHUTDOWN_PENDING | SS_ROLLBACK | SS_FORCE);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       // else no problem. System state wasn't saved. Might be an upgraded system.
 | |
|       systemState = 0;
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| /* Save the systemState flags of the Overlay
 | |
|  * segment. This is saved in the third
 | |
|  * word of txnid File
 | |
|  */
 | |
| void SessionManagerServer::saveSystemState()
 | |
| {
 | |
|   saveSMTxnIDAndState();
 | |
| }
 | |
| 
 | |
| const QueryContext SessionManagerServer::verID()
 | |
| {
 | |
|   QueryContext ret;
 | |
| 
 | |
|   boost::mutex::scoped_lock lk(mutex);
 | |
|   ret.currentScn = _verID;
 | |
| 
 | |
|   for (iterator i = activeTxns.begin(); i != activeTxns.end(); ++i)
 | |
|     ret.currentTxns->push_back(i->second);
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| const QueryContext SessionManagerServer::sysCatVerID()
 | |
| {
 | |
|   QueryContext ret;
 | |
| 
 | |
|   boost::mutex::scoped_lock lk(mutex);
 | |
|   ret.currentScn = _sysCatVerID;
 | |
| 
 | |
|   for (iterator i = activeTxns.begin(); i != activeTxns.end(); ++i)
 | |
|     ret.currentTxns->push_back(i->second);
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| uint32_t SessionManagerServer::newCpimportJob()
 | |
| {
 | |
|   std::scoped_lock lk(cpimportMutex);
 | |
|   activeCpimportJobs.insert(cpimportJobId);
 | |
|   auto ret = cpimportJobId;
 | |
|   ++cpimportJobId;
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| void SessionManagerServer::finishCpimortJob(uint32_t jobId)
 | |
| {
 | |
|   std::scoped_lock lk(cpimportMutex);
 | |
|   if (activeCpimportJobs.count(jobId))
 | |
|     activeCpimportJobs.erase(jobId);
 | |
| }
 | |
| 
 | |
| void SessionManagerServer::clearAllCpimportJobs()
 | |
| {
 | |
|   std::scoped_lock lk(cpimportMutex);
 | |
|   activeCpimportJobs.clear();
 | |
| }
 | |
| 
 | |
| const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool isDDL)
 | |
| {
 | |
|   TxnID ret;  // ctor must set valid = false
 | |
|   iterator it;
 | |
| 
 | |
|   boost::mutex::scoped_lock lk(mutex);
 | |
| 
 | |
|   // if it already has a txn...
 | |
|   it = activeTxns.find(session);
 | |
| 
 | |
|   if (it != activeTxns.end())
 | |
|   {
 | |
|     ret.id = it->second;
 | |
|     ret.valid = true;
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   if (!block && semValue == 0)
 | |
|     return ret;
 | |
|   else
 | |
|     while (semValue == 0)
 | |
|       condvar.wait(lk);
 | |
| 
 | |
|   semValue--;
 | |
|   idbassert(semValue <= (uint32_t)maxTxns);
 | |
| 
 | |
|   ret.id = ++_verID;
 | |
|   ret.valid = true;
 | |
|   activeTxns[session] = ret.id;
 | |
| 
 | |
|   if (isDDL)
 | |
|     ++_sysCatVerID;
 | |
| 
 | |
|   saveSMTxnIDAndState();
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| void SessionManagerServer::finishTransaction(TxnID& txn)
 | |
| {
 | |
|   iterator it;
 | |
|   boost::mutex::scoped_lock lk(mutex);
 | |
|   bool found = false;
 | |
| 
 | |
|   if (!txn.valid)
 | |
|     throw invalid_argument("SessionManagerServer::finishTransaction(): transaction is invalid");
 | |
| 
 | |
|   for (it = activeTxns.begin(); it != activeTxns.end();)
 | |
|   {
 | |
|     if (it->second == txn.id)
 | |
|     {
 | |
|       activeTxns.erase(it++);
 | |
|       txn.valid = false;
 | |
|       found = true;
 | |
|       // we could probably break at this point, but there won't be that many active txns, and,
 | |
|       // even though it'd be an error to have multiple entries for the same txn, we might
 | |
|       // well just get rid of them...
 | |
|     }
 | |
|     else
 | |
|       ++it;
 | |
|   }
 | |
| 
 | |
|   if (found)
 | |
|   {
 | |
|     semValue++;
 | |
|     idbassert(semValue <= (uint32_t)maxTxns);
 | |
|     condvar.notify_one();
 | |
|   }
 | |
|   else
 | |
|     throw invalid_argument("SessionManagerServer::finishTransaction(): transaction doesn't exist");
 | |
| }
 | |
| 
 | |
| const TxnID SessionManagerServer::getTxnID(const SID session)
 | |
| {
 | |
|   TxnID ret;
 | |
|   iterator it;
 | |
| 
 | |
|   boost::mutex::scoped_lock lk(mutex);
 | |
| 
 | |
|   it = activeTxns.find(session);
 | |
| 
 | |
|   if (it != activeTxns.end())
 | |
|   {
 | |
|     ret.id = it->second;
 | |
|     ret.valid = true;
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| std::shared_ptr<SIDTIDEntry[]> SessionManagerServer::SIDTIDMap(int& len)
 | |
| {
 | |
|   int j;
 | |
|   std::shared_ptr<SIDTIDEntry[]> ret;
 | |
|   boost::mutex::scoped_lock lk(mutex);
 | |
|   iterator it;
 | |
| 
 | |
|   ret.reset(new SIDTIDEntry[activeTxns.size()]);
 | |
| 
 | |
|   len = activeTxns.size();
 | |
| 
 | |
|   for (it = activeTxns.begin(), j = 0; it != activeTxns.end(); ++it, ++j)
 | |
|   {
 | |
|     ret[j].sessionid = it->first;
 | |
|     ret[j].txnid.id = it->second;
 | |
|     ret[j].txnid.valid = true;
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| void SessionManagerServer::setSystemState(uint32_t state)
 | |
| {
 | |
|   boost::mutex::scoped_lock lk(mutex);
 | |
| 
 | |
|   systemState |= state;
 | |
|   saveSystemState();
 | |
| }
 | |
| 
 | |
| void SessionManagerServer::clearSystemState(uint32_t state)
 | |
| {
 | |
|   boost::mutex::scoped_lock lk(mutex);
 | |
| 
 | |
|   systemState &= ~state;
 | |
|   saveSystemState();
 | |
| }
 | |
| 
 | |
| uint32_t SessionManagerServer::getCpimportJobsCount()
 | |
| {
 | |
|   std::scoped_lock lk(cpimportMutex);
 | |
|   return activeCpimportJobs.size();
 | |
| }
 | |
| 
 | |
| uint32_t SessionManagerServer::getTxnCount()
 | |
| {
 | |
|   boost::mutex::scoped_lock lk(mutex);
 | |
|   return activeTxns.size();
 | |
| }
 | |
| 
 | |
| void SessionManagerServer::saveSMTxnIDAndState()
 | |
| {
 | |
|   // caller holds the lock
 | |
|   scoped_ptr<IDBDataFile> txnidfp(IDBDataFile::open(
 | |
|       IDBPolicy::getType(txnidFilename.c_str(), IDBPolicy::WRITEENG), txnidFilename.c_str(), "wb", 0));
 | |
| 
 | |
|   if (!txnidfp)
 | |
|   {
 | |
|     perror("SessionManagerServer(): open");
 | |
|     throw runtime_error("SessionManagerServer: Could not open the transaction ID file");
 | |
|   }
 | |
| 
 | |
|   int filedata[2];
 | |
|   filedata[0] = _verID;
 | |
|   filedata[1] = _sysCatVerID;
 | |
| 
 | |
|   int err = txnidfp->write(filedata, 8);
 | |
| 
 | |
|   if (err < 0)
 | |
|   {
 | |
|     perror("SessionManagerServer::newTxnID(): write(verid)");
 | |
|     throw runtime_error("SessionManagerServer::newTxnID(): write(verid) failed");
 | |
|   }
 | |
| 
 | |
|   uint32_t lSystemState = systemState;
 | |
|   // We don't save the pending flags, the force flag or the ready flags.
 | |
|   lSystemState &= ~(SS_READY | SS_QUERY_READY | SS_SUSPEND_PENDING | SS_SHUTDOWN_PENDING | SS_FORCE);
 | |
|   err = txnidfp->write(&lSystemState, sizeof(int));
 | |
| 
 | |
|   if (err < 0)
 | |
|   {
 | |
|     perror("SessionManagerServer::saveSystemState(): write(systemState)");
 | |
|     throw runtime_error("SessionManagerServer::saveSystemState(): write(systemState) failed");
 | |
|   }
 | |
| 
 | |
|   txnidfp->flush();
 | |
| }
 | |
| 
 | |
| }  // namespace BRM
 |