diff --git a/primitives/primproc/sqlfrontsessionthread.cpp b/primitives/primproc/sqlfrontsessionthread.cpp index 545d10a32..d43cd83e6 100644 --- a/primitives/primproc/sqlfrontsessionthread.cpp +++ b/primitives/primproc/sqlfrontsessionthread.cpp @@ -187,7 +187,8 @@ namespace exemgr fIos.write(emsgBs); } - void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted) + void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, + bool& stmtCounted) { auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); messageqcpp::ByteStream::quadbyte qb; @@ -238,19 +239,23 @@ namespace exemgr jl->doQuery(); FEMsgHandler msgHandler(jl, &fIos); - msgHandler.start(); - auto rowCount = jl->projectTable(100, bs); + + // Seemls like this a legacy parameter, not really needed. + const uint32_t dummyTableOid = 100; + auto* statisticsManager = statistics::StatisticsManager::instance(); + // Process rowGroup by rowGroup. + auto rowCount = jl->projectTable(dummyTableOid, bs); + while (rowCount) + { + auto outRG = (static_cast(jl.get()))->getOutputRowGroup(); + statisticsManager->collectSample(outRG); + rowCount = jl->projectTable(dummyTableOid, bs); + } msgHandler.stop(); - auto outRG = (static_cast(jl.get()))->getOutputRowGroup(); - - if (caep.traceOn()) - std::cout << "Row count " << rowCount << std::endl; - - // Process `RowGroup`, increase an epoch and save statistics to the file. - auto* statisticsManager = statistics::StatisticsManager::instance(); - statisticsManager->analyzeColumnKeyTypes(outRG, caep.traceOn()); + // Analyze collected samples. + statisticsManager->analyzeSample(caep.traceOn()); statisticsManager->incEpoch(); statisticsManager->saveToFile(); @@ -317,132 +322,130 @@ namespace exemgr void SQLFrontSessionThread::operator()() { - messageqcpp::ByteStream bs, inbs; - execplan::CalpontSelectExecutionPlan csep; - csep.sessionID(0); - joblist::SJLP jl; - bool incSQLFrontSessionThreadCnt = true; - std::mutex jlMutex; - std::condition_variable jlCleanupDone; - int destructing = 0; - int gDebug = globServiceExeMgr->getDebugLevel(); - logging::Logger& msgLog = globServiceExeMgr->getLogger(); + messageqcpp::ByteStream bs, inbs; + execplan::CalpontSelectExecutionPlan csep; + csep.sessionID(0); + joblist::SJLP jl; + bool incSQLFrontSessionThreadCnt = true; + std::mutex jlMutex; + std::condition_variable jlCleanupDone; + int destructing = 0; + int gDebug = globServiceExeMgr->getDebugLevel(); + logging::Logger& msgLog = globServiceExeMgr->getLogger(); - bool selfJoin = false; - bool tryTuples = false; - bool usingTuples = false; - bool stmtCounted = false; - auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); + bool selfJoin = false; + bool tryTuples = false; + bool usingTuples = false; + bool stmtCounted = false; + auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); - try - { + try + { for (;;) { - selfJoin = false; - tryTuples = false; - usingTuples = false; + selfJoin = false; + tryTuples = false; + usingTuples = false; - if (jl) - { + if (jl) + { // puts the real destruction in another thread to avoid // making the whole session wait. It can take several seconds. std::unique_lock scoped(jlMutex); destructing++; - std::thread bgdtor( - [jl, &jlMutex, &jlCleanupDone, &destructing] - { - std::unique_lock scoped(jlMutex); - const_cast(jl).reset(); // this happens second; does real destruction - if (--destructing == 0) - jlCleanupDone.notify_one(); - }); + std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, &destructing] { + std::unique_lock scoped(jlMutex); + const_cast(jl).reset(); // this happens second; does real destruction + if (--destructing == 0) + jlCleanupDone.notify_one(); + }); jl.reset(); // this runs first bgdtor.detach(); - } + } - bs = fIos.read(); + bs = fIos.read(); - if (bs.length() == 0) - { + if (bs.length() == 0) + { if (gDebug > 1 || (gDebug && !csep.isInternal())) - std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl; + std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl; // connection closed by client fIos.close(); break; - } - else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan - { + } + else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan + { if (gDebug) - std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length " + std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length " << bs.length() << std::endl; fIos.close(); break; - } - else if (bs.length() == 4) // possible tuple flag - { + } + else if (bs.length() == 4) // possible tuple flag + { messageqcpp::ByteStream::quadbyte qb; bs >> qb; if (qb == 4) // UM wants new tuple i/f { - if (gDebug) + if (gDebug) std::cout << "### UM wants tuples" << std::endl; - tryTuples = true; - // now wait for the CSEP... - bs = fIos.read(); + tryTuples = true; + // now wait for the CSEP... + bs = fIos.read(); } else if (qb == 5) // somebody wants stats { - bs.restart(); - qb = statementsRunningCount->cur(); - bs << qb; - qb = statementsRunningCount->waiting(); - bs << qb; - fIos.write(bs); - fIos.close(); - break; + bs.restart(); + qb = statementsRunningCount->cur(); + bs << qb; + qb = statementsRunningCount->waiting(); + bs << qb; + fIos.write(bs); + fIos.close(); + break; } else if (qb == ANALYZE_TABLE_EXECUTE) { - analyzeTableExecute(bs, jl, stmtCounted); - continue; + analyzeTableExecute(bs, jl, stmtCounted); + continue; } else if (qb == ANALYZE_TABLE_REC_STATS) { - analyzeTableHandleStats(bs); - continue; + analyzeTableHandleStats(bs); + continue; } else { - if (gDebug) + if (gDebug) std::cout << "### Got a not-a-plan value " << qb << std::endl; - fIos.close(); - break; + fIos.close(); + break; } - } + } new_plan: - try - { + try + { csep.unserialize(bs); - } - catch (logging::IDBExcept& ex) - { + } + catch (logging::IDBExcept& ex) + { // We can get here on illegal function parameter data type, e.g. // SELECT blob_column|1 FROM t1; statementsRunningCount->decr(stmtCounted); writeCodeAndError(ex.errorCode(), std::string(ex.what())); continue; - } + } - querytele::QueryTeleStats qts; + querytele::QueryTeleStats qts; - if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) - { + if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) + { qts.query_uuid = csep.uuid(); qts.msg_type = querytele::QueryTeleStats::QT_START; qts.start_time = querytele::QueryTeleClient::timeNowms(); @@ -454,76 +457,76 @@ namespace exemgr qts.local_query = csep.localQuery(); qts.schema_name = csep.schemaName(); fTeleClient.postQueryTele(qts); - } + } - if (gDebug > 1 || (gDebug && !csep.isInternal())) + if (gDebug > 1 || (gDebug && !csep.isInternal())) std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl; - setRMParms(csep.rmParms()); - // Re-establish lost PP connections. - if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) - { + setRMParms(csep.rmParms()); + // Re-establish lost PP connections. + if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) + { fEc->Setup(); - } - // @bug 1021. try to get schema cache for a come in query. - // skip system catalog queries. - if (!csep.isInternal()) - { + } + // @bug 1021. try to get schema cache for a come in query. + // skip system catalog queries. + if (!csep.isInternal()) + { boost::shared_ptr csc = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID()); buildSysCache(csep, csc); - } + } - // As soon as we have a session id for this thread, update the - // thread count per session; only do this once per thread. - // Mask 0x80000000 is for associate user query and csc query - if (incSQLFrontSessionThreadCnt) - { + // As soon as we have a session id for this thread, update the + // thread count per session; only do this once per thread. + // Mask 0x80000000 is for associate user query and csc query + if (incSQLFrontSessionThreadCnt) + { // WIP incThreadCntPerSession(csep.sessionID() | 0x80000000); incSQLFrontSessionThreadCnt = false; - } + } - bool needDbProfEndStatementMsg = false; - logging::Message::Args args; - std::string sqlText = csep.data(); - logging::LoggingID li(16, csep.sessionID(), csep.txnID()); + bool needDbProfEndStatementMsg = false; + logging::Message::Args args; + std::string sqlText = csep.data(); + logging::LoggingID li(16, csep.sessionID(), csep.txnID()); - // Initialize stats for this query, including - // init sessionMemMap entry for this session to 0 memory %. - // We will need this later for traceOn() or if we receive a - // table request with qb=3 (see below). This is also recorded - // as query start time. - initStats(csep.sessionID(), sqlText); - fStats.fQueryType = csep.queryType(); + // Initialize stats for this query, including + // init sessionMemMap entry for this session to 0 memory %. + // We will need this later for traceOn() or if we receive a + // table request with qb=3 (see below). This is also recorded + // as query start time. + initStats(csep.sessionID(), sqlText); + fStats.fQueryType = csep.queryType(); - // Log start and end statement if tracing is enabled. Keep in - // mind the trace flag won't be set for system catalog queries. - if (csep.traceOn()) - { + // Log start and end statement if tracing is enabled. Keep in + // mind the trace flag won't be set for system catalog queries. + if (csep.traceOn()) + { args.reset(); args.add((int)csep.statementID()); args.add((int)csep.verID().currentScn); args.add(sqlText); msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li); needDbProfEndStatementMsg = true; - } + } - // Don't log subsequent self joins after first. - if (selfJoin) + // Don't log subsequent self joins after first. + if (selfJoin) sqlText = ""; - std::ostringstream oss; - oss << sqlText << "; |" << csep.schemaName() << "|"; - logging::SQLLogger sqlLog(oss.str(), li); + std::ostringstream oss; + oss << sqlText << "; |" << csep.schemaName() << "|"; + logging::SQLLogger sqlLog(oss.str(), li); - statementsRunningCount->incr(stmtCounted); + statementsRunningCount->incr(stmtCounted); - PrimitiveServerThreadPools primitiveServerThreadPools( - ServicePrimProc::instance()->getPrimitiveServerThreadPool()); + PrimitiveServerThreadPools primitiveServerThreadPools( + ServicePrimProc::instance()->getPrimitiveServerThreadPool()); - if (tryTuples) - { + if (tryTuples) + { try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList { jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, true, true); @@ -577,57 +580,57 @@ namespace exemgr if (gDebug) std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl; } - } - else - { - usingTuples = false; - jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, false, true); - - if (jl->status() == 0) - { - std::string emsg; - - if (jl->putEngineComm(fEc) != 0) - throw std::runtime_error(jl->errMsg()); } else { - throw std::runtime_error("ExeMgr: could not build a JobList!"); + usingTuples = false; + jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, false, true); + + if (jl->status() == 0) + { + std::string emsg; + + if (jl->putEngineComm(fEc) != 0) + throw std::runtime_error(jl->errMsg()); + } + else + { + throw std::runtime_error("ExeMgr: could not build a JobList!"); + } } - } - jl->doQuery(); + jl->doQuery(); - execplan::CalpontSystemCatalog::OID tableOID; - bool swallowRows = false; - joblist::DeliveredTableMap tm; - uint64_t totalBytesSent = 0; - uint64_t totalRowCount = 0; + execplan::CalpontSystemCatalog::OID tableOID; + bool swallowRows = false; + joblist::DeliveredTableMap tm; + uint64_t totalBytesSent = 0; + uint64_t totalRowCount = 0; - // Project each table as the FE asks for it - for (;;) - { + // Project each table as the FE asks for it + for (;;) + { bs = fIos.read(); if (bs.length() == 0) { - if (gDebug > 1 || (gDebug && !csep.isInternal())) + if (gDebug > 1 || (gDebug && !csep.isInternal())) std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl; - break; + break; } if (gDebug && bs.length() > 4) - std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length() + std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length() << std::endl; // TODO: Holy crud! Can this be right? //@bug 1444 Yes, if there is a self-join if (bs.length() > 4) { - selfJoin = true; - statementsRunningCount->decr(stmtCounted); - goto new_plan; + selfJoin = true; + statementsRunningCount->decr(stmtCounted); + goto new_plan; } assert(bs.length() == 4); @@ -636,47 +639,47 @@ namespace exemgr try // @bug2244: try/catch around fIos.write() calls responding to qb command { - bs >> qb; + bs >> qb; - if (gDebug > 1 || (gDebug && !csep.isInternal())) + if (gDebug > 1 || (gDebug && !csep.isInternal())) std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb - << std::endl; + << std::endl; - if (qb == 0) - { + if (qb == 0) + { // No more tables, query is done break; - } - else if (qb == 1) - { + } + else if (qb == 1) + { // super-secret flag indicating that the UM is going to scarf down all the rows in the // query. swallowRows = true; tm = jl->deliveredTables(); continue; - } - else if (qb == 2) - { + } + else if (qb == 2) + { // UM just wants any table assert(swallowRows); auto iter = tm.begin(); if (iter == tm.end()) { - if (gDebug > 1 || (gDebug && !csep.isInternal())) + if (gDebug > 1 || (gDebug && !csep.isInternal())) std::cout << "### For session id " << csep.sessionID() << ", returning end flag" - << std::endl; + << std::endl; - bs.restart(); - bs << (messageqcpp::ByteStream::byte)1; - fIos.write(bs); - continue; + bs.restart(); + bs << (messageqcpp::ByteStream::byte)1; + fIos.write(bs); + continue; } tableOID = iter->first; - } - else if (qb == 3) // special option-UM wants job stats std::string - { + } + else if (qb == 3) // special option-UM wants job stats std::string + { std::string statsString; // Log stats std::string to be sent back to front end @@ -690,171 +693,171 @@ namespace exemgr if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0) { - bs << jl->extendedInfo(); - bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo()); + bs << jl->extendedInfo(); + bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo()); } else { - std::string empty; - bs << empty; - bs << empty; + std::string empty; + bs << empty; + bs << empty; } // send stats to connector for inserting to the querystats table fStats.serialize(bs); fIos.write(bs); continue; - } - // for table mode handling - else if (qb == 4) - { + } + // for table mode handling + else if (qb == 4) + { statementsRunningCount->decr(stmtCounted); bs = fIos.read(); goto new_plan; - } - else // (qb > 3) - { + } + else // (qb > 3) + { // Return table bands for the requested tableOID tableOID = static_cast(qb); - } + } } catch (std::exception& ex) { - std::ostringstream errMsg; - errMsg << "ExeMgr: error writing qb response " + std::ostringstream errMsg; + errMsg << "ExeMgr: error writing qb response " "for qb cmd " - << qb << "; " << ex.what(); - throw std::runtime_error(errMsg.str()); + << qb << "; " << ex.what(); + throw std::runtime_error(errMsg.str()); } catch (...) { - std::ostringstream errMsg; - errMsg << "ExeMgr: unknown error writing qb response " + std::ostringstream errMsg; + errMsg << "ExeMgr: unknown error writing qb response " "for qb cmd " - << qb; - throw std::runtime_error(errMsg.str()); + << qb; + throw std::runtime_error(errMsg.str()); } if (swallowRows) - tm.erase(tableOID); + tm.erase(tableOID); FEMsgHandler msgHandler(jl, &fIos); if (tableOID == 100) - msgHandler.start(); + msgHandler.start(); //...Loop serializing table bands projected for the tableOID for (;;) { - uint32_t rowCount; + uint32_t rowCount; - rowCount = jl->projectTable(tableOID, bs); + rowCount = jl->projectTable(tableOID, bs); - msgHandler.stop(); + msgHandler.stop(); - if (jl->status()) - { + if (jl->status()) + { const auto errInfo = logging::IDBErrorInfo::instance(); if (jl->errMsg().length() != 0) - bs << jl->errMsg(); + bs << jl->errMsg(); else - bs << errInfo->errorMsg(jl->status()); - } + bs << errInfo->errorMsg(jl->status()); + } - try // @bug2244: try/catch around fIos.write() calls projecting rows - { + try // @bug2244: try/catch around fIos.write() calls projecting rows + { if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) { - // Skip the write to the front end until the last empty band. Used to time queries - // through without any front end waiting. - if (tableOID < 3000 || rowCount == 0) + // Skip the write to the front end until the last empty band. Used to time queries + // through without any front end waiting. + if (tableOID < 3000 || rowCount == 0) fIos.write(bs); } else { - fIos.write(bs); + fIos.write(bs); } - } - catch (std::exception& ex) - { + } + catch (std::exception& ex) + { msgHandler.stop(); std::ostringstream errMsg; errMsg << "ExeMgr: error projecting rows " - "for tableOID: " - << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " - << ex.what(); + "for tableOID: " + << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " + << ex.what(); jl->abort(); while (rowCount) - rowCount = jl->projectTable(tableOID, bs); + rowCount = jl->projectTable(tableOID, bs); if (tableOID == 100 && msgHandler.aborted()) { - /* TODO: modularize the cleanup code, as well as - * the rest of this fcn */ + /* TODO: modularize the cleanup code, as well as + * the rest of this fcn */ - decThreadCntPerSession(csep.sessionID() | 0x80000000); - statementsRunningCount->decr(stmtCounted); - fIos.close(); - return; + decThreadCntPerSession(csep.sessionID() | 0x80000000); + statementsRunningCount->decr(stmtCounted); + fIos.close(); + return; } // std::cout << "connection drop\n"; throw std::runtime_error(errMsg.str()); - } - catch (...) - { + } + catch (...) + { std::ostringstream errMsg; msgHandler.stop(); errMsg << "ExeMgr: unknown error projecting rows " - "for tableOID: " - << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; + "for tableOID: " + << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; jl->abort(); while (rowCount) - rowCount = jl->projectTable(tableOID, bs); + rowCount = jl->projectTable(tableOID, bs); throw std::runtime_error(errMsg.str()); - } + } - totalRowCount += rowCount; - totalBytesSent += bs.length(); + totalRowCount += rowCount; + totalBytesSent += bs.length(); - if (rowCount == 0) - { + if (rowCount == 0) + { msgHandler.stop(); // No more bands, table is done bs.reset(); // @bug 2083 decr active statement count here for table mode. if (!usingTuples) - statementsRunningCount->decr(stmtCounted); + statementsRunningCount->decr(stmtCounted); break; - } - else - { + } + else + { bs.restart(); - } + } } // End of loop to project and serialize table bands for a table - } // End of loop to process tables + } // End of loop to process tables - // @bug 828 - if (csep.traceOn()) + // @bug 828 + if (csep.traceOn()) jl->graph(csep.sessionID()); - if (needDbProfEndStatementMsg) - { + if (needDbProfEndStatementMsg) + { std::string ss; std::ostringstream prefix; prefix << "ses:" << csep.sessionID() << " Query Totals"; // Log stats std::string to standard out ss = formatQueryStats(jl, prefix.str(), true, - !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), - (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), - totalRowCount); + !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), + (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), + totalRowCount); //@Bug 1306. Added timing info for real time tracking. std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl; @@ -883,57 +886,55 @@ namespace exemgr int stmtID = csep.statementID(); std::unique_lock scoped(jlMutex); destructing++; - std::thread bgdtor( - [jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog] - { - std::unique_lock scoped(jlMutex); - const_cast(jl).reset(); // this happens second; does real destruction - logging::Message::Args args; - args.add(stmtID); - msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li); - if (--destructing == 0) - jlCleanupDone.notify_one(); - }); + std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog] { + std::unique_lock scoped(jlMutex); + const_cast(jl).reset(); // this happens second; does real destruction + logging::Message::Args args; + args.add(stmtID); + msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li); + if (--destructing == 0) + jlCleanupDone.notify_one(); + }); jl.reset(); // this happens first bgdtor.detach(); - } - else + } + else // delete sessionMemMap entry for this session's memory % use deleteMaxMemPct(csep.sessionID()); - std::string endtime(globServiceExeMgr->timeNow()); + std::string endtime(globServiceExeMgr->timeNow()); - if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000)) - { + if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000)) + { std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at " - << endtime << std::endl; + << endtime << std::endl; // @bug 663 - Implemented caltraceon(16) to replace the // $FIFO_SINK compiler definition in pColStep. // This option consumes rows in the project steps. if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4) { - std::cout << std::endl; - std::cout << "**** No data returned to DM. Rows consumed " - "in ProjectSteps - caltrace(16) is on (FIFO_SINK)." - " ****" + std::cout << std::endl; + std::cout << "**** No data returned to DM. Rows consumed " + "in ProjectSteps - caltrace(16) is on (FIFO_SINK)." + " ****" << std::endl; - std::cout << std::endl; + std::cout << std::endl; } else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) { - std::cout << std::endl; - std::cout << "**** No data returned to DM - caltrace(8) is " - "on (SWALLOW_ROWS_EXEMGR). ****" + std::cout << std::endl; + std::cout << "**** No data returned to DM - caltrace(8) is " + "on (SWALLOW_ROWS_EXEMGR). ****" << std::endl; - std::cout << std::endl; + std::cout << std::endl; } - } + } - statementsRunningCount->decr(stmtCounted); + statementsRunningCount->decr(stmtCounted); - if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) - { + if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) + { qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY; qts.max_mem_pct = fStats.fMaxMemPct; qts.num_files = fStats.fNumFiles; @@ -952,16 +953,16 @@ namespace exemgr qts.module_name = fOamCachePtr->getModuleName(); qts.local_query = csep.localQuery(); fTeleClient.postQueryTele(qts); - } + } } // Release CSC object (for sessionID) that was added by makeJobList() // Mask 0x80000000 is for associate user query and csc query. // (actual joblist destruction happens at the top of this loop) decThreadCntPerSession(csep.sessionID() | 0x80000000); - } - catch (std::exception& ex) - { + } + catch (std::exception& ex) + { decThreadCntPerSession(csep.sessionID() | 0x80000000); statementsRunningCount->decr(stmtCounted); std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl; @@ -970,9 +971,9 @@ namespace exemgr args.add(ex.what()); msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); fIos.close(); - } - catch (...) - { + } + catch (...) + { decThreadCntPerSession(csep.sessionID() | 0x80000000); statementsRunningCount->decr(stmtCounted); std::cerr << "### Exception caught!" << std::endl; @@ -981,11 +982,11 @@ namespace exemgr args.add("ExeMgr caught unknown exception"); msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); fIos.close(); - } + } - // make sure we don't leave scope while joblists are being destroyed - std::unique_lock scoped(jlMutex); - while (destructing > 0) + // make sure we don't leave scope while joblists are being destroyed + std::unique_lock scoped(jlMutex); + while (destructing > 0) jlCleanupDone.wait(scoped); -} + } }; // namespace exemgr diff --git a/utils/common/statistics.cpp b/utils/common/statistics.cpp index 229ca9556..f1681dad0 100644 --- a/utils/common/statistics.cpp +++ b/utils/common/statistics.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include "statistics.h" @@ -31,61 +32,135 @@ using namespace logging; namespace statistics { -using ColumnsCache = std::vector>; - StatisticsManager* StatisticsManager::instance() { static StatisticsManager* sm = new StatisticsManager(); return sm; } -void StatisticsManager::analyzeColumnKeyTypes(const rowgroup::RowGroup& rowGroup, bool trace) +void StatisticsManager::collectSample(const rowgroup::RowGroup& rowGroup) { std::lock_guard lock(mut); - auto rowCount = rowGroup.getRowCount(); + const auto rowCount = rowGroup.getRowCount(); const auto columnCount = rowGroup.getColumnCount(); if (!rowCount || !columnCount) return; - auto& oids = rowGroup.getOIDs(); + const auto& oids = rowGroup.getOIDs(); + for (const auto oid : oids) + { + // Initialize a column data with 0. + if (!columnGroups.count(oid)) + columnGroups[oid] = std::vector(maxSampleSize, 0); + } + // Initialize a first row from the given `rowGroup`. rowgroup::Row r; rowGroup.initRow(&r); rowGroup.getRow(0, &r); - ColumnsCache columns(columnCount, std::unordered_set()); - // Init key types. - for (uint32_t index = 0; index < columnCount; ++index) - keyTypes[oids[index]] = KeyType::PK; + // Generate a uniform distribution. + std::random_device randomDevice; + std::mt19937 gen32(randomDevice()); + std::uniform_int_distribution<> uniformDistribution(0, currentRowIndex + rowCount - 1); - const uint32_t maxRowCount = 4096; - // TODO: We should read just couple of blocks from columns, not all data, but this requires - // more deep refactoring of column commands. - rowCount = std::min(rowCount, maxRowCount); - // This is strange, it's a CS but I'm processing data as row by row, how to fix it? for (uint32_t i = 0; i < rowCount; ++i) { - for (uint32_t j = 0; j < columnCount; ++j) + if (currentSampleSize < maxSampleSize) { - if (r.isNullValue(j) || columns[j].count(r.getIntField(j))) - keyTypes[oids[j]] = KeyType::FK; - else - columns[j].insert(r.getIntField(j)); + for (uint32_t j = 0; j < columnCount; ++j) + { + // FIXME: Handle null values as well. + if (!r.isNullValue(j)) + columnGroups[oids[j]][currentSampleSize] = r.getIntField(j); + } + ++currentSampleSize; + } + else + { + const uint32_t index = uniformDistribution(gen32); + if (index < maxSampleSize) + { + for (uint32_t j = 0; j < columnCount; ++j) + columnGroups[oids[j]][index] = r.getIntField(j); + } } r.nextRow(); + ++currentRowIndex; } - - if (trace) - output(StatisticsType::PK_FK); } -void StatisticsManager::output(StatisticsType statisticsType) +void StatisticsManager::analyzeSample(bool traceOn) { - if (statisticsType == StatisticsType::PK_FK) + if (traceOn) + std::cout << "Sample size: " << currentSampleSize << std::endl; + + // PK_FK statistics. + for (const auto& [oid, sample] : columnGroups) + keyTypes[oid] = std::make_pair(KeyType::PK, currentRowIndex); + + for (const auto& [oid, sample] : columnGroups) { - std::cout << "Columns count: " << keyTypes.size() << std::endl; - for (const auto& p : keyTypes) - std::cout << p.first << " " << (int)p.second << std::endl; + std::unordered_set columnsCache; + std::unordered_map columnMCV; + for (uint32_t i = 0; i < currentSampleSize; ++i) + { + const auto value = sample[i]; + // PK_FK statistics. + if (columnsCache.count(value) && keyTypes[oid].first == KeyType::PK) + keyTypes[oid].first = KeyType::FK; + else + columnsCache.insert(value); + + // MCV statistics. + if (columnMCV.count(value)) + columnMCV[value]++; + else + columnMCV.insert({value, 1}); + } + + // MCV statistics. + std::vector> mcvList(columnMCV.begin(), columnMCV.end()); + std::sort(mcvList.begin(), mcvList.end(), + [](const std::pair& a, const std::pair& b) { + return a.second > b.second; + }); + + // 200 buckets as Microsoft does. + const auto mcvSize = std::min(columnMCV.size(), static_cast(200)); + mcv[oid] = std::unordered_map(mcvList.begin(), mcvList.begin() + mcvSize); + } + + if (traceOn) + output(); + + // Clear sample. + columnGroups.clear(); + currentSampleSize = 0; + currentRowIndex = 0; +} + +void StatisticsManager::output() +{ + std::cout << "Columns count: " << keyTypes.size() << std::endl; + + std::cout << "Statistics type [PK_FK]: " << std::endl; + for (const auto& p : keyTypes) + { + std::cout << "OID: " << p.first << " "; + if (static_cast(p.second.first) == 0) + std::cout << "PK "; + else + std::cout << "FK "; + std::cout << "row count: " << p.second.second << std::endl; + } + + std::cout << "Statistics type [MCV]: " << std::endl; + for (const auto& [oid, columnMCV] : mcv) + { + std::cout << "OID: " << oid << std::endl; + for (const auto& [value, count] : columnMCV) + std::cout << value << ": " << count << std::endl; } } @@ -94,8 +169,16 @@ std::unique_ptr StatisticsManager::convertStatsToDataStream(uint64_t& da { // Number of pairs. uint64_t count = keyTypes.size(); - // count, [[uid, keyType], ... ] - dataStreamSize = sizeof(uint64_t) + count * (sizeof(uint32_t) + sizeof(KeyType)); + // count, [[uid, keyType, rows count], ... ] + dataStreamSize = sizeof(uint64_t) + count * (sizeof(uint32_t) + sizeof(KeyType) + sizeof(uint32_t)); + + // Count the size of the MCV. + for (const auto& [oid, mcvColumn] : mcv) + { + // [oid, list size, list [value, count]] + dataStreamSize += + (sizeof(uint32_t) + sizeof(uint32_t) + ((sizeof(uint64_t) + sizeof(uint32_t)) * mcvColumn.size())); + } // Allocate memory for data stream. std::unique_ptr dataStreamSmartPtr(new char[dataStreamSize]); @@ -105,21 +188,95 @@ std::unique_ptr StatisticsManager::convertStatsToDataStream(uint64_t& da std::memcpy(dataStream, reinterpret_cast(&count), sizeof(uint64_t)); offset += sizeof(uint64_t); - // For each pair [oid, key type]. + // For each pair [oid, key type, rows count]. for (const auto& p : keyTypes) { uint32_t oid = p.first; std::memcpy(&dataStream[offset], reinterpret_cast(&oid), sizeof(uint32_t)); offset += sizeof(uint32_t); - - KeyType keyType = p.second; + KeyType keyType = p.second.first; std::memcpy(&dataStream[offset], reinterpret_cast(&keyType), sizeof(KeyType)); offset += sizeof(KeyType); + uint32_t rowCount = p.second.second; + std::memcpy(&dataStream[offset], reinterpret_cast(&rowCount), sizeof(uint32_t)); + offset += sizeof(uint32_t); } + // For each [oid, list size, list [value, count]]. + for (const auto& p : mcv) + { + // [oid] + uint32_t oid = p.first; + std::memcpy(&dataStream[offset], reinterpret_cast(&oid), sizeof(uint32_t)); + offset += sizeof(uint32_t); + + // [list size] + const auto& mcvColumn = p.second; + uint32_t size = mcvColumn.size(); + std::memcpy(&dataStream[offset], reinterpret_cast(&size), sizeof(uint32_t)); + offset += sizeof(uint32_t); + + // [list [value, count]] + for (const auto& mcvPair : mcvColumn) + { + uint64_t value = mcvPair.first; + std::memcpy(&dataStream[offset], reinterpret_cast(&value), sizeof(uint64_t)); + offset += sizeof(uint64_t); + uint32_t count = mcvPair.second; + std::memcpy(&dataStream[offset], reinterpret_cast(&count), sizeof(uint32_t)); + offset += sizeof(uint32_t); + } + } return dataStreamSmartPtr; } +void StatisticsManager::convertStatsFromDataStream(std::unique_ptr dataStreamSmartPtr) +{ + auto* dataStream = dataStreamSmartPtr.get(); + uint64_t count = 0; + std::memcpy(reinterpret_cast(&count), dataStream, sizeof(uint64_t)); + uint64_t offset = sizeof(uint64_t); + + // For each pair. + for (uint64_t i = 0; i < count; ++i) + { + uint32_t oid, rowCount; + KeyType keyType; + std::memcpy(reinterpret_cast(&oid), &dataStream[offset], sizeof(uint32_t)); + offset += sizeof(uint32_t); + std::memcpy(reinterpret_cast(&keyType), &dataStream[offset], sizeof(KeyType)); + offset += sizeof(KeyType); + std::memcpy(reinterpret_cast(&rowCount), &dataStream[offset], sizeof(uint32_t)); + offset += sizeof(uint32_t); + // Insert pair. + keyTypes[oid] = std::make_pair(keyType, rowCount); + } + + for (uint64_t i = 0; i < count; ++i) + { + uint32_t oid; + std::memcpy(reinterpret_cast(&oid), &dataStream[offset], sizeof(uint32_t)); + offset += sizeof(uint32_t); + + uint32_t mcvSize; + std::memcpy(reinterpret_cast(&mcvSize), &dataStream[offset], sizeof(uint32_t)); + offset += sizeof(uint32_t); + + std::unordered_map columnMCV; + for (uint32_t j = 0; j < mcvSize; ++j) + { + uint64_t value; + std::memcpy(reinterpret_cast(&value), &dataStream[offset], sizeof(uint64_t)); + offset += sizeof(uint64_t); + uint32_t count; + std::memcpy(reinterpret_cast(&count), &dataStream[offset], sizeof(uint32_t)); + offset += sizeof(uint32_t); + columnMCV[value] = count; + } + mcv[oid] = std::move(columnMCV); + } +} + void StatisticsManager::saveToFile() { std::lock_guard lock(mut); @@ -228,22 +385,7 @@ void StatisticsManager::loadFromFile() if (dataHash != computedDataHash) throw ios_base::failure("StatisticsManager::loadFromFile(): invalid file hash. "); - uint64_t count = 0; - std::memcpy(reinterpret_cast(&count), dataStream, sizeof(uint64_t)); - uint64_t offset = sizeof(uint64_t); - - // For each pair. - for (uint64_t i = 0; i < count; ++i) - { - uint32_t oid; - KeyType keyType; - std::memcpy(reinterpret_cast(&oid), &dataStream[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); - std::memcpy(reinterpret_cast(&keyType), &dataStream[offset], sizeof(KeyType)); - offset += sizeof(KeyType); - // Insert pair. - keyTypes[oid] = keyType; - } + convertStatsFromDataStream(std::move(dataStreamSmartPtr)); } uint64_t StatisticsManager::computeHashFromStats() @@ -261,10 +403,25 @@ void StatisticsManager::serialize(messageqcpp::ByteStream& bs) bs << epoch; bs << count; + // PK_FK for (const auto& keyType : keyTypes) { bs << keyType.first; - bs << (uint32_t)keyType.second; + bs << (uint32_t)keyType.second.first; + bs << keyType.second.second; + } + + // MCV + for (const auto& p : mcv) + { + bs << p.first; + const auto& mcvColumn = p.second; + bs << static_cast(mcvColumn.size()); + for (const auto& mcvPair : mcvColumn) + { + bs << mcvPair.first; + bs << mcvPair.second; + } } } @@ -275,12 +432,34 @@ void StatisticsManager::unserialize(messageqcpp::ByteStream& bs) bs >> epoch; bs >> count; + // PK_FK for (uint32_t i = 0; i < count; ++i) { - uint32_t oid, keyType; + uint32_t oid, keyType, rowCount; bs >> oid; bs >> keyType; - keyTypes[oid] = static_cast(keyType); + bs >> rowCount; + keyTypes[oid] = std::make_pair(static_cast(keyType), rowCount); + } + + // MCV + for (uint32_t i = 0; i < count; ++i) + { + uint32_t oid, mcvSize; + bs >> oid; + bs >> mcvSize; + std::unordered_map mcvColumn; + + for (uint32_t j = 0; j < mcvSize; ++j) + { + uint64_t value; + uint32_t count; + bs >> value; + bs >> count; + mcvColumn[value] = count; + } + + mcv[oid] = std::move(mcvColumn); } } @@ -291,7 +470,7 @@ bool StatisticsManager::hasKey(uint32_t oid) KeyType StatisticsManager::getKeyType(uint32_t oid) { - return keyTypes[oid]; + return keyTypes[oid].first; } StatisticsDistributor* StatisticsDistributor::instance() diff --git a/utils/common/statistics.h b/utils/common/statistics.h index 7ec3e319c..a349dcea1 100644 --- a/utils/common/statistics.h +++ b/utils/common/statistics.h @@ -49,8 +49,10 @@ enum class KeyType : uint32_t // Rerpresents types of statistics CS supports. enum class StatisticsType : uint32_t { - // A special statistics type, made to solve circular inner join problem. - PK_FK + // A special statistics type, specifies whether a column a primary key or foreign key. + PK_FK, + // Most common values. + MCV }; // Represetns a header for the statistics file. @@ -63,6 +65,11 @@ struct StatisticsFileHeader uint8_t offset[1024]; }; +using ColumnsCache = std::unordered_map>; +using ColumnGroup = std::unordered_map>; +using KeyTypes = std::unordered_map>; +using MCVList = std::unordered_map>; + // This class is responsible for processing and storing statistics. // On each `analyze table` iteration it increases an epoch and stores // the updated statistics into the special file. @@ -71,10 +78,12 @@ class StatisticsManager public: // Returns the instance of this class, static initialization happens only once. static StatisticsManager* instance(); - // Analyzes the given `rowGroup` by processing it row by row and searching for foreign key. - void analyzeColumnKeyTypes(const rowgroup::RowGroup& rowGroup, bool trace); + // Collect samples from the given `rowGroup`. + void collectSample(const rowgroup::RowGroup& rowGroup); + // Analyzes collected samples. + void analyzeSample(bool traceOn); // Ouputs stats to out stream. - void output(StatisticsType statisticsType = StatisticsType::PK_FK); + void output(); // Saves stats to the file. void saveToFile(); // Loads stats from the file. @@ -95,17 +104,29 @@ class StatisticsManager KeyType getKeyType(uint32_t oid); private: - std::map keyTypes; - StatisticsManager() : epoch(0), version(1) + StatisticsManager() : currentSampleSize(0), currentRowIndex(0), epoch(0), version(1) { // Initialize plugins. IDBPolicy::configIDBPolicy(); } std::unique_ptr convertStatsToDataStream(uint64_t& dataStreamSize); + void convertStatsFromDataStream(std::unique_ptr dataStreamSmartPtr); - std::mutex mut; + // Internal data represents a sample [OID, vector of values]. + ColumnGroup columnGroups; + // Internal data for the PK/FK statistics [OID, bool value]. + KeyTypes keyTypes; + // Internal data for MCV list [OID, list[value, count]] + MCVList mcv; + + // TODO: Think about sample size. + const uint32_t maxSampleSize = 64000; + uint32_t currentSampleSize; + uint32_t currentRowIndex; uint32_t epoch; uint32_t version; + + std::mutex mut; std::string statsFile = "/var/lib/columnstore/local/statistics"; };