diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index ca0707989..b2ef6e6f6 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -334,7 +334,7 @@ PackageHandler::~PackageHandler() // Rollback will most likely be next. // // A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid. -int PackageHandler::synchTableAccess() +int PackageHandler::synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage) { // MCOL-140 Wait for any other DML using this table. std::map::iterator it; @@ -374,9 +374,27 @@ int PackageHandler::synchTableAccess() // tableOidQueue here is the queue holding the waitng transactions for this fTableOid while (true) { - tableOidCond.wait(lock); + // Log something that we're waiting + LoggingID logid(21, fSessionID, fTxnid); + logging::Message::Args args1; + logging::Message msg(1); + ostringstream oss; + oss << "Txn is waiting for" << tableOidQueue.front() << " " << dmlPackage->get_SQLStatement() << "; |" << dmlPackage->get_SchemaName() <<"|"; + args1.add(oss.str()); + args1.add((uint64_t)fTableOid); + msg.format(args1); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + + tableOidCond.wait(lock); + // In case of CTRL+C, the tableOidQueue could be invalidated + if ((tableOidMap.find(fTableOid))->second != tableOidQueue) + { + break; + } if (tableOidQueue.front() == fTxnid) { + // We're up next. Let's go do stuff. break; } if (tableOidQueue.empty()) @@ -458,6 +476,11 @@ int PackageHandler::forceReleaseTableAccess() } PackageHandler::tableAccessQueue_t& tableOidQueue = it->second; tableOidQueue.erase(fTxnid); + if (tableOidQueue.empty()) + { + // remove the queue from the map. + tableOidMap.erase(fTableOid); + } // release the condition tableOidCond.notify_all(); return 1; @@ -509,7 +532,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this, &insertPkg); // Blocks if another DML thread is using this fTableOid } #endif QueryTeleStats qts; @@ -871,7 +894,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this, updatePkg.get()); // Blocks if another DML thread is using this fTableOid } #endif updatePkg->set_TxnID(fTxnid); @@ -927,7 +950,7 @@ void PackageHandler::run() CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName); fTableOid = roPair.objnum; } - synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid + synchTable.setPackage(this, deletePkg.get()); // Blocks if another DML thread is using this fTableOid } #endif deletePkg->set_TxnID(fTxnid); @@ -1070,16 +1093,6 @@ void PackageHandler::run() void PackageHandler::rollbackPending() { - // Force a release of the processing from MCOL-140 -#ifdef MCOL_140 - if (fConcurrentSupport) - { - // MCOL-140 We're not necessarily the next in line. - // This forces this thread to be released anyway. - forceReleaseTableAccess(); - } -#endif - if (fProcessor.get() == NULL) { // This happens when batch insert @@ -1088,6 +1101,16 @@ void PackageHandler::rollbackPending() fProcessor->setRollbackPending(true); + // Force a release of the processing from MCOL-140 +#ifdef MCOL_140 + if (fConcurrentSupport) + { + // MCOL-140 We're not necessarily the next in line. + // This forces this thread to be released anyway. + forceReleaseTableAccess(); + } +#endif + ostringstream oss; oss << "PackageHandler::rollbackPending: Processing DMLPackage."; DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG); diff --git a/dmlproc/dmlprocessor.h b/dmlproc/dmlprocessor.h index 02f993df6..978fbdda8 100644 --- a/dmlproc/dmlprocessor.h +++ b/dmlproc/dmlprocessor.h @@ -171,12 +171,12 @@ private: // Used to serialize operations because the VSS can't handle inserts // or updates on the same block. // When an Insert, Update or Delete command arrives, we look here - // for the table oid. If found, wait until it is no onger here. + // for the table oid. If found, wait until it is no longer here. // If this transactionID (SCN) is < the transactionID in the table, don't delay // and hope for the best, as we're already out of order. // When the VSS is engineered to handle transactions out of order, all MCOL-140 // code is to be removed. - int synchTableAccess(); + int synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage); int releaseTableAccess(); int forceReleaseTableAccess(); typedef iterable_queue tableAccessQueue_t; @@ -192,22 +192,22 @@ public: { public: SynchTable() : fphp(NULL) {}; - SynchTable(PackageHandler* php) + SynchTable(PackageHandler* php, dmlpackage::CalpontDMLPackage* dmlPackage) { - setPackage(php); + setPackage(php, dmlPackage); } ~SynchTable() { if (fphp) fphp->releaseTableAccess(); } - bool setPackage(PackageHandler* php) + bool setPackage(PackageHandler* php, dmlpackage::CalpontDMLPackage* dmlPackage) { if (fphp) fphp->releaseTableAccess(); fphp = php; if (fphp) - fphp->synchTableAccess(); + fphp->synchTableAccess(dmlPackage); return true; } private: