1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

MCOL-4713 Analyze table implementation.

This commit is contained in:
Denis Khalikov
2021-06-11 02:07:04 +03:00
parent ceb73cddcd
commit c20015a7b2
18 changed files with 1837 additions and 208 deletions

View File

@@ -48,6 +48,7 @@
#include <boost/filesystem.hpp>
#include "calpontselectexecutionplan.h"
#include "mcsanalyzetableexecutionplan.h"
#include "activestatementcounter.h"
#include "distributedenginecomm.h"
#include "resourcemanager.h"
@@ -77,7 +78,7 @@
#include "dbrm.h"
#include "mariadb_my_sys.h"
#include "statistics.h"
class Opt
{
@@ -604,7 +605,136 @@ private:
fIos.write(emsgBs);
}
public:
void analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted)
{
messageqcpp::ByteStream::quadbyte qb;
execplan::MCSAnalyzeTableExecutionPlan caep;
bs = fIos.read();
caep.unserialize(bs);
statementsRunningCount->incr(stmtCounted);
jl = joblist::JobListFactory::makeJobList(&caep, fRm, false, true);
// Joblist is empty.
if (jl->status() == logging::statisticsJobListEmpty)
{
if (caep.traceOn())
std::cout << "JobList is empty " << std::endl;
jl.reset();
bs.restart();
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
statementsRunningCount->decr(stmtCounted);
return;
}
if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
{
std::cout << "fEc setup " << std::endl;
fEc->Setup();
}
if (jl->status() == 0)
{
std::string emsg;
if (jl->putEngineComm(fEc) != 0)
throw std::runtime_error(jl->errMsg());
}
else
{
throw std::runtime_error("ExeMgr: could not build a JobList!");
}
// Execute a joblist.
jl->doQuery();
FEMsgHandler msgHandler(jl, &fIos);
msgHandler.start();
auto rowCount = jl->projectTable(100, bs);
msgHandler.stop();
auto outRG = (static_cast<joblist::TupleJobList*>(jl.get()))->getOutputRowGroup();
if (caep.traceOn())
std::cout << "Row count " << rowCount << std::endl;
// Process `RowGroup`, increase an epoch and save statistics to the file.
auto* statisticsManager = statistics::StatisticsManager::instance();
statisticsManager->analyzeColumnKeyTypes(outRG, caep.traceOn());
statisticsManager->incEpoch();
statisticsManager->saveToFile();
// Distribute statistics across all ExeMgr clients if possible.
statistics::StatisticsDistributor::instance()->distributeStatistics();
// Send the signal back to front-end.
bs.restart();
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
statementsRunningCount->decr(stmtCounted);
}
void analyzeTableHandleStats(messageqcpp::ByteStream& bs)
{
messageqcpp::ByteStream::quadbyte qb;
#ifdef DEBUG_STATISTICS
std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) "
<< std::endl;
#endif
bs = fIos.read();
#ifdef DEBUG_STATISTICS
std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) "
<< std::endl;
#endif
uint64_t dataHashRec;
bs >> dataHashRec;
uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats();
// The stats are the same.
if (dataHash == dataHashRec)
{
#ifdef DEBUG_STATISTICS
std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) "
<< std::endl;
#endif
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
return;
}
bs.restart();
qb = ANALYZE_TABLE_NEED_STATS;
bs << qb;
fIos.write(bs);
bs.restart();
bs = fIos.read();
#ifdef DEBUG_STATISTICS
std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
#endif
statistics::StatisticsManager::instance()->unserialize(bs);
statistics::StatisticsManager::instance()->saveToFile();
#ifdef DEBUG_STATISTICS
std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl;
#endif
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
}
public:
void operator()()
{
@@ -691,6 +821,16 @@ public:
fIos.close();
break;
}
else if (qb == ANALYZE_TABLE_EXECUTE)
{
analyzeTableExecute(bs, jl, stmtCounted);
continue;
}
else if (qb == ANALYZE_TABLE_REC_STATS)
{
analyzeTableHandleStats(bs);
continue;
}
else
{
if (gDebug)
@@ -1706,6 +1846,16 @@ int ServiceExeMgr::Child()
exeMgrThreadPool.invoke(threadpool::ThreadPoolMonitor(&exeMgrThreadPool));
}
// Load statistics.
try
{
statistics::StatisticsManager::instance()->loadFromFile();
}
catch (...)
{
std::cerr << "Cannot load statistics from file " << std::endl;
}
for (;;)
{
messageqcpp::IOSocket ios;