1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Merge pull request #1989 from denis0x0D/MCOL-4713

MCOL-4713 Analyze table implementation.
This commit is contained in:
Roman Nozdrin
2021-07-02 16:17:07 +03:00
committed by GitHub
18 changed files with 1837 additions and 208 deletions

View File

@ -12,6 +12,7 @@ set(execplan_LIB_SRCS
calpontexecutionplan.cpp calpontexecutionplan.cpp
calpontexecutionplanfactory.cpp calpontexecutionplanfactory.cpp
calpontselectexecutionplan.cpp calpontselectexecutionplan.cpp
mcsanalyzetableexecutionplan.cpp
clientrotator.cpp clientrotator.cpp
constantcolumn.cpp constantcolumn.cpp
constantfilter.cpp constantfilter.cpp

View File

@ -51,6 +51,29 @@ class CalpontExecutionPlan
*/ */
public: public:
/**
* Flags that can be passed to caltraceon().
*/
enum TRACE_FLAGS
{
TRACE_NONE = 0x0000, /*!< No tracing */
TRACE_LOG = 0x0001, /*!< Full return of rows, extra debug about query in log */
TRACE_NO_ROWS1 = 0x0002, /*!< Same as above, but rows not given to OCI layer */
TRACE_NO_ROWS2 = 0x0004, /*!< Same as above, but rows not converted from stream */
TRACE_NO_ROWS3 = 0x0008, /*!< Same as above, but rows not sent to DM from UM */
TRACE_NO_ROWS4 = 0x0010, /*!< Same as above, but rows not sent to DeliveryStep */
TRACE_LBIDS = 0x0020, /*!< Enable LBID tracing in PrimProc */
TRACE_PLAN_ONLY = 0x0040, /*!< Only generate a serialized CSEP */
PM_PROFILE = 0x0080, /*!< Enable PM profiling in PrimProc */
IGNORE_CP = 0x0100, /*!< Ignore casual partitioning metadata */
WRITE_TO_FILE = 0x0200, /*!< writes table rows out to a file from the Oracle connector */
NOWRITE_TO_FILE = 0x0400, /*!< does not write table rows out to a file from the Oracle connector */
TRACE_DISKIO_UM = 0x0800, /*!< Enable UM disk I/O logging */
TRACE_RESRCMGR = 0x1000, /*!< Trace Resource Manager Usage */
TRACE_TUPLE_AUTOSWITCH = 0x4000, /*!< Enable MySQL tuple-to-table auto switch */
TRACE_TUPLE_OFF = 0x8000, /*!< Enable MySQL table interface */
};
/** /**
* Constructors * Constructors
*/ */

View File

@ -135,29 +135,6 @@ public:
UNKNOWN_SUBS UNKNOWN_SUBS
}; };
/**
* Flags that can be passed to caltraceon().
*/
enum TRACE_FLAGS
{
TRACE_NONE = 0x0000, /*!< No tracing */
TRACE_LOG = 0x0001, /*!< Full return of rows, extra debug about query in log */
TRACE_NO_ROWS1 = 0x0002, /*!< Same as above, but rows not given to OCI layer */
TRACE_NO_ROWS2 = 0x0004, /*!< Same as above, but rows not converted from stream */
TRACE_NO_ROWS3 = 0x0008, /*!< Same as above, but rows not sent to DM from UM */
TRACE_NO_ROWS4 = 0x0010, /*!< Same as above, but rows not sent to DeliveryStep */
TRACE_LBIDS = 0x0020, /*!< Enable LBID tracing in PrimProc */
TRACE_PLAN_ONLY = 0x0040, /*!< Only generate a serialized CSEP */
PM_PROFILE = 0x0080, /*!< Enable PM profiling in PrimProc */
IGNORE_CP = 0x0100, /*!< Ignore casual partitioning metadata */
WRITE_TO_FILE = 0x0200, /*!< writes table rows out to a file from the Oracle connector */
NOWRITE_TO_FILE = 0x0400, /*!< does not write table rows out to a file from the Oracle connector */
TRACE_DISKIO_UM = 0x0800, /*!< Enable UM disk I/O logging */
TRACE_RESRCMGR = 0x1000, /*!< Trace Resource Manager Usage */
TRACE_TUPLE_AUTOSWITCH = 0x4000, /*!< Enable MySQL tuple-to-table auto switch */
TRACE_TUPLE_OFF = 0x8000, /*!< Enable MySQL table interface */
};
enum IDB_LOCAL_QUERY enum IDB_LOCAL_QUERY
{ {
GLOBAL_QUERY = 0, /*!< Standard processing */ GLOBAL_QUERY = 0, /*!< Standard processing */

View File

@ -0,0 +1,156 @@
/* Copyright (C) 2021 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <iostream>
#include <algorithm>
using namespace std;
#include <boost/uuid/uuid_io.hpp>
#include "bytestream.h"
using namespace messageqcpp;
#include "mcsanalyzetableexecutionplan.h"
#include "objectreader.h"
#include "filter.h"
#include "returnedcolumn.h"
#include "simplecolumn.h"
#include "querystats.h"
#include "querytele.h"
using namespace querytele;
namespace execplan
{
std::string MCSAnalyzeTableExecutionPlan::toString() const
{
std::ostringstream output;
output << ">ANALYZE TABLE " << std::endl;
output << "Shema: " << fSchemaName << std::endl;
output << "Table: " << fTableName << std::endl;
output << ">>Returned Columns" << std::endl;
for (auto& rColumn : fReturnedCols)
output << *rColumn << std::endl;
output << "--- Column Map ---" << std::endl;
CalpontSelectExecutionPlan::ColumnMap::const_iterator iter;
for (iter = columnMap().begin(); iter != columnMap().end(); iter++)
output << (*iter).first << " : " << (*iter).second << endl;
output << "SessionID: " << fSessionID << std::endl;
output << "TxnID: " << fTxnID << std::endl;
output << "VerID: " << fVerID << std::endl;
return output.str();
}
void MCSAnalyzeTableExecutionPlan::serialize(messageqcpp::ByteStream& bs) const
{
bs << static_cast<ObjectReader::id_t>(ObjectReader::MCSANALYZETBLEXECUTIONPLAN);
// Returned columns.
bs << static_cast<uint32_t>(fReturnedCols.size());
for (auto& rColumn : fReturnedCols)
rColumn->serialize(bs);
// Column map.
bs << static_cast<uint32_t>(fColumnMap.size());
for (auto& column : fColumnMap)
{
bs << column.first;
column.second->serialize(bs);
}
bs << static_cast<uint32_t>(frmParms.size());
for (RMParmVec::const_iterator it = frmParms.begin(); it != frmParms.end(); ++it)
{
bs << it->sessionId;
bs << it->id;
bs << it->value;
}
bs << fData;
bs << static_cast<uint32_t>(fSessionID);
bs << static_cast<uint32_t>(fTxnID);
bs << fVerID;
bs << fStatementID;
bs << static_cast<uint64_t>(fStringScanThreshold);
bs << fPriority;
bs << fSchemaName;
bs << fTableName;
bs << fLocalQuery;
bs << fTimeZone;
bs << fTraceFlags;
}
void MCSAnalyzeTableExecutionPlan::unserialize(messageqcpp::ByteStream& bs)
{
ObjectReader::checkType(bs, ObjectReader::MCSANALYZETBLEXECUTIONPLAN);
fReturnedCols.clear();
fColumnMap.clear();
uint32_t size;
bs >> size;
for (uint32_t i = 0; i < size; ++i)
{
auto* returnedColumn = dynamic_cast<ReturnedColumn*>(ObjectReader::createTreeNode(bs));
SRCP srcp(returnedColumn);
fReturnedCols.push_back(srcp);
}
bs >> size;
for (uint32_t i = 0; i < size; ++i)
{
std::string colName;
bs >> colName;
auto* returnedColumn = dynamic_cast<ReturnedColumn*>(ObjectReader::createTreeNode(bs));
SRCP srcp(returnedColumn);
fColumnMap.insert(ColumnMap::value_type(colName, srcp));
}
bs >> size;
messageqcpp::ByteStream::doublebyte id;
messageqcpp::ByteStream::quadbyte sessionId;
messageqcpp::ByteStream::octbyte memory;
for (uint32_t i = 0; i < size; i++)
{
bs >> sessionId;
bs >> id;
bs >> memory;
frmParms.push_back(RMParam(sessionId, id, memory));
}
bs >> fData;
bs >> reinterpret_cast<uint32_t&>(fSessionID);
bs >> reinterpret_cast<uint32_t&>(fTxnID);
bs >> fVerID;
bs >> fStatementID;
bs >> reinterpret_cast<uint64_t&>(fStringScanThreshold);
bs >> fPriority;
bs >> fSchemaName;
bs >> fTableName;
bs >> fLocalQuery;
bs >> fTimeZone;
bs >> fTraceFlags;
}
} // namespace execplan

View File

@ -0,0 +1,170 @@
/* Copyright (C) 2021 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef MCSANALYZETABLEXEXCUTIONPLAN_H
#define MCSANALYZETABLEXEXCUTIONPLAN_H
#include <vector>
#include <map>
#include <iosfwd>
#include <boost/uuid/uuid.hpp>
#include "exp_templates.h"
#include "calpontexecutionplan.h"
#include "calpontselectexecutionplan.h"
#include "returnedcolumn.h"
#include "filter.h"
#include "expressionparser.h"
#include "calpontsystemcatalog.h"
#include "brmtypes.h"
#include "objectreader.h"
#include <boost/algorithm/string/case_conv.hpp>
/**
* Namespace
*/
namespace execplan
{
class MCSAnalyzeTableExecutionPlan : public CalpontExecutionPlan
{
public:
typedef std::vector<SRCP> ReturnedColumnList;
typedef std::multimap<std::string, SRCP> ColumnMap;
typedef std::vector<RMParam> RMParmVec;
MCSAnalyzeTableExecutionPlan() : fTraceFlags(TRACE_NONE) {}
MCSAnalyzeTableExecutionPlan(const ReturnedColumnList& returnedCols,
const ColumnMap& columnMap)
: fReturnedCols(returnedCols), fColumnMap(columnMap), fTraceFlags(TRACE_NONE)
{
}
virtual ~MCSAnalyzeTableExecutionPlan() = default;
const ReturnedColumnList& returnedCols() const { return fReturnedCols; }
ReturnedColumnList& returnedCols() { return fReturnedCols; }
void returnedCols(const ReturnedColumnList& returnedCols) { fReturnedCols = returnedCols; }
const ColumnMap& columnMap() const { return fColumnMap; }
ColumnMap& columnMap() { return fColumnMap; }
void columnMap(const ColumnMap& columnMap) { fColumnMap = columnMap; }
const std::string data() const { return fData; }
void data(const std::string data) { fData = data; }
uint32_t sessionID() const { return fSessionID; }
void sessionID(const uint32_t sessionID) { fSessionID = sessionID; }
int txnID() const { return fTxnID; }
void txnID(const int txnID) { fTxnID = txnID; }
const BRM::QueryContext verID() const { return fVerID; }
void verID(const BRM::QueryContext verID) { fVerID = verID; }
inline std::string& schemaName() { return fSchemaName; }
inline void schemaName(const std::string& schemaName, int lower_case_table_names)
{
fSchemaName = schemaName;
if (lower_case_table_names)
boost::algorithm::to_lower(fSchemaName);
}
inline std::string& tableName() { return fTableName; }
inline void tableName(const std::string& tableName, int lower_case_table_names)
{
fTableName = tableName;
if (lower_case_table_names)
boost::algorithm::to_lower(fTableName);
}
uint32_t statementID() const { return fStatementID; }
void statementID(const uint32_t statementID) { fStatementID = statementID; }
void uuid(const boost::uuids::uuid& uuid) { fUuid = uuid; }
const boost::uuids::uuid& uuid() const { return fUuid; }
void timeZone(const std::string& timezone) { fTimeZone = timezone; }
const std::string timeZone() const { return fTimeZone; }
void priority(uint32_t p) { fPriority = p; }
uint32_t priority() const { return fPriority; }
const RMParmVec& rmParms() { return frmParms; }
void rmParms(const RMParmVec& parms)
{
frmParms.clear();
frmParms.assign(parms.begin(), parms.end());
}
uint32_t localQuery() const { return fLocalQuery; }
void localQuery(const uint32_t localQuery) { fLocalQuery = localQuery; }
inline bool traceOn() const { return (traceFlags() & TRACE_LOG); }
inline uint32_t traceFlags() const { return fTraceFlags; }
inline void traceFlags(uint32_t traceFlags) { fTraceFlags = traceFlags; }
virtual std::string toString() const;
virtual bool isInternal() const { return ((fSessionID & 0x80000000) != 0); }
virtual void serialize(messageqcpp::ByteStream& bs) const;
virtual void unserialize(messageqcpp::ByteStream& bs);
// TODO: Why do we need this?
virtual bool operator==(const CalpontExecutionPlan* t) const { return false; }
virtual bool operator!=(const CalpontExecutionPlan* t) const { return false; }
private:
ReturnedColumnList fReturnedCols;
ColumnMap fColumnMap;
uint32_t fSessionID;
int fTxnID;
BRM::QueryContext fVerID;
std::string fSchemaName;
std::string fTableName;
uint32_t fTraceFlags;
boost::uuids::uuid fUuid;
std::string fTimeZone;
uint32_t fStatementID;
uint64_t fStringScanThreshold;
std::string fData;
RMParmVec frmParms;
uint32_t fPriority;
uint32_t fLocalQuery;
};
} // namespace execplan
#endif

View File

@ -123,6 +123,9 @@ public:
/** UDAF SDK */ /** UDAF SDK */
MCSV1_CONTEXT, MCSV1_CONTEXT,
UDAFCOLUMN, UDAFCOLUMN,
/** ANALYZE TABLE */
MCSANALYZETBLEXECUTIONPLAN,
}; };
typedef uint8_t id_t; //expand as necessary typedef uint8_t id_t; //expand as necessary

