/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: dmlprocessor.cpp 1024 2013-07-26 16:23:59Z chao $ * * ***********************************************************************/ /** @file */ #include "configcpp.h" #include #include //#define SERIALIZE_DDL_DML_CPIMPORT 1 #include #include #include #include using namespace boost; #include "cacheutils.h" #include "vss.h" #include "dbrm.h" #include "brmtypes.h" #include "idberrorinfo.h" #include "errorids.h" #include "batchinsertprocessor.h" #include "tablelockdata.h" #include "oamcache.h" #include "messagelog.h" #include "sqllogger.h" #include "we_messages.h" #include "dmlprocessor.h" using namespace BRM; using namespace config; using namespace execplan; using namespace std; using namespace messageqcpp; using namespace dmlpackage; using namespace dmlpackageprocessor; using namespace joblist; using namespace logging; using namespace oam; using namespace WriteEngine; #include "querytele.h" using namespace querytele; extern boost::mutex mute; extern boost::condition_variable cond; #define MCOL_140 // Undefine to test VSS for out of order transactions namespace { const std::string myname = "DMLProc"; } namespace dmlprocessor { // Map to store the package handler objects so we can set flags during execution // for things like ctrl+c DMLProcessor::PackageHandlerMap_t DMLProcessor::packageHandlerMap; boost::mutex DMLProcessor::packageHandlerMapLock; //Map to store the BatchInsertProc object std::map DMLProcessor::batchinsertProcessorMap; boost::mutex DMLProcessor::batchinsertProcessorMapLock; // MCOL-140 Map to hold table oids for tables being changed. std::map PackageHandler::tableOidMap; boost::condition_variable PackageHandler::tableOidCond; boost::mutex PackageHandler::tableOidMutex; //------------------------------------------------------------------------------ // A thread to periodically call dbrm to see if a user is // shutting down the system or has put the system into write // suspend mode. DBRM has 2 flags to check in this case, the // ROLLBACK flag, and the FORCE flag. These flags will be // reported when we ask for the Shutdown Pending flag (which we // ignore at this point). Even if the user is putting the system // into write suspend mode, this call will return the flags we // are interested in. If ROLLBACK is set, we cancel normally. // If FORCE is set, we can't rollback. struct CancellationThread { CancellationThread(DBRM* aDbrm, DMLServer& aServer) : fDbrm(aDbrm), fServer(aServer) {} void operator()() { bool bDoingRollback = false; bool bRollback = false; bool bForce = false; ostringstream oss; std::vector tableLocks; BRM::TxnID txnId; DMLProcessor::PackageHandlerMap_t::iterator phIter; uint32_t sessionID; int rc = 0; while (true) { usleep(1000000); // 1 seconds // Check to see if someone has ordered a shutdown or suspend with rollback. (void)fDbrm->getSystemShutdownPending(bRollback, bForce); if (bForce) break; if (bDoingRollback && bRollback) { continue; // We've already started the rollbacks. Don't start again. } bDoingRollback = false; if (bRollback) { RollbackTransactionProcessor rollbackProcessor(fDbrm); SessionManager sessionManager; uint64_t uniqueId = fDbrm->getUnique64(); std::string errorMsg; int activeTransCount = 0; int idleTransCount = 0; bDoingRollback = true; ostringstream oss; oss << "DMLProc has been told to rollback all DML transactions."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO); // Tell any active processors to stop working and return an error // The front end will respond with a ROLLBACK command. // Mark all active processors to rollback boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock); for (phIter = DMLProcessor::packageHandlerMap.begin(); phIter != DMLProcessor::packageHandlerMap.end(); ++phIter) { ostringstream oss; oss << "DMLProc will rollback active session " << phIter->second->getSessionID() << " Transaction " << phIter->second->getTxnid(); DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO); ++activeTransCount; phIter->second->rollbackPending(); } if (activeTransCount > 0) { ostringstream oss1; oss1 << "DMLProc is rolling back back " << activeTransCount << " active transactions."; DMLProcessor::log(oss1.str(), logging::LOG_TYPE_INFO); } // WIP Need to set cluster to read-only via CMAPI before shutting the cluster down. if (fDbrm->isReadWrite()) { continue; } // Check for any open DML transactions that don't currently have // a processor tableLocks = fDbrm->getAllTableLocks(); if (tableLocks.size() > 0) { for (uint32_t i = 0; i < tableLocks.size(); ++i) { sessionID = tableLocks[i].ownerSessionID; phIter = DMLProcessor::packageHandlerMap.find(sessionID); if (phIter == DMLProcessor::packageHandlerMap.end()) { // We have found an active transaction without a packagehandler. // This means that a transaction is open with autocommit turned // off, but there's no current activity on the transaction. We // need to roll it back if it's a DML transaction. // If ownerName == "DMLProc" then it's a DML transaction. if (tableLocks[i].ownerName == "DMLProc") { // OK, we know this is an idle DML transaction, so roll it back. ++idleTransCount; txnId.id = tableLocks[i].ownerTxnID; txnId.valid = true; rc = rollbackProcessor.rollBackTransaction(uniqueId, txnId, sessionID, errorMsg); if ( rc == 0 ) { fDbrm->invalidateUncommittedExtentLBIDs(txnId.id); //@Bug 4524. In case it is batchinsert, call bulkrollback. rc = rollbackProcessor.rollBackBatchAutoOnTransaction(uniqueId, txnId, sessionID, tableLocks[i].tableOID, errorMsg); if (rc == 0) { logging::logCommand(0, tableLocks[i].ownerTxnID, "ROLLBACK;"); bool lockReleased = true; try { lockReleased = fDbrm->releaseTableLock(tableLocks[i].id); TablelockData::removeTablelockData(sessionID); } catch (std::exception&) { throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } if (lockReleased) { sessionManager.rolledback(txnId); ostringstream oss; oss << "DMLProc rolled back idle transaction " << tableLocks[i].ownerTxnID << " and table lock id " << tableLocks[i].id << " is released."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO); } else { ostringstream oss; oss << "DMLProc rolled back idle transaction " << tableLocks[i].ownerTxnID << " and tble lock id " << tableLocks[i].id << " is not released."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO); } } else { ostringstream oss; oss << " problem with bulk rollback of idle transaction " << tableLocks[i].ownerTxnID << "and DBRM is setting to readonly and table lock is not released: " << errorMsg; DMLProcessor::log(oss.str(), logging::LOG_TYPE_CRITICAL); rc = fDbrm->setReadOnly(true); } } else { ostringstream oss; oss << " problem with rollback of idle transaction " << tableLocks[i].ownerTxnID << "and DBRM is setting to readonly and table lock is not released: " << errorMsg; DMLProcessor::log(oss.str(), logging::LOG_TYPE_CRITICAL); rc = fDbrm->setReadOnly(true); } } } } } // If there are any abandonded transactions without locks // release them. int len; boost::shared_array activeTxns = sessionManager.SIDTIDMap(len); for (int i = 0; i < len; i++) { // If there isn't a table lock for this transaction, roll it back. Otherwise, assume // it has an active processor or is not DML initiated and leave it alone. It's someone // else's concern. bool bFoundit = false; for (uint32_t j = 0; j < tableLocks.size(); ++j) { if (tableLocks[j].ownerTxnID == activeTxns[i].txnid.id) { bFoundit = true; break; } } if (!bFoundit && activeTxns[i].txnid.valid) { rollbackProcessor.rollBackTransaction(uniqueId, activeTxns[i].txnid, activeTxns[i].sessionid, errorMsg); sessionManager.rolledback(activeTxns[i].txnid); ++idleTransCount; ostringstream oss; oss << "DMLProc rolled back idle transaction with no tablelock" << tableLocks[i].ownerTxnID; DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO); } } if (idleTransCount > 0) { ostringstream oss2; oss2 << "DMLProc has rolled back " << idleTransCount << " idle transactions."; DMLProcessor::log(oss2.str(), logging::LOG_TYPE_INFO); } // Here is the end of the rollback if so DMLProc rollbacks what it can. break; } } // Setting the flag to tell DMLServer to exit. fServer.startShutdown(); } DBRM* fDbrm; DMLServer& fServer; }; PackageHandler::PackageHandler(const messageqcpp::IOSocket& ios, boost::shared_ptr bs, uint8_t packageType, joblist::DistributedEngineComm* ec, bool concurrentSupport, uint64_t maxDeleteRows, uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnId, DBRM* aDbrm, const QueryTeleClient& qtc, boost::shared_ptr csc) : fIos(ios), fByteStream(bs), fPackageType(packageType), fEC(ec), fConcurrentSupport(concurrentSupport), fMaxDeleteRows(maxDeleteRows), fSessionID(sessionID), fTableOid(0), fTxnid(txnId), fDbrm(aDbrm), fQtc(qtc), fcsc(csc) { } PackageHandler::~PackageHandler() { //cout << "In destructor" << endl; } // MCOL-140 // Blocks a thread if there is another trx working on the same fTableOid // return 1 when thread should continue. // return 0 if error. Right now, no error detection is implemented. // // txnid was being created before the call to this function. This caused race conditions // so creation is delayed until we're inside the lock here. Nothing needs it before // this point in the execution. // // The algorithm is this. When the first txn for a given fTableOid arrives, start a queue // containing a list of waiting or working txnId. Put this txnId into the queue (working) // Put the queue into a map keyed on fTableOid. // // When the next txn for this fTableOid arrives, it finds the queue in the map and adds itself, // then waits for condition. // When a thread finishes, it removes its txnId from the queue and notifies all. If the queue is // empty, it removes the entry from the map. // Upon wakeup from wait(), a thread checks to see if it's next in the queue. If so, it is released // to do work. Otherwise it goes back to wait. // // There's a chance (CTRL+C) for instance, that the txn is no longer in the queue. Release it to work. // Rollback will most likely be next. // // A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid. int PackageHandler::synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage) { // MCOL-140 Wait for any other DML using this table. std::map::iterator it; boost::unique_lock lock(tableOidMutex); BRM::TxnID txnid; if (fPackageType != dmlpackage::DML_COMMAND) { txnid = sessionManager.getTxnID(fSessionID); if ( !txnid.valid ) { txnid = sessionManager.newTxnID(fSessionID, true); if (!txnid.valid) { throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") ); } } } else { txnid = sessionManager.getTxnID(fSessionID); } fTxnid = txnid.id; if ((it = tableOidMap.find(fTableOid)) != tableOidMap.end()) { PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; // There's at least one working txn on this table. We may be the same txn. if (fTxnid == tableOidQueue.front()) { return 1; // We're next in line or the same as the last. Keep working } tableOidQueue.push(fTxnid); // Get on the waiting list. // We need to wait // tableOidQueue here is the queue holding the waitng transactions for this fTableOid while (true) { // Log something that we're waiting LoggingID logid(21, fSessionID, fTxnid); logging::Message::Args args1; logging::Message msg(1); ostringstream oss; oss << "Txn is waiting for" << tableOidQueue.front() << " " << dmlPackage->get_SQLStatement() << "; |" << dmlPackage->get_SchemaName() <<"|"; args1.add(oss.str()); args1.add((uint64_t)fTableOid); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); tableOidCond.wait(lock); // In case of CTRL+C, the tableOidQueue could be invalidated if ((tableOidMap.find(fTableOid))->second != tableOidQueue) { break; } if (tableOidQueue.front() == fTxnid) { // We're up next. Let's go do stuff. break; } if (tableOidQueue.empty()) { // If we had been the last txn waiting and CTRL+C was hit, then the queue is empty now. // Empty queues must be erased from the map. tableOidMap.erase(fTableOid); break; } // If we're not in the queue at all, then continue. CTRL+C was probably hit. PackageHandler::tableAccessQueue_t::container_type::iterator c_it = tableOidQueue.find(fTxnid); if (c_it == tableOidQueue.end()) { break; } // We're still in the queue and not on top. Go back and wait some more. } } else { // We're the first for this tableoid. Start a new queue. tableAccessQueue_t tableOidQueue; tableOidQueue.push(fTxnid); tableOidMap[fTableOid] = tableOidQueue; } return 1; } // MCOL-140 Called when it's time to release the next thread for this tablOid int PackageHandler::releaseTableAccess() { // take us out of the queue std::map::iterator it; boost::lock_guard lock(tableOidMutex); if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end()) { return 2; // For now, return codes are not used } PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; if (tableOidQueue.front() != fTxnid) { // This is a severe error. The front should be the working thread. If we're here, // we're the working thread and should be front(). cout << fTxnid << " " << fTableOid << " We got to release and we're not on top " << tableOidQueue.front() << endl; LoggingID logid(21, fSessionID, fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("ReleaseTableAccess: Txn being released is not the current txn in the tablOidQueue for tableid"); args1.add((uint64_t)fTableOid); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_ERROR, msg, logid); } else { if (!tableOidQueue.empty()) tableOidQueue.pop(); // Get off the waiting list. if (tableOidQueue.empty()) { // remove the queue from the map. tableOidMap.erase(fTableOid); } } // release the condition tableOidCond.notify_all(); return 1; } int PackageHandler::forceReleaseTableAccess() { // By removing the txnid from the queue, the logic after the wait in // synchTableAccess() will release the thread and clean up if needed. std::map::iterator it; boost::lock_guard lock(tableOidMutex); if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end()) { // This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess return 2; } PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; tableOidQueue.erase(fTxnid); if (tableOidQueue.empty()) { // remove the queue from the map. tableOidMap.erase(fTableOid); } // release the condition tableOidCond.notify_all(); return 1; } //static // Called upon sighup, often because PrimProc crashed. We don't want to leave all the transactions hung, // though some may be because they never returned from PrimProc and will leave the table lock on. int PackageHandler::clearTableAccess() { tableOidMap.clear(); return 1; } void PackageHandler::run() { ResourceManager* frm = ResourceManager::instance(); dmlpackageprocessor::DMLPackageProcessor::DMLResult result; result.result = dmlpackageprocessor::DMLPackageProcessor::NO_ERROR; //cout << "PackageHandler handling "; std::string stmt; unsigned DMLLoggingId = 21; oam::OamCache* oamCache = oam::OamCache::makeOamCache(); SynchTable synchTable; try { switch ( fPackageType ) { case dmlpackage::DML_INSERT: { // build an InsertDMLPackage from the bytestream //cout << "an INSERT package" << endl; dmlpackage::InsertDMLPackage insertPkg; //boost::shared_ptr insertBs (new messageqcpp::ByteStream); messageqcpp::ByteStream bsSave = *(fByteStream.get()); insertPkg.readMetaData(*(fByteStream.get())); #ifdef MCOL_140 if (fConcurrentSupport) { fTableOid = insertPkg.getTableOid(); // Single Insert has no start like bulk does, so insertPkg.getTableOid() // isn't set. Go get it now. if (fTableOid == 0) { CalpontSystemCatalog::TableName tableName; tableName.schema = insertPkg.get_Table()->get_SchemaName(); tableName.table = insertPkg.get_Table()->get_TableName(); CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } synchTable.setPackage(this, &insertPkg); // Blocks if another DML thread is using this fTableOid } #endif QueryTeleStats qts; qts.query_uuid = QueryTeleClient::genUUID(); qts.msg_type = QueryTeleStats::QT_START; qts.start_time = QueryTeleClient::timeNowms(); qts.session_id = fSessionID; qts.query_type = "INSERT"; qts.query = insertPkg.get_SQLStatement(); qts.system_name = oamCache->getSystemName(); qts.module_name = oamCache->getModuleName(); qts.schema_name = insertPkg.get_SchemaName(); fQtc.postQueryTele(qts); //cout << "This is batch insert " << insertPkg->get_isBatchInsert() << endl; if (insertPkg.get_isBatchInsert()) { fByteStream->reset(); //cout << "This is batch insert " << endl; BatchInsertProc* batchProcessor = NULL; { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); std::map::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID); if (batchIter == DMLProcessor::batchinsertProcessorMap.end()) { batchProcessor = new BatchInsertProc(insertPkg.get_isAutocommitOn(), insertPkg.getTableOid(), fTxnid, fDbrm); DMLProcessor::batchinsertProcessorMap[fSessionID] = batchProcessor; //cout << "batchProcessor is created " << batchProcessor << endl; } else { batchProcessor = batchIter->second; //cout << "Found batchProcessor " << batchProcessor << endl; } } if ( insertPkg.get_Logging() ) { LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("Start SQL statement: "); if (!insertPkg.get_isCacheInsert()) { ostringstream oss; oss << insertPkg.get_SQLStatement() << "; |" << insertPkg.get_SchemaName() << "|"; args1.add(oss.str()); } msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); TablelockData* tablelockData = TablelockData::makeTablelockData(insertPkg.get_SessionID()); uint64_t tableLockId = tablelockData->getTablelockId(insertPkg.getTableOid()); //cout << "Processing table oid " << insertPkg.getTableOid() << " for transaction "<< (int)fTxnid << endl; if (tableLockId == 0) { //cout << "Grabing tablelock for batchProcessor " << batchProcessor << endl; tableLockId = batchProcessor->grabTableLock(insertPkg.get_SessionID()); if (tableLockId == 0) { BRM::TxnID brmTxnID; brmTxnID.id = fTxnid; brmTxnID.valid = true; sessionManager.rolledback(brmTxnID); string errMsg; int rc = 0; batchProcessor->getError(rc, errMsg); result.result = DMLPackageProcessor::TABLE_LOCK_ERROR; logging::Message::Args args; logging::Message message(1); args.add("Insert Failed: "); args.add(errMsg); args.add(""); args.add(""); message.format(args); result.message = message; break; } if (tableLockId > 0) tablelockData->setTablelock(insertPkg.getTableOid(), tableLockId); } } if (insertPkg.get_Logending() && insertPkg.get_Logging()) //only one batch need to be processed. { //cout << "dmlprocessor add last pkg" << endl; //need to add error handling. batchProcessor->addPkg(bsSave); batchProcessor->sendFirstBatch(); batchProcessor->receiveOutstandingMsg(); //@Bug 5162. Get the correct error message before the last message. string errMsg; int rc = 0; batchProcessor->getError(rc, errMsg); batchProcessor->sendlastBatch(); batchProcessor->receiveAllMsg(); if (rc == DMLPackageProcessor::IDBRANGE_WARNING) { result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement with warnings"); msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); logging::Message::Args args; logging::Message message(1); args.add(errMsg); args.add(""); args.add(""); message.format(args); result.message = message; } else if ( rc != 0) { result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; logging::Message::Args args; logging::Message message(1); cout << "Got error in the end of one batchinsert." << endl; args.add("Insert Failed: "); args.add(errMsg); args.add(""); args.add(""); message.format(args); result.message = message; LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement with error"); msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); } else { // if (!insertPkg.get_isAutocommitOn()) // { // batchProcessor->setHwm(); // } LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement"); msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); } //remove the batch insert object { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); std::map::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID); if (batchIter != DMLProcessor::batchinsertProcessorMap.end()) { delete batchIter->second; DMLProcessor::batchinsertProcessorMap.erase(fSessionID); } } } else if (insertPkg.get_Logending()) //Last batch { int rc = 0; string errMsg; batchProcessor->getError(rc, errMsg); //cout <<"dmlprocessor received last pkg from mysql rc == " << rc << endl; if (( rc == 0) || (rc == DMLPackageProcessor::IDBRANGE_WARNING)) { //cout << " rc = " << rc << endl; batchProcessor->addPkg(bsSave); batchProcessor->sendNextBatch(); batchProcessor->receiveOutstandingMsg(); //@Bug 5162. Get the correct error message before the last message. batchProcessor->getError(rc, errMsg); batchProcessor->sendlastBatch(); batchProcessor->receiveAllMsg(); if (rc == DMLPackageProcessor::IDBRANGE_WARNING) { result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement with warnings"); msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); logging::Message::Args args; logging::Message message(1); args.add(errMsg); args.add(""); args.add(""); message.format(args); result.message = message; } else if ( rc != 0) { //cout << "Got error in the end of last batchinsert. error message is " << errMsg << endl; result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; logging::Message::Args args; logging::Message message(1); args.add("Insert Failed: "); args.add(errMsg); args.add(""); args.add(""); message.format(args); result.message = message; LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement with error"); msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); } else { LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement"); msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); } //cout << "finished batch insert" << endl; } else { //error occured. Receive all outstanding messages before erroring out. batchProcessor->receiveOutstandingMsg(); batchProcessor->sendlastBatch(); //needs to flush files batchProcessor->receiveAllMsg(); result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; //cout << "Got error in the end of batchinsert2. error msg is " << errMsg<< endl; logging::Message::Args args; logging::Message message(1); args.add("Insert Failed: "); args.add(errMsg); args.add(""); args.add(""); message.format(args); result.message = message; LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement with error"); msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); } //remove from map { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); std::map::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID); if (batchIter != DMLProcessor::batchinsertProcessorMap.end()) { //cout << "Batchinsertprcessor is deleted. " << batchIter->second << endl; delete batchIter->second; DMLProcessor::batchinsertProcessorMap.erase(fSessionID); } } } else { int rc = 0; string errMsg; batchProcessor->getError(rc, errMsg); if (rc == DMLPackageProcessor::IDBRANGE_WARNING) { result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; } else if ( rc != 0) { result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; //@Bug //cout << "Got error during batchinsert. with message " << errMsg << endl; logging::Message::Args args; logging::Message message(6); args.add( errMsg ); message.format( args ); result.message = message; batchProcessor->receiveOutstandingMsg(); batchProcessor->sendlastBatch(); //needs to flush files //cout << "Last batch is sent to WES." << endl; batchProcessor->receiveAllMsg(); LoggingID logid( DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement with error"); msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); //remove from map { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); std::map::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID); if (batchIter != DMLProcessor::batchinsertProcessorMap.end()) { //cout << "Batchinsertprcessor is deleted. " << batchIter->second << endl; delete batchIter->second; DMLProcessor::batchinsertProcessorMap.erase(fSessionID); } } break; } batchProcessor->addPkg(bsSave); batchProcessor->sendNextBatch(); break; } } else // Single Insert { // make sure insertPkg.readMetaData() is called before // this on fByteStream! // TODO: Similar to batch inserts, don't // deserialize the row data here for single inserts. insertPkg.readRowData(*(fByteStream.get())); insertPkg.set_TxnID(fTxnid); fProcessor.reset(new dmlpackageprocessor::InsertPackageProcessor(fDbrm, insertPkg.get_SessionID())); result = fProcessor->processPackage(insertPkg); } qts.msg_type = QueryTeleStats::QT_SUMMARY; qts.max_mem_pct = result.stats.fMaxMemPct; qts.num_files = result.stats.fNumFiles; qts.phy_io = result.stats.fPhyIO; qts.cache_io = result.stats.fCacheIO; qts.msg_rcv_cnt = result.stats.fMsgRcvCnt; qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped; qts.msg_bytes_in = result.stats.fMsgBytesIn; qts.msg_bytes_out = result.stats.fMsgBytesOut; qts.rows = result.stats.fRows; qts.end_time = QueryTeleClient::timeNowms(); qts.blocks_changed = result.stats.fBlocksChanged; fQtc.postQueryTele(qts); } break; case dmlpackage::DML_UPDATE: { // build an UpdateDMLPackage from the bytestream //cout << "an UPDATE package" << endl; boost::scoped_ptr updatePkg(new dmlpackage::UpdateDMLPackage()); updatePkg->read(*(fByteStream.get())); #ifdef MCOL_140 if (fConcurrentSupport) { fTableOid = updatePkg->getTableOid(); // Update generally doesn't set fTableOid in updatePkg. Go get it now. if (fTableOid == 0) { CalpontSystemCatalog::TableName tableName; tableName.schema = updatePkg->get_Table()->get_SchemaName(); tableName.table = updatePkg->get_Table()->get_TableName(); CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } synchTable.setPackage(this, updatePkg.get()); // Blocks if another DML thread is using this fTableOid } #endif updatePkg->set_TxnID(fTxnid); QueryTeleStats qts; qts.query_uuid = updatePkg->uuid(); qts.msg_type = QueryTeleStats::QT_START; qts.start_time = QueryTeleClient::timeNowms(); qts.session_id = fSessionID; qts.query_type = "UPDATE"; qts.query = updatePkg->get_SQLStatement(); qts.system_name = oamCache->getSystemName(); qts.module_name = oamCache->getModuleName(); qts.schema_name = updatePkg->get_SchemaName(); fQtc.postQueryTele(qts); // process it //@Bug 1341. Don't remove calpontsystemcatalog from this //session to take advantage of cache. fProcessor.reset(new dmlpackageprocessor::UpdatePackageProcessor(fDbrm, updatePkg->get_SessionID())); fProcessor->setEngineComm(fEC); fProcessor->setRM( frm); idbassert( fTxnid != 0); result = fProcessor->processPackage(*(updatePkg.get())) ; qts.msg_type = QueryTeleStats::QT_SUMMARY; qts.max_mem_pct = result.stats.fMaxMemPct; qts.num_files = result.stats.fNumFiles; qts.phy_io = result.stats.fPhyIO; qts.cache_io = result.stats.fCacheIO; qts.msg_rcv_cnt = result.stats.fMsgRcvCnt; qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped; qts.msg_bytes_in = result.stats.fMsgBytesIn; qts.msg_bytes_out = result.stats.fMsgBytesOut; qts.rows = result.stats.fRows; qts.end_time = QueryTeleClient::timeNowms(); qts.blocks_changed = result.stats.fBlocksChanged; fQtc.postQueryTele(qts); } break; case dmlpackage::DML_DELETE: { boost::scoped_ptr deletePkg(new dmlpackage::DeleteDMLPackage()); deletePkg->read(*(fByteStream.get())); #ifdef MCOL_140 if (fConcurrentSupport) { fTableOid = deletePkg->getTableOid(); // Delete generally doesn't set fTableOid in updatePkg. Go get it now. if (fTableOid == 0) { CalpontSystemCatalog::TableName tableName; tableName.schema = deletePkg->get_Table()->get_SchemaName(); tableName.table = deletePkg->get_Table()->get_TableName(); CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } synchTable.setPackage(this, deletePkg.get()); // Blocks if another DML thread is using this fTableOid } #endif deletePkg->set_TxnID(fTxnid); QueryTeleStats qts; qts.query_uuid = deletePkg->uuid(); qts.msg_type = QueryTeleStats::QT_START; qts.start_time = QueryTeleClient::timeNowms(); qts.session_id = fSessionID; qts.query_type = "DELETE"; qts.query = deletePkg->get_SQLStatement(); qts.system_name = oamCache->getSystemName(); qts.module_name = oamCache->getModuleName(); qts.schema_name = deletePkg->get_SchemaName(); fQtc.postQueryTele(qts); // process it //@Bug 1341. Don't remove calpontsystemcatalog from this session to take advantage of cache. fProcessor.reset(new dmlpackageprocessor::DeletePackageProcessor(fDbrm, deletePkg->get_SessionID())); fProcessor->setEngineComm(fEC); fProcessor->setRM( frm); idbassert( fTxnid != 0); result = fProcessor->processPackage(*(deletePkg.get())) ; qts.msg_type = QueryTeleStats::QT_SUMMARY; qts.max_mem_pct = result.stats.fMaxMemPct; qts.num_files = result.stats.fNumFiles; qts.phy_io = result.stats.fPhyIO; qts.cache_io = result.stats.fCacheIO; qts.msg_rcv_cnt = result.stats.fMsgRcvCnt; qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped; qts.msg_bytes_in = result.stats.fMsgBytesIn; qts.msg_bytes_out = result.stats.fMsgBytesOut; qts.rows = result.stats.fRows; qts.end_time = QueryTeleClient::timeNowms(); qts.blocks_changed = result.stats.fBlocksChanged; fQtc.postQueryTele(qts); } break; case dmlpackage::DML_COMMAND: { // build a CommandDMLPackage from the bytestream //cout << "a COMMAND package" << endl; dmlpackage::CommandDMLPackage commandPkg; commandPkg.read(*(fByteStream.get())); stmt = commandPkg.get_DMLStatement(); boost::algorithm::to_upper(stmt); trim(stmt); if (stmt == "CLEANUP") { execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(commandPkg.get_SessionID()); execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(commandPkg.get_SessionID() | 0x80000000); } else { // process it //@Bug 1341. Don't remove calpontsystemcatalog from this session to take advantage of cache. fProcessor.reset(new dmlpackageprocessor::CommandPackageProcessor(fDbrm, commandPkg.get_SessionID())); //cout << "got command " << stmt << " for session " << commandPkg.get_SessionID() << endl; result = fProcessor->processPackage(commandPkg); } } break; } //Log errors if ( (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR) && (result.result != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) && (result.result != dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR) && (result.result != dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR) ) { logging::LoggingID lid(21); logging::MessageLog ml(lid); ml.logErrorMessage( result.message ); } else if (result.result == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) { logging::LoggingID lid(21); logging::MessageLog ml(lid); ml.logWarningMessage( result.message ); } } catch (std::exception& e) { cout << "dmlprocessor.cpp PackageHandler::run() package type(" << fPackageType << ") exception: " << e.what() << endl; logging::LoggingID lid(21); logging::MessageLog ml(lid); logging::Message::Args args; logging::Message message(1); args.add("dmlprocessor.cpp PackageHandler::run() package type"); args.add((uint64_t)fPackageType); args.add(e.what()); message.format(args); ml.logErrorMessage(message); result.result = DMLPackageProcessor::COMMAND_ERROR; result.message = message; } catch (...) { logging::LoggingID lid(21); logging::MessageLog ml(lid); logging::Message::Args args; logging::Message message(1); args.add("dmlprocessor.cpp PackageHandler::run() ... exception package type"); args.add((uint64_t)fPackageType); message.format(args); ml.logErrorMessage(message); result.result = DMLPackageProcessor::COMMAND_ERROR; result.message = message; } // We put the packageHandler into a map so that if we receive a // message to affect the previous command, we can find it. // We need to remove it from the list before sending the response back. // If we remove it after sending the results, it's possible for a commit // or rollback be sent and get processed before it is removed, and that // will fail. boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock); DMLProcessor::packageHandlerMap.erase(getSessionID()); lk2.unlock(); // send back the results messageqcpp::ByteStream results; messageqcpp::ByteStream::octbyte rowCount = result.rowCount; messageqcpp::ByteStream::byte retval = result.result; results << retval; results << rowCount; results << result.message.msg(); results << result.tableLockInfo; // ? connector does not get // query stats results << result.queryStats; results << result.extendedStats; results << result.miniStats; result.stats.serialize(results); fIos.write(results); //Bug 5226. dmlprocessor thread will close the socket to mysqld. //if (stmt == "CLEANUP") // fIos.close(); } void PackageHandler::rollbackPending() { if (fProcessor.get() == NULL) { // This happens when batch insert return; } fProcessor->setRollbackPending(true); // Force a release of the processing from MCOL-140 #ifdef MCOL_140 if (fConcurrentSupport) { // MCOL-140 We're not necessarily the next in line. // This forces this thread to be released anyway. forceReleaseTableAccess(); } #endif ostringstream oss; oss << "PackageHandler::rollbackPending: Processing DMLPackage."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG); } void added_a_pm(int) { DistributedEngineComm* dec; ResourceManager* rm = ResourceManager::instance(); dec = DistributedEngineComm::instance(rm); dec->Setup(); // MCOL-140 clear the waiting queue as all transactions are probably going to fail PackageHandler::clearTableAccess(); } DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm) : fPackageMaxThreads(packageMaxThreads), fPackageWorkQueueSize(packageWorkQueueSize), fDbrm(dbrm), fShutdownFlag(false) { fMqServer.reset(new MessageQueueServer("DMLProc")); fDmlPackagepool.setMaxThreads(fPackageMaxThreads); fDmlPackagepool.setQueueSize(fPackageWorkQueueSize); fDmlPackagepool.setName("DmlPackagepool"); } int DMLServer::start() { messageqcpp::IOSocket ios; uint32_t nextID = 1; try { // CancellationThread is for telling all active transactions // to quit working because the system is either going down // or going into write suspend mode CancellationThread cancelObject(fDbrm, *this); boost::thread cancelThread(cancelObject); cout << "DMLProc is ready..." << endl; const static struct timespec timeout = {1, 100}; // roughly 1 second TO for (;;) { ios = fMqServer->accept(&timeout); // MCS polls in a loop watching for a pending shutdown // that is signalled via fShutdownFlag set in a // CancellationThread. CT sets the flag if a cluster state // has SS_SHUTDOWNPENDING value set. while (!ios.hasSocketDescriptor() && !pendingShutdown()) ios = fMqServer->accept(&timeout); if (pendingShutdown()) break; ios.setSockID(nextID++); fDmlPackagepool.invoke(DMLProcessor(ios, fDbrm)); } cancelThread.join(); return EXIT_SUCCESS; } catch (std::exception& ex) { cerr << ex.what() << endl; logging::LoggingID lid(21); Message::Args args; Message message(8); args.add("DMLProc init caught exception: "); args.add(ex.what()); message.format(args); logging::Logger logger(lid.fSubsysID); logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid); return EXIT_FAILURE; } catch (...) { cerr << "Caught unknown exception!" << endl; logging::LoggingID lid(21); Message::Args args; Message message(8); args.add("DMLProc init caught unknown exception"); message.format(args); logging::Logger logger(lid.fSubsysID); logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid); return EXIT_FAILURE; } } DMLProcessor::DMLProcessor(messageqcpp::IOSocket ios, BRM::DBRM* aDbrm) : fIos(ios), fDbrm(aDbrm) { csc = CalpontSystemCatalog::makeCalpontSystemCatalog(); csc->identity(CalpontSystemCatalog::EC); string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host")); if (!teleServerHost.empty()) { int teleServerPort = config::Config::fromText(config::Config::makeConfig()->getConfig("QueryTele", "Port")); if (teleServerPort > 0) { fQtc.serverParms(QueryTeleServerParms(teleServerHost, teleServerPort)); } } } void DMLProcessor::operator()() { bool bIsDbrmUp = true; try { boost::shared_ptr bs1 (new messageqcpp::ByteStream()); //messageqcpp::ByteStream bs; uint8_t packageType; ResourceManager* rm = ResourceManager::instance(); DistributedEngineComm* fEC = DistributedEngineComm::instance(rm); uint64_t maxDeleteRows = rm->getDMLMaxDeleteRows(); fConcurrentSupport = true; string concurrentTranStr = config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions"); if ( concurrentTranStr.length() != 0 ) { if ((concurrentTranStr.compare("N") == 0) || (concurrentTranStr.compare("n") == 0)) fConcurrentSupport = false; } #ifndef _MSC_VER struct sigaction ign; memset(&ign, 0, sizeof(ign)); ign.sa_handler = added_a_pm; sigaction(SIGHUP, &ign, 0); #endif fEC->Open(); for (;;) { //cout << "DMLProc is waiting for a Calpont DML Package on " << fIos.getSockID() << endl; try { bs1.reset(new messageqcpp::ByteStream(fIos.read())); //cout << "received from mysql socket " << fIos.getSockID() << endl; } catch (std::exception& ex) { //This is an I/O error from InetStreamSocket::read(), just close and move on... cout << "runtime error during read on " << fIos.getSockID() << " " << ex.what() << endl; bs1->reset(); } catch (...) { cout << "... error during read " << fIos.getSockID() << endl; // all this throw does is cause this thread to silently go away. I doubt this is the right // thing to do... throw; } if (!bs1 || bs1->length() == 0) { cout << "Read 0 bytes. Closing connection " << fIos.getSockID() << endl; fIos.close(); break; } uint32_t sessionID; *bs1 >> sessionID; *bs1 >> packageType; //cout << "DMLProc received pkg. sessionid:type = " << sessionID <<":"<<(int)packageType << endl; uint32_t stateFlags; messageqcpp::ByteStream::byte status = 255; messageqcpp::ByteStream::octbyte rowCount = 0; if (fDbrm->getSystemState(stateFlags) > 0) // > 0 implies succesful retrieval. It doesn't imply anything about the contents { messageqcpp::ByteStream results; const char* responseMsg = 0; bool bReject = false; // Check to see if we're in write suspended mode // If so, we can't process the request. if (stateFlags & SessionManagerServer::SS_SUSPENDED) { status = DMLPackageProcessor::NOT_ACCEPTING_PACKAGES; responseMsg = "Writing to the database is disabled."; bReject = true; } // Check to see if we're in write suspend or shutdown pending mode if (packageType != dmlpackage::DML_COMMAND) // Not a commit or rollback { if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING || stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING) { if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING) { responseMsg = "Writing to the database is disabled."; } else { responseMsg = "The database is being shut down."; } // Refuse all non active tranasactions // Check the rollback flag // -- Set: Rollback active transactions. // -- Not set: Allow active transactions. if (sessionManager.isTransactionActive(sessionID, bIsDbrmUp)) { if (stateFlags & SessionManagerServer::SS_ROLLBACK) { status = DMLPackageProcessor::JOB_CANCELED; bReject = true; } } else { status = DMLPackageProcessor::NOT_ACCEPTING_PACKAGES; bReject = true; } } if (bReject) { // For batch insert, we need to send a lastpkg message // to batchInsertProcessor so it can clean things up. if (packageType == dmlpackage::DML_INSERT) { // build an InsertDMLPackage from the bytestream // We need the flags from the package to know what // type of package we're dealing with before we can // take special action for the last package of a // batch insert. dmlpackage::InsertDMLPackage insertPkg; messageqcpp::ByteStream bsSave = *(bs1.get()); insertPkg.read(*(bs1.get())); BatchInsertProc* batchInsertProcessor = NULL; if (insertPkg.get_isBatchInsert() && insertPkg.get_Logending()) { { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); std::map::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(sessionID); if (batchIter != DMLProcessor::batchinsertProcessorMap.end()) //The first batch, no need to do anything { batchInsertProcessor = batchIter->second; batchInsertProcessor->addPkg(bsSave); batchInsertProcessor->sendlastBatch(); batchInsertProcessor->receiveAllMsg(); if (!insertPkg.get_isAutocommitOn()) { batchInsertProcessor->setHwm(); } batchIter = DMLProcessor::batchinsertProcessorMap.find(sessionID); if (batchIter != DMLProcessor::batchinsertProcessorMap.end()) { DMLProcessor::batchinsertProcessorMap.erase(sessionID); } } } } } results << status; results << rowCount; logging::Message::Args args; logging::Message message(2); args.add(responseMsg); message.format( args ); results << message.msg(); fIos.write(results); continue; } } } // This section is to check to see if the user hit CTRL+C while the // DML was processing If so, the sessionID will be found in // packageHandlerMap and we can set rollbackPending in the // associated packageHandler. Other than CTRL+C, we should never // find our own sessionID in the map. // This mechanism may prove useful for other things, so the above // comment may change. { boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock); DMLProcessor::PackageHandlerMap_t::iterator phIter = packageHandlerMap.find(sessionID); if (phIter != packageHandlerMap.end()) { if (packageType == dmlpackage::DML_COMMAND) { // MCOL-66 It's possible for a commit or rollback to get here if // the timing is just right. Don't destroy its data messageqcpp::ByteStream bsctrlc(bs1); dmlpackage::CommandDMLPackage commandPkg; commandPkg.read(bsctrlc); std::string stmt = commandPkg.get_DMLStatement(); boost::algorithm::to_upper(stmt); trim(stmt); if (stmt == "CTRL+C") { phIter->second->rollbackPending(); fIos.close(); break; } } else { // If there's a PackageHandler already working for this // sessionID, we have a problem. Reject this package messageqcpp::ByteStream results; ostringstream oss; oss << "Received a DML command for session " << sessionID << " while still processing a command for the same sessionID"; results << static_cast(DMLPackageProcessor::DEAD_LOCK_ERROR); results << static_cast(0); // rowcount logging::Message::Args args; logging::Message message(2); args.add(oss.str()); message.format( args ); logging::LoggingID lid(20); logging::MessageLog ml(lid); ml.logErrorMessage(message); results << message.msg(); fIos.write(results); continue; } } } //cout << " got a "; switch (packageType) { case dmlpackage::DML_INSERT: //cout << "DML_INSERT"; break; case dmlpackage::DML_UPDATE: //cout << "DML_UPDATE"; break; case dmlpackage::DML_DELETE: //cout << "DML_DELETE"; break; case dmlpackage::DML_COMMAND: //cout << "DML_COMMAND"; break; case dmlpackage::DML_INVALID_TYPE: //cout << "DML_INVALID_TYPE"; break; default: //cout << "UNKNOWN"; break; } //cout << " package" << endl; BRM::TxnID txnid; if (!fConcurrentSupport) { //Check if any other active transaction bool anyOtherActiveTransaction = true; BRM::SIDTIDEntry blockingsid; //For logout commit trigger if ( packageType == dmlpackage::DML_COMMAND ) { anyOtherActiveTransaction = false; } int i = 0; int waitPeriod = 10; //@Bug 2487 Check transaction map every 1/10 second int sleepTime = 100; // sleep 100 milliseconds between checks int numTries = 10; // try 10 times per second string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod"); if ( waitPeriodStr.length() != 0 ) waitPeriod = static_cast(config::Config::fromText(waitPeriodStr)); numTries = waitPeriod * 10; struct timespec rm_ts; rm_ts.tv_sec = sleepTime / 1000; rm_ts.tv_nsec = sleepTime % 1000 * 1000000; //cout << "starting i = " << i << endl; //txnid = sessionManager.getTxnID(sessionID); while (anyOtherActiveTransaction) { anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp, blockingsid ); //cout << "session " << sessionID << " with package type " << (int)packageType << " got anyOtherActiveTransaction " << anyOtherActiveTransaction << endl; if (anyOtherActiveTransaction) { for ( ; i < numTries; i++ ) { #ifdef _MSC_VER Sleep(rm_ts.tv_sec * 1000); #else struct timespec abs_ts; //cout << "session " << sessionID << " nanosleep on package type " << (int)packageType << endl; do { abs_ts.tv_sec = rm_ts.tv_sec; abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); #endif anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp, blockingsid ); if ( !anyOtherActiveTransaction ) { txnid = sessionManager.getTxnID(sessionID); //cout << "Ready to process type " << (int)packageType << " with txd " << txnid << endl; if ( !txnid.valid ) { txnid = sessionManager.newTxnID(sessionID, true); if (txnid.valid) { //cout << "Ready to process type " << (int)packageType << " for session "<< sessionID << " with new txnid " << txnid.id << endl; anyOtherActiveTransaction = false; break; } else { anyOtherActiveTransaction = true; } } else { anyOtherActiveTransaction = false; //cout << "already have transaction to process type " << (int)packageType << " for session "<< sessionID <<" with existing txnid " << txnid.id << endl; break; } } } //cout << "ending i = " << i << endl; } else { //cout << "Ready to process type " << (int)packageType << endl; txnid = sessionManager.getTxnID(sessionID); if ( !txnid.valid ) { txnid = sessionManager.newTxnID(sessionID, true); if (txnid.valid) { //cout << "later Ready to process type " << (int)packageType << " for session "<< sessionID << " with new txnid " << txnid.id << endl; anyOtherActiveTransaction = false; } else { anyOtherActiveTransaction = true; //cout << "Cannot get txnid for process type " << (int)packageType << " for session "<< sessionID << endl; } } else { anyOtherActiveTransaction = false; //cout << "already have transaction to process type " << (int)packageType << " for session "<< sessionID <<" with txnid " << txnid.id << endl; break; } } if ((anyOtherActiveTransaction) && (i >= numTries)) { //cout << " Erroring out on package type " << (int)packageType << " for session " << sessionID << endl; break; } } if (anyOtherActiveTransaction && (i >= numTries)) { //cout << " again Erroring out on package type " << (int)packageType << endl; messageqcpp::ByteStream results; //@Bug 2681 set error code for active transaction status = DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR; rowCount = 0; results << status; results << rowCount; Message::Args args; args.add(static_cast(blockingsid.sessionid)); results << IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args); //@Bug 3854 Log to debug.log LoggingID logid(20, 0, 0); logging::Message::Args args1; logging::Message msg(1); args1.add(IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args)); msg.format( args1 ); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); fIos.write(results); } else { //cout << "starting processing package type " << (int) packageType << " for session " << sessionID << " with id " << txnid.id << endl; boost::shared_ptr php(new PackageHandler(fIos, bs1, packageType, fEC, fConcurrentSupport, maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc, csc)); // We put the packageHandler into a map so that if we receive a // message to affect the previous command, we can find it. boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock); lk2.lock(); packageHandlerMap[sessionID] = php; lk2.unlock(); php->run(); // Operates in this thread. // Move this to the end of PackageHandler so it is removed from the map before the response is sent // lk2.lock(); // packageHandlerMap.erase(sessionID); // lk2.unlock(); } } else { #if 0 if (packageType != dmlpackage::DML_COMMAND) { txnid = sessionManager.getTxnID(sessionID); if ( !txnid.valid ) { txnid = sessionManager.newTxnID(sessionID, true); if (!txnid.valid) { throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") ); } } } else { txnid = sessionManager.getTxnID(sessionID); } #endif boost::shared_ptr php(new PackageHandler(fIos, bs1, packageType, fEC, fConcurrentSupport, maxDeleteRows, sessionID, 0, fDbrm, fQtc, csc)); // We put the packageHandler into a map so that if we receive a // message to affect the previous command, we can find it. boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock); lk2.lock(); packageHandlerMap[sessionID] = php; lk2.unlock(); php->run(); // Operates in this thread. // Move this to the end of PackageHandler so it is removed from the map before the response is sent // lk2.lock(); // packageHandlerMap.erase(sessionID); // lk2.unlock(); } } } catch (std::exception& ex) { ostringstream oss; oss << "DMLProcessor failed on: " << ex.what(); DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG); fIos.close(); } catch (...) { ostringstream oss; oss << "DMLProcessor failed on: processing DMLPackage."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG); cerr << "Caught unknown exception! " << oss.str(); fIos.close(); } } void RollbackTransactionProcessor::processBulkRollback (BRM::TableLockInfo lockInfo, BRM::DBRM* dbrm, uint64_t uniqueId, OamCache::dbRootPMMap_t& dbRootPMMap, bool& lockReleased) { // Take over ownership of stale lock. // Use "DMLProc" as process name, session id and transaction id -1 to distinguish from real DMLProc rollback int32_t sessionID = -1; int32_t txnid = -1; std::string processName("DMLProc"); uint32_t processID = ::getpid(); bool ownerChanged = true; lockReleased = true; try { ownerChanged = dbrm->changeOwner(lockInfo.id, processName, processID, sessionID, txnid); } catch (std::exception&) { lockReleased = false; throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } if (!ownerChanged) { lockReleased = false; throw std::runtime_error( std::string("Unable to grab lock; lock not found or still in use.") ); } //send to all PMs boost::shared_ptr bsIn; messageqcpp::ByteStream bsOut; string tableName(""); fWEClient->addQueue(uniqueId); //find the PMs need to send the message to std::set pmSet; int pmId; for (uint32_t i = 0; i < lockInfo.dbrootList.size(); i++) { pmId = (*dbRootPMMap)[lockInfo.dbrootList[i]]; pmSet.insert(pmId); } if (lockInfo.state == BRM::LOADING) { bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK; bsOut << uniqueId; bsOut << lockInfo.id; bsOut << lockInfo.tableOID; bsOut << tableName; bsOut << processName; std::set::const_iterator iter = pmSet.begin(); while (iter != pmSet.end()) { fWEClient->write(bsOut, *iter); iter++; } // Wait for "all" the responses, and accumulate any/all errors unsigned int pmMsgCnt = 0; while (pmMsgCnt < pmSet.size()) { std::string rollbackErrMsg; bsIn.reset(new messageqcpp::ByteStream()); fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) { fWEClient->removeQueue(uniqueId); lockReleased = false; throw std::runtime_error("Network error, PM rollback; "); } else { messageqcpp::ByteStream::byte rc; uint16_t pmNum; *bsIn >> rc; *bsIn >> rollbackErrMsg; *bsIn >> pmNum; if (rc != 0) { fWEClient->removeQueue(uniqueId); lockReleased = false; throw std::runtime_error(rollbackErrMsg); } } pmMsgCnt++; } // end of while loop to process all responses to bulk rollback // If no errors so far, then change state to CLEANUP state. // We ignore the return stateChange flag. dbrm->changeState( lockInfo.id, BRM::CLEANUP ); } // end of (lockInfo.state == BRM::LOADING) //delete meta data backup rollback files bsOut.reset(); bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK_CLEANUP; bsOut << uniqueId; bsOut << lockInfo.tableOID; std::set::const_iterator iter = pmSet.begin(); while (iter != pmSet.end()) { fWEClient->write(bsOut, *iter); iter++; } // Wait for "all" the responses, and accumulate any/all errors unsigned int pmMsgCnt = 0; //@Bug 4517 Release tablelock when it is in CLEANUP state uint32_t rcCleanup = 0; std::string fileDeleteErrMsg; while (pmMsgCnt < pmSet.size()) { bsIn.reset(new messageqcpp::ByteStream()); fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) { fWEClient->removeQueue(uniqueId); rcCleanup = 1; fileDeleteErrMsg = "Network error, PM clean up; "; } else { messageqcpp::ByteStream::byte rc; uint16_t pmNum; *bsIn >> rc; *bsIn >> fileDeleteErrMsg; *bsIn >> pmNum; if ((rc != 0) && (rcCleanup == 0)) { fWEClient->removeQueue(uniqueId); rcCleanup = rc; } } pmMsgCnt++; } // end of while loop to process all responses to rollback cleanup fWEClient->removeQueue(uniqueId); // We ignore return release flag from releaseTableLock(). dbrm->releaseTableLock( lockInfo.id ); if (rcCleanup != 0) throw std::runtime_error(fileDeleteErrMsg); } void DMLProcessor::log(const std::string& msg, logging::LOG_TYPE level) { logging::Message::Args args; logging::Message message(2); args.add(msg); message.format(args); logging::LoggingID lid(20); logging::MessageLog ml(lid); switch (level) { case LOG_TYPE_DEBUG: ml.logDebugMessage(message); break; case LOG_TYPE_INFO: ml.logInfoMessage(message); break; case LOG_TYPE_WARNING: ml.logWarningMessage(message); break; case LOG_TYPE_ERROR: ml.logErrorMessage(message); break; case LOG_TYPE_CRITICAL: ml.logCriticalMessage(message); break; } } } //namespace dmlprocessor // vim:ts=4 sw=4: