1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

MCOL-5191 Add MCV statistics.

This patch adds:
1. Initial version of random sampling.
2. Initial version of MCV statistics.
This commit is contained in:
Denis Khalikov
2022-09-08 18:42:19 +03:00
parent 9d774c1d95
commit e299a8409d
3 changed files with 552 additions and 351 deletions

View File

@@ -187,7 +187,8 @@ namespace exemgr
fIos.write(emsgBs); fIos.write(emsgBs);
} }
void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted) void SQLFrontSessionThread::analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl,
bool& stmtCounted)
{ {
auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount();
messageqcpp::ByteStream::quadbyte qb; messageqcpp::ByteStream::quadbyte qb;
@@ -238,19 +239,23 @@ namespace exemgr
jl->doQuery(); jl->doQuery();
FEMsgHandler msgHandler(jl, &fIos); FEMsgHandler msgHandler(jl, &fIos);
msgHandler.start(); msgHandler.start();
auto rowCount = jl->projectTable(100, bs);
// Seemls like this a legacy parameter, not really needed.
const uint32_t dummyTableOid = 100;
auto* statisticsManager = statistics::StatisticsManager::instance();
// Process rowGroup by rowGroup.
auto rowCount = jl->projectTable(dummyTableOid, bs);
while (rowCount)
{
auto outRG = (static_cast<joblist::TupleJobList*>(jl.get()))->getOutputRowGroup();
statisticsManager->collectSample(outRG);
rowCount = jl->projectTable(dummyTableOid, bs);
}
msgHandler.stop(); msgHandler.stop();
auto outRG = (static_cast<joblist::TupleJobList*>(jl.get()))->getOutputRowGroup(); // Analyze collected samples.
statisticsManager->analyzeSample(caep.traceOn());
if (caep.traceOn())
std::cout << "Row count " << rowCount << std::endl;
// Process `RowGroup`, increase an epoch and save statistics to the file.
auto* statisticsManager = statistics::StatisticsManager::instance();
statisticsManager->analyzeColumnKeyTypes(outRG, caep.traceOn());
statisticsManager->incEpoch(); statisticsManager->incEpoch();
statisticsManager->saveToFile(); statisticsManager->saveToFile();
@@ -317,132 +322,130 @@ namespace exemgr
void SQLFrontSessionThread::operator()() void SQLFrontSessionThread::operator()()
{ {
messageqcpp::ByteStream bs, inbs; messageqcpp::ByteStream bs, inbs;
execplan::CalpontSelectExecutionPlan csep; execplan::CalpontSelectExecutionPlan csep;
csep.sessionID(0); csep.sessionID(0);
joblist::SJLP jl; joblist::SJLP jl;
bool incSQLFrontSessionThreadCnt = true; bool incSQLFrontSessionThreadCnt = true;
std::mutex jlMutex; std::mutex jlMutex;
std::condition_variable jlCleanupDone; std::condition_variable jlCleanupDone;
int destructing = 0; int destructing = 0;
int gDebug = globServiceExeMgr->getDebugLevel(); int gDebug = globServiceExeMgr->getDebugLevel();
logging::Logger& msgLog = globServiceExeMgr->getLogger(); logging::Logger& msgLog = globServiceExeMgr->getLogger();
bool selfJoin = false; bool selfJoin = false;
bool tryTuples = false; bool tryTuples = false;
bool usingTuples = false; bool usingTuples = false;
bool stmtCounted = false; bool stmtCounted = false;
auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount(); auto* statementsRunningCount = globServiceExeMgr->getStatementsRunningCount();
try try
{ {
for (;;) for (;;)
{ {
selfJoin = false; selfJoin = false;
tryTuples = false; tryTuples = false;
usingTuples = false; usingTuples = false;
if (jl) if (jl)
{ {
// puts the real destruction in another thread to avoid // puts the real destruction in another thread to avoid
// making the whole session wait. It can take several seconds. // making the whole session wait. It can take several seconds.
std::unique_lock<std::mutex> scoped(jlMutex); std::unique_lock<std::mutex> scoped(jlMutex);
destructing++; destructing++;
std::thread bgdtor( std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, &destructing] {
[jl, &jlMutex, &jlCleanupDone, &destructing] std::unique_lock<std::mutex> scoped(jlMutex);
{ const_cast<joblist::SJLP&>(jl).reset(); // this happens second; does real destruction
std::unique_lock<std::mutex> scoped(jlMutex); if (--destructing == 0)
const_cast<joblist::SJLP&>(jl).reset(); // this happens second; does real destruction jlCleanupDone.notify_one();
if (--destructing == 0) });
jlCleanupDone.notify_one();
});
jl.reset(); // this runs first jl.reset(); // this runs first
bgdtor.detach(); bgdtor.detach();
} }
bs = fIos.read(); bs = fIos.read();
if (bs.length() == 0) if (bs.length() == 0)
{ {
if (gDebug > 1 || (gDebug && !csep.isInternal())) if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl; std::cout << "### Got a close(1) for session id " << csep.sessionID() << std::endl;
// connection closed by client // connection closed by client
fIos.close(); fIos.close();
break; break;
} }
else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan else if (bs.length() < 4) // Not a CalpontSelectExecutionPlan
{ {
if (gDebug) if (gDebug)
std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length " std::cout << "### Got a not-a-plan for session id " << csep.sessionID() << " with length "
<< bs.length() << std::endl; << bs.length() << std::endl;
fIos.close(); fIos.close();
break; break;
} }
else if (bs.length() == 4) // possible tuple flag else if (bs.length() == 4) // possible tuple flag
{ {
messageqcpp::ByteStream::quadbyte qb; messageqcpp::ByteStream::quadbyte qb;
bs >> qb; bs >> qb;
if (qb == 4) // UM wants new tuple i/f if (qb == 4) // UM wants new tuple i/f
{ {
if (gDebug) if (gDebug)
std::cout << "### UM wants tuples" << std::endl; std::cout << "### UM wants tuples" << std::endl;
tryTuples = true; tryTuples = true;
// now wait for the CSEP... // now wait for the CSEP...
bs = fIos.read(); bs = fIos.read();
} }
else if (qb == 5) // somebody wants stats else if (qb == 5) // somebody wants stats
{ {
bs.restart(); bs.restart();
qb = statementsRunningCount->cur(); qb = statementsRunningCount->cur();
bs << qb; bs << qb;
qb = statementsRunningCount->waiting(); qb = statementsRunningCount->waiting();
bs << qb; bs << qb;
fIos.write(bs); fIos.write(bs);
fIos.close(); fIos.close();
break; break;
} }
else if (qb == ANALYZE_TABLE_EXECUTE) else if (qb == ANALYZE_TABLE_EXECUTE)
{ {
analyzeTableExecute(bs, jl, stmtCounted); analyzeTableExecute(bs, jl, stmtCounted);
continue; continue;
} }
else if (qb == ANALYZE_TABLE_REC_STATS) else if (qb == ANALYZE_TABLE_REC_STATS)
{ {
analyzeTableHandleStats(bs); analyzeTableHandleStats(bs);
continue; continue;
} }
else else
{ {
if (gDebug) if (gDebug)
std::cout << "### Got a not-a-plan value " << qb << std::endl; std::cout << "### Got a not-a-plan value " << qb << std::endl;
fIos.close(); fIos.close();
break; break;
} }
} }
new_plan: new_plan:
try try
{ {
csep.unserialize(bs); csep.unserialize(bs);
} }
catch (logging::IDBExcept& ex) catch (logging::IDBExcept& ex)
{ {
// We can get here on illegal function parameter data type, e.g. // We can get here on illegal function parameter data type, e.g.
// SELECT blob_column|1 FROM t1; // SELECT blob_column|1 FROM t1;
statementsRunningCount->decr(stmtCounted); statementsRunningCount->decr(stmtCounted);
writeCodeAndError(ex.errorCode(), std::string(ex.what())); writeCodeAndError(ex.errorCode(), std::string(ex.what()));
continue; continue;
} }
querytele::QueryTeleStats qts; querytele::QueryTeleStats qts;
if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT"))
{ {
qts.query_uuid = csep.uuid(); qts.query_uuid = csep.uuid();
qts.msg_type = querytele::QueryTeleStats::QT_START; qts.msg_type = querytele::QueryTeleStats::QT_START;
qts.start_time = querytele::QueryTeleClient::timeNowms(); qts.start_time = querytele::QueryTeleClient::timeNowms();
@@ -454,76 +457,76 @@ namespace exemgr
qts.local_query = csep.localQuery(); qts.local_query = csep.localQuery();
qts.schema_name = csep.schemaName(); qts.schema_name = csep.schemaName();
fTeleClient.postQueryTele(qts); fTeleClient.postQueryTele(qts);
} }
if (gDebug > 1 || (gDebug && !csep.isInternal())) if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl; std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl;
setRMParms(csep.rmParms()); setRMParms(csep.rmParms());
// Re-establish lost PP connections. // Re-establish lost PP connections.
if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
{ {
fEc->Setup(); fEc->Setup();
} }
// @bug 1021. try to get schema cache for a come in query. // @bug 1021. try to get schema cache for a come in query.
// skip system catalog queries. // skip system catalog queries.
if (!csep.isInternal()) if (!csep.isInternal())
{ {
boost::shared_ptr<execplan::CalpontSystemCatalog> csc = boost::shared_ptr<execplan::CalpontSystemCatalog> csc =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID()); execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(csep.sessionID());
buildSysCache(csep, csc); buildSysCache(csep, csc);
} }
// As soon as we have a session id for this thread, update the // As soon as we have a session id for this thread, update the
// thread count per session; only do this once per thread. // thread count per session; only do this once per thread.
// Mask 0x80000000 is for associate user query and csc query // Mask 0x80000000 is for associate user query and csc query
if (incSQLFrontSessionThreadCnt) if (incSQLFrontSessionThreadCnt)
{ {
// WIP // WIP
incThreadCntPerSession(csep.sessionID() | 0x80000000); incThreadCntPerSession(csep.sessionID() | 0x80000000);
incSQLFrontSessionThreadCnt = false; incSQLFrontSessionThreadCnt = false;
} }
bool needDbProfEndStatementMsg = false; bool needDbProfEndStatementMsg = false;
logging::Message::Args args; logging::Message::Args args;
std::string sqlText = csep.data(); std::string sqlText = csep.data();
logging::LoggingID li(16, csep.sessionID(), csep.txnID()); logging::LoggingID li(16, csep.sessionID(), csep.txnID());
// Initialize stats for this query, including // Initialize stats for this query, including
// init sessionMemMap entry for this session to 0 memory %. // init sessionMemMap entry for this session to 0 memory %.
// We will need this later for traceOn() or if we receive a // We will need this later for traceOn() or if we receive a
// table request with qb=3 (see below). This is also recorded // table request with qb=3 (see below). This is also recorded
// as query start time. // as query start time.
initStats(csep.sessionID(), sqlText); initStats(csep.sessionID(), sqlText);
fStats.fQueryType = csep.queryType(); fStats.fQueryType = csep.queryType();
// Log start and end statement if tracing is enabled. Keep in // Log start and end statement if tracing is enabled. Keep in
// mind the trace flag won't be set for system catalog queries. // mind the trace flag won't be set for system catalog queries.
if (csep.traceOn()) if (csep.traceOn())
{ {
args.reset(); args.reset();
args.add((int)csep.statementID()); args.add((int)csep.statementID());
args.add((int)csep.verID().currentScn); args.add((int)csep.verID().currentScn);
args.add(sqlText); args.add(sqlText);
msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li); msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfStartStatement, args, li);
needDbProfEndStatementMsg = true; needDbProfEndStatementMsg = true;
} }
// Don't log subsequent self joins after first. // Don't log subsequent self joins after first.
if (selfJoin) if (selfJoin)
sqlText = ""; sqlText = "";
std::ostringstream oss; std::ostringstream oss;
oss << sqlText << "; |" << csep.schemaName() << "|"; oss << sqlText << "; |" << csep.schemaName() << "|";
logging::SQLLogger sqlLog(oss.str(), li); logging::SQLLogger sqlLog(oss.str(), li);
statementsRunningCount->incr(stmtCounted); statementsRunningCount->incr(stmtCounted);
PrimitiveServerThreadPools primitiveServerThreadPools( PrimitiveServerThreadPools primitiveServerThreadPools(
ServicePrimProc::instance()->getPrimitiveServerThreadPool()); ServicePrimProc::instance()->getPrimitiveServerThreadPool());
if (tryTuples) if (tryTuples)
{ {
try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList try // @bug2244: try/catch around fIos.write() calls responding to makeTupleList
{ {
jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, true, true); jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, true, true);
@@ -577,57 +580,57 @@ namespace exemgr
if (gDebug) if (gDebug)
std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl; std::cout << "### UM wanted tuples and we'll do our best;-)" << std::endl;
} }
}
else
{
usingTuples = false;
jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, false, true);
if (jl->status() == 0)
{
std::string emsg;
if (jl->putEngineComm(fEc) != 0)
throw std::runtime_error(jl->errMsg());
} }
else else
{ {
throw std::runtime_error("ExeMgr: could not build a JobList!"); usingTuples = false;
jl = joblist::JobListFactory::makeJobList(&csep, fRm, primitiveServerThreadPools, false, true);
if (jl->status() == 0)
{
std::string emsg;
if (jl->putEngineComm(fEc) != 0)
throw std::runtime_error(jl->errMsg());
}
else
{
throw std::runtime_error("ExeMgr: could not build a JobList!");
}
} }
}
jl->doQuery(); jl->doQuery();
execplan::CalpontSystemCatalog::OID tableOID; execplan::CalpontSystemCatalog::OID tableOID;
bool swallowRows = false; bool swallowRows = false;
joblist::DeliveredTableMap tm; joblist::DeliveredTableMap tm;
uint64_t totalBytesSent = 0; uint64_t totalBytesSent = 0;
uint64_t totalRowCount = 0; uint64_t totalRowCount = 0;
// Project each table as the FE asks for it // Project each table as the FE asks for it
for (;;) for (;;)
{ {
bs = fIos.read(); bs = fIos.read();
if (bs.length() == 0) if (bs.length() == 0)
{ {
if (gDebug > 1 || (gDebug && !csep.isInternal())) if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl; std::cout << "### Got a close(2) for session id " << csep.sessionID() << std::endl;
break; break;
} }
if (gDebug && bs.length() > 4) if (gDebug && bs.length() > 4)
std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length() std::cout << "### For session id " << csep.sessionID() << ", got too many bytes = " << bs.length()
<< std::endl; << std::endl;
// TODO: Holy crud! Can this be right? // TODO: Holy crud! Can this be right?
//@bug 1444 Yes, if there is a self-join //@bug 1444 Yes, if there is a self-join
if (bs.length() > 4) if (bs.length() > 4)
{ {
selfJoin = true; selfJoin = true;
statementsRunningCount->decr(stmtCounted); statementsRunningCount->decr(stmtCounted);
goto new_plan; goto new_plan;
} }
assert(bs.length() == 4); assert(bs.length() == 4);
@@ -636,47 +639,47 @@ namespace exemgr
try // @bug2244: try/catch around fIos.write() calls responding to qb command try // @bug2244: try/catch around fIos.write() calls responding to qb command
{ {
bs >> qb; bs >> qb;
if (gDebug > 1 || (gDebug && !csep.isInternal())) if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb std::cout << "### For session id " << csep.sessionID() << ", got a command = " << qb
<< std::endl; << std::endl;
if (qb == 0) if (qb == 0)
{ {
// No more tables, query is done // No more tables, query is done
break; break;
} }
else if (qb == 1) else if (qb == 1)
{ {
// super-secret flag indicating that the UM is going to scarf down all the rows in the // super-secret flag indicating that the UM is going to scarf down all the rows in the
// query. // query.
swallowRows = true; swallowRows = true;
tm = jl->deliveredTables(); tm = jl->deliveredTables();
continue; continue;
} }
else if (qb == 2) else if (qb == 2)
{ {
// UM just wants any table // UM just wants any table
assert(swallowRows); assert(swallowRows);
auto iter = tm.begin(); auto iter = tm.begin();
if (iter == tm.end()) if (iter == tm.end())
{ {
if (gDebug > 1 || (gDebug && !csep.isInternal())) if (gDebug > 1 || (gDebug && !csep.isInternal()))
std::cout << "### For session id " << csep.sessionID() << ", returning end flag" std::cout << "### For session id " << csep.sessionID() << ", returning end flag"
<< std::endl; << std::endl;
bs.restart(); bs.restart();
bs << (messageqcpp::ByteStream::byte)1; bs << (messageqcpp::ByteStream::byte)1;
fIos.write(bs); fIos.write(bs);
continue; continue;
} }
tableOID = iter->first; tableOID = iter->first;
} }
else if (qb == 3) // special option-UM wants job stats std::string else if (qb == 3) // special option-UM wants job stats std::string
{ {
std::string statsString; std::string statsString;
// Log stats std::string to be sent back to front end // Log stats std::string to be sent back to front end
@@ -690,171 +693,171 @@ namespace exemgr
if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0) if ((csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG) != 0)
{ {
bs << jl->extendedInfo(); bs << jl->extendedInfo();
bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo()); bs << globServiceExeMgr->prettyPrintMiniInfo(jl->miniInfo());
} }
else else
{ {
std::string empty; std::string empty;
bs << empty; bs << empty;
bs << empty; bs << empty;
} }
// send stats to connector for inserting to the querystats table // send stats to connector for inserting to the querystats table
fStats.serialize(bs); fStats.serialize(bs);
fIos.write(bs); fIos.write(bs);
continue; continue;
} }
// for table mode handling // for table mode handling
else if (qb == 4) else if (qb == 4)
{ {
statementsRunningCount->decr(stmtCounted); statementsRunningCount->decr(stmtCounted);
bs = fIos.read(); bs = fIos.read();
goto new_plan; goto new_plan;
} }
else // (qb > 3) else // (qb > 3)
{ {
// Return table bands for the requested tableOID // Return table bands for the requested tableOID
tableOID = static_cast<execplan::CalpontSystemCatalog::OID>(qb); tableOID = static_cast<execplan::CalpontSystemCatalog::OID>(qb);
} }
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {
std::ostringstream errMsg; std::ostringstream errMsg;
errMsg << "ExeMgr: error writing qb response " errMsg << "ExeMgr: error writing qb response "
"for qb cmd " "for qb cmd "
<< qb << "; " << ex.what(); << qb << "; " << ex.what();
throw std::runtime_error(errMsg.str()); throw std::runtime_error(errMsg.str());
} }
catch (...) catch (...)
{ {
std::ostringstream errMsg; std::ostringstream errMsg;
errMsg << "ExeMgr: unknown error writing qb response " errMsg << "ExeMgr: unknown error writing qb response "
"for qb cmd " "for qb cmd "
<< qb; << qb;
throw std::runtime_error(errMsg.str()); throw std::runtime_error(errMsg.str());
} }
if (swallowRows) if (swallowRows)
tm.erase(tableOID); tm.erase(tableOID);
FEMsgHandler msgHandler(jl, &fIos); FEMsgHandler msgHandler(jl, &fIos);
if (tableOID == 100) if (tableOID == 100)
msgHandler.start(); msgHandler.start();
//...Loop serializing table bands projected for the tableOID //...Loop serializing table bands projected for the tableOID
for (;;) for (;;)
{ {
uint32_t rowCount; uint32_t rowCount;
rowCount = jl->projectTable(tableOID, bs); rowCount = jl->projectTable(tableOID, bs);
msgHandler.stop(); msgHandler.stop();
if (jl->status()) if (jl->status())
{ {
const auto errInfo = logging::IDBErrorInfo::instance(); const auto errInfo = logging::IDBErrorInfo::instance();
if (jl->errMsg().length() != 0) if (jl->errMsg().length() != 0)
bs << jl->errMsg(); bs << jl->errMsg();
else else
bs << errInfo->errorMsg(jl->status()); bs << errInfo->errorMsg(jl->status());
} }
try // @bug2244: try/catch around fIos.write() calls projecting rows try // @bug2244: try/catch around fIos.write() calls projecting rows
{ {
if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
{ {
// Skip the write to the front end until the last empty band. Used to time queries // Skip the write to the front end until the last empty band. Used to time queries
// through without any front end waiting. // through without any front end waiting.
if (tableOID < 3000 || rowCount == 0) if (tableOID < 3000 || rowCount == 0)
fIos.write(bs); fIos.write(bs);
} }
else else
{ {
fIos.write(bs); fIos.write(bs);
} }
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {
msgHandler.stop(); msgHandler.stop();
std::ostringstream errMsg; std::ostringstream errMsg;
errMsg << "ExeMgr: error projecting rows " errMsg << "ExeMgr: error projecting rows "
"for tableOID: " "for tableOID: "
<< tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; " << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount << "; "
<< ex.what(); << ex.what();
jl->abort(); jl->abort();
while (rowCount) while (rowCount)
rowCount = jl->projectTable(tableOID, bs); rowCount = jl->projectTable(tableOID, bs);
if (tableOID == 100 && msgHandler.aborted()) if (tableOID == 100 && msgHandler.aborted())
{ {
/* TODO: modularize the cleanup code, as well as /* TODO: modularize the cleanup code, as well as
* the rest of this fcn */ * the rest of this fcn */
decThreadCntPerSession(csep.sessionID() | 0x80000000); decThreadCntPerSession(csep.sessionID() | 0x80000000);
statementsRunningCount->decr(stmtCounted); statementsRunningCount->decr(stmtCounted);
fIos.close(); fIos.close();
return; return;
} }
// std::cout << "connection drop\n"; // std::cout << "connection drop\n";
throw std::runtime_error(errMsg.str()); throw std::runtime_error(errMsg.str());
} }
catch (...) catch (...)
{ {
std::ostringstream errMsg; std::ostringstream errMsg;
msgHandler.stop(); msgHandler.stop();
errMsg << "ExeMgr: unknown error projecting rows " errMsg << "ExeMgr: unknown error projecting rows "
"for tableOID: " "for tableOID: "
<< tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount; << tableOID << "; rowCnt: " << rowCount << "; prevTotRowCnt: " << totalRowCount;
jl->abort(); jl->abort();
while (rowCount) while (rowCount)
rowCount = jl->projectTable(tableOID, bs); rowCount = jl->projectTable(tableOID, bs);
throw std::runtime_error(errMsg.str()); throw std::runtime_error(errMsg.str());
} }
totalRowCount += rowCount; totalRowCount += rowCount;
totalBytesSent += bs.length(); totalBytesSent += bs.length();
if (rowCount == 0) if (rowCount == 0)
{ {
msgHandler.stop(); msgHandler.stop();
// No more bands, table is done // No more bands, table is done
bs.reset(); bs.reset();
// @bug 2083 decr active statement count here for table mode. // @bug 2083 decr active statement count here for table mode.
if (!usingTuples) if (!usingTuples)
statementsRunningCount->decr(stmtCounted); statementsRunningCount->decr(stmtCounted);
break; break;
} }
else else
{ {
bs.restart(); bs.restart();
} }
} // End of loop to project and serialize table bands for a table } // End of loop to project and serialize table bands for a table
} // End of loop to process tables } // End of loop to process tables
// @bug 828 // @bug 828
if (csep.traceOn()) if (csep.traceOn())
jl->graph(csep.sessionID()); jl->graph(csep.sessionID());
if (needDbProfEndStatementMsg) if (needDbProfEndStatementMsg)
{ {
std::string ss; std::string ss;
std::ostringstream prefix; std::ostringstream prefix;
prefix << "ses:" << csep.sessionID() << " Query Totals"; prefix << "ses:" << csep.sessionID() << " Query Totals";
// Log stats std::string to standard out // Log stats std::string to standard out
ss = formatQueryStats(jl, prefix.str(), true, ss = formatQueryStats(jl, prefix.str(), true,
!(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF), !(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF),
(csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG), (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_LOG),
totalRowCount); totalRowCount);
//@Bug 1306. Added timing info for real time tracking. //@Bug 1306. Added timing info for real time tracking.
std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl; std::cout << ss << " at " << globServiceExeMgr->timeNow() << std::endl;
@@ -883,57 +886,55 @@ namespace exemgr
int stmtID = csep.statementID(); int stmtID = csep.statementID();
std::unique_lock<std::mutex> scoped(jlMutex); std::unique_lock<std::mutex> scoped(jlMutex);
destructing++; destructing++;
std::thread bgdtor( std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog] {
[jl, &jlMutex, &jlCleanupDone, stmtID, &li, &destructing, &msgLog] std::unique_lock<std::mutex> scoped(jlMutex);
{ const_cast<joblist::SJLP&>(jl).reset(); // this happens second; does real destruction
std::unique_lock<std::mutex> scoped(jlMutex); logging::Message::Args args;
const_cast<joblist::SJLP&>(jl).reset(); // this happens second; does real destruction args.add(stmtID);
logging::Message::Args args; msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li);
args.add(stmtID); if (--destructing == 0)
msgLog.logMessage(logging::LOG_TYPE_DEBUG, ServiceExeMgr::logDbProfEndStatement, args, li); jlCleanupDone.notify_one();
if (--destructing == 0) });
jlCleanupDone.notify_one();
});
jl.reset(); // this happens first jl.reset(); // this happens first
bgdtor.detach(); bgdtor.detach();
} }
else else
// delete sessionMemMap entry for this session's memory % use // delete sessionMemMap entry for this session's memory % use
deleteMaxMemPct(csep.sessionID()); deleteMaxMemPct(csep.sessionID());
std::string endtime(globServiceExeMgr->timeNow()); std::string endtime(globServiceExeMgr->timeNow());
if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000)) if ((csep.traceFlags() & globServiceExeMgr->flagsWantOutput) && (csep.sessionID() < 0x80000000))
{ {
std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at " std::cout << "For session " << csep.sessionID() << ": " << totalBytesSent << " bytes sent back at "
<< endtime << std::endl; << endtime << std::endl;
// @bug 663 - Implemented caltraceon(16) to replace the // @bug 663 - Implemented caltraceon(16) to replace the
// $FIFO_SINK compiler definition in pColStep. // $FIFO_SINK compiler definition in pColStep.
// This option consumes rows in the project steps. // This option consumes rows in the project steps.
if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4) if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS4)
{ {
std::cout << std::endl; std::cout << std::endl;
std::cout << "**** No data returned to DM. Rows consumed " std::cout << "**** No data returned to DM. Rows consumed "
"in ProjectSteps - caltrace(16) is on (FIFO_SINK)." "in ProjectSteps - caltrace(16) is on (FIFO_SINK)."
" ****" " ****"
<< std::endl; << std::endl;
std::cout << std::endl; std::cout << std::endl;
} }
else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3) else if (csep.traceFlags() & execplan::CalpontSelectExecutionPlan::TRACE_NO_ROWS3)
{ {
std::cout << std::endl; std::cout << std::endl;
std::cout << "**** No data returned to DM - caltrace(8) is " std::cout << "**** No data returned to DM - caltrace(8) is "
"on (SWALLOW_ROWS_EXEMGR). ****" "on (SWALLOW_ROWS_EXEMGR). ****"
<< std::endl; << std::endl;
std::cout << std::endl; std::cout << std::endl;
} }
} }
statementsRunningCount->decr(stmtCounted); statementsRunningCount->decr(stmtCounted);
if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT")) if (!csep.isInternal() && (csep.queryType() == "SELECT" || csep.queryType() == "INSERT_SELECT"))
{ {
qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY; qts.msg_type = querytele::QueryTeleStats::QT_SUMMARY;
qts.max_mem_pct = fStats.fMaxMemPct; qts.max_mem_pct = fStats.fMaxMemPct;
qts.num_files = fStats.fNumFiles; qts.num_files = fStats.fNumFiles;
@@ -952,16 +953,16 @@ namespace exemgr
qts.module_name = fOamCachePtr->getModuleName(); qts.module_name = fOamCachePtr->getModuleName();
qts.local_query = csep.localQuery(); qts.local_query = csep.localQuery();
fTeleClient.postQueryTele(qts); fTeleClient.postQueryTele(qts);
} }
} }
// Release CSC object (for sessionID) that was added by makeJobList() // Release CSC object (for sessionID) that was added by makeJobList()
// Mask 0x80000000 is for associate user query and csc query. // Mask 0x80000000 is for associate user query and csc query.
// (actual joblist destruction happens at the top of this loop) // (actual joblist destruction happens at the top of this loop)
decThreadCntPerSession(csep.sessionID() | 0x80000000); decThreadCntPerSession(csep.sessionID() | 0x80000000);
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {
decThreadCntPerSession(csep.sessionID() | 0x80000000); decThreadCntPerSession(csep.sessionID() | 0x80000000);
statementsRunningCount->decr(stmtCounted); statementsRunningCount->decr(stmtCounted);
std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl; std::cerr << "### ExeMgr ses:" << csep.sessionID() << " caught: " << ex.what() << std::endl;
@@ -970,9 +971,9 @@ namespace exemgr
args.add(ex.what()); args.add(ex.what());
msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li);
fIos.close(); fIos.close();
} }
catch (...) catch (...)
{ {
decThreadCntPerSession(csep.sessionID() | 0x80000000); decThreadCntPerSession(csep.sessionID() | 0x80000000);
statementsRunningCount->decr(stmtCounted); statementsRunningCount->decr(stmtCounted);
std::cerr << "### Exception caught!" << std::endl; std::cerr << "### Exception caught!" << std::endl;
@@ -981,11 +982,11 @@ namespace exemgr
args.add("ExeMgr caught unknown exception"); args.add("ExeMgr caught unknown exception");
msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li); msgLog.logMessage(logging::LOG_TYPE_CRITICAL, ServiceExeMgr::logExeMgrExcpt, args, li);
fIos.close(); fIos.close();
} }
// make sure we don't leave scope while joblists are being destroyed // make sure we don't leave scope while joblists are being destroyed
std::unique_lock<std::mutex> scoped(jlMutex); std::unique_lock<std::mutex> scoped(jlMutex);
while (destructing > 0) while (destructing > 0)
jlCleanupDone.wait(scoped); jlCleanupDone.wait(scoped);
} }
}; // namespace exemgr }; // namespace exemgr