View File

@ -39,6 +39,7 @@ using namespace boost;
#include "calpontexecutionplan.h" #include "calpontexecutionplan.h"
#include "calpontselectexecutionplan.h" #include "calpontselectexecutionplan.h"
#include "mcsanalyzetableexecutionplan.h"
#include "calpontsystemcatalog.h" #include "calpontsystemcatalog.h"
#include "dbrm.h" #include "dbrm.h"
#include "filter.h" #include "filter.h"
@ -79,6 +80,7 @@ using namespace logging;
#include "tupleconstantstep.h" #include "tupleconstantstep.h"
#include "tuplehavingstep.h" #include "tuplehavingstep.h"
#include "windowfunctionstep.h" #include "windowfunctionstep.h"
#include "tupleannexstep.h"
#include "jlf_common.h" #include "jlf_common.h"
#include "jlf_graphics.h" #include "jlf_graphics.h"
@ -1766,8 +1768,159 @@ void makeVtableModeSteps(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo,
// ds->setTraceFlags(jobInfo.traceFlags); // ds->setTraceFlags(jobInfo.traceFlags);
} }
void makeAnalyzeTableJobSteps(MCSAnalyzeTableExecutionPlan* caep, JobInfo& jobInfo,
JobStepVector& querySteps, DeliveredTableMap& deliverySteps)
{
JobStepVector projectSteps;
const auto& retCols = caep->returnedCols();
if (retCols.size() == 0)
return;
// This is strange, but to generate a valid `out rowgroup` in AnnexStep we have to initialize
// `nonConstDelCols`, otherwise we will get an empty result in ExeMgr. Why don't just use `in
// row group`?
jobInfo.nonConstDelCols = retCols;
// Iterate over `returned columns` and create a `pColStep` for each.
for (uint32_t i = 0; i < retCols.size(); i++)
{
const SimpleColumn* sc = dynamic_cast<const SimpleColumn*>(retCols[i].get());
CalpontSystemCatalog::OID oid = sc->oid();
CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc);
CalpontSystemCatalog::ColType colType;
if (!sc->schemaName().empty())
{
SJSTEP sjstep;
colType = sc->colType();
auto* pcs = new pColStep(oid, tblOid, colType, jobInfo);
pcs->alias(extractTableAlias(sc));
pcs->view(sc->viewName());
pcs->name(sc->columnName());
pcs->cardinality(sc->cardinality());
auto ti = setTupleInfo(colType, oid, jobInfo, tblOid, sc, pcs->alias());
pcs->tupleId(ti.key);
sjstep.reset(pcs);
projectSteps.push_back(sjstep);
}
}
if (projectSteps.size() == 0)
return;
// Transform the first `pColStep` to `pColScanStep`.
SJSTEP firstStep = projectSteps.front();
pColStep* colStep = static_cast<pColStep*>(firstStep.get());
// Create a `pColScanStep` using first `pColStep`.
pColScanStep* scanStep = new pColScanStep(*colStep);
scanStep->outputAssociation(JobStepAssociation());
// Update first step.
firstStep.reset(scanStep);
// Create tuple BPS step.
TupleBPS* tbps = new TupleBPS(*scanStep, jobInfo);
tbps->setFirstStepType(SCAN);
// One `filter` step is scan step.
tbps->setBPP(scanStep);
tbps->setStepCount();
vector<uint32_t> pos;
vector<uint32_t> oids;
vector<uint32_t> keys;
vector<uint32_t> scale;
vector<uint32_t> precision;
vector<CalpontSystemCatalog::ColDataType> types;
vector<uint32_t> csNums;
pos.push_back(2);
bool passThruCreated = false;
for (JobStepVector::iterator it = projectSteps.begin(); it != projectSteps.end(); it++)
{
JobStep* js = it->get();
auto* colStep = static_cast<pColStep*>(js);
// TODO: Hoist the condition branch from the cycle, it will look ugly, but probaby faster.
if (UNLIKELY(!passThruCreated))
{
PassThruStep* pts = new PassThruStep(*colStep);
passThruCreated = true;
pts->alias(colStep->alias());
pts->view(colStep->view());
pts->name(colStep->name());
pts->tupleId(colStep->tupleId());
tbps->setProjectBPP(pts, NULL);
}
else
{
tbps->setProjectBPP(it->get(), NULL);
}
TupleInfo ti(getTupleInfo(colStep->tupleId(), jobInfo));
pos.push_back(pos.back() + ti.width);
oids.push_back(ti.oid);
keys.push_back(ti.key);
types.push_back(ti.dtype);
csNums.push_back(ti.csNum);
scale.push_back(ti.scale);
precision.push_back(ti.precision);
}
RowGroup rg(oids.size(), pos, oids, keys, types, csNums, scale, precision, 20);
SJSTEP sjsp;
sjsp.reset(tbps);
// Add tuple BPS step to query steps.
querySteps.push_back(sjsp);
// Delivery step.
SJSTEP annexStep;
auto* tas = new TupleAnnexStep(jobInfo);
annexStep.reset(tas);
// Create input `RowGroupDL`.
RowGroupDL* dlIn = new RowGroupDL(1, jobInfo.fifoSize);
dlIn->OID(CNX_VTABLE_ID);
AnyDataListSPtr spdlIn(new AnyDataList());
spdlIn->rowGroupDL(dlIn);
JobStepAssociation jsaIn;
jsaIn.outAdd(spdlIn);
// Create output `RowGroupDL`.
RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize);
dlOut->OID(CNX_VTABLE_ID);
AnyDataListSPtr spdlOut(new AnyDataList());
spdlOut->rowGroupDL(dlOut);
JobStepAssociation jsaOut;
jsaOut.outAdd(spdlOut);
// Set input and output.
tbps->setOutputRowGroup(rg);
tbps->outputAssociation(jsaIn);
annexStep->inputAssociation(jsaIn);
annexStep->outputAssociation(jsaOut);
// Initialize.
tas->initialize(rg, jobInfo);
// Add `annexStep` to delivery steps and to query steps.
deliverySteps[CNX_VTABLE_ID] = annexStep;
querySteps.push_back(annexStep);
if (jobInfo.trace)
{
std::cout << "TupleBPS created: " << std::endl;
std::cout << tbps->toString() << std::endl;
std::cout << "Result row group: " << std::endl;
std::cout << rg.toString() << std::endl;
}
} }
} // namespace
namespace joblist namespace joblist
{ {
@ -1833,179 +1986,17 @@ void makeUnionJobSteps(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo,
numberSteps(querySteps, stepNo, jobInfo.traceFlags); numberSteps(querySteps, stepNo, jobInfo.traceFlags);
deliverySteps[execplan::CNX_VTABLE_ID] = unionStep; deliverySteps[execplan::CNX_VTABLE_ID] = unionStep;
} }
} // namespace joblist
}
namespace namespace
{ {
SJLP makeJobList_( void handleException(std::exception_ptr e, JobList* jl, JobInfo& jobInfo, unsigned& errCode,
CalpontExecutionPlan* cplan, string& emsg)
ResourceManager* rm,
bool isExeMgr,
unsigned& errCode, string& emsg)
{ {
CalpontSelectExecutionPlan* csep = dynamic_cast<CalpontSelectExecutionPlan*>(cplan);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(csep->sessionID());
static config::Config* sysConfig = config::Config::makeConfig();
int pmsConfigured = atoi(sysConfig->getConfig("PrimitiveServers", "Count").c_str());
// We have to go ahead and create JobList now so we can store the joblist's
// projectTableOID pointer in JobInfo for use during jobstep creation.
SErrorInfo errorInfo(new ErrorInfo());
boost::shared_ptr<TupleKeyInfo> keyInfo(new TupleKeyInfo);
boost::shared_ptr<int> subCount(new int);
*subCount = 0;
JobList* jl = new TupleJobList(isExeMgr);
jl->setPMsConfigured(pmsConfigured);
jl->priority(csep->priority());
jl->errorInfo(errorInfo);
rm->setTraceFlags(csep->traceFlags());
//Stuff a util struct with some stuff we always need
JobInfo jobInfo(rm);
jobInfo.sessionId = csep->sessionID();
jobInfo.txnId = csep->txnID();
jobInfo.verId = csep->verID();
jobInfo.statementId = csep->statementID();
jobInfo.queryType = csep->queryType();
jobInfo.csc = csc;
//TODO: clean up the vestiges of the bool trace
jobInfo.trace = csep->traceOn();
jobInfo.traceFlags = csep->traceFlags();
jobInfo.isExeMgr = isExeMgr;
// jobInfo.tryTuples = tryTuples; // always tuples after release 3.0
jobInfo.stringScanThreshold = csep->stringScanThreshold();
jobInfo.errorInfo = errorInfo;
jobInfo.keyInfo = keyInfo;
jobInfo.subCount = subCount;
jobInfo.projectingTableOID = jl->projectingTableOIDPtr();
jobInfo.jobListPtr = jl;
jobInfo.stringTableThreshold = csep->stringTableThreshold();
jobInfo.localQuery = csep->localQuery();
jobInfo.uuid = csep->uuid();
jobInfo.timeZone = csep->timeZone();
/* disk-based join vars */
jobInfo.smallSideLimit = csep->djsSmallSideLimit();
jobInfo.largeSideLimit = csep->djsLargeSideLimit();
jobInfo.partitionSize = csep->djsPartitionSize();
jobInfo.umMemLimit.reset(new int64_t);
*(jobInfo.umMemLimit) = csep->umMemLimit();
jobInfo.isDML = csep->isDML();
jobInfo.smallSideUsage.reset(new int64_t);
*jobInfo.smallSideUsage = 0;
// set fifoSize to 1 for CalpontSystemCatalog query
if (csep->sessionID() & 0x80000000)
jobInfo.fifoSize = 1;
else if (csep->traceOn())
cout << (*csep) << endl;
try try
{ {
JobStepVector querySteps; std::rethrow_exception(e);
JobStepVector projectSteps;
DeliveredTableMap deliverySteps;
if (csep->unionVec().size() == 0)
makeJobSteps(csep, jobInfo, querySteps, projectSteps, deliverySteps);
else
makeUnionJobSteps(csep, jobInfo, querySteps, projectSteps, deliverySteps);
uint16_t stepNo = numberSteps(querySteps, 0, jobInfo.traceFlags);
stepNo = numberSteps(projectSteps, stepNo, jobInfo.traceFlags);
struct timeval stTime;
if (jobInfo.trace)
{
ostringstream oss;
oss << endl;
oss << endl << "job parms: " << endl;
oss << "maxBuckets = " << jobInfo.maxBuckets << ", maxElems = " << jobInfo.maxElems <<
", flushInterval = " << jobInfo.flushInterval <<
", fifoSize = " << jobInfo.fifoSize <<
", ScanLimit/Threshold = " << jobInfo.scanLbidReqLimit << "/" <<
jobInfo.scanLbidReqThreshold << endl;
oss << "UUID: " << jobInfo.uuid << endl;
oss << endl << "job filter steps: " << endl;
ostream_iterator<JobStepVector::value_type> oIter(oss, "\n");
copy(querySteps.begin(), querySteps.end(), oIter);
oss << endl << "job project steps: " << endl;
copy(projectSteps.begin(), projectSteps.end(), oIter);
oss << endl << "job delivery steps: " << endl;
DeliveredTableMap::iterator dsi = deliverySteps.begin();
while (dsi != deliverySteps.end())
{
oss << dynamic_cast<const JobStep*>(dsi->second.get()) << endl;
++dsi;
}
oss << endl;
gettimeofday(&stTime, 0);
struct tm tmbuf;
#ifdef _MSC_VER
errno_t p = 0;
time_t t = stTime.tv_sec;
p = localtime_s(&tmbuf, &t);
if (p != 0)
memset(&tmbuf, 0, sizeof(tmbuf));
#else
localtime_r(&stTime.tv_sec, &tmbuf);
#endif
ostringstream tms;
tms << setfill('0')
<< setw(4) << (tmbuf.tm_year + 1900)
<< setw(2) << (tmbuf.tm_mon + 1)
<< setw(2) << (tmbuf.tm_mday)
<< setw(2) << (tmbuf.tm_hour)
<< setw(2) << (tmbuf.tm_min)
<< setw(2) << (tmbuf.tm_sec)
<< setw(6) << (stTime.tv_usec);
string tmstr(tms.str());
string jsrname("jobstep." + tmstr + ".dot");
ofstream dotFile(jsrname.c_str());
jlf_graphics::writeDotCmds(dotFile, querySteps, projectSteps);
char timestamp[80];
#ifdef _MSC_VER
t = stTime.tv_sec;
p = ctime_s(timestamp, 80, &t);
if (p != 0)
strcpy(timestamp, "UNKNOWN");
#else
ctime_r((const time_t*)&stTime.tv_sec, timestamp);
#endif
oss << "runtime updates: start at " << timestamp;
cout << oss.str();
Message::Args args;
args.add(oss.str());
jobInfo.logger->logMessage(LOG_TYPE_DEBUG, LogSQLTrace, args,
LoggingID(5, jobInfo.sessionId, jobInfo.txnId, 0));
cout << flush;
}
else
{
gettimeofday(&stTime, 0);
}
// Finish initializing the JobList object
jl->addQuery(querySteps);
jl->addProject(projectSteps);
jl->addDelivery(deliverySteps);
csep->setDynamicParseTreeVec(jobInfo.dynamicParseTreeVec);
dynamic_cast<TupleJobList*>(jl)->setDeliveryFlag(true);
} }
catch (IDBExcept& iex) catch (IDBExcept& iex)
{ {
@ -2013,7 +2004,6 @@ SJLP makeJobList_(
errCode = iex.errorCode(); errCode = iex.errorCode();
exceptionHandler(jl, jobInfo, iex.what(), LOG_TYPE_DEBUG); exceptionHandler(jl, jobInfo, iex.what(), LOG_TYPE_DEBUG);
emsg = iex.what(); emsg = iex.what();
goto bailout;
} }
catch (const std::exception& ex) catch (const std::exception& ex)
{ {
@ -2021,7 +2011,6 @@ SJLP makeJobList_(
errCode = makeJobListErr; errCode = makeJobListErr;
exceptionHandler(jl, jobInfo, ex.what()); exceptionHandler(jl, jobInfo, ex.what());
emsg = ex.what(); emsg = ex.what();
goto bailout;
} }
catch (...) catch (...)
{ {
@ -2029,24 +2018,302 @@ SJLP makeJobList_(
errCode = makeJobListErr; errCode = makeJobListErr;
exceptionHandler(jl, jobInfo, "an exception"); exceptionHandler(jl, jobInfo, "an exception");
emsg = "An unknown internal joblist error"; emsg = "An unknown internal joblist error";
goto bailout;
} }
goto done;
bailout:
delete jl; delete jl;
jl = 0; jl = nullptr;
if (emsg.empty())
emsg = "An unknown internal joblist error";
done:
SJLP jlp(jl);
return jlp;
} }
SJLP makeJobList_(
CalpontExecutionPlan* cplan,
ResourceManager* rm,
bool isExeMgr,
unsigned& errCode, string& emsg)
{
// TODO: This part requires a proper refactoring, we have to move common methods from
// `CalpontSelectExecutionPlan` to the base class. I have no idea what's a point of
// `CalpontExecutionPlan` as base class without any meaningful virtual functions and common
// fields. The main thing I concern about - to make a huge refactoring of this before a week
// of release, because there is no time to test it and I do not want to introduce an errors in
// the existing code. Hope will make it on the next iteration of statistics development.
CalpontSelectExecutionPlan* csep = dynamic_cast<CalpontSelectExecutionPlan*>(cplan);
if (csep != nullptr)
{
boost::shared_ptr<CalpontSystemCatalog> csc =
CalpontSystemCatalog::makeCalpontSystemCatalog(csep->sessionID());
static config::Config* sysConfig = config::Config::makeConfig();
int pmsConfigured = atoi(sysConfig->getConfig("PrimitiveServers", "Count").c_str());
// We have to go ahead and create JobList now so we can store the joblist's
// projectTableOID pointer in JobInfo for use during jobstep creation.
SErrorInfo errorInfo(new ErrorInfo());
boost::shared_ptr<TupleKeyInfo> keyInfo(new TupleKeyInfo);
boost::shared_ptr<int> subCount(new int);
*subCount = 0;
JobList* jl = new TupleJobList(isExeMgr);
jl->setPMsConfigured(pmsConfigured);
jl->priority(csep->priority());
jl->errorInfo(errorInfo);
rm->setTraceFlags(csep->traceFlags());
// Stuff a util struct with some stuff we always need
JobInfo jobInfo(rm);
jobInfo.sessionId = csep->sessionID();
jobInfo.txnId = csep->txnID();
jobInfo.verId = csep->verID();
jobInfo.statementId = csep->statementID();
jobInfo.queryType = csep->queryType();
jobInfo.csc = csc;
// TODO: clean up the vestiges of the bool trace
jobInfo.trace = csep->traceOn();
jobInfo.traceFlags = csep->traceFlags();
jobInfo.isExeMgr = isExeMgr;
// jobInfo.tryTuples = tryTuples; // always tuples after release 3.0
jobInfo.stringScanThreshold = csep->stringScanThreshold();
jobInfo.errorInfo = errorInfo;
jobInfo.keyInfo = keyInfo;
jobInfo.subCount = subCount;
jobInfo.projectingTableOID = jl->projectingTableOIDPtr();
jobInfo.jobListPtr = jl;
jobInfo.stringTableThreshold = csep->stringTableThreshold();
jobInfo.localQuery = csep->localQuery();
jobInfo.uuid = csep->uuid();
jobInfo.timeZone = csep->timeZone();
/* disk-based join vars */
jobInfo.smallSideLimit = csep->djsSmallSideLimit();
jobInfo.largeSideLimit = csep->djsLargeSideLimit();
jobInfo.partitionSize = csep->djsPartitionSize();
jobInfo.umMemLimit.reset(new int64_t);
*(jobInfo.umMemLimit) = csep->umMemLimit();
jobInfo.isDML = csep->isDML();
jobInfo.smallSideUsage.reset(new int64_t);
*jobInfo.smallSideUsage = 0;
// set fifoSize to 1 for CalpontSystemCatalog query
if (csep->sessionID() & 0x80000000)
jobInfo.fifoSize = 1;
else if (csep->traceOn())
cout << (*csep) << endl;
try
{
JobStepVector querySteps;
JobStepVector projectSteps;
DeliveredTableMap deliverySteps;
if (csep->unionVec().size() == 0)
makeJobSteps(csep, jobInfo, querySteps, projectSteps, deliverySteps);
else
makeUnionJobSteps(csep, jobInfo, querySteps, projectSteps, deliverySteps);
uint16_t stepNo = numberSteps(querySteps, 0, jobInfo.traceFlags);
stepNo = numberSteps(projectSteps, stepNo, jobInfo.traceFlags);
struct timeval stTime;
if (jobInfo.trace)
{
ostringstream oss;
oss << endl;
oss << endl << "job parms: " << endl;
oss << "maxBuckets = " << jobInfo.maxBuckets << ", maxElems = " << jobInfo.maxElems
<< ", flushInterval = " << jobInfo.flushInterval
<< ", fifoSize = " << jobInfo.fifoSize
<< ", ScanLimit/Threshold = " << jobInfo.scanLbidReqLimit << "/"
<< jobInfo.scanLbidReqThreshold << endl;
oss << "UUID: " << jobInfo.uuid << endl;
oss << endl << "job filter steps: " << endl;
ostream_iterator<JobStepVector::value_type> oIter(oss, "\n");
copy(querySteps.begin(), querySteps.end(), oIter);
oss << endl << "job project steps: " << endl;
copy(projectSteps.begin(), projectSteps.end(), oIter);
oss << endl << "job delivery steps: " << endl;
DeliveredTableMap::iterator dsi = deliverySteps.begin();
while (dsi != deliverySteps.end())
{
oss << dynamic_cast<const JobStep*>(dsi->second.get()) << endl;
++dsi;
}
oss << endl;
gettimeofday(&stTime, 0);
struct tm tmbuf;
#ifdef _MSC_VER
errno_t p = 0;
time_t t = stTime.tv_sec;
p = localtime_s(&tmbuf, &t);
if (p != 0)
memset(&tmbuf, 0, sizeof(tmbuf));
#else
localtime_r(&stTime.tv_sec, &tmbuf);
#endif
ostringstream tms;
tms << setfill('0') << setw(4) << (tmbuf.tm_year + 1900) << setw(2)
<< (tmbuf.tm_mon + 1) << setw(2) << (tmbuf.tm_mday) << setw(2)
<< (tmbuf.tm_hour) << setw(2) << (tmbuf.tm_min) << setw(2) << (tmbuf.tm_sec)
<< setw(6) << (stTime.tv_usec);
string tmstr(tms.str());
string jsrname("jobstep." + tmstr + ".dot");
ofstream dotFile(jsrname.c_str());
jlf_graphics::writeDotCmds(dotFile, querySteps, projectSteps);
char timestamp[80];
#ifdef _MSC_VER
t = stTime.tv_sec;
p = ctime_s(timestamp, 80, &t);
if (p != 0)
strcpy(timestamp, "UNKNOWN");
#else
ctime_r((const time_t*) &stTime.tv_sec, timestamp);
#endif
oss << "runtime updates: start at " << timestamp;
cout << oss.str();
Message::Args args;
args.add(oss.str());
jobInfo.logger->logMessage(LOG_TYPE_DEBUG, LogSQLTrace, args,
LoggingID(5, jobInfo.sessionId, jobInfo.txnId, 0));
cout << flush;
}
else
{
gettimeofday(&stTime, 0);
}
// Finish initializing the JobList object
jl->addQuery(querySteps);
jl->addProject(projectSteps);
jl->addDelivery(deliverySteps);
csep->setDynamicParseTreeVec(jobInfo.dynamicParseTreeVec);
dynamic_cast<TupleJobList*>(jl)->setDeliveryFlag(true);
}
catch (IDBExcept& iex)
{
jobInfo.errorInfo->errCode = iex.errorCode();
errCode = iex.errorCode();
exceptionHandler(jl, jobInfo, iex.what(), LOG_TYPE_DEBUG);
emsg = iex.what();
goto bailout;
}
catch (const std::exception& ex)
{
jobInfo.errorInfo->errCode = makeJobListErr;
errCode = makeJobListErr;
exceptionHandler(jl, jobInfo, ex.what());
emsg = ex.what();
goto bailout;
}
catch (...)
{
jobInfo.errorInfo->errCode = makeJobListErr;
errCode = makeJobListErr;
exceptionHandler(jl, jobInfo, "an exception");
emsg = "An unknown internal joblist error";
goto bailout;
}
goto done;
bailout:
delete jl;
jl = 0;
if (emsg.empty())
emsg = "An unknown internal joblist error";
done:
SJLP jlp(jl);
return jlp;
}
else
{
auto* caep = dynamic_cast<MCSAnalyzeTableExecutionPlan*>(cplan);
JobList* jl = nullptr;
if (caep == nullptr)
{
SJLP jlp(jl);
std::cerr << "Ivalid execution plan" << std::endl;
return jlp;
}
jl = new TupleJobList(isExeMgr);
boost::shared_ptr<CalpontSystemCatalog> csc =
CalpontSystemCatalog::makeCalpontSystemCatalog(caep->sessionID());
static config::Config* sysConfig = config::Config::makeConfig();
uint32_t pmsConfigured = atoi(sysConfig->getConfig("PrimitiveServers", "Count").c_str());
SErrorInfo errorInfo(new ErrorInfo());
boost::shared_ptr<TupleKeyInfo> keyInfo(new TupleKeyInfo);
boost::shared_ptr<int> subCount(new int);
*subCount = 0;
jl->setPMsConfigured(pmsConfigured);
jl->priority(caep->priority());
jl->errorInfo(errorInfo);
JobInfo jobInfo(rm);
jobInfo.sessionId = caep->sessionID();
jobInfo.txnId = caep->txnID();
jobInfo.verId = caep->verID();
jobInfo.statementId = caep->statementID();
jobInfo.csc = csc;
jobInfo.trace = caep->traceOn();
jobInfo.isExeMgr = isExeMgr;
// TODO: Implement it when we have a dict column.
jobInfo.stringScanThreshold = 20;
jobInfo.errorInfo = errorInfo;
jobInfo.keyInfo = keyInfo;
jobInfo.subCount = subCount;
jobInfo.projectingTableOID = jl->projectingTableOIDPtr();
jobInfo.jobListPtr = jl;
jobInfo.localQuery = caep->localQuery();
jobInfo.timeZone = caep->timeZone();
try
{
JobStepVector querySteps;
DeliveredTableMap deliverySteps;
// Parse exe plan and create a jobstesp from it.
makeAnalyzeTableJobSteps(caep, jobInfo, querySteps, deliverySteps);
if (!querySteps.size())
{
delete jl;
// Indicates that query steps is empty.
errCode = logging::statisticsJobListEmpty;
jl = nullptr;
goto out;
}
numberSteps(querySteps, 0, jobInfo.traceFlags);
jl->addQuery(querySteps);
jl->addDelivery(deliverySteps);
dynamic_cast<TupleJobList*>(jl)->setDeliveryFlag(true);
}
catch (...)
{
handleException(std::current_exception(), jl, jobInfo, errCode, emsg);
}
out:
SJLP jlp(jl);
return jlp;
}
} }
} // namespace
namespace joblist namespace joblist
{ {

View File

@ -200,6 +200,24 @@ const char** ha_mcs::bas_ext() const
return ha_mcs_exts; return ha_mcs_exts;
} }
int ha_mcs::analyze(THD* thd, HA_CHECK_OPT* check_opt)
{
DBUG_ENTER("ha_mcs::analyze");
int rc;
try
{
rc = ha_mcs_impl_analyze(thd, table);
}
catch (std::runtime_error& e)
{
thd->raise_error_printf(ER_INTERNAL_ERROR, e.what());
rc = ER_INTERNAL_ERROR;
}
DBUG_RETURN(rc);
}
/** /**
@brief @brief
Used for opening tables. The name will be the name of the file. Used for opening tables. The name will be the name of the file.

View File

@ -126,6 +126,11 @@ public:
return (double) (stats.records + stats.deleted) / 20.0 + 10; return (double) (stats.records + stats.deleted) / 20.0 + 10;
} }
/** @brief
Analyze table command.
*/
int analyze(THD* thd, HA_CHECK_OPT* check_opt);
/* /*
Everything below are methods that we implement in ha_example.cc. Everything below are methods that we implement in ha_example.cc.

View File

@ -105,6 +105,7 @@ using namespace BRM;
using namespace querystats; using namespace querystats;
#include "calpontselectexecutionplan.h" #include "calpontselectexecutionplan.h"
#include "mcsanalyzetableexecutionplan.h"
#include "calpontsystemcatalog.h" #include "calpontsystemcatalog.h"
#include "simplecolumn_int.h" #include "simplecolumn_int.h"
#include "simplecolumn_decimal.h" #include "simplecolumn_decimal.h"
@ -142,6 +143,7 @@ using namespace funcexp;
#include "ha_mcs_sysvars.h" #include "ha_mcs_sysvars.h"
#include "ha_mcs_datatype.h" #include "ha_mcs_datatype.h"
#include "statistics.h"
namespace cal_impl_if namespace cal_impl_if
{ {
@ -1897,8 +1899,248 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
return rc; return rc;
} }
inline bool isSupportedToAnalyze(const execplan::CalpontSystemCatalog::ColType& colType)
{
return colType.isUnsignedInteger() || colType.isSignedInteger();
}
// Initializes `cal_connection_info` using given `thd` and `sessionID`.
bool initializeCalConnectionInfo(cal_connection_info* ci, THD* thd,
boost::shared_ptr<execplan::CalpontSystemCatalog> csc,
uint32_t sessionID, bool localQuery)
{
ci->stats.reset();
ci->stats.setStartTime();
if (thd->main_security_ctx.user)
{
ci->stats.fUser = thd->main_security_ctx.user;
}
else
{
ci->stats.fUser = "";
}
if (thd->main_security_ctx.host)
ci->stats.fHost = thd->main_security_ctx.host;
else if (thd->main_security_ctx.host_or_ip)
ci->stats.fHost = thd->main_security_ctx.host_or_ip;
else
ci->stats.fHost = "unknown";
try
{
ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
}
catch (std::exception& e)
{
string msg = string("Columnstore User Priority - ") + e.what();
ci->warningMsg = msg;
}
if (ci->queryState != 0)
{
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
sm::sm_init(sessionID, &ci->cal_conn_hndl, localQuery);
idbassert(ci->cal_conn_hndl != 0);
ci->cal_conn_hndl->csc = csc;
idbassert(ci->cal_conn_hndl->exeMgr != 0);
try
{
ci->cal_conn_hndl->connect();
}
catch (...)
{
setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
return false;
}
return true;
}
bool sendExecutionPlanToExeMgr(sm::cpsm_conhdl_t* hndl, ByteStream::quadbyte qb,
std::shared_ptr<execplan::MCSAnalyzeTableExecutionPlan> caep,
cal_connection_info* ci, THD* thd)
{
ByteStream msg;
try
{
msg << qb;
hndl->exeMgr->write(msg);
msg.restart();
caep->rmParms(ci->rmParms);
// Send the execution plan.
caep->serialize(msg);
hndl->exeMgr->write(msg);
// Get the status from ExeMgr.
msg.restart();
msg = hndl->exeMgr->read();
// Any return code is ok for now.
if (msg.length() == 0)
{
auto emsg = "Lost connection to ExeMgr. Please contact your administrator";
setError(thd, ER_INTERNAL_ERROR, emsg);
return false;
}
}
catch (...)
{
return false;
}
return true;
}
} //anon namespace } //anon namespace
int ha_mcs_impl_analyze(THD* thd, TABLE* table)
{
uint32_t sessionID = execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id);
boost::shared_ptr<execplan::CalpontSystemCatalog> csc =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(execplan::CalpontSystemCatalog::FE);
auto table_name =
execplan::make_table(table->s->db.str, table->s->table_name.str, lower_case_table_names);
// Skip for now.
if (table->s->db.length && strcmp(table->s->db.str, "information_schema") == 0)
return 0;
bool columnStore = (table ? isMCSTable(table) : true);
// Skip non columnstore tables.
if (!columnStore)
return 0;
execplan::CalpontSystemCatalog::RIDList oidlist = csc->columnRIDs(table_name, true);
execplan::MCSAnalyzeTableExecutionPlan::ReturnedColumnList returnedColumnList;
execplan::MCSAnalyzeTableExecutionPlan::ColumnMap columnMap;
// Iterate over table oid list and create a `SimpleColumn` for every column with supported type.
for (uint32_t i = 0, e = oidlist.size(); i < e; ++i)
{
execplan::SRCP returnedColumn;
const auto objNum = oidlist[i].objnum;
auto tableColName = csc->colName(objNum);
auto colType = csc->colType(objNum);
if (!isSupportedToAnalyze(colType))
continue;
execplan::SimpleColumn* simpleColumn = new execplan::SimpleColumn();
simpleColumn->columnName(tableColName.column);
simpleColumn->tableName(tableColName.table, lower_case_table_names);
simpleColumn->schemaName(tableColName.schema, lower_case_table_names);
simpleColumn->oid(objNum);
simpleColumn->alias(tableColName.column);
simpleColumn->resultType(colType);
simpleColumn->timeZone(thd->variables.time_zone->get_name()->ptr());
returnedColumn.reset(simpleColumn);
returnedColumnList.push_back(returnedColumn);
columnMap.insert(execplan::MCSAnalyzeTableExecutionPlan::ColumnMap::value_type(
simpleColumn->columnName(), returnedColumn));
}
// Create execution plan and initialize it with `returned columns` and `column map`.
std::shared_ptr<execplan::MCSAnalyzeTableExecutionPlan> caep(
new execplan::MCSAnalyzeTableExecutionPlan(returnedColumnList, columnMap));
caep->schemaName(table->s->db.str, lower_case_table_names);
caep->tableName(table->s->table_name.str, lower_case_table_names);
caep->timeZone(thd->variables.time_zone->get_name()->ptr());
SessionManager sm;
BRM::TxnID txnID;
txnID = sm.getTxnID(sessionID);
if (!txnID.valid)
{
txnID.id = 0;
txnID.valid = true;
}
QueryContext verID;
verID = sm.verID();
caep->txnID(txnID.id);
caep->verID(verID);
caep->sessionID(sessionID);
string query;
query.assign(idb_mysql_query_str(thd));
caep->data(query);
if (!get_fe_conn_info_ptr())
set_fe_conn_info_ptr(reinterpret_cast<void*>(new cal_connection_info(), thd));
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
idbassert(ci != 0);
try
{
caep->priority(ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser));
}
catch (std::exception& e)
{
string msg = string("Columnstore User Priority - ") + e.what();
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
}
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
force_close_fep_conn(thd, ci);
return 0;
}
caep->traceFlags(ci->traceFlags);
cal_table_info ti;
sm::cpsm_conhdl_t* hndl;
bool localQuery = (get_local_query(thd) > 0 ? true : false);
caep->localQuery(localQuery);
// Try to initialize connection.
if (!initializeCalConnectionInfo(ci, thd, csc, sessionID, localQuery))
goto error;
hndl = ci->cal_conn_hndl;
if (caep->traceOn())
std::cout << caep->toString() << std::endl;
{
ByteStream::quadbyte qb = ANALYZE_TABLE_EXECUTE;
// Serialize and the send the `anlyze table` execution plan.
if (!sendExecutionPlanToExeMgr(hndl, qb, caep, ci, thd))
goto error;
}
ci->rmParms.clear();
ci->tableMap[table] = ti;
return 0;
error:
if (ci->cal_conn_hndl)
{
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return ER_INTERNAL_ERROR;
}
int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked) int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked)
{ {
IDEBUG ( cout << "ha_mcs_impl_open: " << name << ", " << mode << ", " << test_if_locked << endl ); IDEBUG ( cout << "ha_mcs_impl_open: " << name << ", " << mode << ", " << test_if_locked << endl );

View File

@ -27,6 +27,7 @@ struct mcs_handler_info;
extern int ha_mcs_impl_discover_existence(const char* schema, const char* name); extern int ha_mcs_impl_discover_existence(const char* schema, const char* name);
extern int ha_mcs_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO* create_info); extern int ha_mcs_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO* create_info);
extern int ha_mcs_impl_delete_table(const char* name); extern int ha_mcs_impl_delete_table(const char* name);
extern int ha_mcs_impl_analyze(THD* thd, TABLE* table);
extern int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked); extern int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked);
extern int ha_mcs_impl_close(void); extern int ha_mcs_impl_close(void);
extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table); extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table);

View File

@ -48,6 +48,7 @@
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include "calpontselectexecutionplan.h" #include "calpontselectexecutionplan.h"
#include "mcsanalyzetableexecutionplan.h"
#include "activestatementcounter.h" #include "activestatementcounter.h"
#include "distributedenginecomm.h" #include "distributedenginecomm.h"
#include "resourcemanager.h" #include "resourcemanager.h"
@ -77,7 +78,7 @@
#include "dbrm.h" #include "dbrm.h"
#include "mariadb_my_sys.h" #include "mariadb_my_sys.h"
#include "statistics.h"
class Opt class Opt
{ {
@ -604,7 +605,136 @@ private:
fIos.write(emsgBs); fIos.write(emsgBs);
} }
public: void analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted)
{
messageqcpp::ByteStream::quadbyte qb;
execplan::MCSAnalyzeTableExecutionPlan caep;
bs = fIos.read();
caep.unserialize(bs);
statementsRunningCount->incr(stmtCounted);
jl = joblist::JobListFactory::makeJobList(&caep, fRm, false, true);
// Joblist is empty.
if (jl->status() == logging::statisticsJobListEmpty)
{
if (caep.traceOn())
std::cout << "JobList is empty " << std::endl;
jl.reset();
bs.restart();
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
statementsRunningCount->decr(stmtCounted);
return;
}
if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
{
std::cout << "fEc setup " << std::endl;
fEc->Setup();
}
if (jl->status() == 0)
{
std::string emsg;
if (jl->putEngineComm(fEc) != 0)
throw std::runtime_error(jl->errMsg());
}
else
{
throw std::runtime_error("ExeMgr: could not build a JobList!");
}
// Execute a joblist.
jl->doQuery();
FEMsgHandler msgHandler(jl, &fIos);
msgHandler.start();
auto rowCount = jl->projectTable(100, bs);
msgHandler.stop();
auto outRG = (static_cast<joblist::TupleJobList*>(jl.get()))->getOutputRowGroup();
if (caep.traceOn())
std::cout << "Row count " << rowCount << std::endl;
// Process `RowGroup`, increase an epoch and save statistics to the file.
auto* statisticsManager = statistics::StatisticsManager::instance();
statisticsManager->analyzeColumnKeyTypes(outRG, caep.traceOn());
statisticsManager->incEpoch();
statisticsManager->saveToFile();
// Distribute statistics across all ExeMgr clients if possible.
statistics::StatisticsDistributor::instance()->distributeStatistics();
// Send the signal back to front-end.
bs.restart();
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
statementsRunningCount->decr(stmtCounted);
}
void analyzeTableHandleStats(messageqcpp::ByteStream& bs)
{
messageqcpp::ByteStream::quadbyte qb;
#ifdef DEBUG_STATISTICS
std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) "
<< std::endl;
#endif
bs = fIos.read();
#ifdef DEBUG_STATISTICS
std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) "
<< std::endl;
#endif
uint64_t dataHashRec;
bs >> dataHashRec;
uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats();
// The stats are the same.
if (dataHash == dataHashRec)
{
#ifdef DEBUG_STATISTICS
std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) "
<< std::endl;
#endif
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
return;
}
bs.restart();
qb = ANALYZE_TABLE_NEED_STATS;
bs << qb;
fIos.write(bs);
bs.restart();
bs = fIos.read();
#ifdef DEBUG_STATISTICS
std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl;
#endif
statistics::StatisticsManager::instance()->unserialize(bs);
statistics::StatisticsManager::instance()->saveToFile();
#ifdef DEBUG_STATISTICS
std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl;
#endif
qb = ANALYZE_TABLE_SUCCESS;
bs << qb;
fIos.write(bs);
bs.reset();
}
public:
void operator()() void operator()()
{ {
@ -691,6 +821,16 @@ public:
fIos.close(); fIos.close();
break; break;
} }
else if (qb == ANALYZE_TABLE_EXECUTE)
{
analyzeTableExecute(bs, jl, stmtCounted);
continue;
}
else if (qb == ANALYZE_TABLE_REC_STATS)
{
analyzeTableHandleStats(bs);
continue;
}
else else
{ {
if (gDebug) if (gDebug)
@ -1706,6 +1846,16 @@ int ServiceExeMgr::Child()
exeMgrThreadPool.invoke(threadpool::ThreadPoolMonitor(&exeMgrThreadPool)); exeMgrThreadPool.invoke(threadpool::ThreadPoolMonitor(&exeMgrThreadPool));
} }
// Load statistics.
try
{
statistics::StatisticsManager::instance()->loadFromFile();
}
catch (...)
{
std::cerr << "Cannot load statistics from file " << std::endl;
}
for (;;) for (;;)
{ {
messageqcpp::IOSocket ios; messageqcpp::IOSocket ios;

View File

@ -0,0 +1,27 @@
DROP DATABASE IF EXISTS analyze_table_db;
CREATE DATABASE analyze_table_db;
USE analyze_table_db;
create table t1 (a int, b int, c int) engine=columnstore;
insert into t1 values (1, 2, 3), (2, 2, 2), (2, 3, 4);
analyze table t1;
Table Op Msg_type Msg_text
analyze_table_db.t1 analyze status OK
create table t2 (a int, b double) engine=columnstore;
insert into t2 values (2, 3), (3, 4);
analyze table t2;
Table Op Msg_type Msg_text
analyze_table_db.t2 analyze status OK
create table t3 (a varchar(25)) engine=columnstore;
insert into t3 values ("a"), ("b");
analyze table t3;
Table Op Msg_type Msg_text
analyze_table_db.t3 analyze status OK
analyze table t1, t2, t3;
Table Op Msg_type Msg_text
analyze_table_db.t1 analyze status OK
analyze_table_db.t2 analyze status OK
analyze_table_db.t3 analyze status OK
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE t3;
DROP DATABASE analyze_table_db;

View File

@ -0,0 +1,28 @@
#-- source ../include/have_columnstore.inc
--disable_warnings
DROP DATABASE IF EXISTS analyze_table_db;
--enable_warnings
CREATE DATABASE analyze_table_db;
USE analyze_table_db;
create table t1 (a int, b int, c int) engine=columnstore;
insert into t1 values (1, 2, 3), (2, 2, 2), (2, 3, 4);
analyze table t1;
create table t2 (a int, b double) engine=columnstore;
insert into t2 values (2, 3), (3, 4);
analyze table t2;
create table t3 (a varchar(25)) engine=columnstore;
insert into t3 values ("a"), ("b");
analyze table t3;
analyze table t1, t2, t3;
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE t3;
DROP DATABASE analyze_table_db;

View File

@ -10,10 +10,13 @@ set(common_LIB_SRCS
MonitorProcMem.cpp MonitorProcMem.cpp
nullvaluemanip.cpp nullvaluemanip.cpp
threadnaming.cpp threadnaming.cpp
utils_utf8.cpp) utils_utf8.cpp
statistics.cpp)
add_library(common SHARED ${common_LIB_SRCS}) add_library(common SHARED ${common_LIB_SRCS})
target_link_libraries(common boost_filesystem)
add_dependencies(common loggingcpp) add_dependencies(common loggingcpp)
install(TARGETS common DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine) install(TARGETS common DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine)

435
utils/common/statistics.cpp Normal file
View File

@ -0,0 +1,435 @@
/* Copyright (C) 2021 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <iostream>
#include <atomic>
#include <boost/filesystem.hpp>
#include "statistics.h"
#include "IDBPolicy.h"
#include "brmtypes.h"
#include "hasher.h"
#include "messagequeue.h"
#include "configcpp.h"
using namespace idbdatafile;
using namespace logging;
namespace statistics
{
using ColumnsCache = std::vector<std::unordered_set<uint32_t>>;
StatisticsManager* StatisticsManager::instance()
{
static StatisticsManager* sm = new StatisticsManager();
return sm;
}
void StatisticsManager::analyzeColumnKeyTypes(const rowgroup::RowGroup& rowGroup, bool trace)
{
std::lock_guard<std::mutex> lock(mut);
auto rowCount = rowGroup.getRowCount();
const auto columnCount = rowGroup.getColumnCount();
if (!rowCount || !columnCount)
return;
auto& oids = rowGroup.getOIDs();
rowgroup::Row r;
rowGroup.initRow(&r);
rowGroup.getRow(0, &r);
ColumnsCache columns(columnCount, std::unordered_set<uint32_t>());
// Init key types.
for (uint32_t index = 0; index < columnCount; ++index)
keyTypes[oids[index]] = KeyType::PK;
const uint32_t maxRowCount = 4096;
// TODO: We should read just couple of blocks from columns, not all data, but this requires
// more deep refactoring of column commands.
rowCount = std::min(rowCount, maxRowCount);
// This is strange, it's a CS but I'm processing data as row by row, how to fix it?
for (uint32_t i = 0; i < rowCount; ++i)
{
for (uint32_t j = 0; j < columnCount; ++j)
{
if (r.isNullValue(j) || columns[j].count(r.getIntField(j)))
keyTypes[oids[j]] = KeyType::FK;
else
columns[j].insert(r.getIntField(j));
}
r.nextRow();
}
if (trace)
output(StatisticsType::PK_FK);
}
void StatisticsManager::output(StatisticsType statisticsType)
{
if (statisticsType == StatisticsType::PK_FK)
{
std::cout << "Columns count: " << keyTypes.size() << std::endl;
for (const auto& p : keyTypes)
std::cout << p.first << " " << (int) p.second << std::endl;
}
}
// Someday it will be a virtual method, based on statistics type we processing.
std::unique_ptr<char[]> StatisticsManager::convertStatsToDataStream(uint64_t& dataStreamSize)
{
// Number of pairs.
uint64_t count = keyTypes.size();
// count, [[uid, keyType], ... ]
dataStreamSize = sizeof(uint64_t) + count * (sizeof(uint32_t) + sizeof(KeyType));
// Allocate memory for data stream.
std::unique_ptr<char[]> dataStreamSmartPtr(new char[dataStreamSize]);
auto* dataStream = dataStreamSmartPtr.get();
// Initialize the data stream.
uint64_t offset = 0;
std::memcpy(dataStream, reinterpret_cast<char*>(&count), sizeof(uint64_t));
offset += sizeof(uint64_t);
// For each pair [oid, key type].
for (const auto& p : keyTypes)
{
uint32_t oid = p.first;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&oid), sizeof(uint32_t));
offset += sizeof(uint32_t);
KeyType keyType = p.second;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&keyType), sizeof(KeyType));
offset += sizeof(KeyType);
}
return dataStreamSmartPtr;
}
void StatisticsManager::saveToFile()
{
std::lock_guard<std::mutex> lock(mut);
const char* fileName = statsFile.c_str();
std::unique_ptr<IDBDataFile> out(
IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, "wb", 1));
if (!out)
{
BRM::log_errno("StatisticsManager::saveToFile(): open");
throw ios_base::failure("StatisticsManager::saveToFile(): open failed.");
}
// Compute hash.
uint64_t dataStreamSize = 0;
std::unique_ptr<char[]> dataStreamSmartPtr = convertStatsToDataStream(dataStreamSize);
utils::Hasher128 hasher;
// Prepare a statistics file header.
const uint32_t headerSize = sizeof(StatisticsFileHeader);
StatisticsFileHeader fileHeader;
std::memset(&fileHeader, 0, headerSize);
fileHeader.version = version;
fileHeader.epoch = epoch;
fileHeader.dataSize = dataStreamSize;
// Compute hash from the data.
fileHeader.dataHash = hasher(dataStreamSmartPtr.get(), dataStreamSize);
// Write statistics file header.
uint64_t size = out->write(reinterpret_cast<char*>(&fileHeader), headerSize);
if (size != headerSize)
{
auto rc = IDBPolicy::remove(fileName);
if (rc == -1)
std::cerr << "Cannot remove file " << fileName << std::endl;
throw ios_base::failure("StatisticsManager::saveToFile(): write failed. ");
}
// Write data.
size = out->write(dataStreamSmartPtr.get(), dataStreamSize);
if (size != dataStreamSize)
{
auto rc = IDBPolicy::remove(fileName);
if (rc == -1)
std::cerr << "Cannot remove file " << fileName << std::endl;
throw ios_base::failure("StatisticsManager::saveToFile(): write failed. ");
}
}
void StatisticsManager::loadFromFile()
{
std::lock_guard<std::mutex> lock(mut);
// Check that stats file does exist.
if (!boost::filesystem::exists(statsFile))
return;
const char* fileName = statsFile.c_str();
std::unique_ptr<IDBDataFile> in(
IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, "rb", 1));
if (!in)
{
BRM::log_errno("StatisticsManager::loadFromFile(): open");
throw ios_base::failure(
"StatisticsManager::loadFromFile(): open failed. Check the error log.");
}
// Read the file header.
StatisticsFileHeader fileHeader;
const uint32_t headerSize = sizeof(StatisticsFileHeader);
int64_t size = in->read(reinterpret_cast<char*>(&fileHeader), headerSize);
if (size != headerSize)
throw ios_base::failure("StatisticsManager::loadFromFile(): read failed. ");
// Initialize fields from the file header.
version = fileHeader.version;
epoch = fileHeader.epoch;
const auto dataHash = fileHeader.dataHash;
const auto dataStreamSize = fileHeader.dataSize;
// Allocate the memory for the file data.
std::unique_ptr<char[]> dataStreamSmartPtr(new char[dataStreamSize]);
auto* dataStream = dataStreamSmartPtr.get();
// Read the data.
uint64_t dataOffset = 0;
auto sizeToRead = dataStreamSize;
size = in->read(dataStream, sizeToRead);
sizeToRead -= size;
dataOffset += size;
while (sizeToRead > 0)
{
size = in->read(dataStream + dataOffset, sizeToRead);
if (size < 0)
throw ios_base::failure("StatisticsManager::loadFromFile(): read failed. ");
sizeToRead -= size;
dataOffset += size;
}
utils::Hasher128 hasher;
auto computedDataHash = hasher(dataStream, dataStreamSize);
if (dataHash != computedDataHash)
throw ios_base::failure("StatisticsManager::loadFromFile(): invalid file hash. ");
uint64_t count = 0;
std::memcpy(reinterpret_cast<char*>(&count), dataStream, sizeof(uint64_t));
uint64_t offset = sizeof(uint64_t);
// For each pair.
for (uint64_t i = 0; i < count; ++i)
{
uint32_t oid;
KeyType keyType;
std::memcpy(reinterpret_cast<char*>(&oid), &dataStream[offset], sizeof(uint32_t));
offset += sizeof(uint32_t);
std::memcpy(reinterpret_cast<char*>(&keyType), &dataStream[offset], sizeof(KeyType));
offset += sizeof(KeyType);
// Insert pair.
keyTypes[oid] = keyType;
}
}
uint64_t StatisticsManager::computeHashFromStats()
{
utils::Hasher128 hasher;
uint64_t dataStreamSize = 0;
std::unique_ptr<char[]> dataStreamSmartPtr = convertStatsToDataStream(dataStreamSize);
return hasher(dataStreamSmartPtr.get(), dataStreamSize);
}
void StatisticsManager::serialize(messageqcpp::ByteStream& bs)
{
uint64_t count = keyTypes.size();
bs << version;
bs << epoch;
bs << count;
for (const auto& keyType : keyTypes)
{
bs << keyType.first;
bs << (uint32_t) keyType.second;
}
}
void StatisticsManager::unserialize(messageqcpp::ByteStream& bs)
{
uint64_t count;
bs >> version;
bs >> epoch;
bs >> count;
for (uint32_t i = 0; i < count; ++i)
{
uint32_t oid, keyType;
bs >> oid;
bs >> keyType;
keyTypes[oid] = static_cast<KeyType>(keyType);
}
}
StatisticsDistributor* StatisticsDistributor::instance()
{
static StatisticsDistributor* sd = new StatisticsDistributor();
return sd;
}
void StatisticsDistributor::distributeStatistics()
{
countClients();
{
std::lock_guard<std::mutex> lock(mut);
// No clients.
if (clientsCount == 0)
return;
#ifdef DEBUG_STATISTICS
std::cout << "Distribute statistics from ExeMgr(Server) to ExeMgr(Clients) " << std::endl;
#endif
messageqcpp::ByteStream msg, statsHash, statsBs;
// Current hash.
statsHash << statistics::StatisticsManager::instance()->computeHashFromStats();
// Statistics.
statistics::StatisticsManager::instance()->serialize(statsBs);
for (uint32_t i = 0; i < clientsCount; ++i)
{
try
{
messageqcpp::ByteStream::quadbyte qb = ANALYZE_TABLE_REC_STATS;
msg << qb;
auto exeMgrID = "ExeMgr" + std::to_string(i + 2);
// Create a client.
std::unique_ptr<messageqcpp::MessageQueueClient> exemgrClient(
new messageqcpp::MessageQueueClient(exeMgrID));
#ifdef DEBUG_STATISTICS
std::cout << "Try to connect to " << exeMgrID << std::endl;
#endif
// Try to connect to the client.
if (!exemgrClient->connect())
{
msg.restart();
#ifdef DEBUG_STATISTICS
std::cout << "Unable to connect to " << exeMgrID << std::endl;
#endif
continue;
}
#ifdef DEBUG_STATISTICS
std::cout
<< "Write flag ANALYZE_TABLE_REC_STATS from ExeMgr(Server) to ExeMgr(Clients) "
<< std::endl;
#endif
// Write a flag to client ExeMgr.
exemgrClient->write(msg);
#ifdef DEBUG_STATISTICS
std::cout << "Write statistics hash from ExeMgr(Server) to ExeMgr(Clients) "
<< std::endl;
#endif
// Write a hash of the stats.
exemgrClient->write(statsHash);
// Read the state from Client.
msg.restart();
msg = exemgrClient->read();
msg >> qb;
// Do not need a stats.
if (qb == ANALYZE_TABLE_SUCCESS)
{
msg.restart();
continue;
}
#ifdef DEBUG_STATISTICS
std::cout << "Write statistics bytestream from ExeMgr(Server) to ExeMgr(Clients) "
<< std::endl;
#endif
// Write a statistics to client ExeMgr.
exemgrClient->write(statsBs);
// Read the flag back from the client ExeMgr.
msg.restart();
msg = exemgrClient->read();
if (msg.length() == 0)
throw runtime_error("Lost conection to ExeMgr.");
#ifdef DEBUG_STATISTICS
std::cout << "Read flag on ExeMgr(Server) from ExeMgr(Client) " << std::endl;
#endif
msg.restart();
}
catch (std::exception& e)
{
msg.restart();
std::cerr << "distributeStatistics() failed with error: " << e.what() << std::endl;
}
catch (...)
{
msg.restart();
std::cerr << "distributeStatistics() failed with unknown error." << std::endl;
}
}
}
}
void StatisticsDistributor::countClients()
{
#ifdef DEBUG_STATISTICS
std::cout << "count clients to distribute statistics " << std::endl;
#endif
auto* config = config::Config::makeConfig();
// Starting from the ExeMgr2, since the Server starts on the ExeMgr1.
std::atomic<uint32_t> exeMgrNumber(2);
try
{
while (true)
{
auto exeMgrID = "ExeMgr" + std::to_string(exeMgrNumber);
auto exeMgrIP = config->getConfig(exeMgrID, "IPAddr");
if (exeMgrIP == "")
break;
#ifdef DEBUG_STATISTICS
std::cout << "Client: " << exeMgrID << std::endl;
#endif
++exeMgrNumber;
}
}
catch (std::exception& e)
{
std::cerr << "countClients() failed with error: " << e.what() << std::endl;
}
catch (...)
{
std::cerr << "countClients() failed with unknown error: ";
}
clientsCount = exeMgrNumber - 2;
#ifdef DEBUG_STATISTICS
std::cout << "Number of clients: " << clientsCount << std::endl;
#endif
}
} // namespace statistics

122
utils/common/statistics.h Normal file
View File

@ -0,0 +1,122 @@
/* Copyright (C) 2021 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#ifndef STATISTICS_H
#define STATISTICS_H
#include "rowgroup.h"
#include "logger.h"
#include "hasher.h"
#include "IDBPolicy.h"
#include <map>
#include <unordered_set>
#include <mutex>
// Represents a commands for `ExeMgr`.
#define ANALYZE_TABLE_EXECUTE 6
#define ANALYZE_TABLE_REC_STATS 7
#define ANALYZE_TABLE_NEED_STATS 8
#define ANALYZE_TABLE_SUCCESS 9
// #define DEBUG_STATISTICS
using namespace idbdatafile;
namespace statistics
{
// Represents a column key type:
// PK - primary key.
// FK - foreign key.
enum class KeyType : uint32_t
{
PK,
FK
};
// Rerpresents types of statistics CS supports.
enum class StatisticsType : uint32_t
{
// A special statistics type, made to solve circular inner join problem.
PK_FK
};
// Represetns a header for the statistics file.
struct StatisticsFileHeader
{
uint64_t version;
uint64_t epoch;
uint64_t dataHash;
uint64_t dataSize;
uint8_t offset[1024];
};
// This class is responsible for processing and storing statistics.
// On each `analyze table` iteration it increases an epoch and stores
// the updated statistics into the special file.
class StatisticsManager
{
public:
// Returns the instance of this class, static initialization happens only once.
static StatisticsManager* instance();
// Analyzes the given `rowGroup` by processing it row by row and searching for foreign key.
void analyzeColumnKeyTypes(const rowgroup::RowGroup& rowGroup, bool trace);
// Ouputs stats to out stream.
void output(StatisticsType statisticsType = StatisticsType::PK_FK);
// Saves stats to the file.
void saveToFile();
// Loads stats from the file.
void loadFromFile();
void incEpoch() { ++epoch; }
// Serialize stats to the given `bs`.
void serialize(messageqcpp::ByteStream& bs);
// Unserialize stats from the given `bs`.
void unserialize(messageqcpp::ByteStream& bs);
// Computes hash from the current statistics data.
uint64_t computeHashFromStats();
private:
std::map<uint32_t, KeyType> keyTypes;
StatisticsManager() : epoch(0), version(1) { IDBPolicy::init(true, false, "", 0); }
std::unique_ptr<char[]> convertStatsToDataStream(uint64_t& dataStreamSize);
std::mutex mut;
uint32_t epoch;
uint32_t version;
std::string statsFile = "/var/lib/columnstore/local/statistics";
};
// This class is responsible for distributing the statistics across all `ExeMgr` in a cluster.
class StatisticsDistributor
{
public:
// Returns the instance of this class, static initialization happens only once.
static StatisticsDistributor* instance();
// Distribute stats across all `ExeMgr` in cluster by connecting to them using config file.
void distributeStatistics();
private:
StatisticsDistributor() : clientsCount(0) {}
// Count the number of clients by reading config file and evaluating `ExeMgr` fields.
void countClients();
uint32_t clientsCount;
std::mutex mut;
};
} // namespace statistics
#endif

View File

@ -77,7 +77,8 @@ enum ErrorCodeValues
dataTypeErr, dataTypeErr,
incompatJoinCols, incompatJoinCols,
incompatFilterCols, incompatFilterCols,
aggregateResourceErr aggregateResourceErr,
statisticsJobListEmpty = 301
}; };
struct ErrorCodes struct ErrorCodes