You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-3296 Add logging when a dml is blocked because of another dml on the same table
This commit is contained in:
@ -334,7 +334,7 @@ PackageHandler::~PackageHandler()
|
|||||||
// Rollback will most likely be next.
|
// Rollback will most likely be next.
|
||||||
//
|
//
|
||||||
// A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid.
|
// A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid.
|
||||||
int PackageHandler::synchTableAccess()
|
int PackageHandler::synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage)
|
||||||
{
|
{
|
||||||
// MCOL-140 Wait for any other DML using this table.
|
// MCOL-140 Wait for any other DML using this table.
|
||||||
std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
|
std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
|
||||||
@ -374,9 +374,27 @@ int PackageHandler::synchTableAccess()
|
|||||||
// tableOidQueue here is the queue holding the waitng transactions for this fTableOid
|
// tableOidQueue here is the queue holding the waitng transactions for this fTableOid
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
tableOidCond.wait(lock);
|
// Log something that we're waiting
|
||||||
|
LoggingID logid(21, fSessionID, fTxnid);
|
||||||
|
logging::Message::Args args1;
|
||||||
|
logging::Message msg(1);
|
||||||
|
ostringstream oss;
|
||||||
|
oss << "Txn is waiting for" << tableOidQueue.front() << " " << dmlPackage->get_SQLStatement() << "; |" << dmlPackage->get_SchemaName() <<"|";
|
||||||
|
args1.add(oss.str());
|
||||||
|
args1.add((uint64_t)fTableOid);
|
||||||
|
msg.format(args1);
|
||||||
|
logging::Logger logger(logid.fSubsysID);
|
||||||
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
||||||
|
|
||||||
|
tableOidCond.wait(lock);
|
||||||
|
// In case of CTRL+C, the tableOidQueue could be invalidated
|
||||||
|
if ((tableOidMap.find(fTableOid))->second != tableOidQueue)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
if (tableOidQueue.front() == fTxnid)
|
if (tableOidQueue.front() == fTxnid)
|
||||||
{
|
{
|
||||||
|
// We're up next. Let's go do stuff.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (tableOidQueue.empty())
|
if (tableOidQueue.empty())
|
||||||
@ -458,6 +476,11 @@ int PackageHandler::forceReleaseTableAccess()
|
|||||||
}
|
}
|
||||||
PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
|
PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
|
||||||
tableOidQueue.erase(fTxnid);
|
tableOidQueue.erase(fTxnid);
|
||||||
|
if (tableOidQueue.empty())
|
||||||
|
{
|
||||||
|
// remove the queue from the map.
|
||||||
|
tableOidMap.erase(fTableOid);
|
||||||
|
}
|
||||||
// release the condition
|
// release the condition
|
||||||
tableOidCond.notify_all();
|
tableOidCond.notify_all();
|
||||||
return 1;
|
return 1;
|
||||||
@ -509,7 +532,7 @@ void PackageHandler::run()
|
|||||||
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
|
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
|
||||||
fTableOid = roPair.objnum;
|
fTableOid = roPair.objnum;
|
||||||
}
|
}
|
||||||
synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid
|
synchTable.setPackage(this, &insertPkg); // Blocks if another DML thread is using this fTableOid
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
QueryTeleStats qts;
|
QueryTeleStats qts;
|
||||||
@ -871,7 +894,7 @@ void PackageHandler::run()
|
|||||||
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
|
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
|
||||||
fTableOid = roPair.objnum;
|
fTableOid = roPair.objnum;
|
||||||
}
|
}
|
||||||
synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid
|
synchTable.setPackage(this, updatePkg.get()); // Blocks if another DML thread is using this fTableOid
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
updatePkg->set_TxnID(fTxnid);
|
updatePkg->set_TxnID(fTxnid);
|
||||||
@ -927,7 +950,7 @@ void PackageHandler::run()
|
|||||||
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
|
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
|
||||||
fTableOid = roPair.objnum;
|
fTableOid = roPair.objnum;
|
||||||
}
|
}
|
||||||
synchTable.setPackage(this); // Blocks if another DML thread is using this fTableOid
|
synchTable.setPackage(this, deletePkg.get()); // Blocks if another DML thread is using this fTableOid
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
deletePkg->set_TxnID(fTxnid);
|
deletePkg->set_TxnID(fTxnid);
|
||||||
@ -1070,16 +1093,6 @@ void PackageHandler::run()
|
|||||||
|
|
||||||
void PackageHandler::rollbackPending()
|
void PackageHandler::rollbackPending()
|
||||||
{
|
{
|
||||||
// Force a release of the processing from MCOL-140
|
|
||||||
#ifdef MCOL_140
|
|
||||||
if (fConcurrentSupport)
|
|
||||||
{
|
|
||||||
// MCOL-140 We're not necessarily the next in line.
|
|
||||||
// This forces this thread to be released anyway.
|
|
||||||
forceReleaseTableAccess();
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (fProcessor.get() == NULL)
|
if (fProcessor.get() == NULL)
|
||||||
{
|
{
|
||||||
// This happens when batch insert
|
// This happens when batch insert
|
||||||
@ -1088,6 +1101,16 @@ void PackageHandler::rollbackPending()
|
|||||||
|
|
||||||
fProcessor->setRollbackPending(true);
|
fProcessor->setRollbackPending(true);
|
||||||
|
|
||||||
|
// Force a release of the processing from MCOL-140
|
||||||
|
#ifdef MCOL_140
|
||||||
|
if (fConcurrentSupport)
|
||||||
|
{
|
||||||
|
// MCOL-140 We're not necessarily the next in line.
|
||||||
|
// This forces this thread to be released anyway.
|
||||||
|
forceReleaseTableAccess();
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
ostringstream oss;
|
ostringstream oss;
|
||||||
oss << "PackageHandler::rollbackPending: Processing DMLPackage.";
|
oss << "PackageHandler::rollbackPending: Processing DMLPackage.";
|
||||||
DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG);
|
DMLProcessor::log(oss.str(), logging::LOG_TYPE_DEBUG);
|
||||||
|
@ -171,12 +171,12 @@ private:
|
|||||||
// Used to serialize operations because the VSS can't handle inserts
|
// Used to serialize operations because the VSS can't handle inserts
|
||||||
// or updates on the same block.
|
// or updates on the same block.
|
||||||
// When an Insert, Update or Delete command arrives, we look here
|
// When an Insert, Update or Delete command arrives, we look here
|
||||||
// for the table oid. If found, wait until it is no onger here.
|
// for the table oid. If found, wait until it is no longer here.
|
||||||
// If this transactionID (SCN) is < the transactionID in the table, don't delay
|
// If this transactionID (SCN) is < the transactionID in the table, don't delay
|
||||||
// and hope for the best, as we're already out of order.
|
// and hope for the best, as we're already out of order.
|
||||||
// When the VSS is engineered to handle transactions out of order, all MCOL-140
|
// When the VSS is engineered to handle transactions out of order, all MCOL-140
|
||||||
// code is to be removed.
|
// code is to be removed.
|
||||||
int synchTableAccess();
|
int synchTableAccess(dmlpackage::CalpontDMLPackage* dmlPackage);
|
||||||
int releaseTableAccess();
|
int releaseTableAccess();
|
||||||
int forceReleaseTableAccess();
|
int forceReleaseTableAccess();
|
||||||
typedef iterable_queue<execplan::CalpontSystemCatalog::SCN> tableAccessQueue_t;
|
typedef iterable_queue<execplan::CalpontSystemCatalog::SCN> tableAccessQueue_t;
|
||||||
@ -192,22 +192,22 @@ public:
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
SynchTable() : fphp(NULL) {};
|
SynchTable() : fphp(NULL) {};
|
||||||
SynchTable(PackageHandler* php)
|
SynchTable(PackageHandler* php, dmlpackage::CalpontDMLPackage* dmlPackage)
|
||||||
{
|
{
|
||||||
setPackage(php);
|
setPackage(php, dmlPackage);
|
||||||
}
|
}
|
||||||
~SynchTable()
|
~SynchTable()
|
||||||
{
|
{
|
||||||
if (fphp)
|
if (fphp)
|
||||||
fphp->releaseTableAccess();
|
fphp->releaseTableAccess();
|
||||||
}
|
}
|
||||||
bool setPackage(PackageHandler* php)
|
bool setPackage(PackageHandler* php, dmlpackage::CalpontDMLPackage* dmlPackage)
|
||||||
{
|
{
|
||||||
if (fphp)
|
if (fphp)
|
||||||
fphp->releaseTableAccess();
|
fphp->releaseTableAccess();
|
||||||
fphp = php;
|
fphp = php;
|
||||||
if (fphp)
|
if (fphp)
|
||||||
fphp->synchTableAccess();
|
fphp->synchTableAccess(dmlPackage);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
private:
|
private:
|
||||||
|
Reference in New Issue
Block a user