You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-66 - Make the DDL and DML parsers re-entrant.
Serialize all DDL because the VVS can't handle modifying the same block simultaneously Fix the CTRL+C logic in DML that caused COMMIT issues.
This commit is contained in:
@ -1,4 +1,5 @@
|
||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
||||
Copyright (C) 2016 MariaDB Corporation
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
@ -123,7 +124,11 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(ddlpackage::Dro
|
||||
uint64_t tableLockId = 0;
|
||||
OamCache* oamcache = OamCache::makeOamCache();
|
||||
std::vector<int> moduleIds = oamcache->getModuleIds();
|
||||
try
|
||||
|
||||
// MCOL-66 The DBRM can't handle concurrent DDL
|
||||
boost::mutex::scoped_lock lk(dbrmMutex);
|
||||
|
||||
try
|
||||
{
|
||||
//check table lock
|
||||
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(dropTableStmt.fSessionID);
|
||||
@ -197,6 +202,8 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(ddlpackage::Dro
|
||||
if (i >= numTries) //error out
|
||||
{
|
||||
Message::Args args;
|
||||
string strOp("drop table");
|
||||
args.add(strOp);
|
||||
args.add(processName);
|
||||
args.add((uint64_t)processID);
|
||||
args.add(sessionId);
|
||||
@ -238,9 +245,9 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(ddlpackage::Dro
|
||||
|
||||
//get a unique number
|
||||
VERBOSE_INFO("Removing the SYSTABLE meta data");
|
||||
#ifdef IDB_DDL_DEBUG
|
||||
cout << "Removing the SYSTABLEs meta data" << endl;
|
||||
#endif
|
||||
//#ifdef IDB_DDL_DEBUG
|
||||
cout << fTxnid.id << " Removing the SYSTABLEs meta data" << endl;
|
||||
//#endif
|
||||
bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSTABLE;
|
||||
bytestream << uniqueId;
|
||||
bytestream << (uint32_t) dropTableStmt.fSessionID;
|
||||
@ -271,11 +278,11 @@ cout << "Removing the SYSTABLEs meta data" << endl;
|
||||
pmNum = (*dbRootPMMap)[dbRoot];
|
||||
try
|
||||
{
|
||||
// #ifdef IDB_DDL_DEBUG
|
||||
cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSTABLES to pm " << pmNum << endl;
|
||||
//#endif
|
||||
//cout << "deleting systable entries with txnid " << txnID.id << endl;
|
||||
fWEClient->write(bytestream, (uint32_t)pmNum);
|
||||
#ifdef IDB_DDL_DEBUG
|
||||
cout << "Drop table sending WE_SVR_DELETE_SYSTABLES to pm " << pmNum << endl;
|
||||
#endif
|
||||
while (1)
|
||||
{
|
||||
bsIn.reset(new ByteStream());
|
||||
@ -297,22 +304,23 @@ cout << "Drop table sending WE_SVR_DELETE_SYSTABLES to pm " << pmNum << endl;
|
||||
}
|
||||
catch (runtime_error& ex) //write error
|
||||
{
|
||||
#ifdef IDB_DDL_DEBUG
|
||||
cout << "Drop table got exception" << endl;
|
||||
#endif
|
||||
// #ifdef IDB_DDL_DEBUG
|
||||
cout << fTxnid.id << " Drop table got exception" << endl;
|
||||
// #endif
|
||||
rc = NETWORK_ERROR;
|
||||
errorMsg = ex.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
rc = NETWORK_ERROR;
|
||||
#ifdef IDB_DDL_DEBUG
|
||||
cout << "Drop table got unknown exception" << endl;
|
||||
#endif
|
||||
//#ifdef IDB_DDL_DEBUG
|
||||
cout << fTxnid.id << " Drop table got unknown exception" << endl;
|
||||
//#endif
|
||||
}
|
||||
|
||||
if (rc != 0)
|
||||
{
|
||||
cout << fTxnid.id << " Error in dropping table from systables(" << (int)rc << ") " << errorMsg.c_str() << endl;
|
||||
Message::Args args;
|
||||
Message message(9);
|
||||
args.add("Error in dropping table from systables.");
|
||||
@ -355,11 +363,10 @@ cout << "Drop table got unknown exception" << endl;
|
||||
pmNum = (*dbRootPMMap)[dbRoot];
|
||||
try
|
||||
{
|
||||
//cout << "deleting systable entries with txnid " << txnID.id << endl;
|
||||
//#ifdef IDB_DDL_DEBUG
|
||||
cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSCOLUMN to pm " << pmNum << endl;
|
||||
//#endif
|
||||
fWEClient->write(bytestream, (unsigned)pmNum);
|
||||
#ifdef IDB_DDL_DEBUG
|
||||
cout << "Drop table sending WE_SVR_DELETE_SYSTABLES to pm " << pmNum << endl;
|
||||
#endif
|
||||
while (1)
|
||||
{
|
||||
bsIn.reset(new ByteStream());
|
||||
@ -381,25 +388,26 @@ cout << "Drop table sending WE_SVR_DELETE_SYSTABLES to pm " << pmNum << endl;
|
||||
}
|
||||
catch (runtime_error& ex) //write error
|
||||
{
|
||||
#ifdef IDB_DDL_DEBUG
|
||||
cout << "Drop table got exception" << endl;
|
||||
#endif
|
||||
//#ifdef IDB_DDL_DEBUG
|
||||
cout << fTxnid.id << " Drop table got exception" << endl;
|
||||
//#endif
|
||||
rc = NETWORK_ERROR;
|
||||
errorMsg = ex.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
rc = NETWORK_ERROR;
|
||||
#ifdef IDB_DDL_DEBUG
|
||||
cout << "Drop table got unknown exception" << endl;
|
||||
#endif
|
||||
// #ifdef IDB_DDL_DEBUG
|
||||
cout << fTxnid.id << " Drop table got unknown exception" << endl;
|
||||
//#endif
|
||||
}
|
||||
|
||||
if (rc != 0)
|
||||
{
|
||||
cout << fTxnid.id << " Error in dropping column from systables(" << (int)rc << ") " << errorMsg.c_str() << endl;
|
||||
Message::Args args;
|
||||
Message message(9);
|
||||
args.add("Error in dropping table from systables.");
|
||||
args.add("Error in dropping column from systables.");
|
||||
args.add(errorMsg);
|
||||
message.format(args);
|
||||
result.result = (ResultCode)rc;
|
||||
@ -412,11 +420,16 @@ cout << "Drop table got unknown exception" << endl;
|
||||
}
|
||||
|
||||
rc = commitTransaction(uniqueId, txnID);
|
||||
//cout << "commiting transaction " << txnID.id << " and valid is " << txnID.valid << endl;
|
||||
if (rc != 0)
|
||||
{
|
||||
cout << txnID.id << " rolledback transaction " << " and valid is " << txnID.valid << endl;
|
||||
fSessionManager.rolledback(txnID);
|
||||
}
|
||||
else
|
||||
{
|
||||
cout << txnID.id << " commiting transaction " << txnID.id << " and valid is " << txnID.valid << endl;
|
||||
fSessionManager.committed(txnID);
|
||||
}
|
||||
|
||||
if (rc != 0)
|
||||
{
|
||||
@ -527,9 +540,9 @@ cout << "Drop table got unknown exception" << endl;
|
||||
{
|
||||
bytestream << (uint32_t) oidList[i];
|
||||
}
|
||||
#ifdef IDB_DDL_DEBUG
|
||||
cout << "Drop table removing column files" << endl;
|
||||
#endif
|
||||
//#ifdef IDB_DDL_DEBUG
|
||||
cout << fTxnid.id << " Drop table removing column files" << endl;
|
||||
//#endif
|
||||
uint32_t msgRecived = 0;
|
||||
try {
|
||||
fWEClient->write_to_all(bytestream);
|
||||
@ -592,6 +605,9 @@ cout << "Drop table removing column files" << endl;
|
||||
//Flush primProc cache
|
||||
rc = cacheutils::flushOIDsFromCache( oidList );
|
||||
//Delete extents from extent map
|
||||
//#ifdef IDB_DDL_DEBUG
|
||||
cout << fTxnid.id << " Drop table deleteOIDs" << endl;
|
||||
//#endif
|
||||
rc = fDbrm->deleteOIDs(oidList);
|
||||
|
||||
if (rc != 0)
|
||||
@ -879,7 +895,11 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage(ddlpackage::T
|
||||
|
||||
ByteStream bytestream;
|
||||
ByteStream::byte tmp8;
|
||||
try {
|
||||
|
||||
// MCOL-66 The DBRM can't handle concurrent DDL
|
||||
boost::mutex::scoped_lock lk(dbrmMutex);
|
||||
|
||||
try {
|
||||
//Disable extents first
|
||||
int rc1 = fDbrm->markAllPartitionForDeletion( allOidList);
|
||||
if (rc1 != 0)
|
||||
|
Reference in New Issue
Block a user