From 6c31f105ecfbf1a73127c3388ce0f8c29bc7b76f Mon Sep 17 00:00:00 2001 From: Alexey Antipovsky Date: Wed, 15 Dec 2021 18:20:39 +0300 Subject: [PATCH] [MCOL-4927] Fix DDLProc connection processing --- ddlproc/ddlprocessor.cpp | 795 +++++++++++++++++++++------------------ 1 file changed, 422 insertions(+), 373 deletions(-) diff --git a/ddlproc/ddlprocessor.cpp b/ddlproc/ddlprocessor.cpp index b2b800994..1872d4f30 100644 --- a/ddlproc/ddlprocessor.cpp +++ b/ddlproc/ddlprocessor.cpp @@ -73,17 +73,413 @@ void cleanPMSysCache() cacheutils::flushOIDsFromCache ( oidList ); } -struct PackageHandler +class PackageHandler { - void operator ()() - { +public: + PackageHandler(QueryTeleClient qtc, DBRM* dbrm, messageqcpp::IOSocket& ios, bool concurrentSupport) + : fIos(ios) + , fDbrm(dbrm) + , fQtc(qtc) + , fConcurrentSupport(concurrentSupport) + {} + void operator()() + { + try + { + fByteStream = fIos.read(); + if (fByteStream.empty()) + { + logError(DDLPackageProcessor::NOT_ACCEPTING_PACKAGES, "Empty package", true); + return; + } + fByteStream >> fSessionID; + fByteStream >> fPackageType; + + uint32_t stateFlags; + if (fDbrm->getSystemState(stateFlags) > 0) // > 0 implies successful retrieval. It doesn't imply anything about the contents + { + messageqcpp::ByteStream results; + const char* responseMsg = nullptr; + messageqcpp::ByteStream::byte status; + bool bReject = false; + + // Check to see if we're in write suspended or shutdown mode + // If so, we can't process the request. + if (stateFlags & SessionManagerServer::SS_SUSPENDED || + stateFlags & SessionManagerServer::SS_SUSPEND_PENDING || + stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING) + { + if (stateFlags & SessionManagerServer::SS_SUSPENDED || + stateFlags & SessionManagerServer::SS_SUSPEND_PENDING) + { + responseMsg = "Writing to the database is disabled."; + } + else + { + responseMsg = "The database is being shut down."; + } + + status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES; + bReject = true; + } + + if (bReject) + { + results << status; + //@bug 266 + MessageLog logger(LoggingID(27)); + logging::Message::Args args; + logging::Message message(2); + args.add(responseMsg); + message.format( args ); + results << message.msg(); + fIos.write(results); + std::cout << responseMsg << endl; + std::cout << "Command rejected. Status " << (int)status << message.msg() << endl; + return; + } + } + + //check whether the system is ready to process statement. + if (fDbrm->getSystemReady() < 1) + { + logError(DDLPackageProcessor::NOT_ACCEPTING_PACKAGES, "System is not ready yet. Please try again.", true); + return; + } + + int rc = 0; + + if (!fConcurrentSupport) + { + //Check if any other active transaction + bool bIsDbrmUp = true; + bool anyOtherActiveTransaction = true; + execplan::SessionManager sessionManager; + BRM::SIDTIDEntry blockingsid; + int i = 0; + int waitPeriod = 10; + //@Bug 2487 Check transaction map every 1/10 second + + int sleepTime = 100; // sleep 100 milliseconds between checks + int numTries = 10; // try 10 times per second + + string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod"); + + if ( waitPeriodStr.length() != 0 ) + waitPeriod = static_cast(config::Config::fromText(waitPeriodStr)); + + numTries = waitPeriod * 10; + struct timespec rm_ts; + + rm_ts.tv_sec = sleepTime / 1000; + rm_ts.tv_nsec = sleepTime % 1000 * 1000000; + + //cout << "starting i = " << i << endl; + //anyOtherActiveTransaction = sessionManager.checkActiveTransaction(fSessionID, bIsDbrmUp); + while (anyOtherActiveTransaction) + { + anyOtherActiveTransaction = sessionManager.checkActiveTransaction(fSessionID, bIsDbrmUp, + blockingsid); + + if (anyOtherActiveTransaction) + { + for ( ; i < numTries; i++ ) + { +#ifdef _MSC_VER + Sleep(rm_ts.tv_sec * 1000); +#else + struct timespec abs_ts; + + //cout << "session " << fSessionID << " nanosleep on package type " << (int)packageType << endl; + do + { + abs_ts.tv_sec = rm_ts.tv_sec; + abs_ts.tv_nsec = rm_ts.tv_nsec; + } + while (nanosleep(&abs_ts, &rm_ts) < 0); + +#endif + anyOtherActiveTransaction = sessionManager.checkActiveTransaction(fSessionID, bIsDbrmUp, + blockingsid); + + if (!anyOtherActiveTransaction) + { + //cout << "Ready to process type " << (int)packageType << endl; + fTxnid = sessionManager.getTxnID(fSessionID); + + if (!fTxnid.valid) + { + fTxnid = sessionManager.newTxnID(fSessionID, true, true); + + if (fTxnid.valid) + { + //cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl; + anyOtherActiveTransaction = false; + break; + } + else + { + anyOtherActiveTransaction = true; + } + } + else + { + string errorMsg; + rc = commitTransaction(fTxnid.id, errorMsg); + + if ( rc != 0) + throw std::runtime_error(errorMsg); + + //need unlock the table. + std::vector tableLocks = fDbrm->getAllTableLocks(); + bool lockReleased = true; + + for (unsigned k = 0; k < tableLocks.size(); k++) + { + if (tableLocks[k].ownerTxnID == fTxnid.id) + { + lockReleased = fDbrm->releaseTableLock(tableLocks[k].id); + + if (!lockReleased) + { + ostringstream os; + os << "tablelock id " << tableLocks[k].id << " is not found"; + throw std::runtime_error(os.str()); + } + } + } + + fDbrm->committed(fTxnid); + fTxnid = fDbrm->newTxnID(fSessionID, true, true); + + if (fTxnid.valid) + { + //cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl; + anyOtherActiveTransaction = false; + break; + } + else + { + anyOtherActiveTransaction = true; + } + } + } + } + + //cout << "ending i = " << i << endl; + } + else + { + //cout << "Ready to process type " << (int)packageType << endl; + fTxnid = sessionManager.getTxnID(fSessionID); + + if ( !fTxnid.valid ) + { + fTxnid = sessionManager.newTxnID(fSessionID, true, true); + + if (!fTxnid.valid) + { + //cout << "cannot get txnid " << (int)packageType << " for session " << sessionID << endl; + anyOtherActiveTransaction = true; + } + else + { + anyOtherActiveTransaction = false; + } + } + else + { + string errorMsg; + rc = commitTransaction(fTxnid.id, errorMsg); + + if ( rc != 0) + throw std::runtime_error(errorMsg); + + //need unlock the table. + std::vector tableLocks = fDbrm->getAllTableLocks(); + bool lockReleased = true; + + for (unsigned k = 0; k < tableLocks.size(); k++) + { + if (tableLocks[k].ownerTxnID == fTxnid.id) + { + lockReleased = fDbrm->releaseTableLock(tableLocks[k].id); + + if (!lockReleased) + { + ostringstream os; + os << "tablelock id " << tableLocks[k].id << " is not found"; + throw std::runtime_error(os.str()); + } + } + } + + sessionManager.committed(fTxnid); + fTxnid = sessionManager.newTxnID(fSessionID, true, true); + + if (!fTxnid.valid) + { + //cout << "cannot get txnid " << (int)packageType << " for session " << sessionID << endl; + anyOtherActiveTransaction = true; + } + else + { + anyOtherActiveTransaction = false; + } + } + } + + if ((anyOtherActiveTransaction) && (i >= numTries)) + { + //cout << " Erroring out on package type " << (int)packageType << endl; + break; + } + } + + if ((anyOtherActiveTransaction) && (i >= numTries)) + { + messageqcpp::ByteStream::byte status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES; + + Message::Args args; + args.add(static_cast(blockingsid.sessionid)); + + //@Bug 3854 Log to debug.log + LoggingID logid(15, 0, 0); + logging::Message::Args args1; + logging::Message msg(1); + args1.add(IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args)); + msg.format(args1); + logging::Logger logger(logid.fSubsysID); + logger.logMessage(LOG_TYPE_DEBUG, msg, logid); + + logError(status, IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args)); + } + else + { + processStatement(); + } + } + else + { + fTxnid = fDbrm->getTxnID(fSessionID); + + if ( !fTxnid.valid ) + { + fTxnid = fDbrm->newTxnID(fSessionID, true, true); + + if (!fTxnid.valid) + { + throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") ); + } + } + else + { + string errorMsg; + rc = commitTransaction(fTxnid.id, errorMsg); + + if ( rc != 0) + throw std::runtime_error(errorMsg); + + //need unlock the table. + std::vector tableLocks = fDbrm->getAllTableLocks(); + bool lockReleased = true; + + for (unsigned k = 0; k < tableLocks.size(); k++) + { + if (tableLocks[k].ownerTxnID == fTxnid.id) + { + lockReleased = fDbrm->releaseTableLock(tableLocks[k].id); + + if (!lockReleased) + { + ostringstream os; + os << "tablelock id " << tableLocks[k].id << " is not found"; + throw std::runtime_error(os.str()); + } + } + } + + fDbrm->committed(fTxnid); + fTxnid = fDbrm->newTxnID(fSessionID, true, true); + + if (!fTxnid.valid) + { + throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") ); + } + } + + processStatement(); + } + } + catch (const std::exception &ex) + { + logError(DDLPackageProcessor::NOT_ACCEPTING_PACKAGES, ex.what(), true); + throw; + } + } + +private: + int commitTransaction(uint32_t txnID, std::string& errorMsg) + { + auto WEClient = std::unique_ptr(new WriteEngine::WEClients(WriteEngine::WEClients::DDLPROC)); + auto PMCount = WEClient->getPmCount(); + ByteStream bytestream; + uint64_t uniqueId = fDbrm->getUnique64(); + WEClient->addQueue(uniqueId); + bytestream << (ByteStream::byte)WE_SVR_COMMIT_VERSION; + bytestream << uniqueId; + bytestream << txnID; + uint32_t msgRecived = 0; + WEClient->write_to_all(bytestream); + boost::shared_ptr bsIn; + bsIn.reset(new ByteStream()); + int rc = 0; + ByteStream::byte tmp8; + + while (true) + { + if (msgRecived == PMCount) + break; + + WEClient->read(uniqueId, bsIn); + + if (bsIn->length() == 0) //read error + { + rc = 1; + errorMsg = "DDL cannot communicate with WES"; + WEClient->removeQueue(uniqueId); + break; + } + else + { + *bsIn >> tmp8; + rc = tmp8; + + if (rc != 0) + { + *bsIn >> errorMsg; + WEClient->removeQueue(uniqueId); + break; + } + else + msgRecived++; + } + } + + return rc; + } + + void processStatement() + { DDLPackageProcessor::DDLResult result; result.result = DDLPackageProcessor::NO_ERROR; //boost::shared_ptr systemCatalogPtr; try { + if (!fOamCache) + fOamCache = oam::OamCache::makeOamCache(); //cout << "DDLProc received package " << fPackageType << endl; switch ( fPackageType ) { @@ -301,35 +697,40 @@ struct PackageHandler } catch (quadbyte& /*foo*/) { - fIos.close(); - cout << "Unrecognized package type" << endl; + logError(DDLPackageProcessor::NOT_ACCEPTING_PACKAGES, "Unrecognized package type", true); } catch (logging::IDBExcept& idbEx) { cleanPMSysCache(); - messageqcpp::ByteStream results; - messageqcpp::ByteStream::byte status = DDLPackageProcessor::CREATE_ERROR; - results << status; - results << string(idbEx.what()); - - fIos.write(results); - - fIos.close(); + logError(DDLPackageProcessor::CREATE_ERROR, idbEx.what(), true); } catch (...) { - fIos.close(); + logError(DDLPackageProcessor::NOT_ACCEPTING_PACKAGES, "Unknown error", true); } - } + void logError(messageqcpp::ByteStream::byte status, const std::string& msg, bool closeSocket = false) + { + messageqcpp::ByteStream res; + res << status; + res << msg; + fIos.write(res); + cerr << "DDLProc error: " << msg << endl; + if (closeSocket) + fIos.close(); + } + +private: messageqcpp::IOSocket fIos; messageqcpp::ByteStream fByteStream; messageqcpp::ByteStream::quadbyte fPackageType; + uint32_t fSessionID; BRM::TxnID fTxnid; BRM::DBRM* fDbrm; QueryTeleClient fQtc; - oam::OamCache* fOamCache; + oam::OamCache* fOamCache = nullptr; + bool fConcurrentSupport; }; } @@ -367,9 +768,6 @@ void DDLProcessor::process() { DBRM dbrm; messageqcpp::IOSocket ios; - messageqcpp::ByteStream bs; - PackageHandler handler; - messageqcpp::ByteStream::quadbyte packageType; bool concurrentSupport = true; string concurrentTranStr = config::Config::makeConfig()->getConfig("SystemConfig", "ConcurrentTransactions"); @@ -385,363 +783,14 @@ void DDLProcessor::process() { for (;;) { - ios = fMqServer.accept(); - bs = ios.read(); - uint32_t sessionID; - bs >> sessionID; - bs >> packageType; - - uint32_t stateFlags; - - if (dbrm.getSystemState(stateFlags) > 0) // > 0 implies succesful retrieval. It doesn't imply anything about the contents + try { - messageqcpp::ByteStream results; - const char* responseMsg = 0; - messageqcpp::ByteStream::byte status; - bool bReject = false; - - // Check to see if we're in write suspended mode - // If so, we can't process the request. - if (stateFlags & SessionManagerServer::SS_SUSPENDED) - { - status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES; - responseMsg = "Writing to the database is disabled."; - bReject = true; - } - - // Check to see if we're in write suspend or shutdown pending mode - if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING - || stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING) - { - if (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING) - { - responseMsg = "Writing to the database is disabled."; - } - else - { - responseMsg = "The database is being shut down."; - } - - status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES; - bReject = true; - } - - if (bReject) - { - results << status; - //@bug 266 - MessageLog logger(LoggingID(27)); - logging::Message::Args args; - logging::Message message(2); - args.add(responseMsg); - message.format( args ); - results << message.msg(); - ios.write(results); - std::cout << responseMsg << endl; - std::cout << "Command rejected. Status " << (int)status << message.msg() << endl; - continue; - } + ios = fMqServer.accept(); + fDdlPackagepool.invoke(PackageHandler(fQtc, &dbrm, ios, concurrentSupport)); } - - //check whether the system is ready to process statement. - if (dbrm.getSystemReady() < 1) + catch (...) { - messageqcpp::ByteStream results; - messageqcpp::ByteStream::byte status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES; - - results << status; - string msg ("System is not ready yet. Please try again." ); - - results << msg; - ios.write(results); - - ios.close(); - continue; - } - - BRM::TxnID txnid; - int rc = 0; - - if (!concurrentSupport) - { - //Check if any other active transaction - bool bIsDbrmUp = true; - bool anyOtherActiveTransaction = true; - execplan::SessionManager sessionManager; - BRM::SIDTIDEntry blockingsid; - int i = 0; - int waitPeriod = 10; - //@Bug 2487 Check transaction map every 1/10 second - - int sleepTime = 100; // sleep 100 milliseconds between checks - int numTries = 10; // try 10 times per second - - string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod"); - - if ( waitPeriodStr.length() != 0 ) - waitPeriod = static_cast(config::Config::fromText(waitPeriodStr)); - - numTries = waitPeriod * 10; - struct timespec rm_ts; - - rm_ts.tv_sec = sleepTime / 1000; - rm_ts.tv_nsec = sleepTime % 1000 * 1000000; - - //cout << "starting i = " << i << endl; - //anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp ); - while (anyOtherActiveTransaction) - { - anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp, - blockingsid ); - - if (anyOtherActiveTransaction) - { - for ( ; i < numTries; i++ ) - { -#ifdef _MSC_VER - Sleep(rm_ts.tv_sec * 1000); -#else - struct timespec abs_ts; - - //cout << "session " << sessionID << " nanosleep on package type " << (int)packageType << endl; - do - { - abs_ts.tv_sec = rm_ts.tv_sec; - abs_ts.tv_nsec = rm_ts.tv_nsec; - } - while (nanosleep(&abs_ts, &rm_ts) < 0); - -#endif - anyOtherActiveTransaction = sessionManager.checkActiveTransaction( sessionID, bIsDbrmUp, - blockingsid ); - - if ( !anyOtherActiveTransaction ) - { - //cout << "Ready to process type " << (int)packageType << endl; - txnid = sessionManager.getTxnID(sessionID); - - if ( !txnid.valid ) - { - txnid = sessionManager.newTxnID(sessionID, true, true); - - if (txnid.valid) - { - //cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl; - anyOtherActiveTransaction = false; - break; - } - else - { - anyOtherActiveTransaction = true; - } - } - else - { - string errorMsg; - rc = commitTransaction(txnid.id, errorMsg); - - if ( rc != 0) - throw std::runtime_error(errorMsg); - - //need unlock the table. - std::vector tableLocks = dbrm.getAllTableLocks(); - bool lockReleased = true; - - for (unsigned k = 0; k < tableLocks.size(); k++) - { - if (tableLocks[k].ownerTxnID == txnid.id) - { - lockReleased = dbrm.releaseTableLock(tableLocks[k].id); - - if (!lockReleased) - { - ostringstream os; - os << "tablelock id " << tableLocks[k].id << " is not found"; - throw std::runtime_error(os.str()); - } - } - } - - dbrm.committed(txnid); - txnid = dbrm.newTxnID(sessionID, true, true); - - if (txnid.valid) - { - //cout << "Ready to process type " << (int)packageType << " with txnid " << txnid.id << endl; - anyOtherActiveTransaction = false; - break; - } - else - { - anyOtherActiveTransaction = true; - } - } - } - } - - //cout << "ending i = " << i << endl; - } - else - { - //cout << "Ready to process type " << (int)packageType << endl; - txnid = sessionManager.getTxnID(sessionID); - - if ( !txnid.valid ) - { - txnid = sessionManager.newTxnID(sessionID, true, true); - - if (!txnid.valid) - { - //cout << "cannot get txnid " << (int)packageType << " for session " << sessionID << endl; - anyOtherActiveTransaction = true; - } - else - { - anyOtherActiveTransaction = false; - } - } - else - { - string errorMsg; - rc = commitTransaction(txnid.id, errorMsg); - - if ( rc != 0) - throw std::runtime_error(errorMsg); - - //need unlock the table. - std::vector tableLocks = dbrm.getAllTableLocks(); - bool lockReleased = true; - - for (unsigned k = 0; k < tableLocks.size(); k++) - { - if (tableLocks[k].ownerTxnID == txnid.id) - { - lockReleased = dbrm.releaseTableLock(tableLocks[k].id); - - if (!lockReleased) - { - ostringstream os; - os << "tablelock id " << tableLocks[k].id << " is not found"; - throw std::runtime_error(os.str()); - } - } - } - - sessionManager.committed(txnid); - txnid = sessionManager.newTxnID(sessionID, true, true); - - if (!txnid.valid) - { - //cout << "cannot get txnid " << (int)packageType << " for session " << sessionID << endl; - anyOtherActiveTransaction = true; - } - else - { - anyOtherActiveTransaction = false; - } - } - } - - if ((anyOtherActiveTransaction) && (i >= numTries)) - { - //cout << " Erroring out on package type " << (int)packageType << endl; - break; - } - } - - if ((anyOtherActiveTransaction) && (i >= numTries)) - { - messageqcpp::ByteStream results; - messageqcpp::ByteStream::byte status = DDLPackageProcessor::NOT_ACCEPTING_PACKAGES; - - results << status; - Message::Args args; - args.add(static_cast(blockingsid.sessionid)); - - results << IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args); - //@Bug 3854 Log to debug.log - LoggingID logid(15, 0, 0); - logging::Message::Args args1; - logging::Message msg(1); - args1.add(IDBErrorInfo::instance()->errorMsg(ERR_ACTIVE_TRANSACTION, args)); - msg.format( args1 ); - logging::Logger logger(logid.fSubsysID); - logger.logMessage(LOG_TYPE_DEBUG, msg, logid); - - ios.write(results); - - ios.close(); - } - else - { - handler.fIos = ios; - handler.fByteStream = bs; - handler.fPackageType = packageType; - handler.fTxnid = txnid; - handler.fDbrm = &dbrm; - handler.fQtc = fQtc; - handler.fOamCache = oam::OamCache::makeOamCache(); - fDdlPackagepool.invoke(handler); - - } - } - else - { - txnid = dbrm.getTxnID(sessionID); - - if ( !txnid.valid ) - { - txnid = dbrm.newTxnID(sessionID, true, true); - - if (!txnid.valid) - { - throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") ); - } - } - else - { - string errorMsg; - rc = commitTransaction(txnid.id, errorMsg); - - if ( rc != 0) - throw std::runtime_error(errorMsg); - - //need unlock the table. - std::vector tableLocks = dbrm.getAllTableLocks(); - bool lockReleased = true; - - for (unsigned k = 0; k < tableLocks.size(); k++) - { - if (tableLocks[k].ownerTxnID == txnid.id) - { - lockReleased = dbrm.releaseTableLock(tableLocks[k].id); - - if (!lockReleased) - { - ostringstream os; - os << "tablelock id " << tableLocks[k].id << " is not found"; - throw std::runtime_error(os.str()); - } - } - } - - dbrm.committed(txnid); - txnid = dbrm.newTxnID(sessionID, true, true); - - if (!txnid.valid) - { - throw std::runtime_error( std::string("Unable to start a transaction. Check critical log.") ); - } - } - - handler.fIos = ios; - handler.fByteStream = bs; - handler.fPackageType = packageType; - handler.fTxnid = txnid; - handler.fDbrm = &dbrm; - handler.fQtc = fQtc; - handler.fOamCache = oam::OamCache::makeOamCache(); - fDdlPackagepool.invoke(handler); + throw; } } }