/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: dmlprocessor.cpp 1024 2013-07-26 16:23:59Z chao $ * * ***********************************************************************/ /** @file */ #include "configcpp.h" #include #include //#define SERIALIZE_DDL_DML_CPIMPORT 1 #include #include #include #include using namespace boost; #include "cacheutils.h" #include "vss.h" #include "dbrm.h" #include "brmtypes.h" #include "idberrorinfo.h" #include "errorids.h" #include "batchinsertprocessor.h" #include "tablelockdata.h" #include "oamcache.h" #include "messagelog.h" #include "sqllogger.h" #include "we_messages.h" #include "dmlprocessor.h" using namespace BRM; using namespace config; using namespace execplan; using namespace std; using namespace messageqcpp; using namespace dmlpackage; using namespace dmlpackageprocessor; using namespace joblist; using namespace logging; using namespace oam; using namespace WriteEngine; #include "querytele.h" using namespace querytele; extern boost::mutex mute; extern boost::condition_variable cond; #define MCOL_140 // Undefine to test VSS for out of order transactions namespace { [[maybe_unused]] const std::string myname = "DMLProc"; } namespace dmlprocessor { // Map to store the package handler objects so we can set flags during execution // for things like ctrl+c DMLProcessor::PackageHandlerMap_t DMLProcessor::packageHandlerMap; boost::mutex DMLProcessor::packageHandlerMapLock; // Map to store the BatchInsertProc object std::map DMLProcessor::batchinsertProcessorMap; boost::mutex DMLProcessor::batchinsertProcessorMapLock; // MCOL-140 Map to hold table oids for tables being changed. std::map PackageHandler::tableOidMap; boost::condition_variable PackageHandler::tableOidCond; boost::mutex PackageHandler::tableOidMutex; //------------------------------------------------------------------------------ // A thread to periodically call dbrm to see if a user is // shutting down the system or has put the system into write // suspend mode. DBRM has 2 flags to check in this case, the // ROLLBACK flag, and the FORCE flag. These flags will be // reported when we ask for the Shutdown Pending flag (which we // ignore at this point). Even if the user is putting the system // into write suspend mode, this call will return the flags we // are interested in. If ROLLBACK is set, we cancel normally. // If FORCE is set, we can't rollback. struct CancellationThread { CancellationThread(DBRM* aDbrm, DMLServer& aServer) : fDbrm(aDbrm), fServer(aServer) { } void operator()() { bool bDoingRollback = false; bool bRollback = false; bool bForce = false; ostringstream oss; std::vector tableLocks; BRM::TxnID txnId; DMLProcessor::PackageHandlerMap_t::iterator phIter; uint32_t sessionID; int rc = 0; while (true) { usleep(1000000); // 1 seconds // Check to see if someone has ordered a shutdown or suspend with rollback. (void)fDbrm->getSystemShutdownPending(bRollback, bForce); if (bForce) break; if (bDoingRollback && bRollback) { continue; // We've already started the rollbacks. Don't start again. } bDoingRollback = false; if (bRollback) { RollbackTransactionProcessor rollbackProcessor(fDbrm); SessionManager sessionManager; uint64_t uniqueId = fDbrm->getUnique64(); std::string errorMsg; int activeTransCount = 0; int idleTransCount = 0; bDoingRollback = true; ostringstream oss; oss << "DMLProc has been told to rollback all DML transactions."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO); // Tell any active processors to stop working and return an error // The front end will respond with a ROLLBACK command. // Mark all active processors to rollback boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock); for (phIter = DMLProcessor::packageHandlerMap.begin(); phIter != DMLProcessor::packageHandlerMap.end(); ++phIter) { ostringstream oss; oss << "DMLProc will rollback active session " << phIter->second->getSessionID() << " Transaction " << phIter->second->getTxnid(); DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO); ++activeTransCount; phIter->second->rollbackPending(); } if (activeTransCount > 0) { ostringstream oss1; oss1 << "DMLProc is rolling back back " << activeTransCount << " active transactions."; DMLProcessor::log(oss1.str(), logging::LOG_TYPE_INFO); } // WIP Need to set cluster to read-only via CMAPI before shutting the cluster down. if (fDbrm->isReadWrite()) { continue; } // Check for any open DML transactions that don't currently have // a processor tableLocks = fDbrm->getAllTableLocks(); if (tableLocks.size() > 0) { for (uint32_t i = 0; i < tableLocks.size(); ++i) { sessionID = tableLocks[i].ownerSessionID; phIter = DMLProcessor::packageHandlerMap.find(sessionID); if (phIter == DMLProcessor::packageHandlerMap.end()) { // We have found an active transaction without a packagehandler. // This means that a transaction is open with autocommit turned // off, but there's no current activity on the transaction. We // need to roll it back if it's a DML transaction. // If ownerName == "DMLProc" then it's a DML transaction. if (tableLocks[i].ownerName == "DMLProc") { // OK, we know this is an idle DML transaction, so roll it back. ++idleTransCount; txnId.id = tableLocks[i].ownerTxnID; txnId.valid = true; rc = rollbackProcessor.rollBackTransaction(uniqueId, txnId, sessionID, errorMsg); if (rc == 0) { fDbrm->invalidateUncommittedExtentLBIDs(txnId.id, false); //@Bug 4524. In case it is batchinsert, call bulkrollback. rc = rollbackProcessor.rollBackBatchAutoOnTransaction(uniqueId, txnId, sessionID, tableLocks[i].tableOID, errorMsg); if (rc == 0) { logging::logCommand(0, tableLocks[i].ownerTxnID, "ROLLBACK;"); bool lockReleased = true; try { lockReleased = fDbrm->releaseTableLock(tableLocks[i].id); TablelockData::removeTablelockData(sessionID); } catch (std::exception&) { throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } if (lockReleased) { sessionManager.rolledback(txnId); ostringstream oss; oss << "DMLProc rolled back idle transaction " << tableLocks[i].ownerTxnID << " and table lock id " << tableLocks[i].id << " is released."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO); } else { ostringstream oss; oss << "DMLProc rolled back idle transaction " << tableLocks[i].ownerTxnID << " and tble lock id " << tableLocks[i].id << " is not released."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO); } } else { ostringstream oss; oss << " problem with bulk rollback of idle transaction " << tableLocks[i].ownerTxnID << "and DBRM is setting to readonly and table lock is not released: " << errorMsg; DMLProcessor::log(oss.str(), logging::LOG_TYPE_CRITICAL); rc = fDbrm->setReadOnly(true); } } else { ostringstream oss; oss << " problem with rollback of idle transaction " << tableLocks[i].ownerTxnID << "and DBRM is setting to readonly and table lock is not released: " << errorMsg; DMLProcessor::log(oss.str(), logging::LOG_TYPE_CRITICAL); rc = fDbrm->setReadOnly(true); } } } } } // If there are any abandonded transactions without locks // release them. int len; std::shared_ptr activeTxns = sessionManager.SIDTIDMap(len); for (int i = 0; i < len; i++) { // If there isn't a table lock for this transaction, roll it back. Otherwise, assume // it has an active processor or is not DML initiated and leave it alone. It's someone // else's concern. bool bFoundit = false; for (uint32_t j = 0; j < tableLocks.size(); ++j) { if (tableLocks[j].ownerTxnID == activeTxns[i].txnid.id) { bFoundit = true; break; } } if (!bFoundit && activeTxns[i].txnid.valid) { rollbackProcessor.rollBackTransaction(uniqueId, activeTxns[i].txnid, activeTxns[i].sessionid, errorMsg); sessionManager.rolledback(activeTxns[i].txnid); ++idleTransCount; ostringstream oss; oss << "DMLProc rolled back idle transaction with no tablelock" << tableLocks[i].ownerTxnID; DMLProcessor::log(oss.str(), logging::LOG_TYPE_INFO); } } if (idleTransCount > 0) { ostringstream oss2; oss2 << "DMLProc has rolled back " << idleTransCount << " idle transactions."; DMLProcessor::log(oss2.str(), logging::LOG_TYPE_INFO); } // Here is the end of the rollback if so DMLProc rollbacks what it can. break; } } // Setting the flag to tell DMLServer to exit. fServer.startShutdown(); } DBRM* fDbrm; DMLServer& fServer; }; PackageHandler::PackageHandler(const messageqcpp::IOSocket& ios, boost::shared_ptr bs, uint8_t packageType, joblist::DistributedEngineComm* ec, bool concurrentSupport, uint64_t maxDeleteRows, uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnId, DBRM* aDbrm, const QueryTeleClient& qtc, boost::shared_ptr csc) : fIos(ios) , fByteStream(bs) , fPackageType(packageType) , fEC(ec) , fConcurrentSupport(concurrentSupport) , fMaxDeleteRows(maxDeleteRows) , fSessionID(sessionID) , fTableOid(0) , fTxnid(txnId) , fDbrm(aDbrm) , fQtc(qtc) , fcsc(csc) { } PackageHandler::~PackageHandler() { // cout << "In destructor" << endl; } // MCOL-140 // Blocks a thread if there is another trx working on the same fTableOid // return 1 when thread should continue. // return 0 if error. Right now, no error detection is implemented. // // txnid was being created before the call to this function. This caused race conditions // so creation is delayed until we're inside the lock here. Nothing needs it before // this point in the execution. // // The algorithm is this. When the first txn for a given fTableOid arrives, start a queue // containing a list of waiting or working txnId. Put this txnId into the queue (working) // Put the queue into a map keyed on fTableOid. // // When the next txn for this fTableOid arrives, it finds the queue in the map and adds itself, // then waits for condition. // When a thread finishes, it removes its txnId from the queue and notifies all. If the queue is // empty, it removes the entry from the map. // Upon wakeup from wait(), a thread checks to see if it's next in the queue. If so, it is released // to do work. Otherwise it goes back to wait. // // There's a chance (CTRL+C) for instance, that the txn is no longer in the queue. Release it to work. // Rollback will most likely be next. // // A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid. int PackageHandler::synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage) { // MCOL-140 Wait for any other DML using this table. std::map::iterator it; boost::unique_lock lock(tableOidMutex); BRM::TxnID txnid; if (fPackageType != dmlpackage::DML_COMMAND) { txnid = sessionManager.getTxnID(fSessionID); if (!txnid.valid) { txnid = sessionManager.newTxnID(fSessionID, true); if (!txnid.valid) { throw std::runtime_error(std::string("Unable to start a transaction. Check critical log.")); } } } else { txnid = sessionManager.getTxnID(fSessionID); } fTxnid = txnid.id; if ((it = tableOidMap.find(fTableOid)) != tableOidMap.end()) { PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; // There's at least one working txn on this table. We may be the same txn. if (fTxnid == tableOidQueue.front()) { return 1; // We're next in line or the same as the last. Keep working } tableOidQueue.push(fTxnid); // Get on the waiting list. // We need to wait // tableOidQueue here is the queue holding the waitng transactions for this fTableOid while (true) { // Log something that we're waiting LoggingID logid(21, fSessionID, fTxnid); logging::Message::Args args1; logging::Message msg(1); ostringstream oss; oss << "Txn is waiting for" << tableOidQueue.front() << " " << dmlPackage->get_SQLStatement() << "; |" << dmlPackage->get_SchemaName() << "|"; args1.add(oss.str()); args1.add((uint64_t)fTableOid); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); tableOidCond.wait(lock); // In case of CTRL+C, the tableOidQueue could be invalidated if ((tableOidMap.find(fTableOid))->second != tableOidQueue) { break; } if (tableOidQueue.front() == fTxnid) { // We're up next. Let's go do stuff. break; } if (tableOidQueue.empty()) { // If we had been the last txn waiting and CTRL+C was hit, then the queue is empty now. // Empty queues must be erased from the map. tableOidMap.erase(fTableOid); break; } // If we're not in the queue at all, then continue. CTRL+C was probably hit. PackageHandler::tableAccessQueue_t::container_type::iterator c_it = tableOidQueue.find(fTxnid); if (c_it == tableOidQueue.end()) { break; } // We're still in the queue and not on top. Go back and wait some more. } } else { // We're the first for this tableoid. Start a new queue. tableAccessQueue_t tableOidQueue; tableOidQueue.push(fTxnid); tableOidMap[fTableOid] = tableOidQueue; } return 1; } // MCOL-140 Called when it's time to release the next thread for this tablOid int PackageHandler::releaseTableAccess() { // take us out of the queue std::map::iterator it; boost::lock_guard lock(tableOidMutex); if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end()) { return 2; // For now, return codes are not used } PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; if (tableOidQueue.front() != fTxnid) { // This is a severe error. The front should be the working thread. If we're here, // we're the working thread and should be front(). cout << fTxnid << " " << fTableOid << " We got to release and we're not on top " << tableOidQueue.front() << endl; LoggingID logid(21, fSessionID, fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add( "ReleaseTableAccess: Txn being released is not the current txn in the tablOidQueue for tableid"); args1.add((uint64_t)fTableOid); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_ERROR, msg, logid); } else { if (!tableOidQueue.empty()) tableOidQueue.pop(); // Get off the waiting list. if (tableOidQueue.empty()) { // remove the queue from the map. tableOidMap.erase(fTableOid); } } // release the condition tableOidCond.notify_all(); return 1; } int PackageHandler::forceReleaseTableAccess() { // By removing the txnid from the queue, the logic after the wait in // synchTableAccess() will release the thread and clean up if needed. std::map::iterator it; boost::lock_guard lock(tableOidMutex); if (fTableOid == 0 || (it = tableOidMap.find(fTableOid)) == tableOidMap.end()) { // This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess return 2; } PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; tableOidQueue.erase(fTxnid); if (tableOidQueue.empty()) { // remove the queue from the map. tableOidMap.erase(fTableOid); } // release the condition tableOidCond.notify_all(); return 1; } // static // Called upon sighup, often because PrimProc crashed. We don't want to leave all the transactions hung, // though some may be because they never returned from PrimProc and will leave the table lock on. int PackageHandler::clearTableAccess() { tableOidMap.clear(); return 1; } CalpontSystemCatalog::ROPair PackageHandler::getTableRID( boost::shared_ptr fcsc, execplan::CalpontSystemCatalog::TableName& tableName) { execplan::CalpontSystemCatalog::ROPair roPair; try { roPair = fcsc->tableRID(tableName); } catch (...) { if (setupDec()) throw; roPair = fcsc->tableRID(tableName); } return roPair; } void PackageHandler::run() { ResourceManager* frm = ResourceManager::instance(); dmlpackageprocessor::DMLPackageProcessor::DMLResult result; result.result = dmlpackageprocessor::DMLPackageProcessor::NO_ERROR; // cout << "PackageHandler handling "; std::string stmt; unsigned DMLLoggingId = 21; oam::OamCache* oamCache = oam::OamCache::makeOamCache(); SynchTable synchTable; try { switch (fPackageType) { case dmlpackage::DML_INSERT: { // build an InsertDMLPackage from the bytestream // cout << "an INSERT package" << endl; dmlpackage::InsertDMLPackage insertPkg; // boost::shared_ptr insertBs (new messageqcpp::ByteStream); messageqcpp::ByteStream bsSave = *(fByteStream.get()); insertPkg.readMetaData(*(fByteStream.get())); #ifdef MCOL_140 if (fConcurrentSupport) { fTableOid = insertPkg.getTableOid(); // Single Insert has no start like bulk does, so insertPkg.getTableOid() // isn't set. Go get it now. if (fTableOid == 0) { CalpontSystemCatalog::TableName tableName; tableName.schema = insertPkg.get_Table()->get_SchemaName(); tableName.table = insertPkg.get_Table()->get_TableName(); CalpontSystemCatalog::ROPair roPair = getTableRID(fcsc, tableName); fTableOid = roPair.objnum; } synchTable.setPackage(this, &insertPkg); // Blocks if another DML thread is using this fTableOid } #endif QueryTeleStats qts; qts.query_uuid = QueryTeleClient::genUUID(); qts.msg_type = QueryTeleStats::QT_START; qts.start_time = QueryTeleClient::timeNowms(); qts.session_id = fSessionID; qts.query_type = "INSERT"; qts.query = insertPkg.get_SQLStatement(); qts.system_name = oamCache->getSystemName(); qts.module_name = oamCache->getModuleName(); qts.schema_name = insertPkg.get_SchemaName(); fQtc.postQueryTele(qts); // cout << "This is batch insert " << insertPkg->get_isBatchInsert() << endl; if (insertPkg.get_isBatchInsert()) { fByteStream->reset(); // cout << "This is batch insert " << endl; BatchInsertProc* batchProcessor = NULL; { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); std::map::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID); if (batchIter == DMLProcessor::batchinsertProcessorMap.end()) { batchProcessor = new BatchInsertProc(insertPkg.get_isAutocommitOn(), insertPkg.getTableOid(), fTxnid, fDbrm); DMLProcessor::batchinsertProcessorMap[fSessionID] = batchProcessor; // cout << "batchProcessor is created " << batchProcessor << endl; } else { batchProcessor = batchIter->second; // cout << "Found batchProcessor " << batchProcessor << endl; } } if (insertPkg.get_Logging()) { LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("Start SQL statement: "); if (!insertPkg.get_isCacheInsert()) { ostringstream oss; oss << insertPkg.get_SQLStatement() << "; |" << insertPkg.get_SchemaName() << "|"; args1.add(oss.str()); } msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); TablelockData* tablelockData = TablelockData::makeTablelockData(insertPkg.get_SessionID()); uint64_t tableLockId = tablelockData->getTablelockId(insertPkg.getTableOid()); // cout << "Processing table oid " << insertPkg.getTableOid() << " for transaction "<< (int)fTxnid // << endl; if (tableLockId == 0) { // cout << "Grabing tablelock for batchProcessor " << batchProcessor << endl; tableLockId = batchProcessor->grabTableLock(insertPkg.get_SessionID()); if (tableLockId == 0) { BRM::TxnID brmTxnID; brmTxnID.id = fTxnid; brmTxnID.valid = true; sessionManager.rolledback(brmTxnID); string errMsg; int rc = 0; batchProcessor->getError(rc, errMsg); result.result = DMLPackageProcessor::TABLE_LOCK_ERROR; logging::Message::Args args; logging::Message message(1); args.add("Insert Failed: "); args.add(errMsg); args.add(""); args.add(""); message.format(args); result.message = message; break; } if (tableLockId > 0) tablelockData->setTablelock(insertPkg.getTableOid(), tableLockId); } } if (insertPkg.get_Logending() && insertPkg.get_Logging()) // only one batch need to be processed. { // cout << "dmlprocessor add last pkg" << endl; // need to add error handling. batchProcessor->addPkg(bsSave); batchProcessor->sendFirstBatch(); batchProcessor->receiveOutstandingMsg(); //@Bug 5162. Get the correct error message before the last message. string errMsg; int rc = 0; batchProcessor->getError(rc, errMsg); batchProcessor->sendlastBatch(); batchProcessor->receiveAllMsg(); if (rc == DMLPackageProcessor::IDBRANGE_WARNING) { result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement with warnings"); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); logging::Message::Args args; logging::Message message(1); args.add(errMsg); args.add(""); args.add(""); message.format(args); result.message = message; } else if (rc != 0) { result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; logging::Message::Args args; logging::Message message(1); cout << "Got error in the end of one batchinsert." << endl; args.add("Insert Failed: "); args.add(errMsg); args.add(""); args.add(""); message.format(args); result.message = message; LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement with error"); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); } else { // if (!insertPkg.get_isAutocommitOn()) // { // batchProcessor->setHwm(); // } LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement"); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); } // remove the batch insert object { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); std::map::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID); if (batchIter != DMLProcessor::batchinsertProcessorMap.end()) { delete batchIter->second; DMLProcessor::batchinsertProcessorMap.erase(fSessionID); } } } else if (insertPkg.get_Logending()) // Last batch { int rc = 0; string errMsg; batchProcessor->getError(rc, errMsg); // cout <<"dmlprocessor received last pkg from mysql rc == " << rc << endl; if ((rc == 0) || (rc == DMLPackageProcessor::IDBRANGE_WARNING)) { // cout << " rc = " << rc << endl; batchProcessor->addPkg(bsSave); batchProcessor->sendNextBatch(); batchProcessor->receiveOutstandingMsg(); //@Bug 5162. Get the correct error message before the last message. batchProcessor->getError(rc, errMsg); batchProcessor->sendlastBatch(); batchProcessor->receiveAllMsg(); if (rc == DMLPackageProcessor::IDBRANGE_WARNING) { result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement with warnings"); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); logging::Message::Args args; logging::Message message(1); args.add(errMsg); args.add(""); args.add(""); message.format(args); result.message = message; } else if (rc != 0) { // cout << "Got error in the end of last batchinsert. error message is " << errMsg << endl; result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; logging::Message::Args args; logging::Message message(1); args.add("Insert Failed: "); args.add(errMsg); args.add(""); args.add(""); message.format(args); result.message = message; LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement with error"); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); } else { LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement"); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); } // cout << "finished batch insert" << endl; } else { // error occurred. Receive all outstanding messages before erroring out. batchProcessor->receiveOutstandingMsg(); batchProcessor->sendlastBatch(); // needs to flush files batchProcessor->receiveAllMsg(); result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; // cout << "Got error in the end of batchinsert2. error msg is " << errMsg<< endl; logging::Message::Args args; logging::Message message(1); args.add("Insert Failed: "); args.add(errMsg); args.add(""); args.add(""); message.format(args); result.message = message; LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement with error"); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); logging::logDML(insertPkg.get_SessionID(), fTxnid, insertPkg.get_SQLStatement() + ";", insertPkg.get_SchemaName()); } // remove from map { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); std::map::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID); if (batchIter != DMLProcessor::batchinsertProcessorMap.end()) { // cout << "Batchinsertprcessor is deleted. " << batchIter->second << endl; delete batchIter->second; DMLProcessor::batchinsertProcessorMap.erase(fSessionID); } } } else { int rc = 0; string errMsg; batchProcessor->getError(rc, errMsg); if (rc == DMLPackageProcessor::IDBRANGE_WARNING) { result.result = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; } else if (rc != 0) { result.result = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; //@Bug // cout << "Got error during batchinsert. with message " << errMsg << endl; logging::Message::Args args; logging::Message message(6); args.add(errMsg); message.format(args); result.message = message; batchProcessor->receiveOutstandingMsg(); batchProcessor->sendlastBatch(); // needs to flush files // cout << "Last batch is sent to WES." << endl; batchProcessor->receiveAllMsg(); LoggingID logid(DMLLoggingId, insertPkg.get_SessionID(), fTxnid); logging::Message::Args args1; logging::Message msg(1); args1.add("End SQL statement with error"); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); // remove from map { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); std::map::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(fSessionID); if (batchIter != DMLProcessor::batchinsertProcessorMap.end()) { // cout << "Batchinsertprcessor is deleted. " << batchIter->second << endl; delete batchIter->second; DMLProcessor::batchinsertProcessorMap.erase(fSessionID); } } break; } batchProcessor->addPkg(bsSave); batchProcessor->sendNextBatch(); break; } } else // Single Insert { // make sure insertPkg.readMetaData() is called before // this on fByteStream! // TODO: Similar to batch inserts, don't // deserialize the row data here for single inserts. insertPkg.readRowData(*(fByteStream.get())); insertPkg.set_TxnID(fTxnid); fProcessor.reset(new dmlpackageprocessor::InsertPackageProcessor(fDbrm, insertPkg.get_SessionID())); result = fProcessor->processPackage(insertPkg); } qts.msg_type = QueryTeleStats::QT_SUMMARY; qts.max_mem_pct = result.stats.fMaxMemPct; qts.num_files = result.stats.fNumFiles; qts.phy_io = result.stats.fPhyIO; qts.cache_io = result.stats.fCacheIO; qts.msg_rcv_cnt = result.stats.fMsgRcvCnt; qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped; qts.msg_bytes_in = result.stats.fMsgBytesIn; qts.msg_bytes_out = result.stats.fMsgBytesOut; qts.rows = result.stats.fRows; qts.end_time = QueryTeleClient::timeNowms(); qts.blocks_changed = result.stats.fBlocksChanged; fQtc.postQueryTele(qts); } break; case dmlpackage::DML_UPDATE: { // build an UpdateDMLPackage from the bytestream // cout << "an UPDATE package" << endl; boost::scoped_ptr updatePkg(new dmlpackage::UpdateDMLPackage()); updatePkg->read(*(fByteStream.get())); #ifdef MCOL_140 if (fConcurrentSupport) { fTableOid = updatePkg->getTableOid(); // Update generally doesn't set fTableOid in updatePkg. Go get it now. if (fTableOid == 0) { CalpontSystemCatalog::TableName tableName; tableName.schema = updatePkg->get_Table()->get_SchemaName(); tableName.table = updatePkg->get_Table()->get_TableName(); CalpontSystemCatalog::ROPair roPair = getTableRID(fcsc, tableName); fTableOid = roPair.objnum; } synchTable.setPackage(this, updatePkg.get()); // Blocks if another DML thread is using this fTableOid } #endif updatePkg->set_TxnID(fTxnid); QueryTeleStats qts; qts.query_uuid = updatePkg->uuid(); qts.msg_type = QueryTeleStats::QT_START; qts.start_time = QueryTeleClient::timeNowms(); qts.session_id = fSessionID; qts.query_type = "UPDATE"; qts.query = updatePkg->get_SQLStatement(); qts.system_name = oamCache->getSystemName(); qts.module_name = oamCache->getModuleName(); qts.schema_name = updatePkg->get_SchemaName(); fQtc.postQueryTele(qts); // process it //@Bug 1341. Don't remove calpontsystemcatalog from this // session to take advantage of cache. fProcessor.reset(new dmlpackageprocessor::UpdatePackageProcessor(fDbrm, updatePkg->get_SessionID())); fProcessor->setEngineComm(fEC); fProcessor->setRM(frm); idbassert(fTxnid != 0); result = fProcessor->processPackage(*(updatePkg.get())); qts.msg_type = QueryTeleStats::QT_SUMMARY; qts.max_mem_pct = result.stats.fMaxMemPct; qts.num_files = result.stats.fNumFiles; qts.phy_io = result.stats.fPhyIO; qts.cache_io = result.stats.fCacheIO; qts.msg_rcv_cnt = result.stats.fMsgRcvCnt; qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped; qts.msg_bytes_in = result.stats.fMsgBytesIn; qts.msg_bytes_out = result.stats.fMsgBytesOut; qts.rows = result.stats.fRows; qts.end_time = QueryTeleClient::timeNowms(); qts.blocks_changed = result.stats.fBlocksChanged; fQtc.postQueryTele(qts); } break; case dmlpackage::DML_DELETE: { boost::scoped_ptr deletePkg(new dmlpackage::DeleteDMLPackage()); deletePkg->read(*(fByteStream.get())); #ifdef MCOL_140 if (fConcurrentSupport) { fTableOid = deletePkg->getTableOid(); // Delete generally doesn't set fTableOid in updatePkg. Go get it now. if (fTableOid == 0) { CalpontSystemCatalog::TableName tableName; tableName.schema = deletePkg->get_Table()->get_SchemaName(); tableName.table = deletePkg->get_Table()->get_TableName(); CalpontSystemCatalog::ROPair roPair = getTableRID(fcsc, tableName); fTableOid = roPair.objnum; } synchTable.setPackage(this, deletePkg.get()); // Blocks if another DML thread is using this fTableOid } #endif deletePkg->set_TxnID(fTxnid); QueryTeleStats qts; qts.query_uuid = deletePkg->uuid(); qts.msg_type = QueryTeleStats::QT_START; qts.start_time = QueryTeleClient::timeNowms(); qts.session_id = fSessionID; qts.query_type = "DELETE"; qts.query = deletePkg->get_SQLStatement(); qts.system_name = oamCache->getSystemName(); qts.module_name = oamCache->getModuleName(); qts.schema_name = deletePkg->get_SchemaName(); fQtc.postQueryTele(qts); // process it //@Bug 1341. Don't remove calpontsystemcatalog from this session to take advantage of cache. fProcessor.reset(new dmlpackageprocessor::DeletePackageProcessor(fDbrm, deletePkg->get_SessionID())); fProcessor->setEngineComm(fEC); fProcessor->setRM(frm); idbassert(fTxnid != 0); result = fProcessor->processPackage(*(deletePkg.get())); qts.msg_type = QueryTeleStats::QT_SUMMARY; qts.max_mem_pct = result.stats.fMaxMemPct; qts.num_files = result.stats.fNumFiles; qts.phy_io = result.stats.fPhyIO; qts.cache_io = result.stats.fCacheIO; qts.msg_rcv_cnt = result.stats.fMsgRcvCnt; qts.cp_blocks_skipped = result.stats.fCPBlocksSkipped; qts.msg_bytes_in = result.stats.fMsgBytesIn; qts.msg_bytes_out = result.stats.fMsgBytesOut; qts.rows = result.stats.fRows; qts.end_time = QueryTeleClient::timeNowms(); qts.blocks_changed = result.stats.fBlocksChanged; fQtc.postQueryTele(qts); } break; case dmlpackage::DML_COMMAND: { // build a CommandDMLPackage from the bytestream // cout << "a COMMAND package" << endl; dmlpackage::CommandDMLPackage commandPkg; commandPkg.read(*(fByteStream.get())); stmt = commandPkg.get_DMLStatement(); boost::algorithm::to_upper(stmt); trim(stmt); if (stmt == "CLEANUP") { execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(commandPkg.get_SessionID()); execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(commandPkg.get_SessionID() | 0x80000000); } else { // process it //@Bug 1341. Don't remove calpontsystemcatalog from this session to take advantage of cache. fProcessor.reset( new dmlpackageprocessor::CommandPackageProcessor(fDbrm, commandPkg.get_SessionID())); // cout << "got command " << stmt << " for session " << commandPkg.get_SessionID() << endl; result = fProcessor->processPackage(commandPkg); } } break; } // Log errors if ((result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR) && (result.result != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) && (result.result != dmlpackageprocessor::DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR) && (result.result != dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR)) { logging::LoggingID lid(21); logging::MessageLog ml(lid); ml.logErrorMessage(result.message); } else if (result.result == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) { logging::LoggingID lid(21); logging::MessageLog ml(lid); ml.logWarningMessage(result.message); } } catch (std::exception& e) { cout << "dmlprocessor.cpp PackageHandler::run() package type(" << fPackageType << ") exception: " << e.what() << endl; logging::LoggingID lid(21); logging::MessageLog ml(lid); logging::Message::Args args; logging::Message message(1); args.add("dmlprocessor.cpp PackageHandler::run() package type"); args.add((uint64_t)fPackageType); args.add(e.what()); message.format(args); ml.logErrorMessage(message); result.result = DMLPackageProcessor::COMMAND_ERROR; result.message = message; } catch (...) { logging::LoggingID lid(21); logging::MessageLog ml(lid); logging::Message::Args args; logging::Message message(1); args.add("dmlprocessor.cpp PackageHandler::run() ... exception package type"); args.add((uint64_t)fPackageType); message.format(args); ml.logErrorMessage(message); result.result = DMLPackageProcessor::COMMAND_ERROR; result.message = message; } // We put the packageHandler into a map so that if we receive a // message to affect the previous command, we can find it. // We need to remove it from the list before sending the response back. // If we remove it after sending the results, it's possible for a commit // or rollback be sent and get processed before it is removed, and that // will fail. boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock); DMLProcessor::packageHandlerMap.erase(getSessionID()); lk2.unlock(); // send back the results messageqcpp::ByteStream results; messageqcpp::ByteStream::octbyte rowCount = result.rowCount; messageqcpp::ByteStream::byte retval = result.result; results << retval; results << rowCount; results << result.message.msg(); results << result.tableLockInfo; // ? connector does not get // query stats results << result.queryStats; results << result.extendedStats; results << result.miniStats; result.stats.serialize(results); fIos.write(results); // Bug 5226. dmlprocessor thread will close the socket to mysqld. // if (stmt == "CLEANUP") // fIos.close(); } void PackageHandler::rollbackPending() { if (fProcessor.get() == NULL) { // This happens when batch insert return; } fProcessor->setRollbackPending(true); // Force a release of the processing from MCOL-140 #ifdef MCOL_140 if (fConcurrentSupport) { // MCOL-140 We're not necessarily the next in line. // This forces this thread to be released anyway. forceReleaseTableAccess(); } #endif ostringstream oss; oss << "PackageHandler::rollbackPending: Processing DMLPackage."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG); } void added_a_pm(int) { DistributedEngineComm* dec; ResourceManager* rm = ResourceManager::instance(); dec = DistributedEngineComm::instance(rm); dec->Setup(); // MCOL-140 clear the waiting queue as all transactions are probably going to fail PackageHandler::clearTableAccess(); } DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm) : fPackageMaxThreads(packageMaxThreads) , fPackageWorkQueueSize(packageWorkQueueSize) , fDbrm(dbrm) , fShutdownFlag(false) { fMqServer.reset(new MessageQueueServer("DMLProc")); fDmlPackagepool.setMaxThreads(fPackageMaxThreads); fDmlPackagepool.setQueueSize(fPackageWorkQueueSize); fDmlPackagepool.setName("DmlPackagepool"); } int DMLServer::start() { messageqcpp::IOSocket ios; uint32_t nextID = 1; try { // CancellationThread is for telling all active transactions // to quit working because the system is either going down // or going into write suspend mode CancellationThread cancelObject(fDbrm, *this); boost::thread cancelThread(cancelObject); cout << "DMLProc is ready..." << endl; const static struct timespec timeout = {1, 100}; // roughly 1 second TO for (;;) { ios = fMqServer->accept(&timeout); // MCS polls in a loop watching for a pending shutdown // that is signalled via fShutdownFlag set in a // CancellationThread. CT sets the flag if a cluster state // has SS_SHUTDOWNPENDING value set. while (!ios.hasSocketDescriptor() && !pendingShutdown()) ios = fMqServer->accept(&timeout); if (pendingShutdown()) break; ios.setSockID(nextID++); fDmlPackagepool.invoke(DMLProcessor(ios, fDbrm)); } cancelThread.join(); return EXIT_SUCCESS; } catch (std::exception& ex) { cerr << ex.what() << endl; logging::LoggingID lid(21); Message::Args args; Message message(8); args.add("DMLProc init caught exception: "); args.add(ex.what()); message.format(args); logging::Logger logger(lid.fSubsysID); logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid); return EXIT_FAILURE; } catch (...) { cerr << "Caught unknown exception!" << endl; logging::LoggingID lid(21); Message::Args args; Message message(8); args.add("DMLProc init caught unknown exception"); message.format(args); logging::Logger logger(lid.fSubsysID); logger.logMessage(logging::LOG_TYPE_CRITICAL, message, lid); return EXIT_FAILURE; } } DMLProcessor::DMLProcessor(messageqcpp::IOSocket ios, BRM::DBRM* aDbrm) : fIos(ios) , fDbrm(aDbrm) , fConcurrentSupport(false) { csc = CalpontSystemCatalog::makeCalpontSystemCatalog(); csc->identity(CalpontSystemCatalog::EC); string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host")); if (!teleServerHost.empty()) { int teleServerPort = config::Config::fromText(config::Config::makeConfig()->getConfig("QueryTele", "Port")); if (teleServerPort > 0) { fQtc.serverParms(QueryTeleServerParms(teleServerHost, teleServerPort)); } } } void DMLProcessor::operator()() { bool bIsDbrmUp = true; try { boost::shared_ptr bs1(new messageqcpp::ByteStream()); // messageqcpp::ByteStream bs; uint8_t packageType; ResourceManager* rm = ResourceManager::instance(); DistributedEngineComm* fEC = DistributedEngineComm::instance(rm); uint64_t maxDeleteRows = rm->getDMLMaxDeleteRows(); fConcurrentSupport = true; string concurrentTranStr = config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions"); if (concurrentTranStr.length() != 0) { if ((concurrentTranStr.compare("N") == 0) || (concurrentTranStr.compare("n") == 0)) fConcurrentSupport = false; } struct sigaction ign; memset(&ign, 0, sizeof(ign)); ign.sa_handler = added_a_pm; sigaction(SIGHUP, &ign, 0); fEC->Open(); for (;;) { // cout << "DMLProc is waiting for a Calpont DML Package on " << fIos.getSockID() << endl; try { bs1.reset(new messageqcpp::ByteStream(fIos.read())); // cout << "received from mysql socket " << fIos.getSockID() << endl; } catch (std::exception& ex) { // This is an I/O error from InetStreamSocket::read(), just close and move on... cout << "runtime error during read on " << fIos.getSockID() << " " << ex.what() << endl; bs1->reset(); } catch (...) { cout << "... error during read " << fIos.getSockID() << endl; // all this throw does is cause this thread to silently go away. I doubt this is the right // thing to do... throw; } if (!bs1 || bs1->length() == 0) { cout << "Read 0 bytes. Closing connection " << fIos.getSockID() << endl; fIos.close(); break; } uint32_t sessionID; *bs1 >> sessionID; *bs1 >> packageType; // cout << "DMLProc received pkg. sessionid:type = " << sessionID <<":"<<(int)packageType << endl; uint32_t stateFlags; messageqcpp::ByteStream::byte status = 255; messageqcpp::ByteStream::octbyte rowCount = 0; if (fDbrm->getSystemState(stateFlags) > 0) // > 0 implies succesful retrieval. It doesn't imply anything about the contents { messageqcpp::ByteStream results; std::string responseMsg; bool bReject = false; // Check to see if we're in write suspended mode // If so, we can't process the request. if (stateFlags & SessionManagerServer::SS_SUSPENDED) { status = DMLPackageProcessor::NOT_ACCEPTING_PACKAGES; responseMsg = "Writing to the database is disabled."; bReject = true; } // Check to see if we're in write suspend or shutdown pending mode if (packageType != dmlpackage::DML_COMMAND) // Not a commit or rollback { if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING || stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING) { if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING) { responseMsg = "Writing to the database is disabled."; } else { responseMsg = "The database is being shut down."; } // Refuse all non active tranasactions // Check the rollback flag // -- Set: Rollback active transactions. // -- Not set: Allow active transactions. if (sessionManager.isTransactionActive(sessionID, bIsDbrmUp)) { if (stateFlags & SessionManagerServer::SS_ROLLBACK) { status = DMLPackageProcessor::JOB_CANCELED; bReject = true; } } else { status = DMLPackageProcessor::NOT_ACCEPTING_PACKAGES; bReject = true; } } // MCOL-4988 Check if DBRM is in READ ONLY mode if (fDbrm->isReadWrite() == BRM::ERR_READONLY) { BRM::errString(BRM::ERR_READONLY, responseMsg); status = DMLPackageProcessor::DBRM_READ_ONLY; bReject = true; } if (bReject) { // For batch insert, we need to send a lastpkg message // to batchInsertProcessor so it can clean things up. if (packageType == dmlpackage::DML_INSERT) { // build an InsertDMLPackage from the bytestream // We need the flags from the package to know what // type of package we're dealing with before we can // take special action for the last package of a // batch insert. dmlpackage::InsertDMLPackage insertPkg; messageqcpp::ByteStream bsSave = *(bs1.get()); insertPkg.read(*(bs1.get())); BatchInsertProc* batchInsertProcessor = NULL; if (insertPkg.get_isBatchInsert() && insertPkg.get_Logending()) { { boost::mutex::scoped_lock lk(DMLProcessor::batchinsertProcessorMapLock); std::map::iterator batchIter = DMLProcessor::batchinsertProcessorMap.find(sessionID); if (batchIter != DMLProcessor::batchinsertProcessorMap.end()) // The first batch, no need to do anything { batchInsertProcessor = batchIter->second; batchInsertProcessor->addPkg(bsSave); batchInsertProcessor->sendlastBatch(); batchInsertProcessor->receiveAllMsg(); if (!insertPkg.get_isAutocommitOn()) { batchInsertProcessor->setHwm(); } batchIter = DMLProcessor::batchinsertProcessorMap.find(sessionID); if (batchIter != DMLProcessor::batchinsertProcessorMap.end()) { DMLProcessor::batchinsertProcessorMap.erase(sessionID); } } } } } results << status; results << rowCount; logging::Message::Args args; logging::Message message(2); args.add(responseMsg); message.format(args); results << message.msg(); fIos.write(results); continue; } } } // This section is to check to see if the user hit CTRL+C while the // DML was processing If so, the sessionID will be found in // packageHandlerMap and we can set rollbackPending in the // associated packageHandler. Other than CTRL+C, we should never // find our own sessionID in the map. // This mechanism may prove useful for other things, so the above // comment may change. { boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock); DMLProcessor::PackageHandlerMap_t::iterator phIter = packageHandlerMap.find(sessionID); if (phIter != packageHandlerMap.end()) { if (packageType == dmlpackage::DML_COMMAND) { // MCOL-66 It's possible for a commit or rollback to get here if // the timing is just right. Don't destroy its data messageqcpp::ByteStream bsctrlc(bs1); dmlpackage::CommandDMLPackage commandPkg; commandPkg.read(bsctrlc); std::string stmt = commandPkg.get_DMLStatement(); boost::algorithm::to_upper(stmt); trim(stmt); if (stmt == "CTRL+C") { phIter->second->rollbackPending(); fIos.close(); break; } } else { // If there's a PackageHandler already working for this // sessionID, we have a problem. Reject this package messageqcpp::ByteStream results; ostringstream oss; oss << "Received a DML command for session " << sessionID << " while still processing a command for the same sessionID"; results << static_cast(DMLPackageProcessor::DEAD_LOCK_ERROR); results << static_cast(0); // rowcount logging::Message::Args args; logging::Message message(2); args.add(oss.str()); message.format(args); logging::LoggingID lid(20); logging::MessageLog ml(lid); ml.logErrorMessage(message); results << message.msg(); fIos.write(results); continue; } } } // cout << " got a "; switch (packageType) { case dmlpackage::DML_INSERT: // cout << "DML_INSERT"; break; case dmlpackage::DML_UPDATE: // cout << "DML_UPDATE"; break; case dmlpackage::DML_DELETE: // cout << "DML_DELETE"; break; case dmlpackage::DML_COMMAND: // cout << "DML_COMMAND"; break; case dmlpackage::DML_INVALID_TYPE: // cout << "DML_INVALID_TYPE"; break; default: // cout << "UNKNOWN"; break; } // cout << " package" << endl; BRM::TxnID txnid; if (!fConcurrentSupport) { // Check if any other active transaction bool anyOtherActiveTransaction = true; BRM::SIDTIDEntry blockingsid; // For logout commit trigger if (packageType == dmlpackage::DML_COMMAND) { anyOtherActiveTransaction = false; } int i = 0; int waitPeriod = 10; //@Bug 2487 Check transaction map every 1/10 second int sleepTime = 100; // sleep 100 milliseconds between checks int numTries = 10; // try 10 times per second string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod"); if (waitPeriodStr.length() != 0) waitPeriod = static_cast(config::Config::fromText(waitPeriodStr)); numTries = waitPeriod * 10; struct timespec rm_ts; rm_ts.tv_sec = sleepTime / 1000; rm_ts.tv_nsec = sleepTime % 1000 * 1000000; // cout << "starting i = " << i << endl; // txnid = sessionManager.getTxnID(sessionID); while (anyOtherActiveTransaction) { anyOtherActiveTransaction = sessionManager.checkActiveTransaction(sessionID, bIsDbrmUp, blockingsid); // cout << "session " << sessionID << " with package type " << (int)packageType << " got // anyOtherActiveTransaction " << anyOtherActiveTransaction << endl; if (anyOtherActiveTransaction) { for (; i < numTries; i++) { struct timespec abs_ts; // cout << "session " << sessionID << " nanosleep on package type " << (int)packageType << endl; do { abs_ts.tv_sec = rm_ts.tv_sec; abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); anyOtherActiveTransaction = sessionManager.checkActiveTransaction(sessionID, bIsDbrmUp, blockingsid); if (!anyOtherActiveTransaction) { txnid = sessionManager.getTxnID(sessionID); // cout << "Ready to process type " << (int)packageType << " with txd " << txnid << endl; if (!txnid.valid) { txnid = sessionManager.newTxnID(sessionID, true); if (txnid.valid) { // cout << "Ready to process type " << (int)packageType << " for session "<< sessionID << // " with new txnid " << txnid.id << endl; anyOtherActiveTransaction = false; break; } else { anyOtherActiveTransaction = true; } } else { anyOtherActiveTransaction = false; // cout << "already have transaction to process type " << (int)packageType << " for session // "<< sessionID <<" with existing txnid " << txnid.id << endl; break; } } } // cout << "ending i = " << i << endl; } else { // cout << "Ready to process type " << (int)packageType << endl; txnid = sessionManager.getTxnID(sessionID); if (!txnid.valid) { txnid = sessionManager.newTxnID(sessionID, true); if (txnid.valid) { // cout << "later Ready to process type " << (int)packageType << " for session "<< sessionID // << " with new txnid " << txnid.id << endl; anyOtherActiveTransaction = false; } else { anyOtherActiveTransaction = true; // cout << "Cannot get txnid for process type " << (int)packageType << " for session "<< // sessionID << endl; } } else { anyOtherActiveTransaction = false; // cout << "already have transaction to process type " << (int)packageType << " for session "<< // sessionID <<" with txnid " << txnid.id << endl; break; } } if ((anyOtherActiveTransaction) && (i >= numTries)) { // cout << " Erroring out on package type " << (int)packageType << " for session " << sessionID << // endl; break; } } if (anyOtherActiveTransaction && (i >= numTries)) { // cout << " again Erroring out on package type " << (int)packageType << endl; messageqcpp::ByteStream results; //@Bug 2681 set error code for active transaction status = DMLPackageProcessor::ACTIVE_TRANSACTION_ERROR; rowCount = 0; results << status; results << rowCount; Message::Args args; args.add(static_cast(blockingsid.sessionid)); results << IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args); //@Bug 3854 Log to debug.log LoggingID logid(20, 0, 0); logging::Message::Args args1; logging::Message msg(1); args1.add(IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args)); msg.format(args1); logging::Logger logger(logid.fSubsysID); logger.logMessage(LOG_TYPE_DEBUG, msg, logid); fIos.write(results); } else { // cout << "starting processing package type " << (int) packageType << " for session " << sessionID // << " with id " << txnid.id << endl; boost::shared_ptr php(new PackageHandler(fIos, bs1, packageType, fEC, fConcurrentSupport, maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc, csc)); // We put the packageHandler into a map so that if we receive a // message to affect the previous command, we can find it. boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock); lk2.lock(); packageHandlerMap[sessionID] = php; lk2.unlock(); php->run(); // Operates in this thread. // Move this to the end of PackageHandler so it is removed from the map before the response is sent // lk2.lock(); // packageHandlerMap.erase(sessionID); // lk2.unlock(); } } else { #if 0 if (packageType != dmlpackage::DML_COMMAND) { txnid = sessionManager.getTxnID(sessionID); if ( !txnid.valid ) { txnid = sessionManager.newTxnID(sessionID, true); if (!txnid.valid) { throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") ); } } } else { txnid = sessionManager.getTxnID(sessionID); } #endif boost::shared_ptr php(new PackageHandler( fIos, bs1, packageType, fEC, fConcurrentSupport, maxDeleteRows, sessionID, 0, fDbrm, fQtc, csc)); // We put the packageHandler into a map so that if we receive a // message to affect the previous command, we can find it. boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, boost::defer_lock); lk2.lock(); packageHandlerMap[sessionID] = php; lk2.unlock(); php->run(); // Operates in this thread. // Move this to the end of PackageHandler so it is removed from the map before the response is sent // lk2.lock(); // packageHandlerMap.erase(sessionID); // lk2.unlock(); } } } catch (std::exception& ex) { ostringstream oss; oss << "DMLProcessor failed on: " << ex.what(); DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG); fIos.close(); } catch (...) { ostringstream oss; oss << "DMLProcessor failed on: processing DMLPackage."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG); cerr << "Caught unknown exception! " << oss.str(); fIos.close(); } } void RollbackTransactionProcessor::processBulkRollback(BRM::TableLockInfo lockInfo, BRM::DBRM* dbrm, uint64_t uniqueId, OamCache::dbRootPMMap_t& dbRootPMMap, bool& lockReleased) { // Take over ownership of stale lock. // Use "DMLProc" as process name, session id and transaction id -1 to distinguish from real DMLProc rollback int32_t sessionID = -1; int32_t txnid = -1; std::string processName("DMLProc"); uint32_t processID = ::getpid(); bool ownerChanged = true; lockReleased = true; try { ownerChanged = dbrm->changeOwner(lockInfo.id, processName, processID, sessionID, txnid); } catch (std::exception&) { lockReleased = false; throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE)); } if (!ownerChanged) { lockReleased = false; throw std::runtime_error(std::string("Unable to grab lock; lock not found or still in use.")); } // send to all PMs boost::shared_ptr bsIn; messageqcpp::ByteStream bsOut; string tableName(""); fWEClient->addQueue(uniqueId); // find the PMs need to send the message to std::set pmSet; int pmId; for (uint32_t i = 0; i < lockInfo.dbrootList.size(); i++) { pmId = (*dbRootPMMap)[lockInfo.dbrootList[i]]; pmSet.insert(pmId); } if (lockInfo.state == BRM::LOADING) { bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK; bsOut << uniqueId; bsOut << lockInfo.id; bsOut << lockInfo.tableOID; bsOut << tableName; bsOut << processName; std::set::const_iterator iter = pmSet.begin(); while (iter != pmSet.end()) { fWEClient->write(bsOut, *iter); iter++; } // Wait for "all" the responses, and accumulate any/all errors unsigned int pmMsgCnt = 0; while (pmMsgCnt < pmSet.size()) { std::string rollbackErrMsg; bsIn.reset(new messageqcpp::ByteStream()); fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) { fWEClient->removeQueue(uniqueId); lockReleased = false; throw std::runtime_error("Network error, PM rollback; "); } else { messageqcpp::ByteStream::byte rc; uint16_t pmNum; *bsIn >> rc; *bsIn >> rollbackErrMsg; *bsIn >> pmNum; if (rc != 0) { fWEClient->removeQueue(uniqueId); lockReleased = false; throw std::runtime_error(rollbackErrMsg); } } pmMsgCnt++; } // end of while loop to process all responses to bulk rollback // If no errors so far, then change state to CLEANUP state. // We ignore the return stateChange flag. dbrm->changeState(lockInfo.id, BRM::CLEANUP); } // end of (lockInfo.state == BRM::LOADING) // delete meta data backup rollback files bsOut.reset(); bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK_CLEANUP; bsOut << uniqueId; bsOut << lockInfo.tableOID; std::set::const_iterator iter = pmSet.begin(); while (iter != pmSet.end()) { fWEClient->write(bsOut, *iter); iter++; } // Wait for "all" the responses, and accumulate any/all errors unsigned int pmMsgCnt = 0; //@Bug 4517 Release tablelock when it is in CLEANUP state uint32_t rcCleanup = 0; std::string fileDeleteErrMsg; while (pmMsgCnt < pmSet.size()) { bsIn.reset(new messageqcpp::ByteStream()); fWEClient->read(uniqueId, bsIn); if (bsIn->length() == 0) { fWEClient->removeQueue(uniqueId); rcCleanup = 1; fileDeleteErrMsg = "Network error, PM clean up; "; } else { messageqcpp::ByteStream::byte rc; uint16_t pmNum; *bsIn >> rc; *bsIn >> fileDeleteErrMsg; *bsIn >> pmNum; if ((rc != 0) && (rcCleanup == 0)) { fWEClient->removeQueue(uniqueId); rcCleanup = rc; } } pmMsgCnt++; } // end of while loop to process all responses to rollback cleanup fWEClient->removeQueue(uniqueId); // We ignore return release flag from releaseTableLock(). dbrm->releaseTableLock(lockInfo.id); if (rcCleanup != 0) throw std::runtime_error(fileDeleteErrMsg); } void DMLProcessor::log(const std::string& msg, logging::LOG_TYPE level) { logging::Message::Args args; logging::Message message(2); args.add(msg); message.format(args); logging::LoggingID lid(20); logging::MessageLog ml(lid); switch (level) { case LOG_TYPE_DEBUG: ml.logDebugMessage(message); break; case LOG_TYPE_INFO: ml.logInfoMessage(message); break; case LOG_TYPE_WARNING: ml.logWarningMessage(message); break; case LOG_TYPE_ERROR: ml.logErrorMessage(message); break; case LOG_TYPE_CRITICAL: ml.logCriticalMessage(message); break; } } } // namespace dmlprocessor