1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

[MCOL-4927] Fix DDLProc connection processing

This commit is contained in:
Alexey Antipovsky
2021-12-15 18:20:39 +03:00
parent 199d89f681
commit 6c31f105ec

View File

@ -73,17 +73,413 @@ void cleanPMSysCache()
cacheutils::flushOIDsFromCache ( oidList );
}
struct PackageHandler
class PackageHandler
{
void operator ()()
{
public:
PackageHandler(QueryTeleClient qtc, DBRM* dbrm, messageqcpp::IOSocket& ios, bool concurrentSupport)
: fIos(ios)
, fDbrm(dbrm)
, fQtc(qtc)
, fConcurrentSupport(concurrentSupport)
{}
void operator()()
{
try
{
fByteStream = fIos.read();
if (fByteStream.empty())
{
logError(DDLPackageProcessor::NOT_ACCEPTING_PACKAGES, "Empty package", true);
return;
}
fByteStream >> fSessionID;
fByteStream >> fPackageType;
uint32_t stateFlags;
if (fDbrm->getSystemState(stateFlags) > 0) // > 0 implies successful retrieval. It doesn't imply anything about the contents
{
messageqcpp::ByteStream results;
const char* responseMsg = nullptr;
messageqcpp::ByteStream::byte status;
bool bReject = false;
// Check to see if we're in write suspended or shutdown mode
// If so, we can't process the request.
if (stateFlags & SessionManagerServer::SS_SUSPENDED ||
stateFlags & SessionManagerServer::SS_SUSPEND_PENDING ||
stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING)
{
if (stateFlags & SessionManagerServer::SS_SUSPENDED ||
stateFlags & SessionManagerServer::SS_SUSPEND_PENDING)
{
responseMsg = "Writing to the database is disabled.";
}
else
{
responseMsg = "The database is being shut down.";
}
status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
bReject = true;
}
if (bReject)
{
results << status;
//@bug 266
MessageLog logger(LoggingID(27));
logging::Message::Args args;
logging::Message message(2);
args.add(responseMsg);
message.format( args );
results << message.msg();
fIos.write(results);
std::cout << responseMsg << endl;
std::cout << "Command rejected. Status " << (int)status << message.msg() << endl;
return;
}
}
//check whether the system is ready to process statement.
if (fDbrm->getSystemReady() < 1)
{
logError(DDLPackageProcessor::NOT_ACCEPTING_PACKAGES, "System is not ready yet. Please try again.", true);
return;
}
int rc = 0;
if (!fConcurrentSupport)
{
//Check if any other active transaction
bool bIsDbrmUp = true;
bool anyOtherActiveTransaction = true;
execplan::SessionManager sessionManager;
BRM::SIDTIDEntry blockingsid;
int i = 0;
int waitPeriod = 10;
//@Bug 2487 Check transaction map every 1/10 second
int sleepTime = 100; // sleep 100 milliseconds between checks
int numTries = 10; // try 10 times per second
string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod");
if ( waitPeriodStr.length() != 0 )
waitPeriod = static_cast<int>(config::Config::fromText(waitPeriodStr));
numTries = waitPeriod * 10;
struct timespec rm_ts;
rm_ts.tv_sec = sleepTime / 1000;
rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
//cout << "starting i = " << i << endl;
//anyOtherActiveTransaction = sessionManager.checkActiveTransaction(fSessionID, bIsDbrmUp);
while (anyOtherActiveTransaction)
{
anyOtherActiveTransaction = sessionManager.checkActiveTransaction(fSessionID, bIsDbrmUp,
blockingsid);
if (anyOtherActiveTransaction)
{
for ( ; i < numTries; i++ )
{
#ifdef _MSC_VER
Sleep(rm_ts.tv_sec * 1000);
#else
struct timespec abs_ts;
//cout << "session " << fSessionID << " nanosleep on package type " << (int)packageType << endl;
do
{
abs_ts.tv_sec = rm_ts.tv_sec;
abs_ts.tv_nsec = rm_ts.tv_nsec;
}
while (nanosleep(&abs_ts, &rm_ts) < 0);
#endif
anyOtherActiveTransaction = sessionManager.checkActiveTransaction(fSessionID, bIsDbrmUp,
blockingsid);
if (!anyOtherActiveTransaction)
{
//cout << "Ready to process type " << (int)packageType << endl;
fTxnid = sessionManager.getTxnID(fSessionID);
if (!fTxnid.valid)
{
fTxnid = sessionManager.newTxnID(fSessionID, true, true);
if (fTxnid.valid)
{
//cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl;
anyOtherActiveTransaction = false;
break;
}
else
{
anyOtherActiveTransaction = true;
}
}
else
{
string errorMsg;
rc = commitTransaction(fTxnid.id, errorMsg);
if ( rc != 0)
throw std::runtime_error(errorMsg);
//need unlock the table.
std::vector<TableLockInfo> tableLocks = fDbrm->getAllTableLocks();
bool lockReleased = true;
for (unsigned k = 0; k < tableLocks.size(); k++)
{
if (tableLocks[k].ownerTxnID == fTxnid.id)
{
lockReleased = fDbrm->releaseTableLock(tableLocks[k].id);
if (!lockReleased)
{
ostringstream os;
os << "tablelock id " << tableLocks[k].id << " is not found";
throw std::runtime_error(os.str());
}
}
}
fDbrm->committed(fTxnid);
fTxnid = fDbrm->newTxnID(fSessionID, true, true);
if (fTxnid.valid)
{
//cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl;
anyOtherActiveTransaction = false;
break;
}
else
{
anyOtherActiveTransaction = true;
}
}
}
}
//cout << "ending i = " << i << endl;
}
else
{
//cout << "Ready to process type " << (int)packageType << endl;
fTxnid = sessionManager.getTxnID(fSessionID);
if ( !fTxnid.valid )
{
fTxnid = sessionManager.newTxnID(fSessionID, true, true);
if (!fTxnid.valid)
{
//cout << "cannot get txnid " << (int)packageType << " for session " << sessionID << endl;
anyOtherActiveTransaction = true;
}
else
{
anyOtherActiveTransaction = false;
}
}
else
{
string errorMsg;
rc = commitTransaction(fTxnid.id, errorMsg);
if ( rc != 0)
throw std::runtime_error(errorMsg);
//need unlock the table.
std::vector<TableLockInfo> tableLocks = fDbrm->getAllTableLocks();
bool lockReleased = true;
for (unsigned k = 0; k < tableLocks.size(); k++)
{
if (tableLocks[k].ownerTxnID == fTxnid.id)
{
lockReleased = fDbrm->releaseTableLock(tableLocks[k].id);
if (!lockReleased)
{
ostringstream os;
os << "tablelock id " << tableLocks[k].id << " is not found";
throw std::runtime_error(os.str());
}
}
}
sessionManager.committed(fTxnid);
fTxnid = sessionManager.newTxnID(fSessionID, true, true);
if (!fTxnid.valid)
{
//cout << "cannot get txnid " << (int)packageType << " for session " << sessionID << endl;
anyOtherActiveTransaction = true;
}
else
{
anyOtherActiveTransaction = false;
}
}
}
if ((anyOtherActiveTransaction) && (i >= numTries))
{
//cout << " Erroring out on package type " << (int)packageType << endl;
break;
}
}
if ((anyOtherActiveTransaction) && (i >= numTries))
{
messageqcpp::ByteStream::byte status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
Message::Args args;
args.add(static_cast<uint64_t>(blockingsid.sessionid));
//@Bug 3854 Log to debug.log
LoggingID logid(15, 0, 0);
logging::Message::Args args1;
logging::Message msg(1);
args1.add(IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args));
msg.format(args1);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
logError(status, IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args));
}
else
{
processStatement();
}
}
else
{
fTxnid = fDbrm->getTxnID(fSessionID);
if ( !fTxnid.valid )
{
fTxnid = fDbrm->newTxnID(fSessionID, true, true);
if (!fTxnid.valid)
{
throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
}
}
else
{
string errorMsg;
rc = commitTransaction(fTxnid.id, errorMsg);
if ( rc != 0)
throw std::runtime_error(errorMsg);
//need unlock the table.
std::vector<TableLockInfo> tableLocks = fDbrm->getAllTableLocks();
bool lockReleased = true;
for (unsigned k = 0; k < tableLocks.size(); k++)
{
if (tableLocks[k].ownerTxnID == fTxnid.id)
{
lockReleased = fDbrm->releaseTableLock(tableLocks[k].id);
if (!lockReleased)
{
ostringstream os;
os << "tablelock id " << tableLocks[k].id << " is not found";
throw std::runtime_error(os.str());
}
}
}
fDbrm->committed(fTxnid);
fTxnid = fDbrm->newTxnID(fSessionID, true, true);
if (!fTxnid.valid)
{
throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
}
}
processStatement();
}
}
catch (const std::exception &ex)
{
logError(DDLPackageProcessor::NOT_ACCEPTING_PACKAGES, ex.what(), true);
throw;
}
}
private:
int commitTransaction(uint32_t txnID, std::string& errorMsg)
{
auto WEClient = std::unique_ptr<WriteEngine::WEClients>(new WriteEngine::WEClients(WriteEngine::WEClients::DDLPROC));
auto PMCount = WEClient->getPmCount();
ByteStream bytestream;
uint64_t uniqueId = fDbrm->getUnique64();
WEClient->addQueue(uniqueId);
bytestream << (ByteStream::byte)WE_SVR_COMMIT_VERSION;
bytestream << uniqueId;
bytestream << txnID;
uint32_t msgRecived = 0;
WEClient->write_to_all(bytestream);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
bsIn.reset(new ByteStream());
int rc = 0;
ByteStream::byte tmp8;
while (true)
{
if (msgRecived == PMCount)
break;
WEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) //read error
{
rc = 1;
errorMsg = "DDL cannot communicate with WES";
WEClient->removeQueue(uniqueId);
break;
}
else
{
*bsIn >> tmp8;
rc = tmp8;
if (rc != 0)
{
*bsIn >> errorMsg;
WEClient->removeQueue(uniqueId);
break;
}
else
msgRecived++;
}
}
return rc;
}
void processStatement()
{
DDLPackageProcessor::DDLResult result;
result.result = DDLPackageProcessor::NO_ERROR;
//boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
try
{
if (!fOamCache)
fOamCache = oam::OamCache::makeOamCache();
//cout << "DDLProc received package " << fPackageType << endl;
switch ( fPackageType )
{
@ -301,35 +697,40 @@ struct PackageHandler
}
catch (quadbyte& /*foo*/)
{
fIos.close();
cout << "Unrecognized package type" << endl;
logError(DDLPackageProcessor::NOT_ACCEPTING_PACKAGES, "Unrecognized package type", true);
}
catch (logging::IDBExcept& idbEx)
{
cleanPMSysCache();
messageqcpp::ByteStream results;
messageqcpp::ByteStream::byte status = DDLPackageProcessor::CREATE_ERROR;
results << status;
results << string(idbEx.what());
fIos.write(results);
fIos.close();
logError(DDLPackageProcessor::CREATE_ERROR, idbEx.what(), true);
}
catch (...)
{
fIos.close();
logError(DDLPackageProcessor::NOT_ACCEPTING_PACKAGES, "Unknown error", true);
}
}
void logError(messageqcpp::ByteStream::byte status, const std::string& msg, bool closeSocket = false)
{
messageqcpp::ByteStream res;
res << status;
res << msg;
fIos.write(res);
cerr << "DDLProc error: " << msg << endl;
if (closeSocket)
fIos.close();
}
private:
messageqcpp::IOSocket fIos;
messageqcpp::ByteStream fByteStream;
messageqcpp::ByteStream::quadbyte fPackageType;
uint32_t fSessionID;
BRM::TxnID fTxnid;
BRM::DBRM* fDbrm;
QueryTeleClient fQtc;
oam::OamCache* fOamCache;
oam::OamCache* fOamCache = nullptr;
bool fConcurrentSupport;
};
}
@ -367,9 +768,6 @@ void DDLProcessor::process()
{
DBRM dbrm;
messageqcpp::IOSocket ios;
messageqcpp::ByteStream bs;
PackageHandler handler;
messageqcpp::ByteStream::quadbyte packageType;
bool concurrentSupport = true;
string concurrentTranStr = config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions");
@ -385,363 +783,14 @@ void DDLProcessor::process()
{
for (;;)
{
ios = fMqServer.accept();
bs = ios.read();
uint32_t sessionID;
bs >> sessionID;
bs >> packageType;
uint32_t stateFlags;
if (dbrm.getSystemState(stateFlags) > 0) // > 0 implies succesful retrieval. It doesn't imply anything about the contents
try
{
messageqcpp::ByteStream results;
const char* responseMsg = 0;
messageqcpp::ByteStream::byte status;
bool bReject = false;
// Check to see if we're in write suspended mode
// If so, we can't process the request.
if (stateFlags & SessionManagerServer::SS_SUSPENDED)
{
status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
responseMsg = "Writing to the database is disabled.";
bReject = true;
}
// Check to see if we're in write suspend or shutdown pending mode
if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING
|| stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING)
{
if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING)
{
responseMsg = "Writing to the database is disabled.";
}
else
{
responseMsg = "The database is being shut down.";
}
status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
bReject = true;
}
if (bReject)
{
results << status;
//@bug 266
MessageLog logger(LoggingID(27));
logging::Message::Args args;
logging::Message message(2);
args.add(responseMsg);
message.format( args );
results << message.msg();
ios.write(results);
std::cout << responseMsg << endl;
std::cout << "Command rejected. Status " << (int)status << message.msg() << endl;
continue;
}
ios = fMqServer.accept();
fDdlPackagepool.invoke(PackageHandler(fQtc, &dbrm, ios, concurrentSupport));
}
//check whether the system is ready to process statement.
if (dbrm.getSystemReady() < 1)
catch (...)
{
messageqcpp::ByteStream results;
messageqcpp::ByteStream::byte status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
results << status;
string msg ("System is not ready yet. Please try again." );
results << msg;
ios.write(results);
ios.close();
continue;
}
BRM::TxnID txnid;
int rc = 0;
if (!concurrentSupport)
{
//Check if any other active transaction
bool bIsDbrmUp = true;
bool anyOtherActiveTransaction = true;
execplan::SessionManager sessionManager;
BRM::SIDTIDEntry blockingsid;
int i = 0;
int waitPeriod = 10;
//@Bug 2487 Check transaction map every 1/10 second
int sleepTime = 100; // sleep 100 milliseconds between checks
int numTries = 10; // try 10 times per second
string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod");
if ( waitPeriodStr.length() != 0 )
waitPeriod = static_cast<int>(config::Config::fromText(waitPeriodStr));
numTries = waitPeriod * 10;
struct timespec rm_ts;
rm_ts.tv_sec = sleepTime / 1000;
rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
//cout << "starting i = " << i << endl;
//anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp );
while (anyOtherActiveTransaction)
{
anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp,
blockingsid );
if (anyOtherActiveTransaction)
{
for ( ; i < numTries; i++ )
{
#ifdef _MSC_VER
Sleep(rm_ts.tv_sec * 1000);
#else
struct timespec abs_ts;
//cout << "session " << sessionID << " nanosleep on package type " << (int)packageType << endl;
do
{
abs_ts.tv_sec = rm_ts.tv_sec;
abs_ts.tv_nsec = rm_ts.tv_nsec;
}
while (nanosleep(&abs_ts, &rm_ts) < 0);
#endif
anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp,
blockingsid );
if ( !anyOtherActiveTransaction )
{
//cout << "Ready to process type " << (int)packageType << endl;
txnid = sessionManager.getTxnID(sessionID);
if ( !txnid.valid )
{
txnid = sessionManager.newTxnID(sessionID, true, true);
if (txnid.valid)
{
//cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl;
anyOtherActiveTransaction = false;
break;
}
else
{
anyOtherActiveTransaction = true;
}
}
else
{
string errorMsg;
rc = commitTransaction(txnid.id, errorMsg);
if ( rc != 0)
throw std::runtime_error(errorMsg);
//need unlock the table.
std::vector<TableLockInfo> tableLocks = dbrm.getAllTableLocks();
bool lockReleased = true;
for (unsigned k = 0; k < tableLocks.size(); k++)
{
if (tableLocks[k].ownerTxnID == txnid.id)
{
lockReleased = dbrm.releaseTableLock(tableLocks[k].id);
if (!lockReleased)
{
ostringstream os;
os << "tablelock id " << tableLocks[k].id << " is not found";
throw std::runtime_error(os.str());
}
}
}
dbrm.committed(txnid);
txnid = dbrm.newTxnID(sessionID, true, true);
if (txnid.valid)
{
//cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl;
anyOtherActiveTransaction = false;
break;
}
else
{
anyOtherActiveTransaction = true;
}
}
}
}
//cout << "ending i = " << i << endl;
}
else
{
//cout << "Ready to process type " << (int)packageType << endl;
txnid = sessionManager.getTxnID(sessionID);
if ( !txnid.valid )
{
txnid = sessionManager.newTxnID(sessionID, true, true);
if (!txnid.valid)
{
//cout << "cannot get txnid " << (int)packageType << " for session " << sessionID << endl;
anyOtherActiveTransaction = true;
}
else
{
anyOtherActiveTransaction = false;
}
}
else
{
string errorMsg;
rc = commitTransaction(txnid.id, errorMsg);
if ( rc != 0)
throw std::runtime_error(errorMsg);
//need unlock the table.
std::vector<TableLockInfo> tableLocks = dbrm.getAllTableLocks();
bool lockReleased = true;
for (unsigned k = 0; k < tableLocks.size(); k++)
{
if (tableLocks[k].ownerTxnID == txnid.id)
{
lockReleased = dbrm.releaseTableLock(tableLocks[k].id);
if (!lockReleased)
{
ostringstream os;
os << "tablelock id " << tableLocks[k].id << " is not found";
throw std::runtime_error(os.str());
}
}
}
sessionManager.committed(txnid);
txnid = sessionManager.newTxnID(sessionID, true, true);
if (!txnid.valid)
{
//cout << "cannot get txnid " << (int)packageType << " for session " << sessionID << endl;
anyOtherActiveTransaction = true;
}
else
{
anyOtherActiveTransaction = false;
}
}
}
if ((anyOtherActiveTransaction) && (i >= numTries))
{
//cout << " Erroring out on package type " << (int)packageType << endl;
break;
}
}
if ((anyOtherActiveTransaction) && (i >= numTries))
{
messageqcpp::ByteStream results;
messageqcpp::ByteStream::byte status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES;
results << status;
Message::Args args;
args.add(static_cast<uint64_t>(blockingsid.sessionid));
results << IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args);
//@Bug 3854 Log to debug.log
LoggingID logid(15, 0, 0);
logging::Message::Args args1;
logging::Message msg(1);
args1.add(IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args));
msg.format( args1 );
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
ios.write(results);
ios.close();
}
else
{
handler.fIos = ios;
handler.fByteStream = bs;
handler.fPackageType = packageType;
handler.fTxnid = txnid;
handler.fDbrm = &dbrm;
handler.fQtc = fQtc;
handler.fOamCache = oam::OamCache::makeOamCache();
fDdlPackagepool.invoke(handler);
}
}
else
{
txnid = dbrm.getTxnID(sessionID);
if ( !txnid.valid )
{
txnid = dbrm.newTxnID(sessionID, true, true);
if (!txnid.valid)
{
throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
}
}
else
{
string errorMsg;
rc = commitTransaction(txnid.id, errorMsg);
if ( rc != 0)
throw std::runtime_error(errorMsg);
//need unlock the table.
std::vector<TableLockInfo> tableLocks = dbrm.getAllTableLocks();
bool lockReleased = true;
for (unsigned k = 0; k < tableLocks.size(); k++)
{
if (tableLocks[k].ownerTxnID == txnid.id)
{
lockReleased = dbrm.releaseTableLock(tableLocks[k].id);
if (!lockReleased)
{
ostringstream os;
os << "tablelock id " << tableLocks[k].id << " is not found";
throw std::runtime_error(os.str());
}
}
}
dbrm.committed(txnid);
txnid = dbrm.newTxnID(sessionID, true, true);
if (!txnid.valid)
{
throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") );
}
}
handler.fIos = ios;
handler.fByteStream = bs;
handler.fPackageType = packageType;
handler.fTxnid = txnid;
handler.fDbrm = &dbrm;
handler.fQtc = fQtc;
handler.fOamCache = oam::OamCache::makeOamCache();
fDdlPackagepool.invoke(handler);
throw;
}
}
}