/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /***************************************************************************** * $Id: sessionmanagerserver.cpp 1906 2013-06-14 19:15:32Z rdempsey $ * ****************************************************************************/ /* * This class issues Transaction ID and keeps track of the current version ID */ #include #include #include #include #include #include #include #include #include #include #include using namespace std; #include #include using namespace boost; #include "brmtypes.h" #include "calpontsystemcatalog.h" using namespace execplan; #include "configcpp.h" #include "atomicops.h" #define SESSIONMANAGERSERVER_DLLEXPORT #include "sessionmanagerserver.h" #undef SESSIONMANAGERSERVER_DLLEXPORT #ifndef O_BINARY #define O_BINARY 0 #endif #ifndef O_DIRECT #define O_DIRECT 0 #endif #ifndef O_LARGEFILE #define O_LARGEFILE 0 #endif #ifndef O_NOATIME #define O_NOATIME 0 #endif #include "IDBDataFile.h" #include "IDBPolicy.h" using namespace idbdatafile; namespace BRM { const uint32_t SessionManagerServer::SS_READY = 1 << 0; // Set by dmlProc one time when dmlProc is ready const uint32_t SessionManagerServer::SS_SUSPENDED = 1 << 1; // Set by console when the system has been suspended by user. const uint32_t SessionManagerServer::SS_SUSPEND_PENDING = 1 << 2; // Set by console when user wants to suspend, but writing is occuring. const uint32_t SessionManagerServer::SS_SHUTDOWN_PENDING = 1 << 3; // Set by console when user wants to shutdown, but writing is occuring. const uint32_t SessionManagerServer::SS_ROLLBACK = 1 << 4; // In combination with a PENDING flag, force a rollback as soom as possible. const uint32_t SessionManagerServer::SS_FORCE = 1 << 5; // In combination with a PENDING flag, force a shutdown without rollback. const uint32_t SessionManagerServer::SS_QUERY_READY = 1 << 6; // Set by ProcManager when system is ready for queries SessionManagerServer::SessionManagerServer() : unique32(0), unique64(0) { config::Config* conf; string stmp; const char* ctmp; conf = config::Config::makeConfig(); try { stmp = conf->getConfig("SessionManager", "MaxConcurrentTransactions"); } catch (const std::exception& e) { cout << e.what() << endl; stmp.clear(); } if (stmp != "") { int64_t tmp; ctmp = stmp.c_str(); tmp = config::Config::fromText(ctmp); if (tmp < 1) maxTxns = 1; else maxTxns = static_cast(tmp); } else maxTxns = 1; txnidFilename = conf->getConfig("SessionManager", "TxnIDFile"); semValue = maxTxns; _verID = 0; _sysCatVerID = 0; systemState = 0; try { loadState(); } catch (...) { // first-time run most likely, ignore the error } } SessionManagerServer::~SessionManagerServer() { } void SessionManagerServer::reset() { mutex.try_lock(); semValue = maxTxns; condvar.notify_all(); activeTxns.clear(); mutex.unlock(); } void SessionManagerServer::loadState() { int lastTxnID; int err; int lastSysCatVerId; again: // There are now 3 pieces of info stored in the txnidfd file: last // transaction id, last system catalog version id, and the // system state flags. All these values are stored in shared, an // instance of struct Overlay. // If we fail to read a full four bytes for any value, then the // value isn't in the file, and we start with the default. if (IDBPolicy::exists(txnidFilename.c_str())) { scoped_ptr txnidfp(IDBDataFile::open( IDBPolicy::getType(txnidFilename.c_str(), IDBPolicy::WRITEENG), txnidFilename.c_str(), "rb", 0)); if (!txnidfp) { perror("SessionManagerServer(): open"); throw runtime_error("SessionManagerServer: Could not open the transaction ID file"); } // Last transaction id txnidfp->seek(0, SEEK_SET); err = txnidfp->read(&lastTxnID, 4); if (err < 0 && errno != EINTR) { perror("Sessionmanager::initSegment(): read"); throw runtime_error("SessionManagerServer: read failed, aborting"); } else if (err < 0) goto again; else if (err == sizeof(int)) _verID = lastTxnID; // last system catalog version id err = txnidfp->read(&lastSysCatVerId, 4); if (err < 0 && errno != EINTR) { perror("Sessionmanager::initSegment(): read"); throw runtime_error("SessionManagerServer: read failed, aborting"); } else if (err < 0) goto again; else if (err == sizeof(int)) _sysCatVerID = lastSysCatVerId; // System state. Contains flags regarding the suspend state of the system. err = txnidfp->read(&systemState, 4); if (err < 0 && errno == EINTR) { goto again; } else if (err == sizeof(int)) { // Turn off the pending and force flags. They make no sense for a clean start. // Turn off the ready flag. DMLProc will set it back on when // initialized. systemState &= ~(SS_READY | SS_QUERY_READY | SS_SUSPEND_PENDING | SS_SHUTDOWN_PENDING | SS_ROLLBACK | SS_FORCE); } else { // else no problem. System state wasn't saved. Might be an upgraded system. systemState = 0; } } } /* Save the systemState flags of the Overlay * segment. This is saved in the third * word of txnid File */ void SessionManagerServer::saveSystemState() { saveSMTxnIDAndState(); } const QueryContext SessionManagerServer::verID() { QueryContext ret; boost::mutex::scoped_lock lk(mutex); ret.currentScn = _verID; for (iterator i = activeTxns.begin(); i != activeTxns.end(); ++i) ret.currentTxns->push_back(i->second); return ret; } const QueryContext SessionManagerServer::sysCatVerID() { QueryContext ret; boost::mutex::scoped_lock lk(mutex); ret.currentScn = _sysCatVerID; for (iterator i = activeTxns.begin(); i != activeTxns.end(); ++i) ret.currentTxns->push_back(i->second); return ret; } uint32_t SessionManagerServer::newCpimportJob() { std::scoped_lock lk(cpimportMutex); activeCpimportJobs.insert(cpimportJobId); auto ret = cpimportJobId; ++cpimportJobId; return ret; } void SessionManagerServer::finishCpimortJob(uint32_t jobId) { std::scoped_lock lk(cpimportMutex); if (activeCpimportJobs.count(jobId)) activeCpimportJobs.erase(jobId); } void SessionManagerServer::clearAllCpimportJobs() { std::scoped_lock lk(cpimportMutex); activeCpimportJobs.clear(); } const TxnID SessionManagerServer::newTxnID(const SID session, bool block, bool isDDL) { TxnID ret; // ctor must set valid = false iterator it; boost::mutex::scoped_lock lk(mutex); // if it already has a txn... it = activeTxns.find(session); if (it != activeTxns.end()) { ret.id = it->second; ret.valid = true; return ret; } if (!block && semValue == 0) return ret; else while (semValue == 0) condvar.wait(lk); semValue--; idbassert(semValue <= (uint32_t)maxTxns); ret.id = ++_verID; ret.valid = true; activeTxns[session] = ret.id; if (isDDL) ++_sysCatVerID; saveSMTxnIDAndState(); return ret; } void SessionManagerServer::finishTransaction(TxnID& txn) { iterator it; boost::mutex::scoped_lock lk(mutex); bool found = false; if (!txn.valid) throw invalid_argument("SessionManagerServer::finishTransaction(): transaction is invalid"); for (it = activeTxns.begin(); it != activeTxns.end();) { if (it->second == txn.id) { activeTxns.erase(it++); txn.valid = false; found = true; // we could probably break at this point, but there won't be that many active txns, and, // even though it'd be an error to have multiple entries for the same txn, we might // well just get rid of them... } else ++it; } if (found) { semValue++; idbassert(semValue <= (uint32_t)maxTxns); condvar.notify_one(); } else throw invalid_argument("SessionManagerServer::finishTransaction(): transaction doesn't exist"); } const TxnID SessionManagerServer::getTxnID(const SID session) { TxnID ret; iterator it; boost::mutex::scoped_lock lk(mutex); it = activeTxns.find(session); if (it != activeTxns.end()) { ret.id = it->second; ret.valid = true; } return ret; } std::shared_ptr SessionManagerServer::SIDTIDMap(int& len) { int j; std::shared_ptr ret; boost::mutex::scoped_lock lk(mutex); iterator it; ret.reset(new SIDTIDEntry[activeTxns.size()]); len = activeTxns.size(); for (it = activeTxns.begin(), j = 0; it != activeTxns.end(); ++it, ++j) { ret[j].sessionid = it->first; ret[j].txnid.id = it->second; ret[j].txnid.valid = true; } return ret; } void SessionManagerServer::setSystemState(uint32_t state) { boost::mutex::scoped_lock lk(mutex); systemState |= state; saveSystemState(); } void SessionManagerServer::clearSystemState(uint32_t state) { boost::mutex::scoped_lock lk(mutex); systemState &= ~state; saveSystemState(); } uint32_t SessionManagerServer::getCpimportJobsCount() { std::scoped_lock lk(cpimportMutex); return activeCpimportJobs.size(); } uint32_t SessionManagerServer::getTxnCount() { boost::mutex::scoped_lock lk(mutex); return activeTxns.size(); } void SessionManagerServer::saveSMTxnIDAndState() { // caller holds the lock scoped_ptr txnidfp(IDBDataFile::open( IDBPolicy::getType(txnidFilename.c_str(), IDBPolicy::WRITEENG), txnidFilename.c_str(), "wb", 0)); if (!txnidfp) { perror("SessionManagerServer(): open"); throw runtime_error("SessionManagerServer: Could not open the transaction ID file"); } int filedata[2]; filedata[0] = _verID; filedata[1] = _sysCatVerID; int err = txnidfp->write(filedata, 8); if (err < 0) { perror("SessionManagerServer::newTxnID(): write(verid)"); throw runtime_error("SessionManagerServer::newTxnID(): write(verid) failed"); } uint32_t lSystemState = systemState; // We don't save the pending flags, the force flag or the ready flags. lSystemState &= ~(SS_READY | SS_QUERY_READY | SS_SUSPEND_PENDING | SS_SHUTDOWN_PENDING | SS_FORCE); err = txnidfp->write(&lSystemState, sizeof(int)); if (err < 0) { perror("SessionManagerServer::saveSystemState(): write(systemState)"); throw runtime_error("SessionManagerServer::saveSystemState(): write(systemState) failed"); } txnidfp->flush(); } } // namespace BRM