From 020c0ed3f52309deed887483bc1e8c0d431784cd Mon Sep 17 00:00:00 2001 From: David Hall Date: Thu, 28 Jul 2016 09:19:21 -0500 Subject: [PATCH] MCOL-140 Add a mechanism to serialize transactions for a single table to prevent VSS clashes. Transactions for different tables will continue concurrently. --- .../dmlpackageproc/deletepackageprocessor.cpp | 2 +- dmlproc/dmlproc.cpp | 3 + dmlproc/dmlprocessor.cpp | 281 +++++++++++++++++- dmlproc/dmlprocessor.h | 76 ++++- 4 files changed, 341 insertions(+), 21 deletions(-) diff --git a/dbcon/dmlpackageproc/deletepackageprocessor.cpp b/dbcon/dmlpackageproc/deletepackageprocessor.cpp index 22420fc9a..dde256e31 100644 --- a/dbcon/dmlpackageproc/deletepackageprocessor.cpp +++ b/dbcon/dmlpackageproc/deletepackageprocessor.cpp @@ -150,7 +150,7 @@ namespace dmlpackageprocessor { int waitPeriod = 10; int sleepTime = 100; // sleep 100 milliseconds between checks - int numTries = 10; // try 10 times per second + int numTries = 30; // try 30 times (3 seconds) waitPeriod = Config::getWaitPeriod(); numTries = waitPeriod * 10; struct timespec rm_ts; diff --git a/dmlproc/dmlproc.cpp b/dmlproc/dmlproc.cpp index 5942d6cc4..accb35d3c 100644 --- a/dmlproc/dmlproc.cpp +++ b/dmlproc/dmlproc.cpp @@ -99,6 +99,9 @@ void added_a_pm(int) logger.logMessage(LOG_TYPE_DEBUG, msg, logid); Dec->Setup(); + // MCOL-140 clear the waiting queue as all transactions are probably going to fail + PackageHandler::clearTableAccess(); + logger.logMessage(LOG_TYPE_DEBUG, msg, logid); //WriteEngine::WEClients::instance(WriteEngine::WEClients::DMLPROC)->Setup(); diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index 4afa3127b..3ff861f6a 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -63,6 +63,8 @@ using namespace querytele; extern boost::mutex mute; extern boost::condition_variable cond; +#define MCOL_140 // Undefine to test VSS for out of order transactions + namespace { const std::string myname = "DMLProc"; @@ -79,6 +81,11 @@ boost::mutex DMLProcessor::packageHandlerMapLock; std::map DMLProcessor::batchinsertProcessorMap; boost::mutex DMLProcessor::batchinsertProcessorMapLock; +// MCOL-140 Map to hold table oids for tables being changed. +std::map PackageHandler::tableOidMap; +boost::condition_variable PackageHandler::tableOidCond; +boost::mutex PackageHandler::tableOidMutex; + //------------------------------------------------------------------------------ // A thread to periodically call dbrm to see if a user is // shutting down the system or has put the system into write @@ -276,20 +283,25 @@ PackageHandler::PackageHandler(const messageqcpp::IOSocket& ios, boost::shared_ptr bs, uint8_t packageType, joblist::DistributedEngineComm *ec, + bool concurrentSupport, uint64_t maxDeleteRows, uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnId, DBRM * aDbrm, - const QueryTeleClient& qtc) : + const QueryTeleClient& qtc, + boost::shared_ptr csc) : fIos(ios), fByteStream(bs), fPackageType(packageType), fEC(ec), + fConcurrentSupport(concurrentSupport), fMaxDeleteRows(maxDeleteRows), fSessionID(sessionID), + fTableOid(0), fTxnid(txnId), fDbrm(aDbrm), - fQtc(qtc) + fQtc(qtc), + fcsc(csc) { } @@ -298,6 +310,168 @@ PackageHandler::~PackageHandler() //cout << "In destructor" << endl; } +// MCOL-140 +// Blocks a thread if there is another trx working on the same fTableOid +// return 1 when thread should continue. +// return 0 if error. Right now, no error detection is implemented. +// +// txnid was being created before the call to this function. This caused race conditions +// so creation is delayed until we're inside the lock here. Nothing needs it before +// this point in the execution. +// +// The algorithm is this. When the first txn for a given fTableOid arrives, start a queue +// containing a list of waiting or working txnId. Put this txnId into the queue (working) +// Put the queue into a map keyed on fTableOid. +// +// When the next txn for this fTableOid arrives, it finds the queue in the map and adds itself, +// then waits for condition. +// When a thread finishes, it removes its txnId from the queue and notifies all. If the queue is +// empty, it removes the entry from the map. +// Upon wakeup from wait(), a thread checks to see if it's next in the queue. If so, it is released +// to do work. Otherwise it goes back to wait. +// +// There's a chance (CTRL+C) for instance, that the txn is no longer in the queue. Release it to work. +// Rollback will most likely be next. +// +// A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid. +int PackageHandler::synchTableAccess() +{ + // MCOL-140 Wait for any other DML using this table. + std::map::iterator it; + boost::unique_lock lock(tableOidMutex); + BRM::TxnID txnid; + + if (fPackageType != dmlpackage::DML_COMMAND) + { + txnid = sessionManager.getTxnID(fSessionID); + if ( !txnid.valid ) + { + txnid = sessionManager.newTxnID(fSessionID, true); + if (!txnid.valid) + { + throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") ); + } + } + } + else + { + txnid = sessionManager.getTxnID(fSessionID); + } + fTxnid = txnid.id; + + if ((it=tableOidMap.find(fTableOid)) != tableOidMap.end()) + { + PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; + // There's at least one working txn on this table. We may be the same txn. + if (fTxnid == tableOidQueue.front()) + { + return 1; // We're next in line or the same as the last. Keep working + } + + tableOidQueue.push(fTxnid); // Get on the waiting list. + + // We need to wait + // tableOidQueue here is the queue holding the waitng transactions for this fTableOid + while (true) + { + tableOidCond.wait(lock); + if (tableOidQueue.front() == fTxnid) + { + break; + } + if (tableOidQueue.empty()) + { + // If we had been the last txn waiting and CTRL+C was hit, then the queue is empty now. + // Empty queues must be erased from the map. + tableOidMap.erase(fTableOid); + break; + } + // If we're not in the queue at all, then continue. CTRL+C was probably hit. + PackageHandler::tableAccessQueue_t::container_type::iterator c_it = tableOidQueue.find(fTxnid); + if (c_it == tableOidQueue.end()) + { + break; + } + // We're still in the queue and not on top. Go back and wait some more. + } + } + else + { + // We're the first for this tableoid. Start a new queue. + tableAccessQueue_t tableOidQueue; + tableOidQueue.push(fTxnid); + tableOidMap[fTableOid] = tableOidQueue; + } + return 1; +} + +// MCOL-140 Called when it's time to release the next thread for this tablOid +int PackageHandler::releaseTableAccess() +{ + // take us out of the queue + std::map::iterator it; + boost::lock_guard lock(tableOidMutex); + if (fTableOid == 0 || (it=tableOidMap.find(fTableOid)) == tableOidMap.end()) + { + // This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess + return 2; // For now, return codes are not used + } + PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; + if (tableOidQueue.front() != fTxnid) + { + // This is a severe error. The front should be the working thread. If we're here, + // we're the working thread and should be front(). + cout << fTxnid << " " << fTableOid << " We got to release and we're not on top " << tableOidQueue.front() << endl; + LoggingID logid(21, fSessionID, fTxnid); + logging::Message::Args args1; + logging::Message msg(1); + args1.add("ReleaseTableAccess: Txn being released is not the current txn in the tablOidQueue for tableid"); + args1.add((uint64_t)fTableOid); + msg.format(args1); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_ERROR, msg, logid); + } + else + { + tableOidQueue.pop(); // Get off the waiting list. + if (tableOidQueue.empty()) + { + // remove the queue from the map. + tableOidMap.erase(fTableOid); + } + } + // release the condition + tableOidCond.notify_all(); + return 1; +} + +int PackageHandler::forceReleaseTableAccess() +{ + // By removing the tcnid from the queue, the logic after the wait in + // synchTableAccess() will release the thread and clean up if needed. + std::map::iterator it; + boost::lock_guard lock(tableOidMutex); + if (fTableOid == 0 || (it=tableOidMap.find(fTableOid)) == tableOidMap.end()) + { + // This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess + return 2; + } + PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; + tableOidQueue.erase(fTxnid); + // release the condition + tableOidCond.notify_all(); + return 1; +} + +//static +// Called upon sighup, often because PrimProc crashed. We don't want to leave all the transactions hung, +// though some may be because they never returned from PrimProc and will leave the table lock on. +int PackageHandler::clearTableAccess() +{ + tableOidMap.clear(); + return 1; +} + void PackageHandler::run() { ResourceManager frm; @@ -320,6 +494,23 @@ void PackageHandler::run() //boost::shared_ptr insertBs (new messageqcpp::ByteStream); messageqcpp::ByteStream bsSave = *(fByteStream.get()); insertPkg.read(*(fByteStream.get())); +#ifdef MCOL_140 + if (fConcurrentSupport) + { + fTableOid = insertPkg.getTableOid(); + // Single Insert has no start like bulk does, so insertPkg.getTableOid() + // isn't set. Go get it now. + if (fTableOid == 0) + { + CalpontSystemCatalog::TableName tableName; + tableName.schema = insertPkg.get_Table()->get_SchemaName(); + tableName.table = insertPkg.get_Table()->get_TableName(); + CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); + fTableOid = roPair.objnum; + } + synchTableAccess(); // Blocks if another DML thread is using this fTableOid + } +#endif QueryTeleStats qts; qts.query_uuid = QueryTeleClient::genUUID(); qts.msg_type = QueryTeleStats::QT_START; @@ -552,7 +743,7 @@ void PackageHandler::run() } else { - //error occured. Receive all outstanding messages nefore erroring out. + //error occured. Receive all outstanding messages before erroring out. batchProcessor->receiveOutstandingMsg(); batchProcessor->sendlastBatch(); //needs to flush files batchProcessor->receiveAllMsg(); @@ -637,9 +828,9 @@ void PackageHandler::run() break; } } - else + else // Single Insert { - //insertPkg.readTable(*(fByteStream.get())); + //insertPkg.readTable(*(fByteStream.get())); insertPkg.set_TxnID(fTxnid); fProcessor.reset(new dmlpackageprocessor::InsertPackageProcessor(fDbrm, insertPkg.get_SessionID())); result = fProcessor->processPackage(insertPkg); @@ -666,7 +857,23 @@ void PackageHandler::run() //cout << "an UPDATE package" << endl; boost::scoped_ptr updatePkg(new dmlpackage::UpdateDMLPackage()); updatePkg->read(*(fByteStream.get())); - updatePkg->set_TxnID(fTxnid); +#ifdef MCOL_140 + if (fConcurrentSupport) + { + fTableOid = updatePkg->getTableOid(); + // Update generally doesn't set fTableOid in updatePkg. Go get it now. + if (fTableOid == 0) + { + CalpontSystemCatalog::TableName tableName; + tableName.schema = updatePkg->get_Table()->get_SchemaName(); + tableName.table = updatePkg->get_Table()->get_TableName(); + CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); + fTableOid = roPair.objnum; + } + synchTableAccess(); // Blocks if another DML thread is using this fTableOid + } +#endif + updatePkg->set_TxnID(fTxnid); QueryTeleStats qts; qts.query_uuid = updatePkg->uuid(); qts.msg_type = QueryTeleStats::QT_START; @@ -706,6 +913,22 @@ void PackageHandler::run() { boost::scoped_ptr deletePkg(new dmlpackage::DeleteDMLPackage()); deletePkg->read(*(fByteStream.get())); +#ifdef MCOL_140 + if (fConcurrentSupport) + { + fTableOid = deletePkg->getTableOid(); + // Delete generally doesn't set fTableOid in updatePkg. Go get it now. + if (fTableOid == 0) + { + CalpontSystemCatalog::TableName tableName; + tableName.schema = deletePkg->get_Table()->get_SchemaName(); + tableName.table = deletePkg->get_Table()->get_TableName(); + CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); + fTableOid = roPair.objnum; + } + synchTableAccess(); // Blocks if another DML thread is using this fTableOid + } +#endif deletePkg->set_TxnID(fTxnid); QueryTeleStats qts; qts.query_uuid = deletePkg->uuid(); @@ -766,6 +989,13 @@ void PackageHandler::run() } break; } +#ifdef MCOL_140 + if (fConcurrentSupport) + { + // MCOL-140 We're done. release the next waiting txn for this fTableOid + releaseTableAccess(); + } +#endif //Log errors if ( (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR) && (result.result != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) @@ -787,6 +1017,13 @@ void PackageHandler::run() } catch(std::exception& e) { +#ifdef MCOL_140 + if (fConcurrentSupport) + { + // MCOL-140 We're done. release the next waiting txn for this fTableOid + releaseTableAccess(); + } +#endif cout << "dmlprocessor.cpp PackageHandler::run() package type(" << fPackageType << ") exception: " << e.what() << endl; logging::LoggingID lid(21); @@ -803,6 +1040,13 @@ void PackageHandler::run() } catch(...) { +#ifdef MCOL_140 + if (fConcurrentSupport) + { + // MCOL-140 We're done. release the next waiting txn for this fTableOid + releaseTableAccess(); + } +#endif logging::LoggingID lid(21); logging::MessageLog ml(lid); logging::Message::Args args; @@ -835,6 +1079,16 @@ void PackageHandler::run() void PackageHandler::rollbackPending() { + // Force a release of the processing from MCOL-140 +#ifdef MCOL_140 + if (fConcurrentSupport) + { + // MCOL-140 We're not necessarily the next in line. + // This forces this thread to be released anyway. + forceReleaseTableAccess(); + } +#endif + if (fProcessor.get() == NULL) { // This happens when batch insert @@ -854,6 +1108,8 @@ void added_a_pm(int) ResourceManager rm; dec = DistributedEngineComm::instance(rm); dec->Setup(); + // MCOL-140 clear the waiting queue as all transactions are probably going to fail + PackageHandler::clearTableAccess(); } DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm) : @@ -923,12 +1179,12 @@ void DMLProcessor::operator()() uint64_t maxDeleteRows = rm.getDMLMaxDeleteRows(); - bool concurrentSupport = true; + fConcurrentSupport = true; string concurrentTranStr = config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions"); if ( concurrentTranStr.length() != 0 ) { if ((concurrentTranStr.compare("N") == 0) || (concurrentTranStr.compare("n") == 0)) - concurrentSupport = false; + fConcurrentSupport = false; } #ifndef _MSC_VER @@ -1155,7 +1411,7 @@ void DMLProcessor::operator()() //cout << " package" << endl; BRM::TxnID txnid; - if (!concurrentSupport) + if (!fConcurrentSupport) { //Check if any other active transaction bool anyOtherActiveTransaction = true; @@ -1294,7 +1550,7 @@ void DMLProcessor::operator()() { //cout << "starting processing package type " << (int) packageType << " for session " << sessionID << " with id " << txnid.id << endl; boost::shared_ptr php(new PackageHandler(fIos, bs1, packageType, fEC, - maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc)); + fConcurrentSupport, maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc, csc)); // We put the packageHandler into a map so that if we receive a // message to affect the previous command, we can find it. boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, defer_lock); @@ -1312,6 +1568,7 @@ void DMLProcessor::operator()() } else { +#if 0 if (packageType != dmlpackage::DML_COMMAND) { txnid = sessionManager.getTxnID(sessionID); @@ -1327,9 +1584,9 @@ void DMLProcessor::operator()() { txnid = sessionManager.getTxnID(sessionID); } - +#endif boost::shared_ptr php(new PackageHandler(fIos, bs1, packageType, fEC, - maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc)); + fConcurrentSupport, maxDeleteRows, sessionID, 0, fDbrm, fQtc, csc)); // We put the packageHandler into a map so that if we receive a // message to affect the previous command, we can find it. boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, defer_lock); diff --git a/dmlproc/dmlprocessor.h b/dmlproc/dmlprocessor.h index b3344323c..2faf0cfb2 100644 --- a/dmlproc/dmlprocessor.h +++ b/dmlproc/dmlprocessor.h @@ -45,6 +45,41 @@ #include "batchinsertprocessor.h" #include "querytele.h" +template > +class iterable_queue : public std::queue +{ +public: + typedef typename Container::iterator iterator; + typedef typename Container::const_iterator const_iterator; + + iterator begin() { return this->c.begin(); } + iterator end() { return this->c.end(); } + const_iterator begin() const { return this->c.begin(); } + const_iterator end() const { return this->c.end(); } + iterator find(T t) + { + iterator it; + for (it = begin(); it != end(); ++it) + { + if (*it == t) + { + break; + } + } + return it; + } + iterator erase(typename Container::iterator it) { return this->c.erase(it); } + iterator erase(T t) + { + iterator it = find(t); + if (it != end()) + { + erase(it); + } + return it; + } +}; + namespace dmlprocessor { @@ -103,9 +138,9 @@ class PackageHandler { public: PackageHandler(const messageqcpp::IOSocket& ios, boost::shared_ptr bs, - uint8_t packageType, joblist::DistributedEngineComm *ec, uint64_t maxDeleteRows, + uint8_t packageType, joblist::DistributedEngineComm *ec, bool concurrentSuport, uint64_t maxDeleteRows, uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnId, BRM::DBRM * aDbrm, - const querytele::QueryTeleClient& qtc); + const querytele::QueryTeleClient& qtc, boost::shared_ptr csc); ~PackageHandler(); void run(); @@ -113,18 +148,42 @@ public: execplan::CalpontSystemCatalog::SCN getTxnid() {return fTxnid;} uint32_t getSessionID() {return fSessionID;} + private: messageqcpp::IOSocket fIos; boost::shared_ptr fByteStream; - boost::scoped_ptr fProcessor; + boost::scoped_ptr fProcessor; messageqcpp::ByteStream::quadbyte fPackageType; joblist::DistributedEngineComm *fEC; + bool fConcurrentSupport; uint64_t fMaxDeleteRows; - uint32_t fSessionID; - execplan::CalpontSystemCatalog::SCN fTxnid; - execplan::SessionManager sessionManager; - BRM::DBRM *fDbrm; - querytele::QueryTeleClient fQtc; + uint32_t fSessionID; + uint32_t fTableOid; + execplan::CalpontSystemCatalog::SCN fTxnid; + execplan::SessionManager sessionManager; + BRM::DBRM *fDbrm; + querytele::QueryTeleClient fQtc; + boost::shared_ptr fcsc; + + // MCOL-140 A map to hold table oids so that we can wait for DML on a table. + // This effectively creates a table lock for transactions. + // Used to serialize operations because the VSS can't handle inserts + // or updates on the same block. + // When an Insert, Update or Delete command arrives, we look here + // for the table oid. If found, wait until it is no onger here. + // If this transactionID (SCN) is < the transactionID in the table, don't delay + // and hope for the best, as we're already out of order. + // When the VSS is engineered to handle transactions out of order, all MCOL-140 + // code is to be removed. + int synchTableAccess(); + int releaseTableAccess(); + int forceReleaseTableAccess(); + typedef iterable_queue tableAccessQueue_t; + static std::map tableOidMap; + static boost::condition_variable tableOidCond; + static boost::mutex tableOidMutex; +public: + static int clearTableAccess(); }; /** @brief processes dml packages as they arrive @@ -151,6 +210,7 @@ private: boost::shared_ptr csc; BRM::DBRM* fDbrm; querytele::QueryTeleClient fQtc; + bool fConcurrentSupport; // A map to hold pointers to all active PackageProcessors typedef std::map > PackageHandlerMap_t;