1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-140 Add a mechanism to serialize transactions for a single table to prevent VSS clashes. Transactions for different tables will continue concurrently.

This commit is contained in:
David Hall
2016-07-28 09:19:21 -05:00
parent 5053716521
commit 020c0ed3f5
4 changed files with 341 additions and 21 deletions

View File

@ -150,7 +150,7 @@ namespace dmlpackageprocessor
{
int waitPeriod = 10;
int sleepTime = 100; // sleep 100 milliseconds between checks
int numTries = 10; // try 10 times per second
int numTries = 30; // try 30 times (3 seconds)
waitPeriod = Config::getWaitPeriod();
numTries = waitPeriod * 10;
struct timespec rm_ts;

View File

@ -99,6 +99,9 @@ void added_a_pm(int)
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
Dec->Setup();
// MCOL-140 clear the waiting queue as all transactions are probably going to fail
PackageHandler::clearTableAccess();
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
//WriteEngine::WEClients::instance(WriteEngine::WEClients::DMLPROC)->Setup();

View File

@ -63,6 +63,8 @@ using namespace querytele;
extern boost::mutex mute;
extern boost::condition_variable cond;
#define MCOL_140 // Undefine to test VSS for out of order transactions
namespace
{
const std::string myname = "DMLProc";
@ -79,6 +81,11 @@ boost::mutex DMLProcessor::packageHandlerMapLock;
std::map<uint32_t, BatchInsertProc*> DMLProcessor::batchinsertProcessorMap;
boost::mutex DMLProcessor::batchinsertProcessorMapLock;
// MCOL-140 Map to hold table oids for tables being changed.
std::map<uint32_t, PackageHandler::tableAccessQueue_t> PackageHandler::tableOidMap;
boost::condition_variable PackageHandler::tableOidCond;
boost::mutex PackageHandler::tableOidMutex;
//------------------------------------------------------------------------------
// A thread to periodically call dbrm to see if a user is
// shutting down the system or has put the system into write
@ -276,20 +283,25 @@ PackageHandler::PackageHandler(const messageqcpp::IOSocket& ios,
boost::shared_ptr<messageqcpp::ByteStream> bs,
uint8_t packageType,
joblist::DistributedEngineComm *ec,
bool concurrentSupport,
uint64_t maxDeleteRows,
uint32_t sessionID,
execplan::CalpontSystemCatalog::SCN txnId,
DBRM * aDbrm,
const QueryTeleClient& qtc) :
const QueryTeleClient& qtc,
boost::shared_ptr<execplan::CalpontSystemCatalog> csc) :
fIos(ios),
fByteStream(bs),
fPackageType(packageType),
fEC(ec),
fConcurrentSupport(concurrentSupport),
fMaxDeleteRows(maxDeleteRows),
fSessionID(sessionID),
fTableOid(0),
fTxnid(txnId),
fDbrm(aDbrm),
fQtc(qtc)
fQtc(qtc),
fcsc(csc)
{
}
@ -298,6 +310,168 @@ PackageHandler::~PackageHandler()
//cout << "In destructor" << endl;
}
// MCOL-140
// Blocks a thread if there is another trx working on the same fTableOid
// return 1 when thread should continue.
// return 0 if error. Right now, no error detection is implemented.
//
// txnid was being created before the call to this function. This caused race conditions
// so creation is delayed until we're inside the lock here. Nothing needs it before
// this point in the execution.
//
// The algorithm is this. When the first txn for a given fTableOid arrives, start a queue
// containing a list of waiting or working txnId. Put this txnId into the queue (working)
// Put the queue into a map keyed on fTableOid.
//
// When the next txn for this fTableOid arrives, it finds the queue in the map and adds itself,
// then waits for condition.
// When a thread finishes, it removes its txnId from the queue and notifies all. If the queue is
// empty, it removes the entry from the map.
// Upon wakeup from wait(), a thread checks to see if it's next in the queue. If so, it is released
// to do work. Otherwise it goes back to wait.
//
// There's a chance (CTRL+C) for instance, that the txn is no longer in the queue. Release it to work.
// Rollback will most likely be next.
//
// A tranasaction for one fTableOid is not blocked by a txn for a different fTableOid.
int PackageHandler::synchTableAccess()
{
// MCOL-140 Wait for any other DML using this table.
std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
boost::unique_lock<boost::mutex> lock(tableOidMutex);
BRM::TxnID txnid;
if (fPackageType != dmlpackage::DML_COMMAND)
{
txnid = sessionManager.getTxnID(fSessionID);
if ( !txnid.valid )
{
txnid = sessionManager.newTxnID(fSessionID, true);
if (!txnid.valid)
{
throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
}
}
}
else
{
txnid = sessionManager.getTxnID(fSessionID);
}
fTxnid = txnid.id;
if ((it=tableOidMap.find(fTableOid)) != tableOidMap.end())
{
PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
// There's at least one working txn on this table. We may be the same txn.
if (fTxnid == tableOidQueue.front())
{
return 1; // We're next in line or the same as the last. Keep working
}
tableOidQueue.push(fTxnid); // Get on the waiting list.
// We need to wait
// tableOidQueue here is the queue holding the waitng transactions for this fTableOid
while (true)
{
tableOidCond.wait(lock);
if (tableOidQueue.front() == fTxnid)
{
break;
}
if (tableOidQueue.empty())
{
// If we had been the last txn waiting and CTRL+C was hit, then the queue is empty now.
// Empty queues must be erased from the map.
tableOidMap.erase(fTableOid);
break;
}
// If we're not in the queue at all, then continue. CTRL+C was probably hit.
PackageHandler::tableAccessQueue_t::container_type::iterator c_it = tableOidQueue.find(fTxnid);
if (c_it == tableOidQueue.end())
{
break;
}
// We're still in the queue and not on top. Go back and wait some more.
}
}
else
{
// We're the first for this tableoid. Start a new queue.
tableAccessQueue_t tableOidQueue;
tableOidQueue.push(fTxnid);
tableOidMap[fTableOid] = tableOidQueue;
}
return 1;
}
// MCOL-140 Called when it's time to release the next thread for this tablOid
int PackageHandler::releaseTableAccess()
{
// take us out of the queue
std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
boost::lock_guard<boost::mutex> lock(tableOidMutex);
if (fTableOid == 0 || (it=tableOidMap.find(fTableOid)) == tableOidMap.end())
{
// This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess
return 2; // For now, return codes are not used
}
PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
if (tableOidQueue.front() != fTxnid)
{
// This is a severe error. The front should be the working thread. If we're here,
// we're the working thread and should be front().
cout << fTxnid << " " << fTableOid << " We got to release and we're not on top " << tableOidQueue.front() << endl;
LoggingID logid(21, fSessionID, fTxnid);
logging::Message::Args args1;
logging::Message msg(1);
args1.add("ReleaseTableAccess: Txn being released is not the current txn in the tablOidQueue for tableid");
args1.add((uint64_t)fTableOid);
msg.format(args1);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_ERROR, msg, logid);
}
else
{
tableOidQueue.pop(); // Get off the waiting list.
if (tableOidQueue.empty())
{
// remove the queue from the map.
tableOidMap.erase(fTableOid);
}
}
// release the condition
tableOidCond.notify_all();
return 1;
}
int PackageHandler::forceReleaseTableAccess()
{
// By removing the tcnid from the queue, the logic after the wait in
// synchTableAccess() will release the thread and clean up if needed.
std::map<uint32_t, PackageHandler::tableAccessQueue_t>::iterator it;
boost::lock_guard<boost::mutex> lock(tableOidMutex);
if (fTableOid == 0 || (it=tableOidMap.find(fTableOid)) == tableOidMap.end())
{
// This will happen for DML_COMMAND, as we never got the tableoid or called synchTableAccess
return 2;
}
PackageHandler::tableAccessQueue_t& tableOidQueue = it->second;
tableOidQueue.erase(fTxnid);
// release the condition
tableOidCond.notify_all();
return 1;
}
//static
// Called upon sighup, often because PrimProc crashed. We don't want to leave all the transactions hung,
// though some may be because they never returned from PrimProc and will leave the table lock on.
int PackageHandler::clearTableAccess()
{
tableOidMap.clear();
return 1;
}
void PackageHandler::run()
{
ResourceManager frm;
@ -320,6 +494,23 @@ void PackageHandler::run()
//boost::shared_ptr<messageqcpp::ByteStream> insertBs (new messageqcpp::ByteStream);
messageqcpp::ByteStream bsSave = *(fByteStream.get());
insertPkg.read(*(fByteStream.get()));
#ifdef MCOL_140
if (fConcurrentSupport)
{
fTableOid = insertPkg.getTableOid();
// Single Insert has no start like bulk does, so insertPkg.getTableOid()
// isn't set. Go get it now.
if (fTableOid == 0)
{
CalpontSystemCatalog::TableName tableName;
tableName.schema = insertPkg.get_Table()->get_SchemaName();
tableName.table = insertPkg.get_Table()->get_TableName();
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
fTableOid = roPair.objnum;
}
synchTableAccess(); // Blocks if another DML thread is using this fTableOid
}
#endif
QueryTeleStats qts;
qts.query_uuid = QueryTeleClient::genUUID();
qts.msg_type = QueryTeleStats::QT_START;
@ -552,7 +743,7 @@ void PackageHandler::run()
}
else
{
//error occured. Receive all outstanding messages nefore erroring out.
//error occured. Receive all outstanding messages before erroring out.
batchProcessor->receiveOutstandingMsg();
batchProcessor->sendlastBatch(); //needs to flush files
batchProcessor->receiveAllMsg();
@ -637,7 +828,7 @@ void PackageHandler::run()
break;
}
}
else
else // Single Insert
{
//insertPkg.readTable(*(fByteStream.get()));
insertPkg.set_TxnID(fTxnid);
@ -666,6 +857,22 @@ void PackageHandler::run()
//cout << "an UPDATE package" << endl;
boost::scoped_ptr<dmlpackage::UpdateDMLPackage> updatePkg(new dmlpackage::UpdateDMLPackage());
updatePkg->read(*(fByteStream.get()));
#ifdef MCOL_140
if (fConcurrentSupport)
{
fTableOid = updatePkg->getTableOid();
// Update generally doesn't set fTableOid in updatePkg. Go get it now.
if (fTableOid == 0)
{
CalpontSystemCatalog::TableName tableName;
tableName.schema = updatePkg->get_Table()->get_SchemaName();
tableName.table = updatePkg->get_Table()->get_TableName();
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
fTableOid = roPair.objnum;
}
synchTableAccess(); // Blocks if another DML thread is using this fTableOid
}
#endif
updatePkg->set_TxnID(fTxnid);
QueryTeleStats qts;
qts.query_uuid = updatePkg->uuid();
@ -706,6 +913,22 @@ void PackageHandler::run()
{
boost::scoped_ptr<dmlpackage::DeleteDMLPackage> deletePkg(new dmlpackage::DeleteDMLPackage());
deletePkg->read(*(fByteStream.get()));
#ifdef MCOL_140
if (fConcurrentSupport)
{
fTableOid = deletePkg->getTableOid();
// Delete generally doesn't set fTableOid in updatePkg. Go get it now.
if (fTableOid == 0)
{
CalpontSystemCatalog::TableName tableName;
tableName.schema = deletePkg->get_Table()->get_SchemaName();
tableName.table = deletePkg->get_Table()->get_TableName();
CalpontSystemCatalog::ROPair roPair = fcsc->tableRID(tableName);
fTableOid = roPair.objnum;
}
synchTableAccess(); // Blocks if another DML thread is using this fTableOid
}
#endif
deletePkg->set_TxnID(fTxnid);
QueryTeleStats qts;
qts.query_uuid = deletePkg->uuid();
@ -766,6 +989,13 @@ void PackageHandler::run()
}
break;
}
#ifdef MCOL_140
if (fConcurrentSupport)
{
// MCOL-140 We're done. release the next waiting txn for this fTableOid
releaseTableAccess();
}
#endif
//Log errors
if ( (result.result != dmlpackageprocessor::DMLPackageProcessor::NO_ERROR)
&& (result.result != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
@ -787,6 +1017,13 @@ void PackageHandler::run()
}
catch(std::exception& e)
{
#ifdef MCOL_140
if (fConcurrentSupport)
{
// MCOL-140 We're done. release the next waiting txn for this fTableOid
releaseTableAccess();
}
#endif
cout << "dmlprocessor.cpp PackageHandler::run() package type("
<< fPackageType << ") exception: " << e.what() << endl;
logging::LoggingID lid(21);
@ -803,6 +1040,13 @@ void PackageHandler::run()
}
catch(...)
{
#ifdef MCOL_140
if (fConcurrentSupport)
{
// MCOL-140 We're done. release the next waiting txn for this fTableOid
releaseTableAccess();
}
#endif
logging::LoggingID lid(21);
logging::MessageLog ml(lid);
logging::Message::Args args;
@ -835,6 +1079,16 @@ void PackageHandler::run()
void PackageHandler::rollbackPending()
{
// Force a release of the processing from MCOL-140
#ifdef MCOL_140
if (fConcurrentSupport)
{
// MCOL-140 We're not necessarily the next in line.
// This forces this thread to be released anyway.
forceReleaseTableAccess();
}
#endif
if (fProcessor.get() == NULL)
{
// This happens when batch insert
@ -854,6 +1108,8 @@ void added_a_pm(int)
ResourceManager rm;
dec = DistributedEngineComm::instance(rm);
dec->Setup();
// MCOL-140 clear the waiting queue as all transactions are probably going to fail
PackageHandler::clearTableAccess();
}
DMLServer::DMLServer(int packageMaxThreads, int packageWorkQueueSize, DBRM* dbrm) :
@ -923,12 +1179,12 @@ void DMLProcessor::operator()()
uint64_t maxDeleteRows = rm.getDMLMaxDeleteRows();
bool concurrentSupport = true;
fConcurrentSupport = true;
string concurrentTranStr = config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions");
if ( concurrentTranStr.length() != 0 )
{
if ((concurrentTranStr.compare("N") == 0) || (concurrentTranStr.compare("n") == 0))
concurrentSupport = false;
fConcurrentSupport = false;
}
#ifndef _MSC_VER
@ -1155,7 +1411,7 @@ void DMLProcessor::operator()()
//cout << " package" << endl;
BRM::TxnID txnid;
if (!concurrentSupport)
if (!fConcurrentSupport)
{
//Check if any other active transaction
bool anyOtherActiveTransaction = true;
@ -1294,7 +1550,7 @@ void DMLProcessor::operator()()
{
//cout << "starting processing package type " << (int) packageType << " for session " << sessionID << " with id " << txnid.id << endl;
boost::shared_ptr<PackageHandler> php(new PackageHandler(fIos, bs1, packageType, fEC,
maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc));
fConcurrentSupport, maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc, csc));
// We put the packageHandler into a map so that if we receive a
// message to affect the previous command, we can find it.
boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, defer_lock);
@ -1312,6 +1568,7 @@ void DMLProcessor::operator()()
}
else
{
#if 0
if (packageType != dmlpackage::DML_COMMAND)
{
txnid = sessionManager.getTxnID(sessionID);
@ -1327,9 +1584,9 @@ void DMLProcessor::operator()()
{
txnid = sessionManager.getTxnID(sessionID);
}
#endif
boost::shared_ptr<PackageHandler> php(new PackageHandler(fIos, bs1, packageType, fEC,
maxDeleteRows, sessionID, txnid.id, fDbrm, fQtc));
fConcurrentSupport, maxDeleteRows, sessionID, 0, fDbrm, fQtc, csc));
// We put the packageHandler into a map so that if we receive a
// message to affect the previous command, we can find it.
boost::mutex::scoped_lock lk2(DMLProcessor::packageHandlerMapLock, defer_lock);

View File

@ -45,6 +45,41 @@
#include "batchinsertprocessor.h"
#include "querytele.h"
template<typename T, typename Container=std::deque<T> >
class iterable_queue : public std::queue<T,Container>
{
public:
typedef typename Container::iterator iterator;
typedef typename Container::const_iterator const_iterator;
iterator begin() { return this->c.begin(); }
iterator end() { return this->c.end(); }
const_iterator begin() const { return this->c.begin(); }
const_iterator end() const { return this->c.end(); }
iterator find(T t)
{
iterator it;
for (it = begin(); it != end(); ++it)
{
if (*it == t)
{
break;
}
}
return it;
}
iterator erase(typename Container::iterator it) { return this->c.erase(it); }
iterator erase(T t)
{
iterator it = find(t);
if (it != end())
{
erase(it);
}
return it;
}
};
namespace dmlprocessor
{
@ -103,9 +138,9 @@ class PackageHandler
{
public:
PackageHandler(const messageqcpp::IOSocket& ios, boost::shared_ptr<messageqcpp::ByteStream> bs,
uint8_t packageType, joblist::DistributedEngineComm *ec, uint64_t maxDeleteRows,
uint8_t packageType, joblist::DistributedEngineComm *ec, bool concurrentSuport, uint64_t maxDeleteRows,
uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnId, BRM::DBRM * aDbrm,
const querytele::QueryTeleClient& qtc);
const querytele::QueryTeleClient& qtc, boost::shared_ptr<execplan::CalpontSystemCatalog> csc);
~PackageHandler();
void run();
@ -113,18 +148,42 @@ public:
execplan::CalpontSystemCatalog::SCN getTxnid() {return fTxnid;}
uint32_t getSessionID() {return fSessionID;}
private:
messageqcpp::IOSocket fIos;
boost::shared_ptr<messageqcpp::ByteStream> fByteStream;
boost::scoped_ptr<dmlpackageprocessor::DMLPackageProcessor> fProcessor;
messageqcpp::ByteStream::quadbyte fPackageType;
joblist::DistributedEngineComm *fEC;
bool fConcurrentSupport;
uint64_t fMaxDeleteRows;
uint32_t fSessionID;
uint32_t fTableOid;
execplan::CalpontSystemCatalog::SCN fTxnid;
execplan::SessionManager sessionManager;
BRM::DBRM *fDbrm;
querytele::QueryTeleClient fQtc;
boost::shared_ptr<execplan::CalpontSystemCatalog> fcsc;
// MCOL-140 A map to hold table oids so that we can wait for DML on a table.
// This effectively creates a table lock for transactions.
// Used to serialize operations because the VSS can't handle inserts
// or updates on the same block.
// When an Insert, Update or Delete command arrives, we look here
// for the table oid. If found, wait until it is no onger here.
// If this transactionID (SCN) is < the transactionID in the table, don't delay
// and hope for the best, as we're already out of order.
// When the VSS is engineered to handle transactions out of order, all MCOL-140
// code is to be removed.
int synchTableAccess();
int releaseTableAccess();
int forceReleaseTableAccess();
typedef iterable_queue<execplan::CalpontSystemCatalog::SCN> tableAccessQueue_t;
static std::map<uint32_t, tableAccessQueue_t> tableOidMap;
static boost::condition_variable tableOidCond;
static boost::mutex tableOidMutex;
public:
static int clearTableAccess();
};
/** @brief processes dml packages as they arrive
@ -151,6 +210,7 @@ private:
boost::shared_ptr<execplan::CalpontSystemCatalog> csc;
BRM::DBRM* fDbrm;
querytele::QueryTeleClient fQtc;
bool fConcurrentSupport;
// A map to hold pointers to all active PackageProcessors
typedef std::map<uint32_t, boost::shared_ptr<PackageHandler> > PackageHandlerMap_t;