View File

@@ -17,6 +17,7 @@
#include <iostream> #include <iostream>
#include <atomic> #include <atomic>
#include <random>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include "statistics.h" #include "statistics.h"
@@ -31,61 +32,135 @@ using namespace logging;
namespace statistics namespace statistics
{ {
using ColumnsCache = std::vector<std::unordered_set<uint32_t>>;
StatisticsManager* StatisticsManager::instance() StatisticsManager* StatisticsManager::instance()
{ {
static StatisticsManager* sm = new StatisticsManager(); static StatisticsManager* sm = new StatisticsManager();
return sm; return sm;
} }
void StatisticsManager::analyzeColumnKeyTypes(const rowgroup::RowGroup& rowGroup, bool trace) void StatisticsManager::collectSample(const rowgroup::RowGroup& rowGroup)
{ {
std::lock_guard<std::mutex> lock(mut); std::lock_guard<std::mutex> lock(mut);
auto rowCount = rowGroup.getRowCount(); const auto rowCount = rowGroup.getRowCount();
const auto columnCount = rowGroup.getColumnCount(); const auto columnCount = rowGroup.getColumnCount();
if (!rowCount || !columnCount) if (!rowCount || !columnCount)
return; return;
auto& oids = rowGroup.getOIDs(); const auto& oids = rowGroup.getOIDs();
for (const auto oid : oids)
{
// Initialize a column data with 0.
if (!columnGroups.count(oid))
columnGroups[oid] = std::vector<uint64_t>(maxSampleSize, 0);
}
// Initialize a first row from the given `rowGroup`.
rowgroup::Row r; rowgroup::Row r;
rowGroup.initRow(&r); rowGroup.initRow(&r);
rowGroup.getRow(0, &r); rowGroup.getRow(0, &r);
ColumnsCache columns(columnCount, std::unordered_set<uint32_t>()); // Generate a uniform distribution.
// Init key types. std::random_device randomDevice;
for (uint32_t index = 0; index < columnCount; ++index) std::mt19937 gen32(randomDevice());
keyTypes[oids[index]] = KeyType::PK; std::uniform_int_distribution<> uniformDistribution(0, currentRowIndex + rowCount - 1);
const uint32_t maxRowCount = 4096;
// TODO: We should read just couple of blocks from columns, not all data, but this requires
// more deep refactoring of column commands.
rowCount = std::min(rowCount, maxRowCount);
// This is strange, it's a CS but I'm processing data as row by row, how to fix it?
for (uint32_t i = 0; i < rowCount; ++i) for (uint32_t i = 0; i < rowCount; ++i)
{ {
for (uint32_t j = 0; j < columnCount; ++j) if (currentSampleSize < maxSampleSize)
{ {
if (r.isNullValue(j) || columns[j].count(r.getIntField(j))) for (uint32_t j = 0; j < columnCount; ++j)
keyTypes[oids[j]] = KeyType::FK; {
else // FIXME: Handle null values as well.
columns[j].insert(r.getIntField(j)); if (!r.isNullValue(j))
columnGroups[oids[j]][currentSampleSize] = r.getIntField(j);
}
++currentSampleSize;
}
else
{
const uint32_t index = uniformDistribution(gen32);
if (index < maxSampleSize)
{
for (uint32_t j = 0; j < columnCount; ++j)
columnGroups[oids[j]][index] = r.getIntField(j);
}
} }
r.nextRow(); r.nextRow();
++currentRowIndex;
} }
if (trace)
output(StatisticsType::PK_FK);
} }
void StatisticsManager::output(StatisticsType statisticsType) void StatisticsManager::analyzeSample(bool traceOn)
{ {
if (statisticsType == StatisticsType::PK_FK) if (traceOn)
std::cout << "Sample size: " << currentSampleSize << std::endl;
// PK_FK statistics.
for (const auto& [oid, sample] : columnGroups)
keyTypes[oid] = std::make_pair(KeyType::PK, currentRowIndex);
for (const auto& [oid, sample] : columnGroups)
{ {
std::cout << "Columns count: " << keyTypes.size() << std::endl; std::unordered_set<uint32_t> columnsCache;
for (const auto& p : keyTypes) std::unordered_map<uint64_t, uint32_t> columnMCV;
std::cout << p.first << " " << (int)p.second << std::endl; for (uint32_t i = 0; i < currentSampleSize; ++i)
{
const auto value = sample[i];
// PK_FK statistics.
if (columnsCache.count(value) && keyTypes[oid].first == KeyType::PK)
keyTypes[oid].first = KeyType::FK;
else
columnsCache.insert(value);
// MCV statistics.
if (columnMCV.count(value))
columnMCV[value]++;
else
columnMCV.insert({value, 1});
}
// MCV statistics.
std::vector<pair<uint64_t, uint32_t>> mcvList(columnMCV.begin(), columnMCV.end());
std::sort(mcvList.begin(), mcvList.end(),
[](const std::pair<uint64_t, uint32_t>& a, const std::pair<uint64_t, uint32_t>& b) {
return a.second > b.second;
});
// 200 buckets as Microsoft does.
const auto mcvSize = std::min(columnMCV.size(), static_cast<uint64_t>(200));
mcv[oid] = std::unordered_map<uint64_t, uint32_t>(mcvList.begin(), mcvList.begin() + mcvSize);
}
if (traceOn)
output();
// Clear sample.
columnGroups.clear();
currentSampleSize = 0;
currentRowIndex = 0;
}
void StatisticsManager::output()
{
std::cout << "Columns count: " << keyTypes.size() << std::endl;
std::cout << "Statistics type [PK_FK]: " << std::endl;
for (const auto& p : keyTypes)
{
std::cout << "OID: " << p.first << " ";
if (static_cast<uint32_t>(p.second.first) == 0)
std::cout << "PK ";
else
std::cout << "FK ";
std::cout << "row count: " << p.second.second << std::endl;
}
std::cout << "Statistics type [MCV]: " << std::endl;
for (const auto& [oid, columnMCV] : mcv)
{
std::cout << "OID: " << oid << std::endl;
for (const auto& [value, count] : columnMCV)
std::cout << value << ": " << count << std::endl;
} }
} }
@@ -94,8 +169,16 @@ std::unique_ptr<char[]> StatisticsManager::convertStatsToDataStream(uint64_t& da
{ {
// Number of pairs. // Number of pairs.
uint64_t count = keyTypes.size(); uint64_t count = keyTypes.size();
// count, [[uid, keyType], ... ] // count, [[uid, keyType, rows count], ... ]
dataStreamSize = sizeof(uint64_t) + count * (sizeof(uint32_t) + sizeof(KeyType)); dataStreamSize = sizeof(uint64_t) + count * (sizeof(uint32_t) + sizeof(KeyType) + sizeof(uint32_t));
// Count the size of the MCV.
for (const auto& [oid, mcvColumn] : mcv)
{
// [oid, list size, list [value, count]]
dataStreamSize +=
(sizeof(uint32_t) + sizeof(uint32_t) + ((sizeof(uint64_t) + sizeof(uint32_t)) * mcvColumn.size()));
}
// Allocate memory for data stream. // Allocate memory for data stream.
std::unique_ptr<char[]> dataStreamSmartPtr(new char[dataStreamSize]); std::unique_ptr<char[]> dataStreamSmartPtr(new char[dataStreamSize]);
@@ -105,21 +188,95 @@ std::unique_ptr<char[]> StatisticsManager::convertStatsToDataStream(uint64_t& da
std::memcpy(dataStream, reinterpret_cast<char*>(&count), sizeof(uint64_t)); std::memcpy(dataStream, reinterpret_cast<char*>(&count), sizeof(uint64_t));
offset += sizeof(uint64_t); offset += sizeof(uint64_t);
// For each pair [oid, key type]. // For each pair [oid, key type, rows count].
for (const auto& p : keyTypes) for (const auto& p : keyTypes)
{ {
uint32_t oid = p.first; uint32_t oid = p.first;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&oid), sizeof(uint32_t)); std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&oid), sizeof(uint32_t));
offset += sizeof(uint32_t); offset += sizeof(uint32_t);
KeyType keyType = p.second.first;
KeyType keyType = p.second;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&keyType), sizeof(KeyType)); std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&keyType), sizeof(KeyType));
offset += sizeof(KeyType); offset += sizeof(KeyType);
uint32_t rowCount = p.second.second;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&rowCount), sizeof(uint32_t));
offset += sizeof(uint32_t);
} }
// For each [oid, list size, list [value, count]].
for (const auto& p : mcv)
{
// [oid]
uint32_t oid = p.first;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&oid), sizeof(uint32_t));
offset += sizeof(uint32_t);
// [list size]
const auto& mcvColumn = p.second;
uint32_t size = mcvColumn.size();
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&size), sizeof(uint32_t));
offset += sizeof(uint32_t);
// [list [value, count]]
for (const auto& mcvPair : mcvColumn)
{
uint64_t value = mcvPair.first;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&value), sizeof(uint64_t));
offset += sizeof(uint64_t);
uint32_t count = mcvPair.second;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&count), sizeof(uint32_t));
offset += sizeof(uint32_t);
}
}
return dataStreamSmartPtr; return dataStreamSmartPtr;
} }
void StatisticsManager::convertStatsFromDataStream(std::unique_ptr<char[]> dataStreamSmartPtr)
{
auto* dataStream = dataStreamSmartPtr.get();
uint64_t count = 0;
std::memcpy(reinterpret_cast<char*>(&count), dataStream, sizeof(uint64_t));
uint64_t offset = sizeof(uint64_t);
// For each pair.
for (uint64_t i = 0; i < count; ++i)
{
uint32_t oid, rowCount;
KeyType keyType;
std::memcpy(reinterpret_cast<char*>(&oid), &dataStream[offset], sizeof(uint32_t));
offset += sizeof(uint32_t);
std::memcpy(reinterpret_cast<char*>(&keyType), &dataStream[offset], sizeof(KeyType));
offset += sizeof(KeyType);
std::memcpy(reinterpret_cast<char*>(&rowCount), &dataStream[offset], sizeof(uint32_t));
offset += sizeof(uint32_t);
// Insert pair.
keyTypes[oid] = std::make_pair(keyType, rowCount);
}
for (uint64_t i = 0; i < count; ++i)
{
uint32_t oid;
std::memcpy(reinterpret_cast<char*>(&oid), &dataStream[offset], sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t mcvSize;
std::memcpy(reinterpret_cast<char*>(&mcvSize), &dataStream[offset], sizeof(uint32_t));
offset += sizeof(uint32_t);
std::unordered_map<uint64_t, uint32_t> columnMCV;
for (uint32_t j = 0; j < mcvSize; ++j)
{
uint64_t value;
std::memcpy(reinterpret_cast<char*>(&value), &dataStream[offset], sizeof(uint64_t));
offset += sizeof(uint64_t);
uint32_t count;
std::memcpy(reinterpret_cast<char*>(&count), &dataStream[offset], sizeof(uint32_t));
offset += sizeof(uint32_t);
columnMCV[value] = count;
}
mcv[oid] = std::move(columnMCV);
}
}
void StatisticsManager::saveToFile() void StatisticsManager::saveToFile()
{ {
std::lock_guard<std::mutex> lock(mut); std::lock_guard<std::mutex> lock(mut);
@@ -228,22 +385,7 @@ void StatisticsManager::loadFromFile()
if (dataHash != computedDataHash) if (dataHash != computedDataHash)
throw ios_base::failure("StatisticsManager::loadFromFile(): invalid file hash. "); throw ios_base::failure("StatisticsManager::loadFromFile(): invalid file hash. ");
uint64_t count = 0; convertStatsFromDataStream(std::move(dataStreamSmartPtr));
std::memcpy(reinterpret_cast<char*>(&count), dataStream, sizeof(uint64_t));
uint64_t offset = sizeof(uint64_t);
// For each pair.
for (uint64_t i = 0; i < count; ++i)
{
uint32_t oid;
KeyType keyType;
std::memcpy(reinterpret_cast<char*>(&oid), &dataStream[offset], sizeof(uint32_t));
offset += sizeof(uint32_t);
std::memcpy(reinterpret_cast<char*>(&keyType), &dataStream[offset], sizeof(KeyType));
offset += sizeof(KeyType);
// Insert pair.
keyTypes[oid] = keyType;
}
} }
uint64_t StatisticsManager::computeHashFromStats() uint64_t StatisticsManager::computeHashFromStats()
@@ -261,10 +403,25 @@ void StatisticsManager::serialize(messageqcpp::ByteStream& bs)
bs << epoch; bs << epoch;
bs << count; bs << count;
// PK_FK
for (const auto& keyType : keyTypes) for (const auto& keyType : keyTypes)
{ {
bs << keyType.first; bs << keyType.first;
bs << (uint32_t)keyType.second; bs << (uint32_t)keyType.second.first;
bs << keyType.second.second;
}
// MCV
for (const auto& p : mcv)
{
bs << p.first;
const auto& mcvColumn = p.second;
bs << static_cast<uint32_t>(mcvColumn.size());
for (const auto& mcvPair : mcvColumn)
{
bs << mcvPair.first;
bs << mcvPair.second;
}
} }
} }
@@ -275,12 +432,34 @@ void StatisticsManager::unserialize(messageqcpp::ByteStream& bs)
bs >> epoch; bs >> epoch;
bs >> count; bs >> count;
// PK_FK
for (uint32_t i = 0; i < count; ++i) for (uint32_t i = 0; i < count; ++i)
{ {
uint32_t oid, keyType; uint32_t oid, keyType, rowCount;
bs >> oid; bs >> oid;
bs >> keyType; bs >> keyType;
keyTypes[oid] = static_cast<KeyType>(keyType); bs >> rowCount;
keyTypes[oid] = std::make_pair(static_cast<KeyType>(keyType), rowCount);
}
// MCV
for (uint32_t i = 0; i < count; ++i)
{
uint32_t oid, mcvSize;
bs >> oid;
bs >> mcvSize;
std::unordered_map<uint64_t, uint32_t> mcvColumn;
for (uint32_t j = 0; j < mcvSize; ++j)
{
uint64_t value;
uint32_t count;
bs >> value;
bs >> count;
mcvColumn[value] = count;
}
mcv[oid] = std::move(mcvColumn);
} }
} }
@@ -291,7 +470,7 @@ bool StatisticsManager::hasKey(uint32_t oid)
KeyType StatisticsManager::getKeyType(uint32_t oid) KeyType StatisticsManager::getKeyType(uint32_t oid)
{ {
return keyTypes[oid]; return keyTypes[oid].first;
} }
StatisticsDistributor* StatisticsDistributor::instance() StatisticsDistributor* StatisticsDistributor::instance()

