From 4313288a859fde589b36a7df12689f0817e414b9 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Wed, 28 Dec 2022 21:15:39 +0300 Subject: [PATCH] Merge 22.08.7 (#2678) * fix C API includes ColumnStore used to include server's mysql.h but link all tools with libmariadb.so There's no guarantee that this would work, even with workarounds it had in dbcon/mysql/sm.cpp Fix: * tools (linked with libmariadb.so) *must* include libmariadb's mysql.h * as a hack prevent service_thd_timezone.h from being loaded into tools, as it conflicts with libmariadb's mysql.h * server plugin *must* include server's mysql.h * also don't link every tool with libmariadb.so, link the helper library (liblibmysqlclient.so) that actually needs it, tools use this helper library, not libmariadb.so directly * do *not* link ha_columnstore.so with libmariadb.so this means some libraries have to be compiled twice - for tools with libmariadb.so and for plugin, without. * use system boost, if possible boost 1.71.0 is what ubuntu focal has, so let's start with that version. boost 1.77.0 is the first that supports c++20 * add dependency for generated header files errorids.h messageids.h see 3edd51610 * bump the version * MCOL-5322 This patch replaces boost::mutex with std::mutex b/c IMHO std::unique_lock::lock is less troublesome comparing with the boost alternative * MCOL-5310 This patch replaces move-assignment with copy-assignment to avoid memory corruption (#2661) * Bump VERSION to 22.08.7-1 * MCOL-5306 Re-read the config (Columnstore.xml) file if it was updated. The existing implementation of Config::makeConfig() factory method was returning a possibly stale config to the caller, without checking if the config file was updated since the last read. This bug triggered a scenario as described in MCOL-5306 where after a failover in an MCS cluster, the controllernode coordinates changed in the config file after failover and the existing mariadbd process was still using the old controllernode coordinates. This lead to failed network connection between mariadbd and the new controllernode. The change in this fix, however, is more generic and not just limited to this above scenario. * MCOL-5264 This patch replaces boost mutex locks with std analogs boost::uniqie_lock dtor calls a fancy unlock logic that throws twice. First if the mutex is 0 and second lock doesn't own the mutex. The first condition failure causes unhandled exception for one of the clients in DEC::writeToClient(). I was unable to find out why Linux can have a 0 mutex and replaced boost::mutex with std::mutex b/c stdlibc++ should be more stable comparing with boost. * MCOL-5311 Add timezone to jobList in subquerytransformer TimeZone was uninitialized in this scenario and led to undefined behavior. * patch_out_of_band Some changes made to 10.6-enterprise make a build using the out-of-band method of compiling columnstore not work. Out-of band means the source for the engine is not in the storage subdir of server, but rather in a stand alone directory. This is used by developers for easier develop work. In the case of out-of-band, INSTALL_LAYOUT is false in CMakeLists.txt * MCOL-5346 This patch forces TreeNode::getIntValue to use conversion for dict-based CHAR/VARCHAR and TEXT columns (#2657) Co-authored-by: Roman Nozdrin * MCOL-5263 Add support to ROLLBACK when PP were restarted. DMLProc starts ROLLBACK when SELECT part of UPDATE fails b/c EM facility in PP were restarted. Unfortunately this ROLLBACK stuck if EM/PP are not yet available. DMLProc must have a t/o with re-try doing ROLLBACK. * MCOL-3561 This patch updates Connector code after MDEV-29988 * This commit applies the code style format Co-authored-by: Sergei Golubchik Co-authored-by: Roman Nozdrin Co-authored-by: David.Hall Co-authored-by: Gagan Goel Co-authored-by: Denis Khalikov --- VERSION | 2 +- dbcon/mysql/ha_mcs_execplan.cpp | 24 +- primitives/blockcache/stats.cpp | 10 +- primitives/primproc/sqlfrontsessionthread.cpp | 1728 +++++++++-------- 4 files changed, 887 insertions(+), 877 deletions(-) diff --git a/VERSION b/VERSION index 1bb40e0e7..4ac98053e 100644 --- a/VERSION +++ b/VERSION @@ -1,4 +1,4 @@ COLUMNSTORE_VERSION_MAJOR=22 COLUMNSTORE_VERSION_MINOR=08 -COLUMNSTORE_VERSION_PATCH=4 +COLUMNSTORE_VERSION_PATCH=7 COLUMNSTORE_VERSION_RELEASE=1 diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 0dd954ad8..207086e4c 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -3471,6 +3471,10 @@ ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupp break; default: + if (ref->ref_type() == Item_ref::DIRECT_REF) + { + return buildReturnedColumn(ref->real_item(), gwi, nonSupport); + } gwi.fatalParseError = true; gwi.parseErrorText = "Unknown REF item"; break; @@ -4053,15 +4057,14 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non ReturnedColumn* rc = NULL; - // Special treatment for json functions // All boolean arguments will be parsed as boolean string true(false) // E.g. the result of `SELECT JSON_ARRAY(true, false)` should be [true, false] instead of [1, 0] - bool mayHasBoolArg = ((funcName == "json_insert" || funcName == "json_replace" || - funcName == "json_set" || funcName == "json_array_append" || - funcName == "json_array_insert") && i != 0 && i % 2 == 0) || - (funcName == "json_array") || - (funcName == "json_object" && i % 2 == 1); + bool mayHasBoolArg = + ((funcName == "json_insert" || funcName == "json_replace" || funcName == "json_set" || + funcName == "json_array_append" || funcName == "json_array_insert") && + i != 0 && i % 2 == 0) || + (funcName == "json_array") || (funcName == "json_object" && i % 2 == 1); bool isBoolType = (ifp->arguments()[i]->const_item() && ifp->arguments()[i]->type_handler()->is_bool_type()); @@ -6408,10 +6411,15 @@ void parse_item(Item* item, vector& field_vec, bool& hasNonSupportI case Item::REF_ITEM: { + Item_ref* ref = (Item_ref*)item; + if (ref->ref_type() == Item_ref::DIRECT_REF) + { + parse_item(ref->real_item(), field_vec, hasNonSupportItem, parseInfo, gwi); + break; + } while (true) { - Item_ref* ref = (Item_ref*)item; - + ref = (Item_ref*)item; if ((*(ref->ref))->type() == Item::SUM_FUNC_ITEM) { parseInfo |= AGG_BIT; diff --git a/primitives/blockcache/stats.cpp b/primitives/blockcache/stats.cpp index 50372758e..ad5c3a6ed 100644 --- a/primitives/blockcache/stats.cpp +++ b/primitives/blockcache/stats.cpp @@ -29,7 +29,7 @@ typedef int pthread_t; #endif #include #include -//#define NDEBUG +// #define NDEBUG #include #include #include @@ -178,7 +178,7 @@ typedef map TraceFileMap_t; TraceFileMap_t traceFileMap; // map mutex -boost::mutex traceFileMapMutex; +std::mutex traceFileMapMutex; class StatMon { @@ -197,7 +197,7 @@ class StatMon void operator()() const { // struct timespec ts = { 60 * 1, 0 }; - boost::mutex::scoped_lock lk(traceFileMapMutex); + std::unique_lock lk(traceFileMapMutex); TraceFileMap_t::iterator iter; TraceFileMap_t::iterator end; @@ -256,7 +256,7 @@ void Stats::touchedLBID(uint64_t lbid, pthread_t thdid, uint32_t session) if (session == 0) return; - boost::mutex::scoped_lock lk(traceFileMapMutex); + std::lock_guard lk(traceFileMapMutex); TraceFileMap_t::iterator iter = traceFileMap.find(session); if (iter == traceFileMap.end()) @@ -274,7 +274,7 @@ void Stats::markEvent(const uint64_t lbid, const pthread_t thdid, const uint32_t if (session == 0) return; - boost::mutex::scoped_lock lk(traceFileMapMutex); + std::lock_guard lk(traceFileMapMutex); TraceFileMap_t::iterator iter = traceFileMap.find(session); if (iter == traceFileMap.end()) diff --git a/primitives/primproc/sqlfrontsessionthread.cpp b/primitives/primproc/sqlfrontsessionthread.cpp index d43cd83e6..48bb9577a 100644 --- a/primitives/primproc/sqlfrontsessionthread.cpp +++ b/primitives/primproc/sqlfrontsessionthread.cpp @@ -21,972 +21,974 @@ namespace exemgr { - uint64_t SQLFrontSessionThread::getMaxMemPct(uint32_t sessionId) - { - return globServiceExeMgr->getMaxMemPct(sessionId); - } - void SQLFrontSessionThread::deleteMaxMemPct(uint32_t sessionId) - { - return globServiceExeMgr->deleteMaxMemPct(sessionId); - } - void SQLFrontSessionThread::incThreadCntPerSession(uint32_t sessionId) - { - return globServiceExeMgr->incThreadCntPerSession(sessionId); - } - void SQLFrontSessionThread::decThreadCntPerSession(uint32_t sessionId) - { - return globServiceExeMgr->decThreadCntPerSession(sessionId); - } +uint64_t SQLFrontSessionThread::getMaxMemPct(uint32_t sessionId) +{ + return globServiceExeMgr->getMaxMemPct(sessionId); +} +void SQLFrontSessionThread::deleteMaxMemPct(uint32_t sessionId) +{ + return globServiceExeMgr->deleteMaxMemPct(sessionId); +} +void SQLFrontSessionThread::incThreadCntPerSession(uint32_t sessionId) +{ + return globServiceExeMgr->incThreadCntPerSession(sessionId); +} +void SQLFrontSessionThread::decThreadCntPerSession(uint32_t sessionId) +{ + return globServiceExeMgr->decThreadCntPerSession(sessionId); +} - void SQLFrontSessionThread::initMaxMemPct(uint32_t sessionId) - { - return globServiceExeMgr->initMaxMemPct(sessionId); - } - //...Get and log query stats to specified output stream - const std::string SQLFrontSessionThread::formatQueryStats( - joblist::SJLP& jl, // joblist associated with query - const std::string& label, // header label to print in front of log output - bool includeNewLine, // include line breaks in query stats std::string - bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned) - { - std::ostringstream os; +void SQLFrontSessionThread::initMaxMemPct(uint32_t sessionId) +{ + return globServiceExeMgr->initMaxMemPct(sessionId); +} +//...Get and log query stats to specified output stream +const std::string SQLFrontSessionThread::formatQueryStats( + joblist::SJLP& jl, // joblist associated with query + const std::string& label, // header label to print in front of log output + bool includeNewLine, // include line breaks in query stats std::string + bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned) +{ + std::ostringstream os; - // Get stats if not already acquired for current query - if (!fStatsRetrieved) + // Get stats if not already acquired for current query + if (!fStatsRetrieved) + { + if (wantExtendedStats) { - if (wantExtendedStats) + // wait for the ei data to be written by another thread (brain-dead) + struct timespec req = {0, 250000}; // 250 usec + nanosleep(&req, 0); + } + + // Get % memory usage during current query for sessionId + jl->querySummary(wantExtendedStats); + fStats = jl->queryStats(); + fStats.fMaxMemPct = getMaxMemPct(fStats.fSessionID); + fStats.fRows = rowsReturned; + fStatsRetrieved = true; + } + + std::string queryMode; + queryMode = (vtableModeOn ? "Distributed" : "Standard"); + + // Log stats to specified output stream + os << label << ": MaxMemPct-" << fStats.fMaxMemPct << "; NumTempFiles-" << fStats.fNumFiles + << "; TempFileSpace-" << roundBytes(fStats.fFileBytes) << "; ApproxPhyI/O-" << fStats.fPhyIO + << "; CacheI/O-" << fStats.fCacheIO << "; BlocksTouched-" << fStats.fMsgRcvCnt; + + if (includeNewLine) + os << std::endl << " "; // insert line break + else + os << "; "; // continue without line break + + os << "PartitionBlocksEliminated-" << fStats.fCPBlocksSkipped << "; MsgBytesIn-" + << roundBytes(fStats.fMsgBytesIn) << "; MsgBytesOut-" << roundBytes(fStats.fMsgBytesOut) << "; Mode-" + << queryMode; + + return os.str(); +} + +//... Round off to human readable format (KB, MB, or GB). +const std::string SQLFrontSessionThread::roundBytes(uint64_t value) const +{ + const char* units[] = {"B", "KB", "MB", "GB", "TB"}; + uint64_t i = 0, up = 0; + uint64_t roundedValue = value; + + while (roundedValue > 1024 && i < 4) + { + up = (roundedValue & 512); + roundedValue /= 1024; + i++; + } + + if (up) + roundedValue++; + + std::ostringstream oss; + oss << roundedValue << units[i]; + return oss.str(); +} + +//...Round off to nearest (1024*1024) MB +uint64_t SQLFrontSessionThread::roundMB(uint64_t value) const +{ + uint64_t roundedValue = value >> 20; + + if (value & 524288) + roundedValue++; + + return roundedValue; +} + +void SQLFrontSessionThread::setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms) +{ + for (execplan::CalpontSelectExecutionPlan::RMParmVec::const_iterator it = parms.begin(); it != parms.end(); + ++it) + { + switch (it->id) + { + case execplan::PMSMALLSIDEMEMORY: { - // wait for the ei data to be written by another thread (brain-dead) - struct timespec req = {0, 250000}; // 250 usec - nanosleep(&req, 0); + globServiceExeMgr->getRm().addHJPmMaxSmallSideMap(it->sessionId, it->value); + break; } - // Get % memory usage during current query for sessionId - jl->querySummary(wantExtendedStats); - fStats = jl->queryStats(); - fStats.fMaxMemPct = getMaxMemPct(fStats.fSessionID); - fStats.fRows = rowsReturned; - fStatsRetrieved = true; - } - - std::string queryMode; - queryMode = (vtableModeOn ? "Distributed" : "Standard"); - - // Log stats to specified output stream - os << label << ": MaxMemPct-" << fStats.fMaxMemPct << "; NumTempFiles-" << fStats.fNumFiles - << "; TempFileSpace-" << roundBytes(fStats.fFileBytes) << "; ApproxPhyI/O-" << fStats.fPhyIO - << "; CacheI/O-" << fStats.fCacheIO << "; BlocksTouched-" << fStats.fMsgRcvCnt; - - if (includeNewLine) - os << std::endl << " "; // insert line break - else - os << "; "; // continue without line break - - os << "PartitionBlocksEliminated-" << fStats.fCPBlocksSkipped << "; MsgBytesIn-" - << roundBytes(fStats.fMsgBytesIn) << "; MsgBytesOut-" << roundBytes(fStats.fMsgBytesOut) << "; Mode-" - << queryMode; - - return os.str(); - } - - //... Round off to human readable format (KB, MB, or GB). - const std::string SQLFrontSessionThread::roundBytes(uint64_t value) const - { - const char* units[] = {"B", "KB", "MB", "GB", "TB"}; - uint64_t i = 0, up = 0; - uint64_t roundedValue = value; - - while (roundedValue > 1024 && i < 4) - { - up = (roundedValue & 512); - roundedValue /= 1024; - i++; - } - - if (up) - roundedValue++; - - std::ostringstream oss; - oss << roundedValue << units[i]; - return oss.str(); - } - - //...Round off to nearest (1024*1024) MB - uint64_t SQLFrontSessionThread::roundMB(uint64_t value) const - { - uint64_t roundedValue = value >> 20; - - if (value & 524288) - roundedValue++; - - return roundedValue; - } - - void SQLFrontSessionThread::setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms) - { - for (execplan::CalpontSelectExecutionPlan::RMParmVec::const_iterator it = parms.begin(); - it != parms.end(); ++it) - { - switch (it->id) + case execplan::UMSMALLSIDEMEMORY: { - case execplan::PMSMALLSIDEMEMORY: - { - globServiceExeMgr->getRm().addHJPmMaxSmallSideMap(it->sessionId, it->value); - break; - } - - case execplan::UMSMALLSIDEMEMORY: - { - globServiceExeMgr->getRm().addHJUmMaxSmallSideMap(it->sessionId, it->value); - break; - } - - default:; + globServiceExeMgr->getRm().addHJUmMaxSmallSideMap(it->sessionId, it->value); + break; } + + default:; + } + } +} + +void SQLFrontSessionThread::buildSysCache(const execplan::CalpontSelectExecutionPlan& csep, + boost::shared_ptr csc) +{ + const execplan::CalpontSelectExecutionPlan::ColumnMap& colMap = csep.columnMap(); + std::string schemaName; + + for (auto it = colMap.begin(); it != colMap.end(); ++it) + { + const auto sc = dynamic_cast((it->second).get()); + + if (sc) + { + schemaName = sc->schemaName(); + + // only the first time a schema is got will actually query + // system catalog. System catalog keeps a schema name std::map. + // if a schema exists, the call getSchemaInfo returns without + // doing anything. + if (!schemaName.empty()) + csc->getSchemaInfo(schemaName); } } - void SQLFrontSessionThread::buildSysCache(const execplan::CalpontSelectExecutionPlan& csep, - boost::shared_ptr csc) + execplan::CalpontSelectExecutionPlan::SelectList::const_iterator subIt; + + for (subIt = csep.derivedTableList().begin(); subIt != csep.derivedTableList().end(); ++subIt) { - const execplan::CalpontSelectExecutionPlan::ColumnMap& colMap = csep.columnMap(); - std::string schemaName; - - for (auto it = colMap.begin(); it != colMap.end(); ++it) - { - const auto sc = dynamic_cast((it->second).get()); - - if (sc) - { - schemaName = sc->schemaName(); - - // only the first time a schema is got will actually query - // system catalog. System catalog keeps a schema name std::map. - // if a schema exists, the call getSchemaInfo returns without - // doing anything. - if (!schemaName.empty()) - csc->getSchemaInfo(schemaName); - } - } - - execplan::CalpontSelectExecutionPlan::SelectList::const_iterator subIt; - - for (subIt = csep.derivedTableList().begin(); subIt != csep.derivedTableList().end(); ++subIt) - { - buildSysCache(*(dynamic_cast(subIt->get())), csc); - } + buildSysCache(*(dynamic_cast(subIt->get())), csc); } +} - void SQLFrontSessionThread::writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg) +void SQLFrontSessionThread::writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg) +{ + messageqcpp::ByteStream emsgBs; + messageqcpp::ByteStream tbs; + tbs << code; + fIos.write(tbs); + emsgBs << emsg; + fIos.write(emsgBs); +} + +void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, + bool& stmtCounted) +{ + auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); + messageqcpp::ByteStream::quadbyte qb; + execplan::MCSAnalyzeTableExecutionPlan caep; + + bs = fIos.read(); + caep.unserialize(bs); + + statementsRunningCount->incr(stmtCounted); + PrimitiveServerThreadPools primitiveServerThreadPools; + jl = joblist::JobListFactory::makeJobList(&caep, fRm, primitiveServerThreadPools, false, true); + + // Joblist is empty. + if (jl->status() == logging::statisticsJobListEmpty) { - messageqcpp::ByteStream emsgBs; - messageqcpp::ByteStream tbs; - tbs << code; - fIos.write(tbs); - emsgBs << emsg; - fIos.write(emsgBs); - } + if (caep.traceOn()) + std::cout << "JobList is empty " << std::endl; - void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, - bool& stmtCounted) - { - auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); - messageqcpp::ByteStream::quadbyte qb; - execplan::MCSAnalyzeTableExecutionPlan caep; - - bs = fIos.read(); - caep.unserialize(bs); - - statementsRunningCount->incr(stmtCounted); - PrimitiveServerThreadPools primitiveServerThreadPools; - jl = joblist::JobListFactory::makeJobList(&caep, fRm, primitiveServerThreadPools, false, true); - - // Joblist is empty. - if (jl->status() == logging::statisticsJobListEmpty) - { - if (caep.traceOn()) - std::cout << "JobList is empty " << std::endl; - - jl.reset(); - bs.restart(); - qb = ANALYZE_TABLE_SUCCESS; - bs << qb; - fIos.write(bs); - bs.reset(); - statementsRunningCount->decr(stmtCounted); - return; - } - - if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) - { - std::cout << "fEc setup " << std::endl; - fEc->Setup(); - } - - if (jl->status() == 0) - { - std::string emsg; - - if (jl->putEngineComm(fEc) != 0) - throw std::runtime_error(jl->errMsg()); - } - else - { - throw std::runtime_error("ExeMgr: could not build a JobList!"); - } - - // Execute a joblist. - jl->doQuery(); - - FEMsgHandler msgHandler(jl, &fIos); - msgHandler.start(); - - // Seemls like this a legacy parameter, not really needed. - const uint32_t dummyTableOid = 100; - auto* statisticsManager = statistics::StatisticsManager::instance(); - // Process rowGroup by rowGroup. - auto rowCount = jl->projectTable(dummyTableOid, bs); - while (rowCount) - { - auto outRG = (static_cast(jl.get()))->getOutputRowGroup(); - statisticsManager->collectSample(outRG); - rowCount = jl->projectTable(dummyTableOid, bs); - } - msgHandler.stop(); - - // Analyze collected samples. - statisticsManager->analyzeSample(caep.traceOn()); - statisticsManager->incEpoch(); - statisticsManager->saveToFile(); - - // Distribute statistics across all ExeMgr clients if possible. - statistics::StatisticsDistributor::instance()->distributeStatistics(); - - // Send the signal back to front-end. + jl.reset(); bs.restart(); qb = ANALYZE_TABLE_SUCCESS; bs << qb; fIos.write(bs); bs.reset(); statementsRunningCount->decr(stmtCounted); + return; } - void SQLFrontSessionThread::analyzeTableHandleStats(messageqcpp::ByteStream& bs) + if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) { - messageqcpp::ByteStream::quadbyte qb; -#ifdef DEBUG_STATISTICS - std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; -#endif - bs = fIos.read(); -#ifdef DEBUG_STATISTICS - std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; -#endif - uint64_t dataHashRec; - bs >> dataHashRec; + std::cout << "fEc setup " << std::endl; + fEc->Setup(); + } - uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats(); - // The stats are the same. - if (dataHash == dataHashRec) - { + if (jl->status() == 0) + { + std::string emsg; + + if (jl->putEngineComm(fEc) != 0) + throw std::runtime_error(jl->errMsg()); + } + else + { + throw std::runtime_error("ExeMgr: could not build a JobList!"); + } + + // Execute a joblist. + jl->doQuery(); + + FEMsgHandler msgHandler(jl, &fIos); + msgHandler.start(); + + // Seemls like this a legacy parameter, not really needed. + const uint32_t dummyTableOid = 100; + auto* statisticsManager = statistics::StatisticsManager::instance(); + // Process rowGroup by rowGroup. + auto rowCount = jl->projectTable(dummyTableOid, bs); + while (rowCount) + { + auto outRG = (static_cast(jl.get()))->getOutputRowGroup(); + statisticsManager->collectSample(outRG); + rowCount = jl->projectTable(dummyTableOid, bs); + } + msgHandler.stop(); + + // Analyze collected samples. + statisticsManager->analyzeSample(caep.traceOn()); + statisticsManager->incEpoch(); + statisticsManager->saveToFile(); + + // Distribute statistics across all ExeMgr clients if possible. + statistics::StatisticsDistributor::instance()->distributeStatistics(); + + // Send the signal back to front-end. + bs.restart(); + qb = ANALYZE_TABLE_SUCCESS; + bs << qb; + fIos.write(bs); + bs.reset(); + statementsRunningCount->decr(stmtCounted); +} + +void SQLFrontSessionThread::analyzeTableHandleStats(messageqcpp::ByteStream& bs) +{ + messageqcpp::ByteStream::quadbyte qb; #ifdef DEBUG_STATISTICS - std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) " << std::endl; + std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; #endif - qb = ANALYZE_TABLE_SUCCESS; - bs << qb; - fIos.write(bs); - bs.reset(); - return; - } - - bs.restart(); - qb = ANALYZE_TABLE_NEED_STATS; - bs << qb; - fIos.write(bs); - - bs.restart(); - bs = fIos.read(); + bs = fIos.read(); #ifdef DEBUG_STATISTICS - std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; + std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; #endif - statistics::StatisticsManager::instance()->unserialize(bs); - statistics::StatisticsManager::instance()->saveToFile(); + uint64_t dataHashRec; + bs >> dataHashRec; + uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats(); + // The stats are the same. + if (dataHash == dataHashRec) + { #ifdef DEBUG_STATISTICS - std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl; + std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) " << std::endl; #endif qb = ANALYZE_TABLE_SUCCESS; bs << qb; fIos.write(bs); bs.reset(); + return; } - void SQLFrontSessionThread::operator()() + bs.restart(); + qb = ANALYZE_TABLE_NEED_STATS; + bs << qb; + fIos.write(bs); + + bs.restart(); + bs = fIos.read(); +#ifdef DEBUG_STATISTICS + std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; +#endif + statistics::StatisticsManager::instance()->unserialize(bs); + statistics::StatisticsManager::instance()->saveToFile(); + +#ifdef DEBUG_STATISTICS + std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl; +#endif + qb = ANALYZE_TABLE_SUCCESS; + bs << qb; + fIos.write(bs); + bs.reset(); +} + +void SQLFrontSessionThread::operator()() +{ + messageqcpp::ByteStream bs, inbs; + execplan::CalpontSelectExecutionPlan csep; + csep.sessionID(0); + joblist::SJLP jl; + bool incSQLFrontSessionThreadCnt = true; + std::mutex jlMutex; + std::condition_variable jlCleanupDone; + int destructing = 0; + int gDebug = globServiceExeMgr->getDebugLevel(); + logging::Logger& msgLog = globServiceExeMgr->getLogger(); + + bool selfJoin = false; + bool tryTuples = false; + bool usingTuples = false; + bool stmtCounted = false; + auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); + + try { - messageqcpp::ByteStream bs, inbs; - execplan::CalpontSelectExecutionPlan csep; - csep.sessionID(0); - joblist::SJLP jl; - bool incSQLFrontSessionThreadCnt = true; - std::mutex jlMutex; - std::condition_variable jlCleanupDone; - int destructing = 0; - int gDebug = globServiceExeMgr->getDebugLevel(); - logging::Logger& msgLog = globServiceExeMgr->getLogger(); - - bool selfJoin = false; - bool tryTuples = false; - bool usingTuples = false; - bool stmtCounted = false; - auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); - - try + for (;;) { - for (;;) - { - selfJoin = false; - tryTuples = false; - usingTuples = false; + selfJoin = false; + tryTuples = false; + usingTuples = false; - if (jl) + if (jl) + { + // puts the real destruction in another thread to avoid + // making the whole session wait. It can take several seconds. + std::unique_lock scoped(jlMutex); + destructing++; + std::thread bgdtor( + [jl, &jlMutex, &jlCleanupDone, &destructing] + { + std::unique_lock scoped(jlMutex); + const_cast(jl).reset(); // this happens second; does real destruction + if (--destructing == 0) + jlCleanupDone.notify_one(); + }); + jl.reset(); // this runs first + bgdtor.detach(); + } + + bs = fIos.read(); + + if (bs.length() == 0) + { + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl; + + // connection closed by client + fIos.close(); + break; + } + else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan + { + if (gDebug) + std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length " + << bs.length() << std::endl; + + fIos.close(); + break; + } + else if (bs.length() == 4) // possible tuple flag + { + messageqcpp::ByteStream::quadbyte qb; + bs >> qb; + + if (qb == 4) // UM wants new tuple i/f { - // puts the real destruction in another thread to avoid - // making the whole session wait. It can take several seconds. - std::unique_lock scoped(jlMutex); - destructing++; - std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, &destructing] { - std::unique_lock scoped(jlMutex); - const_cast(jl).reset(); // this happens second; does real destruction - if (--destructing == 0) - jlCleanupDone.notify_one(); - }); - jl.reset(); // this runs first - bgdtor.detach(); + if (gDebug) + std::cout << "### UM wants tuples" << std::endl; + + tryTuples = true; + // now wait for the CSEP... + bs = fIos.read(); + } + else if (qb == 5) // somebody wants stats + { + bs.restart(); + qb = statementsRunningCount->cur(); + bs << qb; + qb = statementsRunningCount->waiting(); + bs << qb; + fIos.write(bs); + fIos.close(); + break; + } + else if (qb == ANALYZE_TABLE_EXECUTE) + { + analyzeTableExecute(bs, jl, stmtCounted); + continue; + } + else if (qb == ANALYZE_TABLE_REC_STATS) + { + analyzeTableHandleStats(bs); + continue; + } + else + { + if (gDebug) + std::cout << "### Got a not-a-plan value " << qb << std::endl; + + fIos.close(); + break; + } + } + + new_plan: + try + { + csep.unserialize(bs); + } + catch (logging::IDBExcept& ex) + { + // We can get here on illegal function parameter data type, e.g. + // SELECT blob_column|1 FROM t1; + statementsRunningCount->decr(stmtCounted); + writeCodeAndError(ex.errorCode(), std::string(ex.what())); + continue; + } + + querytele::QueryTeleStats qts; + + if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) + { + qts.query_uuid = csep.uuid(); + qts.msg_type = querytele::QueryTeleStats::QT_START; + qts.start_time = querytele::QueryTeleClient::timeNowms(); + qts.query = csep.data(); + qts.session_id = csep.sessionID(); + qts.query_type = csep.queryType(); + qts.system_name = fOamCachePtr->getSystemName(); + qts.module_name = fOamCachePtr->getModuleName(); + qts.local_query = csep.localQuery(); + qts.schema_name = csep.schemaName(); + fTeleClient.postQueryTele(qts); + } + + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl; + + setRMParms(csep.rmParms()); + // Re-establish lost PP connections. + if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) + { + fEc->Setup(); + } + // @bug 1021. try to get schema cache for a come in query. + // skip system catalog queries. + if (!csep.isInternal()) + { + boost::shared_ptr csc = + execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID()); + buildSysCache(csep, csc); + } + + // As soon as we have a session id for this thread, update the + // thread count per session; only do this once per thread. + // Mask 0x80000000 is for associate user query and csc query + if (incSQLFrontSessionThreadCnt) + { + // WIP + incThreadCntPerSession(csep.sessionID() | 0x80000000); + incSQLFrontSessionThreadCnt = false; + } + + bool needDbProfEndStatementMsg = false; + logging::Message::Args args; + std::string sqlText = csep.data(); + logging::LoggingID li(16, csep.sessionID(), csep.txnID()); + + // Initialize stats for this query, including + // init sessionMemMap entry for this session to 0 memory %. + // We will need this later for traceOn() or if we receive a + // table request with qb=3 (see below). This is also recorded + // as query start time. + initStats(csep.sessionID(), sqlText); + fStats.fQueryType = csep.queryType(); + + // Log start and end statement if tracing is enabled. Keep in + // mind the trace flag won't be set for system catalog queries. + if (csep.traceOn()) + { + args.reset(); + args.add((int)csep.statementID()); + args.add((int)csep.verID().currentScn); + args.add(sqlText); + msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li); + needDbProfEndStatementMsg = true; + } + + // Don't log subsequent self joins after first. + if (selfJoin) + sqlText = ""; + + std::ostringstream oss; + oss << sqlText << "; |" << csep.schemaName() << "|"; + logging::SQLLogger sqlLog(oss.str(), li); + + statementsRunningCount->incr(stmtCounted); + + PrimitiveServerThreadPools primitiveServerThreadPools( + ServicePrimProc::instance()->getPrimitiveServerThreadPool()); + + if (tryTuples) + { + try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList + { + jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, true, true); + // assign query stats + jl->queryStats(fStats); + + if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0)) + { + usingTuples = true; + + // Tell the FE that we're sending tuples back, not TableBands + writeCodeAndError(0, "NOERROR"); + auto tjlp = dynamic_cast(jl.get()); + assert(tjlp); + messageqcpp::ByteStream tbs; + tbs << tjlp->getOutputRowGroup(); + fIos.write(tbs); + } + else + { + const std::string emsg = jl->errMsg(); + statementsRunningCount->decr(stmtCounted); + writeCodeAndError(jl->status(), emsg); + std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl; + continue; + } + } + catch (std::exception& ex) + { + std::ostringstream errMsg; + errMsg << "ExeMgr: error writing makeJoblist " + "response; " + << ex.what(); + throw std::runtime_error(errMsg.str()); + } + catch (...) + { + std::ostringstream errMsg; + errMsg << "ExeMgr: unknown error writing makeJoblist " + "response; "; + throw std::runtime_error(errMsg.str()); } + if (!usingTuples) + { + if (gDebug) + std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl; + } + else + { + if (gDebug) + std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl; + } + } + else + { + usingTuples = false; + jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, false, true); + + if (jl->status() == 0) + { + std::string emsg; + + if (jl->putEngineComm(fEc) != 0) + throw std::runtime_error(jl->errMsg()); + } + else + { + throw std::runtime_error("ExeMgr: could not build a JobList!"); + } + } + + jl->doQuery(); + + execplan::CalpontSystemCatalog::OID tableOID; + bool swallowRows = false; + joblist::DeliveredTableMap tm; + uint64_t totalBytesSent = 0; + uint64_t totalRowCount = 0; + + // Project each table as the FE asks for it + for (;;) + { bs = fIos.read(); if (bs.length() == 0) { if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl; + std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl; - // connection closed by client - fIos.close(); break; } - else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan - { - if (gDebug) - std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length " - << bs.length() << std::endl; - fIos.close(); - break; - } - else if (bs.length() == 4) // possible tuple flag + if (gDebug && bs.length() > 4) + std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length() + << std::endl; + + // TODO: Holy crud! Can this be right? + //@bug 1444 Yes, if there is a self-join + if (bs.length() > 4) + { + selfJoin = true; + statementsRunningCount->decr(stmtCounted); + goto new_plan; + } + + assert(bs.length() == 4); + + messageqcpp::ByteStream::quadbyte qb; + + try // @bug2244: try/catch around fIos.write() calls responding to qb command { - messageqcpp::ByteStream::quadbyte qb; bs >> qb; - if (qb == 4) // UM wants new tuple i/f - { - if (gDebug) - std::cout << "### UM wants tuples" << std::endl; + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb << std::endl; - tryTuples = true; - // now wait for the CSEP... - bs = fIos.read(); - } - else if (qb == 5) // somebody wants stats + if (qb == 0) { - bs.restart(); - qb = statementsRunningCount->cur(); - bs << qb; - qb = statementsRunningCount->waiting(); - bs << qb; - fIos.write(bs); - fIos.close(); + // No more tables, query is done break; } - else if (qb == ANALYZE_TABLE_EXECUTE) + else if (qb == 1) { - analyzeTableExecute(bs, jl, stmtCounted); + // super-secret flag indicating that the UM is going to scarf down all the rows in the + // query. + swallowRows = true; + tm = jl->deliveredTables(); continue; } - else if (qb == ANALYZE_TABLE_REC_STATS) + else if (qb == 2) { - analyzeTableHandleStats(bs); - continue; - } - else - { - if (gDebug) - std::cout << "### Got a not-a-plan value " << qb << std::endl; + // UM just wants any table + assert(swallowRows); + auto iter = tm.begin(); - fIos.close(); - break; - } - } - - new_plan: - try - { - csep.unserialize(bs); - } - catch (logging::IDBExcept& ex) - { - // We can get here on illegal function parameter data type, e.g. - // SELECT blob_column|1 FROM t1; - statementsRunningCount->decr(stmtCounted); - writeCodeAndError(ex.errorCode(), std::string(ex.what())); - continue; - } - - querytele::QueryTeleStats qts; - - if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) - { - qts.query_uuid = csep.uuid(); - qts.msg_type = querytele::QueryTeleStats::QT_START; - qts.start_time = querytele::QueryTeleClient::timeNowms(); - qts.query = csep.data(); - qts.session_id = csep.sessionID(); - qts.query_type = csep.queryType(); - qts.system_name = fOamCachePtr->getSystemName(); - qts.module_name = fOamCachePtr->getModuleName(); - qts.local_query = csep.localQuery(); - qts.schema_name = csep.schemaName(); - fTeleClient.postQueryTele(qts); - } - - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl; - - setRMParms(csep.rmParms()); - // Re-establish lost PP connections. - if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) - { - fEc->Setup(); - } - // @bug 1021. try to get schema cache for a come in query. - // skip system catalog queries. - if (!csep.isInternal()) - { - boost::shared_ptr csc = - execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID()); - buildSysCache(csep, csc); - } - - // As soon as we have a session id for this thread, update the - // thread count per session; only do this once per thread. - // Mask 0x80000000 is for associate user query and csc query - if (incSQLFrontSessionThreadCnt) - { - // WIP - incThreadCntPerSession(csep.sessionID() | 0x80000000); - incSQLFrontSessionThreadCnt = false; - } - - bool needDbProfEndStatementMsg = false; - logging::Message::Args args; - std::string sqlText = csep.data(); - logging::LoggingID li(16, csep.sessionID(), csep.txnID()); - - // Initialize stats for this query, including - // init sessionMemMap entry for this session to 0 memory %. - // We will need this later for traceOn() or if we receive a - // table request with qb=3 (see below). This is also recorded - // as query start time. - initStats(csep.sessionID(), sqlText); - fStats.fQueryType = csep.queryType(); - - // Log start and end statement if tracing is enabled. Keep in - // mind the trace flag won't be set for system catalog queries. - if (csep.traceOn()) - { - args.reset(); - args.add((int)csep.statementID()); - args.add((int)csep.verID().currentScn); - args.add(sqlText); - msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li); - needDbProfEndStatementMsg = true; - } - - // Don't log subsequent self joins after first. - if (selfJoin) - sqlText = ""; - - std::ostringstream oss; - oss << sqlText << "; |" << csep.schemaName() << "|"; - logging::SQLLogger sqlLog(oss.str(), li); - - statementsRunningCount->incr(stmtCounted); - - PrimitiveServerThreadPools primitiveServerThreadPools( - ServicePrimProc::instance()->getPrimitiveServerThreadPool()); - - if (tryTuples) - { - try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList - { - jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, true, true); - // assign query stats - jl->queryStats(fStats); - - if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0)) + if (iter == tm.end()) { - usingTuples = true; - - // Tell the FE that we're sending tuples back, not TableBands - writeCodeAndError(0, "NOERROR"); - auto tjlp = dynamic_cast(jl.get()); - assert(tjlp); - messageqcpp::ByteStream tbs; - tbs << tjlp->getOutputRowGroup(); - fIos.write(tbs); - } - else - { - const std::string emsg = jl->errMsg(); - statementsRunningCount->decr(stmtCounted); - writeCodeAndError(jl->status(), emsg); - std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl; - continue; - } - } - catch (std::exception& ex) - { - std::ostringstream errMsg; - errMsg << "ExeMgr: error writing makeJoblist " - "response; " - << ex.what(); - throw std::runtime_error(errMsg.str()); - } - catch (...) - { - std::ostringstream errMsg; - errMsg << "ExeMgr: unknown error writing makeJoblist " - "response; "; - throw std::runtime_error(errMsg.str()); - } - - if (!usingTuples) - { - if (gDebug) - std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl; - } - else - { - if (gDebug) - std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl; - } - } - else - { - usingTuples = false; - jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, false, true); - - if (jl->status() == 0) - { - std::string emsg; - - if (jl->putEngineComm(fEc) != 0) - throw std::runtime_error(jl->errMsg()); - } - else - { - throw std::runtime_error("ExeMgr: could not build a JobList!"); - } - } - - jl->doQuery(); - - execplan::CalpontSystemCatalog::OID tableOID; - bool swallowRows = false; - joblist::DeliveredTableMap tm; - uint64_t totalBytesSent = 0; - uint64_t totalRowCount = 0; - - // Project each table as the FE asks for it - for (;;) - { - bs = fIos.read(); - - if (bs.length() == 0) - { - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl; - - break; - } - - if (gDebug && bs.length() > 4) - std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length() - << std::endl; - - // TODO: Holy crud! Can this be right? - //@bug 1444 Yes, if there is a self-join - if (bs.length() > 4) - { - selfJoin = true; - statementsRunningCount->decr(stmtCounted); - goto new_plan; - } - - assert(bs.length() == 4); - - messageqcpp::ByteStream::quadbyte qb; - - try // @bug2244: try/catch around fIos.write() calls responding to qb command - { - bs >> qb; - - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb - << std::endl; - - if (qb == 0) - { - // No more tables, query is done - break; - } - else if (qb == 1) - { - // super-secret flag indicating that the UM is going to scarf down all the rows in the - // query. - swallowRows = true; - tm = jl->deliveredTables(); - continue; - } - else if (qb == 2) - { - // UM just wants any table - assert(swallowRows); - auto iter = tm.begin(); - - if (iter == tm.end()) - { - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### For session id " << csep.sessionID() << ", returning end flag" - << std::endl; - - bs.restart(); - bs << (messageqcpp::ByteStream::byte)1; - fIos.write(bs); - continue; - } - - tableOID = iter->first; - } - else if (qb == 3) // special option-UM wants job stats std::string - { - std::string statsString; - - // Log stats std::string to be sent back to front end - statsString = formatQueryStats( - jl, "Query Stats", false, - !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), - (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), totalRowCount); + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### For session id " << csep.sessionID() << ", returning end flag" << std::endl; bs.restart(); - bs << statsString; - - if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0) - { - bs << jl->extendedInfo(); - bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo()); - } - else - { - std::string empty; - bs << empty; - bs << empty; - } - - // send stats to connector for inserting to the querystats table - fStats.serialize(bs); + bs << (messageqcpp::ByteStream::byte)1; fIos.write(bs); continue; } - // for table mode handling - else if (qb == 4) + + tableOID = iter->first; + } + else if (qb == 3) // special option-UM wants job stats std::string + { + std::string statsString; + + // Log stats std::string to be sent back to front end + statsString = formatQueryStats( + jl, "Query Stats", false, + !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), + (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), totalRowCount); + + bs.restart(); + bs << statsString; + + if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0) { - statementsRunningCount->decr(stmtCounted); - bs = fIos.read(); - goto new_plan; + bs << jl->extendedInfo(); + bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo()); } - else // (qb > 3) + else { - // Return table bands for the requested tableOID - tableOID = static_cast(qb); + std::string empty; + bs << empty; + bs << empty; + } + + // send stats to connector for inserting to the querystats table + fStats.serialize(bs); + fIos.write(bs); + continue; + } + // for table mode handling + else if (qb == 4) + { + statementsRunningCount->decr(stmtCounted); + bs = fIos.read(); + goto new_plan; + } + else // (qb > 3) + { + // Return table bands for the requested tableOID + tableOID = static_cast(qb); + } + } + catch (std::exception& ex) + { + std::ostringstream errMsg; + errMsg << "ExeMgr: error writing qb response " + "for qb cmd " + << qb << "; " << ex.what(); + throw std::runtime_error(errMsg.str()); + } + catch (...) + { + std::ostringstream errMsg; + errMsg << "ExeMgr: unknown error writing qb response " + "for qb cmd " + << qb; + throw std::runtime_error(errMsg.str()); + } + + if (swallowRows) + tm.erase(tableOID); + + FEMsgHandler msgHandler(jl, &fIos); + + if (tableOID == 100) + msgHandler.start(); + + //...Loop serializing table bands projected for the tableOID + for (;;) + { + uint32_t rowCount; + + rowCount = jl->projectTable(tableOID, bs); + + msgHandler.stop(); + + if (jl->status()) + { + const auto errInfo = logging::IDBErrorInfo::instance(); + + if (jl->errMsg().length() != 0) + bs << jl->errMsg(); + else + bs << errInfo->errorMsg(jl->status()); + } + + try // @bug2244: try/catch around fIos.write() calls projecting rows + { + if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) + { + // Skip the write to the front end until the last empty band. Used to time queries + // through without any front end waiting. + if (tableOID < 3000 || rowCount == 0) + fIos.write(bs); + } + else + { + fIos.write(bs); } } catch (std::exception& ex) { + msgHandler.stop(); std::ostringstream errMsg; - errMsg << "ExeMgr: error writing qb response " - "for qb cmd " - << qb << "; " << ex.what(); + errMsg << "ExeMgr: error projecting rows " + "for tableOID: " + << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " + << ex.what(); + jl->abort(); + + while (rowCount) + rowCount = jl->projectTable(tableOID, bs); + + if (tableOID == 100 && msgHandler.aborted()) + { + /* TODO: modularize the cleanup code, as well as + * the rest of this fcn */ + + decThreadCntPerSession(csep.sessionID() | 0x80000000); + statementsRunningCount->decr(stmtCounted); + fIos.close(); + return; + } + + // std::cout << "connection drop\n"; throw std::runtime_error(errMsg.str()); } catch (...) { std::ostringstream errMsg; - errMsg << "ExeMgr: unknown error writing qb response " - "for qb cmd " - << qb; + msgHandler.stop(); + errMsg << "ExeMgr: unknown error projecting rows " + "for tableOID: " + << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; + jl->abort(); + + while (rowCount) + rowCount = jl->projectTable(tableOID, bs); + throw std::runtime_error(errMsg.str()); } - if (swallowRows) - tm.erase(tableOID); + totalRowCount += rowCount; + totalBytesSent += bs.length(); - FEMsgHandler msgHandler(jl, &fIos); - - if (tableOID == 100) - msgHandler.start(); - - //...Loop serializing table bands projected for the tableOID - for (;;) + if (rowCount == 0) { - uint32_t rowCount; - - rowCount = jl->projectTable(tableOID, bs); - msgHandler.stop(); + // No more bands, table is done + bs.reset(); - if (jl->status()) - { - const auto errInfo = logging::IDBErrorInfo::instance(); + // @bug 2083 decr active statement count here for table mode. + if (!usingTuples) + statementsRunningCount->decr(stmtCounted); - if (jl->errMsg().length() != 0) - bs << jl->errMsg(); - else - bs << errInfo->errorMsg(jl->status()); - } - - try // @bug2244: try/catch around fIos.write() calls projecting rows - { - if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) - { - // Skip the write to the front end until the last empty band. Used to time queries - // through without any front end waiting. - if (tableOID < 3000 || rowCount == 0) - fIos.write(bs); - } - else - { - fIos.write(bs); - } - } - catch (std::exception& ex) - { - msgHandler.stop(); - std::ostringstream errMsg; - errMsg << "ExeMgr: error projecting rows " - "for tableOID: " - << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " - << ex.what(); - jl->abort(); - - while (rowCount) - rowCount = jl->projectTable(tableOID, bs); - - if (tableOID == 100 && msgHandler.aborted()) - { - /* TODO: modularize the cleanup code, as well as - * the rest of this fcn */ - - decThreadCntPerSession(csep.sessionID() | 0x80000000); - statementsRunningCount->decr(stmtCounted); - fIos.close(); - return; - } - - // std::cout << "connection drop\n"; - throw std::runtime_error(errMsg.str()); - } - catch (...) - { - std::ostringstream errMsg; - msgHandler.stop(); - errMsg << "ExeMgr: unknown error projecting rows " - "for tableOID: " - << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; - jl->abort(); - - while (rowCount) - rowCount = jl->projectTable(tableOID, bs); - - throw std::runtime_error(errMsg.str()); - } - - totalRowCount += rowCount; - totalBytesSent += bs.length(); - - if (rowCount == 0) - { - msgHandler.stop(); - // No more bands, table is done - bs.reset(); - - // @bug 2083 decr active statement count here for table mode. - if (!usingTuples) - statementsRunningCount->decr(stmtCounted); - - break; - } - else - { - bs.restart(); - } - } // End of loop to project and serialize table bands for a table - } // End of loop to process tables - - // @bug 828 - if (csep.traceOn()) - jl->graph(csep.sessionID()); - - if (needDbProfEndStatementMsg) - { - std::string ss; - std::ostringstream prefix; - prefix << "ses:" << csep.sessionID() << " Query Totals"; - - // Log stats std::string to standard out - ss = formatQueryStats(jl, prefix.str(), true, - !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), - (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), - totalRowCount); - //@Bug 1306. Added timing info for real time tracking. - std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl; - - // log query stats to debug log file - args.reset(); - args.add((int)csep.statementID()); - args.add(fStats.fMaxMemPct); - args.add(fStats.fNumFiles); - args.add(fStats.fFileBytes); // log raw byte count instead of MB - args.add(fStats.fPhyIO); - args.add(fStats.fCacheIO); - args.add(fStats.fMsgRcvCnt); - args.add(fStats.fMsgBytesIn); - args.add(fStats.fMsgBytesOut); - args.add(fStats.fCPBlocksSkipped); - msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfQueryStats, args, li); - //@bug 1327 - deleteMaxMemPct(csep.sessionID()); - // Calling reset here, will cause joblist destructor to be - // called, which "joins" the threads. We need to do that - // here to make sure all syslogging from all the threads - // are complete; and that our logDbProfEndStatement will - // appear "last" in the syslog for this SQL statement. - // puts the real destruction in another thread to avoid - // making the whole session wait. It can take several seconds. - int stmtID = csep.statementID(); - std::unique_lock scoped(jlMutex); - destructing++; - std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog] { - std::unique_lock scoped(jlMutex); - const_cast(jl).reset(); // this happens second; does real destruction - logging::Message::Args args; - args.add(stmtID); - msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li); - if (--destructing == 0) - jlCleanupDone.notify_one(); - }); - jl.reset(); // this happens first - bgdtor.detach(); - } - else - // delete sessionMemMap entry for this session's memory % use - deleteMaxMemPct(csep.sessionID()); - - std::string endtime(globServiceExeMgr->timeNow()); - - if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000)) - { - std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at " - << endtime << std::endl; - - // @bug 663 - Implemented caltraceon(16) to replace the - // $FIFO_SINK compiler definition in pColStep. - // This option consumes rows in the project steps. - if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4) - { - std::cout << std::endl; - std::cout << "**** No data returned to DM. Rows consumed " - "in ProjectSteps - caltrace(16) is on (FIFO_SINK)." - " ****" - << std::endl; - std::cout << std::endl; + break; } - else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) + else { - std::cout << std::endl; - std::cout << "**** No data returned to DM - caltrace(8) is " - "on (SWALLOW_ROWS_EXEMGR). ****" - << std::endl; - std::cout << std::endl; + bs.restart(); } - } + } // End of loop to project and serialize table bands for a table + } // End of loop to process tables - statementsRunningCount->decr(stmtCounted); + // @bug 828 + if (csep.traceOn()) + jl->graph(csep.sessionID()); - if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) + if (needDbProfEndStatementMsg) + { + std::string ss; + std::ostringstream prefix; + prefix << "ses:" << csep.sessionID() << " Query Totals"; + + // Log stats std::string to standard out + ss = formatQueryStats(jl, prefix.str(), true, + !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), + (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), + totalRowCount); + //@Bug 1306. Added timing info for real time tracking. + std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl; + + // log query stats to debug log file + args.reset(); + args.add((int)csep.statementID()); + args.add(fStats.fMaxMemPct); + args.add(fStats.fNumFiles); + args.add(fStats.fFileBytes); // log raw byte count instead of MB + args.add(fStats.fPhyIO); + args.add(fStats.fCacheIO); + args.add(fStats.fMsgRcvCnt); + args.add(fStats.fMsgBytesIn); + args.add(fStats.fMsgBytesOut); + args.add(fStats.fCPBlocksSkipped); + msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfQueryStats, args, li); + //@bug 1327 + deleteMaxMemPct(csep.sessionID()); + // Calling reset here, will cause joblist destructor to be + // called, which "joins" the threads. We need to do that + // here to make sure all syslogging from all the threads + // are complete; and that our logDbProfEndStatement will + // appear "last" in the syslog for this SQL statement. + // puts the real destruction in another thread to avoid + // making the whole session wait. It can take several seconds. + int stmtID = csep.statementID(); + std::unique_lock scoped(jlMutex); + destructing++; + std::thread bgdtor( + [jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog] + { + std::unique_lock scoped(jlMutex); + const_cast(jl).reset(); // this happens second; does real destruction + logging::Message::Args args; + args.add(stmtID); + msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li); + if (--destructing == 0) + jlCleanupDone.notify_one(); + }); + jl.reset(); // this happens first + bgdtor.detach(); + } + else + // delete sessionMemMap entry for this session's memory % use + deleteMaxMemPct(csep.sessionID()); + + std::string endtime(globServiceExeMgr->timeNow()); + + if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000)) + { + std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at " + << endtime << std::endl; + + // @bug 663 - Implemented caltraceon(16) to replace the + // $FIFO_SINK compiler definition in pColStep. + // This option consumes rows in the project steps. + if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4) { - qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY; - qts.max_mem_pct = fStats.fMaxMemPct; - qts.num_files = fStats.fNumFiles; - qts.phy_io = fStats.fPhyIO; - qts.cache_io = fStats.fCacheIO; - qts.msg_rcv_cnt = fStats.fMsgRcvCnt; - qts.cp_blocks_skipped = fStats.fCPBlocksSkipped; - qts.msg_bytes_in = fStats.fMsgBytesIn; - qts.msg_bytes_out = fStats.fMsgBytesOut; - qts.rows = totalRowCount; - qts.end_time = querytele::QueryTeleClient::timeNowms(); - qts.session_id = csep.sessionID(); - qts.query_type = csep.queryType(); - qts.query = csep.data(); - qts.system_name = fOamCachePtr->getSystemName(); - qts.module_name = fOamCachePtr->getModuleName(); - qts.local_query = csep.localQuery(); - fTeleClient.postQueryTele(qts); + std::cout << std::endl; + std::cout << "**** No data returned to DM. Rows consumed " + "in ProjectSteps - caltrace(16) is on (FIFO_SINK)." + " ****" + << std::endl; + std::cout << std::endl; + } + else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) + { + std::cout << std::endl; + std::cout << "**** No data returned to DM - caltrace(8) is " + "on (SWALLOW_ROWS_EXEMGR). ****" + << std::endl; + std::cout << std::endl; } } - // Release CSC object (for sessionID) that was added by makeJobList() - // Mask 0x80000000 is for associate user query and csc query. - // (actual joblist destruction happens at the top of this loop) - decThreadCntPerSession(csep.sessionID() | 0x80000000); - } - catch (std::exception& ex) - { - decThreadCntPerSession(csep.sessionID() | 0x80000000); statementsRunningCount->decr(stmtCounted); - std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl; - logging::Message::Args args; - logging::LoggingID li(16, csep.sessionID(), csep.txnID()); - args.add(ex.what()); - msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); - fIos.close(); - } - catch (...) - { - decThreadCntPerSession(csep.sessionID() | 0x80000000); - statementsRunningCount->decr(stmtCounted); - std::cerr << "### Exception caught!" << std::endl; - logging::Message::Args args; - logging::LoggingID li(16, csep.sessionID(), csep.txnID()); - args.add("ExeMgr caught unknown exception"); - msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); - fIos.close(); + + if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) + { + qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY; + qts.max_mem_pct = fStats.fMaxMemPct; + qts.num_files = fStats.fNumFiles; + qts.phy_io = fStats.fPhyIO; + qts.cache_io = fStats.fCacheIO; + qts.msg_rcv_cnt = fStats.fMsgRcvCnt; + qts.cp_blocks_skipped = fStats.fCPBlocksSkipped; + qts.msg_bytes_in = fStats.fMsgBytesIn; + qts.msg_bytes_out = fStats.fMsgBytesOut; + qts.rows = totalRowCount; + qts.end_time = querytele::QueryTeleClient::timeNowms(); + qts.session_id = csep.sessionID(); + qts.query_type = csep.queryType(); + qts.query = csep.data(); + qts.system_name = fOamCachePtr->getSystemName(); + qts.module_name = fOamCachePtr->getModuleName(); + qts.local_query = csep.localQuery(); + fTeleClient.postQueryTele(qts); + } } - // make sure we don't leave scope while joblists are being destroyed - std::unique_lock scoped(jlMutex); - while (destructing > 0) - jlCleanupDone.wait(scoped); + // Release CSC object (for sessionID) that was added by makeJobList() + // Mask 0x80000000 is for associate user query and csc query. + // (actual joblist destruction happens at the top of this loop) + decThreadCntPerSession(csep.sessionID() | 0x80000000); } -}; // namespace exemgr + catch (std::exception& ex) + { + decThreadCntPerSession(csep.sessionID() | 0x80000000); + statementsRunningCount->decr(stmtCounted); + std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl; + logging::Message::Args args; + logging::LoggingID li(16, csep.sessionID(), csep.txnID()); + args.add(ex.what()); + msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); + fIos.close(); + } + catch (...) + { + decThreadCntPerSession(csep.sessionID() | 0x80000000); + statementsRunningCount->decr(stmtCounted); + std::cerr << "### Exception caught!" << std::endl; + logging::Message::Args args; + logging::LoggingID li(16, csep.sessionID(), csep.txnID()); + args.add("ExeMgr caught unknown exception"); + msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); + fIos.close(); + } + + // make sure we don't leave scope while joblists are being destroyed + std::unique_lock scoped(jlMutex); + while (destructing > 0) + jlCleanupDone.wait(scoped); +} +}; // namespace exemgr