diff --git a/VERSION b/VERSION index 1bb40e0e7..4ac98053e 100644 --- a/VERSION +++ b/VERSION @@ -1,4 +1,4 @@ COLUMNSTORE_VERSION_MAJOR=22 COLUMNSTORE_VERSION_MINOR=08 -COLUMNSTORE_VERSION_PATCH=4 +COLUMNSTORE_VERSION_PATCH=7 COLUMNSTORE_VERSION_RELEASE=1 diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 0dd954ad8..207086e4c 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -3471,6 +3471,10 @@ ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupp break; default: + if (ref->ref_type() == Item_ref::DIRECT_REF) + { + return buildReturnedColumn(ref->real_item(), gwi, nonSupport); + } gwi.fatalParseError = true; gwi.parseErrorText = "Unknown REF item"; break; @@ -4053,15 +4057,14 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non ReturnedColumn* rc = NULL; - // Special treatment for json functions // All boolean arguments will be parsed as boolean string true(false) // E.g. the result of `SELECT JSON_ARRAY(true, false)` should be [true, false] instead of [1, 0] - bool mayHasBoolArg = ((funcName == "json_insert" || funcName == "json_replace" || - funcName == "json_set" || funcName == "json_array_append" || - funcName == "json_array_insert") && i != 0 && i % 2 == 0) || - (funcName == "json_array") || - (funcName == "json_object" && i % 2 == 1); + bool mayHasBoolArg = + ((funcName == "json_insert" || funcName == "json_replace" || funcName == "json_set" || + funcName == "json_array_append" || funcName == "json_array_insert") && + i != 0 && i % 2 == 0) || + (funcName == "json_array") || (funcName == "json_object" && i % 2 == 1); bool isBoolType = (ifp->arguments()[i]->const_item() && ifp->arguments()[i]->type_handler()->is_bool_type()); @@ -6408,10 +6411,15 @@ void parse_item(Item* item, vector& field_vec, bool& hasNonSupportI case Item::REF_ITEM: { + Item_ref* ref = (Item_ref*)item; + if (ref->ref_type() == Item_ref::DIRECT_REF) + { + parse_item(ref->real_item(), field_vec, hasNonSupportItem, parseInfo, gwi); + break; + } while (true) { - Item_ref* ref = (Item_ref*)item; - + ref = (Item_ref*)item; if ((*(ref->ref))->type() == Item::SUM_FUNC_ITEM) { parseInfo |= AGG_BIT; diff --git a/primitives/blockcache/stats.cpp b/primitives/blockcache/stats.cpp index 50372758e..ad5c3a6ed 100644 --- a/primitives/blockcache/stats.cpp +++ b/primitives/blockcache/stats.cpp @@ -29,7 +29,7 @@ typedef int pthread_t; #endif #include #include -//#define NDEBUG +// #define NDEBUG #include #include #include @@ -178,7 +178,7 @@ typedef map TraceFileMap_t; TraceFileMap_t traceFileMap; // map mutex -boost::mutex traceFileMapMutex; +std::mutex traceFileMapMutex; class StatMon { @@ -197,7 +197,7 @@ class StatMon void operator()() const { // struct timespec ts = { 60 * 1, 0 }; - boost::mutex::scoped_lock lk(traceFileMapMutex); + std::unique_lock lk(traceFileMapMutex); TraceFileMap_t::iterator iter; TraceFileMap_t::iterator end; @@ -256,7 +256,7 @@ void Stats::touchedLBID(uint64_t lbid, pthread_t thdid, uint32_t session) if (session == 0) return; - boost::mutex::scoped_lock lk(traceFileMapMutex); + std::lock_guard lk(traceFileMapMutex); TraceFileMap_t::iterator iter = traceFileMap.find(session); if (iter == traceFileMap.end()) @@ -274,7 +274,7 @@ void Stats::markEvent(const uint64_t lbid, const pthread_t thdid, const uint32_t if (session == 0) return; - boost::mutex::scoped_lock lk(traceFileMapMutex); + std::lock_guard lk(traceFileMapMutex); TraceFileMap_t::iterator iter = traceFileMap.find(session); if (iter == traceFileMap.end()) diff --git a/primitives/primproc/sqlfrontsessionthread.cpp b/primitives/primproc/sqlfrontsessionthread.cpp index d43cd83e6..48bb9577a 100644 --- a/primitives/primproc/sqlfrontsessionthread.cpp +++ b/primitives/primproc/sqlfrontsessionthread.cpp @@ -21,972 +21,974 @@ namespace exemgr { - uint64_t SQLFrontSessionThread::getMaxMemPct(uint32_t sessionId) - { - return globServiceExeMgr->getMaxMemPct(sessionId); - } - void SQLFrontSessionThread::deleteMaxMemPct(uint32_t sessionId) - { - return globServiceExeMgr->deleteMaxMemPct(sessionId); - } - void SQLFrontSessionThread::incThreadCntPerSession(uint32_t sessionId) - { - return globServiceExeMgr->incThreadCntPerSession(sessionId); - } - void SQLFrontSessionThread::decThreadCntPerSession(uint32_t sessionId) - { - return globServiceExeMgr->decThreadCntPerSession(sessionId); - } +uint64_t SQLFrontSessionThread::getMaxMemPct(uint32_t sessionId) +{ + return globServiceExeMgr->getMaxMemPct(sessionId); +} +void SQLFrontSessionThread::deleteMaxMemPct(uint32_t sessionId) +{ + return globServiceExeMgr->deleteMaxMemPct(sessionId); +} +void SQLFrontSessionThread::incThreadCntPerSession(uint32_t sessionId) +{ + return globServiceExeMgr->incThreadCntPerSession(sessionId); +} +void SQLFrontSessionThread::decThreadCntPerSession(uint32_t sessionId) +{ + return globServiceExeMgr->decThreadCntPerSession(sessionId); +} - void SQLFrontSessionThread::initMaxMemPct(uint32_t sessionId) - { - return globServiceExeMgr->initMaxMemPct(sessionId); - } - //...Get and log query stats to specified output stream - const std::string SQLFrontSessionThread::formatQueryStats( - joblist::SJLP& jl, // joblist associated with query - const std::string& label, // header label to print in front of log output - bool includeNewLine, // include line breaks in query stats std::string - bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned) - { - std::ostringstream os; +void SQLFrontSessionThread::initMaxMemPct(uint32_t sessionId) +{ + return globServiceExeMgr->initMaxMemPct(sessionId); +} +//...Get and log query stats to specified output stream +const std::string SQLFrontSessionThread::formatQueryStats( + joblist::SJLP& jl, // joblist associated with query + const std::string& label, // header label to print in front of log output + bool includeNewLine, // include line breaks in query stats std::string + bool vtableModeOn, bool wantExtendedStats, uint64_t rowsReturned) +{ + std::ostringstream os; - // Get stats if not already acquired for current query - if (!fStatsRetrieved) + // Get stats if not already acquired for current query + if (!fStatsRetrieved) + { + if (wantExtendedStats) { - if (wantExtendedStats) + // wait for the ei data to be written by another thread (brain-dead) + struct timespec req = {0, 250000}; // 250 usec + nanosleep(&req, 0); + } + + // Get % memory usage during current query for sessionId + jl->querySummary(wantExtendedStats); + fStats = jl->queryStats(); + fStats.fMaxMemPct = getMaxMemPct(fStats.fSessionID); + fStats.fRows = rowsReturned; + fStatsRetrieved = true; + } + + std::string queryMode; + queryMode = (vtableModeOn ? "Distributed" : "Standard"); + + // Log stats to specified output stream + os << label << ": MaxMemPct-" << fStats.fMaxMemPct << "; NumTempFiles-" << fStats.fNumFiles + << "; TempFileSpace-" << roundBytes(fStats.fFileBytes) << "; ApproxPhyI/O-" << fStats.fPhyIO + << "; CacheI/O-" << fStats.fCacheIO << "; BlocksTouched-" << fStats.fMsgRcvCnt; + + if (includeNewLine) + os << std::endl << " "; // insert line break + else + os << "; "; // continue without line break + + os << "PartitionBlocksEliminated-" << fStats.fCPBlocksSkipped << "; MsgBytesIn-" + << roundBytes(fStats.fMsgBytesIn) << "; MsgBytesOut-" << roundBytes(fStats.fMsgBytesOut) << "; Mode-" + << queryMode; + + return os.str(); +} + +//... Round off to human readable format (KB, MB, or GB). +const std::string SQLFrontSessionThread::roundBytes(uint64_t value) const +{ + const char* units[] = {"B", "KB", "MB", "GB", "TB"}; + uint64_t i = 0, up = 0; + uint64_t roundedValue = value; + + while (roundedValue > 1024 && i < 4) + { + up = (roundedValue & 512); + roundedValue /= 1024; + i++; + } + + if (up) + roundedValue++; + + std::ostringstream oss; + oss << roundedValue << units[i]; + return oss.str(); +} + +//...Round off to nearest (1024*1024) MB +uint64_t SQLFrontSessionThread::roundMB(uint64_t value) const +{ + uint64_t roundedValue = value >> 20; + + if (value & 524288) + roundedValue++; + + return roundedValue; +} + +void SQLFrontSessionThread::setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms) +{ + for (execplan::CalpontSelectExecutionPlan::RMParmVec::const_iterator it = parms.begin(); it != parms.end(); + ++it) + { + switch (it->id) + { + case execplan::PMSMALLSIDEMEMORY: { - // wait for the ei data to be written by another thread (brain-dead) - struct timespec req = {0, 250000}; // 250 usec - nanosleep(&req, 0); + globServiceExeMgr->getRm().addHJPmMaxSmallSideMap(it->sessionId, it->value); + break; } - // Get % memory usage during current query for sessionId - jl->querySummary(wantExtendedStats); - fStats = jl->queryStats(); - fStats.fMaxMemPct = getMaxMemPct(fStats.fSessionID); - fStats.fRows = rowsReturned; - fStatsRetrieved = true; - } - - std::string queryMode; - queryMode = (vtableModeOn ? "Distributed" : "Standard"); - - // Log stats to specified output stream - os << label << ": MaxMemPct-" << fStats.fMaxMemPct << "; NumTempFiles-" << fStats.fNumFiles - << "; TempFileSpace-" << roundBytes(fStats.fFileBytes) << "; ApproxPhyI/O-" << fStats.fPhyIO - << "; CacheI/O-" << fStats.fCacheIO << "; BlocksTouched-" << fStats.fMsgRcvCnt; - - if (includeNewLine) - os << std::endl << " "; // insert line break - else - os << "; "; // continue without line break - - os << "PartitionBlocksEliminated-" << fStats.fCPBlocksSkipped << "; MsgBytesIn-" - << roundBytes(fStats.fMsgBytesIn) << "; MsgBytesOut-" << roundBytes(fStats.fMsgBytesOut) << "; Mode-" - << queryMode; - - return os.str(); - } - - //... Round off to human readable format (KB, MB, or GB). - const std::string SQLFrontSessionThread::roundBytes(uint64_t value) const - { - const char* units[] = {"B", "KB", "MB", "GB", "TB"}; - uint64_t i = 0, up = 0; - uint64_t roundedValue = value; - - while (roundedValue > 1024 && i < 4) - { - up = (roundedValue & 512); - roundedValue /= 1024; - i++; - } - - if (up) - roundedValue++; - - std::ostringstream oss; - oss << roundedValue << units[i]; - return oss.str(); - } - - //...Round off to nearest (1024*1024) MB - uint64_t SQLFrontSessionThread::roundMB(uint64_t value) const - { - uint64_t roundedValue = value >> 20; - - if (value & 524288) - roundedValue++; - - return roundedValue; - } - - void SQLFrontSessionThread::setRMParms(const execplan::CalpontSelectExecutionPlan::RMParmVec& parms) - { - for (execplan::CalpontSelectExecutionPlan::RMParmVec::const_iterator it = parms.begin(); - it != parms.end(); ++it) - { - switch (it->id) + case execplan::UMSMALLSIDEMEMORY: { - case execplan::PMSMALLSIDEMEMORY: - { - globServiceExeMgr->getRm().addHJPmMaxSmallSideMap(it->sessionId, it->value); - break; - } - - case execplan::UMSMALLSIDEMEMORY: - { - globServiceExeMgr->getRm().addHJUmMaxSmallSideMap(it->sessionId, it->value); - break; - } - - default:; + globServiceExeMgr->getRm().addHJUmMaxSmallSideMap(it->sessionId, it->value); + break; } + + default:; + } + } +} + +void SQLFrontSessionThread::buildSysCache(const execplan::CalpontSelectExecutionPlan& csep, + boost::shared_ptr csc) +{ + const execplan::CalpontSelectExecutionPlan::ColumnMap& colMap = csep.columnMap(); + std::string schemaName; + + for (auto it = colMap.begin(); it != colMap.end(); ++it) + { + const auto sc = dynamic_cast((it->second).get()); + + if (sc) + { + schemaName = sc->schemaName(); + + // only the first time a schema is got will actually query + // system catalog. System catalog keeps a schema name std::map. + // if a schema exists, the call getSchemaInfo returns without + // doing anything. + if (!schemaName.empty()) + csc->getSchemaInfo(schemaName); } } - void SQLFrontSessionThread::buildSysCache(const execplan::CalpontSelectExecutionPlan& csep, - boost::shared_ptr csc) + execplan::CalpontSelectExecutionPlan::SelectList::const_iterator subIt; + + for (subIt = csep.derivedTableList().begin(); subIt != csep.derivedTableList().end(); ++subIt) { - const execplan::CalpontSelectExecutionPlan::ColumnMap& colMap = csep.columnMap(); - std::string schemaName; - - for (auto it = colMap.begin(); it != colMap.end(); ++it) - { - const auto sc = dynamic_cast((it->second).get()); - - if (sc) - { - schemaName = sc->schemaName(); - - // only the first time a schema is got will actually query - // system catalog. System catalog keeps a schema name std::map. - // if a schema exists, the call getSchemaInfo returns without - // doing anything. - if (!schemaName.empty()) - csc->getSchemaInfo(schemaName); - } - } - - execplan::CalpontSelectExecutionPlan::SelectList::const_iterator subIt; - - for (subIt = csep.derivedTableList().begin(); subIt != csep.derivedTableList().end(); ++subIt) - { - buildSysCache(*(dynamic_cast(subIt->get())), csc); - } + buildSysCache(*(dynamic_cast(subIt->get())), csc); } +} - void SQLFrontSessionThread::writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg) +void SQLFrontSessionThread::writeCodeAndError(messageqcpp::ByteStream::quadbyte code, const std::string emsg) +{ + messageqcpp::ByteStream emsgBs; + messageqcpp::ByteStream tbs; + tbs << code; + fIos.write(tbs); + emsgBs << emsg; + fIos.write(emsgBs); +} + +void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, + bool& stmtCounted) +{ + auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); + messageqcpp::ByteStream::quadbyte qb; + execplan::MCSAnalyzeTableExecutionPlan caep; + + bs = fIos.read(); + caep.unserialize(bs); + + statementsRunningCount->incr(stmtCounted); + PrimitiveServerThreadPools primitiveServerThreadPools; + jl = joblist::JobListFactory::makeJobList(&caep, fRm, primitiveServerThreadPools, false, true); + + // Joblist is empty. + if (jl->status() == logging::statisticsJobListEmpty) { - messageqcpp::ByteStream emsgBs; - messageqcpp::ByteStream tbs; - tbs << code; - fIos.write(tbs); - emsgBs << emsg; - fIos.write(emsgBs); - } + if (caep.traceOn()) + std::cout << "JobList is empty " << std::endl; - void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, - bool& stmtCounted) - { - auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); - messageqcpp::ByteStream::quadbyte qb; - execplan::MCSAnalyzeTableExecutionPlan caep; - - bs = fIos.read(); - caep.unserialize(bs); - - statementsRunningCount->incr(stmtCounted); - PrimitiveServerThreadPools primitiveServerThreadPools; - jl = joblist::JobListFactory::makeJobList(&caep, fRm, primitiveServerThreadPools, false, true); - - // Joblist is empty. - if (jl->status() == logging::statisticsJobListEmpty) - { - if (caep.traceOn()) - std::cout << "JobList is empty " << std::endl; - - jl.reset(); - bs.restart(); - qb = ANALYZE_TABLE_SUCCESS; - bs << qb; - fIos.write(bs); - bs.reset(); - statementsRunningCount->decr(stmtCounted); - return; - } - - if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) - { - std::cout << "fEc setup " << std::endl; - fEc->Setup(); - } - - if (jl->status() == 0) - { - std::string emsg; - - if (jl->putEngineComm(fEc) != 0) - throw std::runtime_error(jl->errMsg()); - } - else - { - throw std::runtime_error("ExeMgr: could not build a JobList!"); - } - - // Execute a joblist. - jl->doQuery(); - - FEMsgHandler msgHandler(jl, &fIos); - msgHandler.start(); - - // Seemls like this a legacy parameter, not really needed. - const uint32_t dummyTableOid = 100; - auto* statisticsManager = statistics::StatisticsManager::instance(); - // Process rowGroup by rowGroup. - auto rowCount = jl->projectTable(dummyTableOid, bs); - while (rowCount) - { - auto outRG = (static_cast(jl.get()))->getOutputRowGroup(); - statisticsManager->collectSample(outRG); - rowCount = jl->projectTable(dummyTableOid, bs); - } - msgHandler.stop(); - - // Analyze collected samples. - statisticsManager->analyzeSample(caep.traceOn()); - statisticsManager->incEpoch(); - statisticsManager->saveToFile(); - - // Distribute statistics across all ExeMgr clients if possible. - statistics::StatisticsDistributor::instance()->distributeStatistics(); - - // Send the signal back to front-end. + jl.reset(); bs.restart(); qb = ANALYZE_TABLE_SUCCESS; bs << qb; fIos.write(bs); bs.reset(); statementsRunningCount->decr(stmtCounted); + return; } - void SQLFrontSessionThread::analyzeTableHandleStats(messageqcpp::ByteStream& bs) + if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) { - messageqcpp::ByteStream::quadbyte qb; -#ifdef DEBUG_STATISTICS - std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; -#endif - bs = fIos.read(); -#ifdef DEBUG_STATISTICS - std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; -#endif - uint64_t dataHashRec; - bs >> dataHashRec; + std::cout << "fEc setup " << std::endl; + fEc->Setup(); + } - uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats(); - // The stats are the same. - if (dataHash == dataHashRec) - { + if (jl->status() == 0) + { + std::string emsg; + + if (jl->putEngineComm(fEc) != 0) + throw std::runtime_error(jl->errMsg()); + } + else + { + throw std::runtime_error("ExeMgr: could not build a JobList!"); + } + + // Execute a joblist. + jl->doQuery(); + + FEMsgHandler msgHandler(jl, &fIos); + msgHandler.start(); + + // Seemls like this a legacy parameter, not really needed. + const uint32_t dummyTableOid = 100; + auto* statisticsManager = statistics::StatisticsManager::instance(); + // Process rowGroup by rowGroup. + auto rowCount = jl->projectTable(dummyTableOid, bs); + while (rowCount) + { + auto outRG = (static_cast(jl.get()))->getOutputRowGroup(); + statisticsManager->collectSample(outRG); + rowCount = jl->projectTable(dummyTableOid, bs); + } + msgHandler.stop(); + + // Analyze collected samples. + statisticsManager->analyzeSample(caep.traceOn()); + statisticsManager->incEpoch(); + statisticsManager->saveToFile(); + + // Distribute statistics across all ExeMgr clients if possible. + statistics::StatisticsDistributor::instance()->distributeStatistics(); + + // Send the signal back to front-end. + bs.restart(); + qb = ANALYZE_TABLE_SUCCESS; + bs << qb; + fIos.write(bs); + bs.reset(); + statementsRunningCount->decr(stmtCounted); +} + +void SQLFrontSessionThread::analyzeTableHandleStats(messageqcpp::ByteStream& bs) +{ + messageqcpp::ByteStream::quadbyte qb; #ifdef DEBUG_STATISTICS - std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) " << std::endl; + std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; #endif - qb = ANALYZE_TABLE_SUCCESS; - bs << qb; - fIos.write(bs); - bs.reset(); - return; - } - - bs.restart(); - qb = ANALYZE_TABLE_NEED_STATS; - bs << qb; - fIos.write(bs); - - bs.restart(); - bs = fIos.read(); + bs = fIos.read(); #ifdef DEBUG_STATISTICS - std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; + std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; #endif - statistics::StatisticsManager::instance()->unserialize(bs); - statistics::StatisticsManager::instance()->saveToFile(); + uint64_t dataHashRec; + bs >> dataHashRec; + uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats(); + // The stats are the same. + if (dataHash == dataHashRec) + { #ifdef DEBUG_STATISTICS - std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl; + std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) " << std::endl; #endif qb = ANALYZE_TABLE_SUCCESS; bs << qb; fIos.write(bs); bs.reset(); + return; } - void SQLFrontSessionThread::operator()() + bs.restart(); + qb = ANALYZE_TABLE_NEED_STATS; + bs << qb; + fIos.write(bs); + + bs.restart(); + bs = fIos.read(); +#ifdef DEBUG_STATISTICS + std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; +#endif + statistics::StatisticsManager::instance()->unserialize(bs); + statistics::StatisticsManager::instance()->saveToFile(); + +#ifdef DEBUG_STATISTICS + std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl; +#endif + qb = ANALYZE_TABLE_SUCCESS; + bs << qb; + fIos.write(bs); + bs.reset(); +} + +void SQLFrontSessionThread::operator()() +{ + messageqcpp::ByteStream bs, inbs; + execplan::CalpontSelectExecutionPlan csep; + csep.sessionID(0); + joblist::SJLP jl; + bool incSQLFrontSessionThreadCnt = true; + std::mutex jlMutex; + std::condition_variable jlCleanupDone; + int destructing = 0; + int gDebug = globServiceExeMgr->getDebugLevel(); + logging::Logger& msgLog = globServiceExeMgr->getLogger(); + + bool selfJoin = false; + bool tryTuples = false; + bool usingTuples = false; + bool stmtCounted = false; + auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); + + try { - messageqcpp::ByteStream bs, inbs; - execplan::CalpontSelectExecutionPlan csep; - csep.sessionID(0); - joblist::SJLP jl; - bool incSQLFrontSessionThreadCnt = true; - std::mutex jlMutex; - std::condition_variable jlCleanupDone; - int destructing = 0; - int gDebug = globServiceExeMgr->getDebugLevel(); - logging::Logger& msgLog = globServiceExeMgr->getLogger(); - - bool selfJoin = false; - bool tryTuples = false; - bool usingTuples = false; - bool stmtCounted = false; - auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); - - try + for (;;) { - for (;;) - { - selfJoin = false; - tryTuples = false; - usingTuples = false; + selfJoin = false; + tryTuples = false; + usingTuples = false; - if (jl) + if (jl) + { + // puts the real destruction in another thread to avoid + // making the whole session wait. It can take several seconds. + std::unique_lock scoped(jlMutex); + destructing++; + std::thread bgdtor( + [jl, &jlMutex, &jlCleanupDone, &destructing] + { + std::unique_lock scoped(jlMutex); + const_cast(jl).reset(); // this happens second; does real destruction + if (--destructing == 0) + jlCleanupDone.notify_one(); + }); + jl.reset(); // this runs first + bgdtor.detach(); + } + + bs = fIos.read(); + + if (bs.length() == 0) + { + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl; + + // connection closed by client + fIos.close(); + break; + } + else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan + { + if (gDebug) + std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length " + << bs.length() << std::endl; + + fIos.close(); + break; + } + else if (bs.length() == 4) // possible tuple flag + { + messageqcpp::ByteStream::quadbyte qb; + bs >> qb; + + if (qb == 4) // UM wants new tuple i/f { - // puts the real destruction in another thread to avoid - // making the whole session wait. It can take several seconds. - std::unique_lock scoped(jlMutex); - destructing++; - std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, &destructing] { - std::unique_lock scoped(jlMutex); - const_cast(jl).reset(); // this happens second; does real destruction - if (--destructing == 0) - jlCleanupDone.notify_one(); - }); - jl.reset(); // this runs first - bgdtor.detach(); + if (gDebug) + std::cout << "### UM wants tuples" << std::endl; + + tryTuples = true; + // now wait for the CSEP... + bs = fIos.read(); + } + else if (qb == 5) // somebody wants stats + { + bs.restart(); + qb = statementsRunningCount->cur(); + bs << qb; + qb = statementsRunningCount->waiting(); + bs << qb; + fIos.write(bs); + fIos.close(); + break; + } + else if (qb == ANALYZE_TABLE_EXECUTE) + { + analyzeTableExecute(bs, jl, stmtCounted); + continue; + } + else if (qb == ANALYZE_TABLE_REC_STATS) + { + analyzeTableHandleStats(bs); + continue; + } + else + { + if (gDebug) + std::cout << "### Got a not-a-plan value " << qb << std::endl; + + fIos.close(); + break; + } + } + + new_plan: + try + { + csep.unserialize(bs); + } + catch (logging::IDBExcept& ex) + { + // We can get here on illegal function parameter data type, e.g. + // SELECT blob_column|1 FROM t1; + statementsRunningCount->decr(stmtCounted); + writeCodeAndError(ex.errorCode(), std::string(ex.what())); + continue; + } + + querytele::QueryTeleStats qts; + + if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) + { + qts.query_uuid = csep.uuid(); + qts.msg_type = querytele::QueryTeleStats::QT_START; + qts.start_time = querytele::QueryTeleClient::timeNowms(); + qts.query = csep.data(); + qts.session_id = csep.sessionID(); + qts.query_type = csep.queryType(); + qts.system_name = fOamCachePtr->getSystemName(); + qts.module_name = fOamCachePtr->getModuleName(); + qts.local_query = csep.localQuery(); + qts.schema_name = csep.schemaName(); + fTeleClient.postQueryTele(qts); + } + + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl; + + setRMParms(csep.rmParms()); + // Re-establish lost PP connections. + if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) + { + fEc->Setup(); + } + // @bug 1021. try to get schema cache for a come in query. + // skip system catalog queries. + if (!csep.isInternal()) + { + boost::shared_ptr csc = + execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID()); + buildSysCache(csep, csc); + } + + // As soon as we have a session id for this thread, update the + // thread count per session; only do this once per thread. + // Mask 0x80000000 is for associate user query and csc query + if (incSQLFrontSessionThreadCnt) + { + // WIP + incThreadCntPerSession(csep.sessionID() | 0x80000000); + incSQLFrontSessionThreadCnt = false; + } + + bool needDbProfEndStatementMsg = false; + logging::Message::Args args; + std::string sqlText = csep.data(); + logging::LoggingID li(16, csep.sessionID(), csep.txnID()); + + // Initialize stats for this query, including + // init sessionMemMap entry for this session to 0 memory %. + // We will need this later for traceOn() or if we receive a + // table request with qb=3 (see below). This is also recorded + // as query start time. + initStats(csep.sessionID(), sqlText); + fStats.fQueryType = csep.queryType(); + + // Log start and end statement if tracing is enabled. Keep in + // mind the trace flag won't be set for system catalog queries. + if (csep.traceOn()) + { + args.reset(); + args.add((int)csep.statementID()); + args.add((int)csep.verID().currentScn); + args.add(sqlText); + msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li); + needDbProfEndStatementMsg = true; + } + + // Don't log subsequent self joins after first. + if (selfJoin) + sqlText = ""; + + std::ostringstream oss; + oss << sqlText << "; |" << csep.schemaName() << "|"; + logging::SQLLogger sqlLog(oss.str(), li); + + statementsRunningCount->incr(stmtCounted); + + PrimitiveServerThreadPools primitiveServerThreadPools( + ServicePrimProc::instance()->getPrimitiveServerThreadPool()); + + if (tryTuples) + { + try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList + { + jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, true, true); + // assign query stats + jl->queryStats(fStats); + + if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0)) + { + usingTuples = true; + + // Tell the FE that we're sending tuples back, not TableBands + writeCodeAndError(0, "NOERROR"); + auto tjlp = dynamic_cast(jl.get()); + assert(tjlp); + messageqcpp::ByteStream tbs; + tbs << tjlp->getOutputRowGroup(); + fIos.write(tbs); + } + else + { + const std::string emsg = jl->errMsg(); + statementsRunningCount->decr(stmtCounted); + writeCodeAndError(jl->status(), emsg); + std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl; + continue; + } + } + catch (std::exception& ex) + { + std::ostringstream errMsg; + errMsg << "ExeMgr: error writing makeJoblist " + "response; " + << ex.what(); + throw std::runtime_error(errMsg.str()); + } + catch (...) + { + std::ostringstream errMsg; + errMsg << "ExeMgr: unknown error writing makeJoblist " + "response; "; + throw std::runtime_error(errMsg.str()); } + if (!usingTuples) + { + if (gDebug) + std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl; + } + else + { + if (gDebug) + std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl; + } + } + else + { + usingTuples = false; + jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, false, true); + + if (jl->status() == 0) + { + std::string emsg; + + if (jl->putEngineComm(fEc) != 0) + throw std::runtime_error(jl->errMsg()); + } + else + { + throw std::runtime_error("ExeMgr: could not build a JobList!"); + } + } + + jl->doQuery(); + + execplan::CalpontSystemCatalog::OID tableOID; + bool swallowRows = false; + joblist::DeliveredTableMap tm; + uint64_t totalBytesSent = 0; + uint64_t totalRowCount = 0; + + // Project each table as the FE asks for it + for (;;) + { bs = fIos.read(); if (bs.length() == 0) { if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl; + std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl; - // connection closed by client - fIos.close(); break; } - else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan - { - if (gDebug) - std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length " - << bs.length() << std::endl; - fIos.close(); - break; - } - else if (bs.length() == 4) // possible tuple flag + if (gDebug && bs.length() > 4) + std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length() + << std::endl; + + // TODO: Holy crud! Can this be right? + //@bug 1444 Yes, if there is a self-join + if (bs.length() > 4) + { + selfJoin = true; + statementsRunningCount->decr(stmtCounted); + goto new_plan; + } + + assert(bs.length() == 4); + + messageqcpp::ByteStream::quadbyte qb; + + try // @bug2244: try/catch around fIos.write() calls responding to qb command { - messageqcpp::ByteStream::quadbyte qb; bs >> qb; - if (qb == 4) // UM wants new tuple i/f - { - if (gDebug) - std::cout << "### UM wants tuples" << std::endl; + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb << std::endl; - tryTuples = true; - // now wait for the CSEP... - bs = fIos.read(); - } - else if (qb == 5) // somebody wants stats + if (qb == 0) { - bs.restart(); - qb = statementsRunningCount->cur(); - bs << qb; - qb = statementsRunningCount->waiting(); - bs << qb; - fIos.write(bs); - fIos.close(); + // No more tables, query is done break; } - else if (qb == ANALYZE_TABLE_EXECUTE) + else if (qb == 1) { - analyzeTableExecute(bs, jl, stmtCounted); + // super-secret flag indicating that the UM is going to scarf down all the rows in the + // query. + swallowRows = true; + tm = jl->deliveredTables(); continue; } - else if (qb == ANALYZE_TABLE_REC_STATS) + else if (qb == 2) { - analyzeTableHandleStats(bs); - continue; - } - else - { - if (gDebug) - std::cout << "### Got a not-a-plan value " << qb << std::endl; + // UM just wants any table + assert(swallowRows); + auto iter = tm.begin(); - fIos.close(); - break; - } - } - - new_plan: - try - { - csep.unserialize(bs); - } - catch (logging::IDBExcept& ex) - { - // We can get here on illegal function parameter data type, e.g. - // SELECT blob_column|1 FROM t1; - statementsRunningCount->decr(stmtCounted); - writeCodeAndError(ex.errorCode(), std::string(ex.what())); - continue; - } - - querytele::QueryTeleStats qts; - - if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) - { - qts.query_uuid = csep.uuid(); - qts.msg_type = querytele::QueryTeleStats::QT_START; - qts.start_time = querytele::QueryTeleClient::timeNowms(); - qts.query = csep.data(); - qts.session_id = csep.sessionID(); - qts.query_type = csep.queryType(); - qts.system_name = fOamCachePtr->getSystemName(); - qts.module_name = fOamCachePtr->getModuleName(); - qts.local_query = csep.localQuery(); - qts.schema_name = csep.schemaName(); - fTeleClient.postQueryTele(qts); - } - - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl; - - setRMParms(csep.rmParms()); - // Re-establish lost PP connections. - if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) - { - fEc->Setup(); - } - // @bug 1021. try to get schema cache for a come in query. - // skip system catalog queries. - if (!csep.isInternal()) - { - boost::shared_ptr csc = - execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID()); - buildSysCache(csep, csc); - } - - // As soon as we have a session id for this thread, update the - // thread count per session; only do this once per thread. - // Mask 0x80000000 is for associate user query and csc query - if (incSQLFrontSessionThreadCnt) - { - // WIP - incThreadCntPerSession(csep.sessionID() | 0x80000000); - incSQLFrontSessionThreadCnt = false; - } - - bool needDbProfEndStatementMsg = false; - logging::Message::Args args; - std::string sqlText = csep.data(); - logging::LoggingID li(16, csep.sessionID(), csep.txnID()); - - // Initialize stats for this query, including - // init sessionMemMap entry for this session to 0 memory %. - // We will need this later for traceOn() or if we receive a - // table request with qb=3 (see below). This is also recorded - // as query start time. - initStats(csep.sessionID(), sqlText); - fStats.fQueryType = csep.queryType(); - - // Log start and end statement if tracing is enabled. Keep in - // mind the trace flag won't be set for system catalog queries. - if (csep.traceOn()) - { - args.reset(); - args.add((int)csep.statementID()); - args.add((int)csep.verID().currentScn); - args.add(sqlText); - msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li); - needDbProfEndStatementMsg = true; - } - - // Don't log subsequent self joins after first. - if (selfJoin) - sqlText = ""; - - std::ostringstream oss; - oss << sqlText << "; |" << csep.schemaName() << "|"; - logging::SQLLogger sqlLog(oss.str(), li); - - statementsRunningCount->incr(stmtCounted); - - PrimitiveServerThreadPools primitiveServerThreadPools( - ServicePrimProc::instance()->getPrimitiveServerThreadPool()); - - if (tryTuples) - { - try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList - { - jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, true, true); - // assign query stats - jl->queryStats(fStats); - - if ((jl->status()) == 0 && (jl->putEngineComm(fEc) == 0)) + if (iter == tm.end()) { - usingTuples = true; - - // Tell the FE that we're sending tuples back, not TableBands - writeCodeAndError(0, "NOERROR"); - auto tjlp = dynamic_cast(jl.get()); - assert(tjlp); - messageqcpp::ByteStream tbs; - tbs << tjlp->getOutputRowGroup(); - fIos.write(tbs); - } - else - { - const std::string emsg = jl->errMsg(); - statementsRunningCount->decr(stmtCounted); - writeCodeAndError(jl->status(), emsg); - std::cerr << "ExeMgr: could not build a tuple joblist: " << emsg << std::endl; - continue; - } - } - catch (std::exception& ex) - { - std::ostringstream errMsg; - errMsg << "ExeMgr: error writing makeJoblist " - "response; " - << ex.what(); - throw std::runtime_error(errMsg.str()); - } - catch (...) - { - std::ostringstream errMsg; - errMsg << "ExeMgr: unknown error writing makeJoblist " - "response; "; - throw std::runtime_error(errMsg.str()); - } - - if (!usingTuples) - { - if (gDebug) - std::cout << "### UM wanted tuples but it didn't work out :-(" << std::endl; - } - else - { - if (gDebug) - std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl; - } - } - else - { - usingTuples = false; - jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, false, true); - - if (jl->status() == 0) - { - std::string emsg; - - if (jl->putEngineComm(fEc) != 0) - throw std::runtime_error(jl->errMsg()); - } - else - { - throw std::runtime_error("ExeMgr: could not build a JobList!"); - } - } - - jl->doQuery(); - - execplan::CalpontSystemCatalog::OID tableOID; - bool swallowRows = false; - joblist::DeliveredTableMap tm; - uint64_t totalBytesSent = 0; - uint64_t totalRowCount = 0; - - // Project each table as the FE asks for it - for (;;) - { - bs = fIos.read(); - - if (bs.length() == 0) - { - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl; - - break; - } - - if (gDebug && bs.length() > 4) - std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length() - << std::endl; - - // TODO: Holy crud! Can this be right? - //@bug 1444 Yes, if there is a self-join - if (bs.length() > 4) - { - selfJoin = true; - statementsRunningCount->decr(stmtCounted); - goto new_plan; - } - - assert(bs.length() == 4); - - messageqcpp::ByteStream::quadbyte qb; - - try // @bug2244: try/catch around fIos.write() calls responding to qb command - { - bs >> qb; - - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb - << std::endl; - - if (qb == 0) - { - // No more tables, query is done - break; - } - else if (qb == 1) - { - // super-secret flag indicating that the UM is going to scarf down all the rows in the - // query. - swallowRows = true; - tm = jl->deliveredTables(); - continue; - } - else if (qb == 2) - { - // UM just wants any table - assert(swallowRows); - auto iter = tm.begin(); - - if (iter == tm.end()) - { - if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### For session id " << csep.sessionID() << ", returning end flag" - << std::endl; - - bs.restart(); - bs << (messageqcpp::ByteStream::byte)1; - fIos.write(bs); - continue; - } - - tableOID = iter->first; - } - else if (qb == 3) // special option-UM wants job stats std::string - { - std::string statsString; - - // Log stats std::string to be sent back to front end - statsString = formatQueryStats( - jl, "Query Stats", false, - !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), - (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), totalRowCount); + if (gDebug > 1 || (gDebug && !csep.isInternal())) + std::cout << "### For session id " << csep.sessionID() << ", returning end flag" << std::endl; bs.restart(); - bs << statsString; - - if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0) - { - bs << jl->extendedInfo(); - bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo()); - } - else - { - std::string empty; - bs << empty; - bs << empty; - } - - // send stats to connector for inserting to the querystats table - fStats.serialize(bs); + bs << (messageqcpp::ByteStream::byte)1; fIos.write(bs); continue; } - // for table mode handling - else if (qb == 4) + + tableOID = iter->first; + } + else if (qb == 3) // special option-UM wants job stats std::string + { + std::string statsString; + + // Log stats std::string to be sent back to front end + statsString = formatQueryStats( + jl, "Query Stats", false, + !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), + (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), totalRowCount); + + bs.restart(); + bs << statsString; + + if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0) { - statementsRunningCount->decr(stmtCounted); - bs = fIos.read(); - goto new_plan; + bs << jl->extendedInfo(); + bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo()); } - else // (qb > 3) + else { - // Return table bands for the requested tableOID - tableOID = static_cast(qb); + std::string empty; + bs << empty; + bs << empty; + } + + // send stats to connector for inserting to the querystats table + fStats.serialize(bs); + fIos.write(bs); + continue; + } + // for table mode handling + else if (qb == 4) + { + statementsRunningCount->decr(stmtCounted); + bs = fIos.read(); + goto new_plan; + } + else // (qb > 3) + { + // Return table bands for the requested tableOID + tableOID = static_cast(qb); + } + } + catch (std::exception& ex) + { + std::ostringstream errMsg; + errMsg << "ExeMgr: error writing qb response " + "for qb cmd " + << qb << "; " << ex.what(); + throw std::runtime_error(errMsg.str()); + } + catch (...) + { + std::ostringstream errMsg; + errMsg << "ExeMgr: unknown error writing qb response " + "for qb cmd " + << qb; + throw std::runtime_error(errMsg.str()); + } + + if (swallowRows) + tm.erase(tableOID); + + FEMsgHandler msgHandler(jl, &fIos); + + if (tableOID == 100) + msgHandler.start(); + + //...Loop serializing table bands projected for the tableOID + for (;;) + { + uint32_t rowCount; + + rowCount = jl->projectTable(tableOID, bs); + + msgHandler.stop(); + + if (jl->status()) + { + const auto errInfo = logging::IDBErrorInfo::instance(); + + if (jl->errMsg().length() != 0) + bs << jl->errMsg(); + else + bs << errInfo->errorMsg(jl->status()); + } + + try // @bug2244: try/catch around fIos.write() calls projecting rows + { + if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) + { + // Skip the write to the front end until the last empty band. Used to time queries + // through without any front end waiting. + if (tableOID < 3000 || rowCount == 0) + fIos.write(bs); + } + else + { + fIos.write(bs); } } catch (std::exception& ex) { + msgHandler.stop(); std::ostringstream errMsg; - errMsg << "ExeMgr: error writing qb response " - "for qb cmd " - << qb << "; " << ex.what(); + errMsg << "ExeMgr: error projecting rows " + "for tableOID: " + << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " + << ex.what(); + jl->abort(); + + while (rowCount) + rowCount = jl->projectTable(tableOID, bs); + + if (tableOID == 100 && msgHandler.aborted()) + { + /* TODO: modularize the cleanup code, as well as + * the rest of this fcn */ + + decThreadCntPerSession(csep.sessionID() | 0x80000000); + statementsRunningCount->decr(stmtCounted); + fIos.close(); + return; + } + + // std::cout << "connection drop\n"; throw std::runtime_error(errMsg.str()); } catch (...) { std::ostringstream errMsg; - errMsg << "ExeMgr: unknown error writing qb response " - "for qb cmd " - << qb; + msgHandler.stop(); + errMsg << "ExeMgr: unknown error projecting rows " + "for tableOID: " + << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; + jl->abort(); + + while (rowCount) + rowCount = jl->projectTable(tableOID, bs); + throw std::runtime_error(errMsg.str()); } - if (swallowRows) - tm.erase(tableOID); + totalRowCount += rowCount; + totalBytesSent += bs.length(); - FEMsgHandler msgHandler(jl, &fIos); - - if (tableOID == 100) - msgHandler.start(); - - //...Loop serializing table bands projected for the tableOID - for (;;) + if (rowCount == 0) { - uint32_t rowCount; - - rowCount = jl->projectTable(tableOID, bs); - msgHandler.stop(); + // No more bands, table is done + bs.reset(); - if (jl->status()) - { - const auto errInfo = logging::IDBErrorInfo::instance(); + // @bug 2083 decr active statement count here for table mode. + if (!usingTuples) + statementsRunningCount->decr(stmtCounted); - if (jl->errMsg().length() != 0) - bs << jl->errMsg(); - else - bs << errInfo->errorMsg(jl->status()); - } - - try // @bug2244: try/catch around fIos.write() calls projecting rows - { - if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) - { - // Skip the write to the front end until the last empty band. Used to time queries - // through without any front end waiting. - if (tableOID < 3000 || rowCount == 0) - fIos.write(bs); - } - else - { - fIos.write(bs); - } - } - catch (std::exception& ex) - { - msgHandler.stop(); - std::ostringstream errMsg; - errMsg << "ExeMgr: error projecting rows " - "for tableOID: " - << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " - << ex.what(); - jl->abort(); - - while (rowCount) - rowCount = jl->projectTable(tableOID, bs); - - if (tableOID == 100 && msgHandler.aborted()) - { - /* TODO: modularize the cleanup code, as well as - * the rest of this fcn */ - - decThreadCntPerSession(csep.sessionID() | 0x80000000); - statementsRunningCount->decr(stmtCounted); - fIos.close(); - return; - } - - // std::cout << "connection drop\n"; - throw std::runtime_error(errMsg.str()); - } - catch (...) - { - std::ostringstream errMsg; - msgHandler.stop(); - errMsg << "ExeMgr: unknown error projecting rows " - "for tableOID: " - << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; - jl->abort(); - - while (rowCount) - rowCount = jl->projectTable(tableOID, bs); - - throw std::runtime_error(errMsg.str()); - } - - totalRowCount += rowCount; - totalBytesSent += bs.length(); - - if (rowCount == 0) - { - msgHandler.stop(); - // No more bands, table is done - bs.reset(); - - // @bug 2083 decr active statement count here for table mode. - if (!usingTuples) - statementsRunningCount->decr(stmtCounted); - - break; - } - else - { - bs.restart(); - } - } // End of loop to project and serialize table bands for a table - } // End of loop to process tables - - // @bug 828 - if (csep.traceOn()) - jl->graph(csep.sessionID()); - - if (needDbProfEndStatementMsg) - { - std::string ss; - std::ostringstream prefix; - prefix << "ses:" << csep.sessionID() << " Query Totals"; - - // Log stats std::string to standard out - ss = formatQueryStats(jl, prefix.str(), true, - !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), - (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), - totalRowCount); - //@Bug 1306. Added timing info for real time tracking. - std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl; - - // log query stats to debug log file - args.reset(); - args.add((int)csep.statementID()); - args.add(fStats.fMaxMemPct); - args.add(fStats.fNumFiles); - args.add(fStats.fFileBytes); // log raw byte count instead of MB - args.add(fStats.fPhyIO); - args.add(fStats.fCacheIO); - args.add(fStats.fMsgRcvCnt); - args.add(fStats.fMsgBytesIn); - args.add(fStats.fMsgBytesOut); - args.add(fStats.fCPBlocksSkipped); - msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfQueryStats, args, li); - //@bug 1327 - deleteMaxMemPct(csep.sessionID()); - // Calling reset here, will cause joblist destructor to be - // called, which "joins" the threads. We need to do that - // here to make sure all syslogging from all the threads - // are complete; and that our logDbProfEndStatement will - // appear "last" in the syslog for this SQL statement. - // puts the real destruction in another thread to avoid - // making the whole session wait. It can take several seconds. - int stmtID = csep.statementID(); - std::unique_lock scoped(jlMutex); - destructing++; - std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog] { - std::unique_lock scoped(jlMutex); - const_cast(jl).reset(); // this happens second; does real destruction - logging::Message::Args args; - args.add(stmtID); - msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li); - if (--destructing == 0) - jlCleanupDone.notify_one(); - }); - jl.reset(); // this happens first - bgdtor.detach(); - } - else - // delete sessionMemMap entry for this session's memory % use - deleteMaxMemPct(csep.sessionID()); - - std::string endtime(globServiceExeMgr->timeNow()); - - if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000)) - { - std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at " - << endtime << std::endl; - - // @bug 663 - Implemented caltraceon(16) to replace the - // $FIFO_SINK compiler definition in pColStep. - // This option consumes rows in the project steps. - if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4) - { - std::cout << std::endl; - std::cout << "**** No data returned to DM. Rows consumed " - "in ProjectSteps - caltrace(16) is on (FIFO_SINK)." - " ****" - << std::endl; - std::cout << std::endl; + break; } - else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) + else { - std::cout << std::endl; - std::cout << "**** No data returned to DM - caltrace(8) is " - "on (SWALLOW_ROWS_EXEMGR). ****" - << std::endl; - std::cout << std::endl; + bs.restart(); } - } + } // End of loop to project and serialize table bands for a table + } // End of loop to process tables - statementsRunningCount->decr(stmtCounted); + // @bug 828 + if (csep.traceOn()) + jl->graph(csep.sessionID()); - if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) + if (needDbProfEndStatementMsg) + { + std::string ss; + std::ostringstream prefix; + prefix << "ses:" << csep.sessionID() << " Query Totals"; + + // Log stats std::string to standard out + ss = formatQueryStats(jl, prefix.str(), true, + !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), + (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), + totalRowCount); + //@Bug 1306. Added timing info for real time tracking. + std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl; + + // log query stats to debug log file + args.reset(); + args.add((int)csep.statementID()); + args.add(fStats.fMaxMemPct); + args.add(fStats.fNumFiles); + args.add(fStats.fFileBytes); // log raw byte count instead of MB + args.add(fStats.fPhyIO); + args.add(fStats.fCacheIO); + args.add(fStats.fMsgRcvCnt); + args.add(fStats.fMsgBytesIn); + args.add(fStats.fMsgBytesOut); + args.add(fStats.fCPBlocksSkipped); + msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfQueryStats, args, li); + //@bug 1327 + deleteMaxMemPct(csep.sessionID()); + // Calling reset here, will cause joblist destructor to be + // called, which "joins" the threads. We need to do that + // here to make sure all syslogging from all the threads + // are complete; and that our logDbProfEndStatement will + // appear "last" in the syslog for this SQL statement. + // puts the real destruction in another thread to avoid + // making the whole session wait. It can take several seconds. + int stmtID = csep.statementID(); + std::unique_lock scoped(jlMutex); + destructing++; + std::thread bgdtor( + [jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog] + { + std::unique_lock scoped(jlMutex); + const_cast(jl).reset(); // this happens second; does real destruction + logging::Message::Args args; + args.add(stmtID); + msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li); + if (--destructing == 0) + jlCleanupDone.notify_one(); + }); + jl.reset(); // this happens first + bgdtor.detach(); + } + else + // delete sessionMemMap entry for this session's memory % use + deleteMaxMemPct(csep.sessionID()); + + std::string endtime(globServiceExeMgr->timeNow()); + + if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000)) + { + std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at " + << endtime << std::endl; + + // @bug 663 - Implemented caltraceon(16) to replace the + // $FIFO_SINK compiler definition in pColStep. + // This option consumes rows in the project steps. + if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4) { - qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY; - qts.max_mem_pct = fStats.fMaxMemPct; - qts.num_files = fStats.fNumFiles; - qts.phy_io = fStats.fPhyIO; - qts.cache_io = fStats.fCacheIO; - qts.msg_rcv_cnt = fStats.fMsgRcvCnt; - qts.cp_blocks_skipped = fStats.fCPBlocksSkipped; - qts.msg_bytes_in = fStats.fMsgBytesIn; - qts.msg_bytes_out = fStats.fMsgBytesOut; - qts.rows = totalRowCount; - qts.end_time = querytele::QueryTeleClient::timeNowms(); - qts.session_id = csep.sessionID(); - qts.query_type = csep.queryType(); - qts.query = csep.data(); - qts.system_name = fOamCachePtr->getSystemName(); - qts.module_name = fOamCachePtr->getModuleName(); - qts.local_query = csep.localQuery(); - fTeleClient.postQueryTele(qts); + std::cout << std::endl; + std::cout << "**** No data returned to DM. Rows consumed " + "in ProjectSteps - caltrace(16) is on (FIFO_SINK)." + " ****" + << std::endl; + std::cout << std::endl; + } + else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) + { + std::cout << std::endl; + std::cout << "**** No data returned to DM - caltrace(8) is " + "on (SWALLOW_ROWS_EXEMGR). ****" + << std::endl; + std::cout << std::endl; } } - // Release CSC object (for sessionID) that was added by makeJobList() - // Mask 0x80000000 is for associate user query and csc query. - // (actual joblist destruction happens at the top of this loop) - decThreadCntPerSession(csep.sessionID() | 0x80000000); - } - catch (std::exception& ex) - { - decThreadCntPerSession(csep.sessionID() | 0x80000000); statementsRunningCount->decr(stmtCounted); - std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl; - logging::Message::Args args; - logging::LoggingID li(16, csep.sessionID(), csep.txnID()); - args.add(ex.what()); - msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); - fIos.close(); - } - catch (...) - { - decThreadCntPerSession(csep.sessionID() | 0x80000000); - statementsRunningCount->decr(stmtCounted); - std::cerr << "### Exception caught!" << std::endl; - logging::Message::Args args; - logging::LoggingID li(16, csep.sessionID(), csep.txnID()); - args.add("ExeMgr caught unknown exception"); - msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); - fIos.close(); + + if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) + { + qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY; + qts.max_mem_pct = fStats.fMaxMemPct; + qts.num_files = fStats.fNumFiles; + qts.phy_io = fStats.fPhyIO; + qts.cache_io = fStats.fCacheIO; + qts.msg_rcv_cnt = fStats.fMsgRcvCnt; + qts.cp_blocks_skipped = fStats.fCPBlocksSkipped; + qts.msg_bytes_in = fStats.fMsgBytesIn; + qts.msg_bytes_out = fStats.fMsgBytesOut; + qts.rows = totalRowCount; + qts.end_time = querytele::QueryTeleClient::timeNowms(); + qts.session_id = csep.sessionID(); + qts.query_type = csep.queryType(); + qts.query = csep.data(); + qts.system_name = fOamCachePtr->getSystemName(); + qts.module_name = fOamCachePtr->getModuleName(); + qts.local_query = csep.localQuery(); + fTeleClient.postQueryTele(qts); + } } - // make sure we don't leave scope while joblists are being destroyed - std::unique_lock scoped(jlMutex); - while (destructing > 0) - jlCleanupDone.wait(scoped); + // Release CSC object (for sessionID) that was added by makeJobList() + // Mask 0x80000000 is for associate user query and csc query. + // (actual joblist destruction happens at the top of this loop) + decThreadCntPerSession(csep.sessionID() | 0x80000000); } -}; // namespace exemgr + catch (std::exception& ex) + { + decThreadCntPerSession(csep.sessionID() | 0x80000000); + statementsRunningCount->decr(stmtCounted); + std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl; + logging::Message::Args args; + logging::LoggingID li(16, csep.sessionID(), csep.txnID()); + args.add(ex.what()); + msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); + fIos.close(); + } + catch (...) + { + decThreadCntPerSession(csep.sessionID() | 0x80000000); + statementsRunningCount->decr(stmtCounted); + std::cerr << "### Exception caught!" << std::endl; + logging::Message::Args args; + logging::LoggingID li(16, csep.sessionID(), csep.txnID()); + args.add("ExeMgr caught unknown exception"); + msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); + fIos.close(); + } + + // make sure we don't leave scope while joblists are being destroyed + std::unique_lock scoped(jlMutex); + while (destructing > 0) + jlCleanupDone.wait(scoped); +} +}; // namespace exemgr