View File

@@ -49,8 +49,10 @@ enum class KeyType : uint32_t
// Rerpresents types of statistics CS supports. // Rerpresents types of statistics CS supports.
enum class StatisticsType : uint32_t enum class StatisticsType : uint32_t
{ {
// A special statistics type, made to solve circular inner join problem. // A special statistics type, specifies whether a column a primary key or foreign key.
PK_FK PK_FK,
// Most common values.
MCV
}; };
// Represetns a header for the statistics file. // Represetns a header for the statistics file.
@@ -63,6 +65,11 @@ struct StatisticsFileHeader
uint8_t offset[1024]; uint8_t offset[1024];
}; };
using ColumnsCache = std::unordered_map<uint32_t, std::unordered_set<uint64_t>>;
using ColumnGroup = std::unordered_map<uint32_t, std::vector<uint64_t>>;
using KeyTypes = std::unordered_map<uint32_t, std::pair<KeyType, uint32_t>>;
using MCVList = std::unordered_map<uint32_t, std::unordered_map<uint64_t, uint32_t>>;
// This class is responsible for processing and storing statistics. // This class is responsible for processing and storing statistics.
// On each `analyze table` iteration it increases an epoch and stores // On each `analyze table` iteration it increases an epoch and stores
// the updated statistics into the special file. // the updated statistics into the special file.
@@ -71,10 +78,12 @@ class StatisticsManager
public: public:
// Returns the instance of this class, static initialization happens only once. // Returns the instance of this class, static initialization happens only once.
static StatisticsManager* instance(); static StatisticsManager* instance();
// Analyzes the given `rowGroup` by processing it row by row and searching for foreign key. // Collect samples from the given `rowGroup`.
void analyzeColumnKeyTypes(const rowgroup::RowGroup& rowGroup, bool trace); void collectSample(const rowgroup::RowGroup& rowGroup);
// Analyzes collected samples.
void analyzeSample(bool traceOn);
// Ouputs stats to out stream. // Ouputs stats to out stream.
void output(StatisticsType statisticsType = StatisticsType::PK_FK); void output();
// Saves stats to the file. // Saves stats to the file.
void saveToFile(); void saveToFile();
// Loads stats from the file. // Loads stats from the file.
@@ -95,17 +104,29 @@ class StatisticsManager
KeyType getKeyType(uint32_t oid); KeyType getKeyType(uint32_t oid);
private: private:
std::map<uint32_t, KeyType> keyTypes; StatisticsManager() : currentSampleSize(0), currentRowIndex(0), epoch(0), version(1)
StatisticsManager() : epoch(0), version(1)
{ {
// Initialize plugins. // Initialize plugins.
IDBPolicy::configIDBPolicy(); IDBPolicy::configIDBPolicy();
} }
std::unique_ptr<char[]> convertStatsToDataStream(uint64_t& dataStreamSize); std::unique_ptr<char[]> convertStatsToDataStream(uint64_t& dataStreamSize);
void convertStatsFromDataStream(std::unique_ptr<char[]> dataStreamSmartPtr);
std::mutex mut; // Internal data represents a sample [OID, vector of values].
ColumnGroup columnGroups;
// Internal data for the PK/FK statistics [OID, bool value].
KeyTypes keyTypes;
// Internal data for MCV list [OID, list[value, count]]
MCVList mcv;
// TODO: Think about sample size.
const uint32_t maxSampleSize = 64000;
uint32_t currentSampleSize;
uint32_t currentRowIndex;
uint32_t epoch; uint32_t epoch;
uint32_t version; uint32_t version;
std::mutex mut;
std::string statsFile = "/var/lib/columnstore/local/statistics"; std::string statsFile = "/var/lib/columnstore/local/statistics";
}; };