diff --git a/dbcon/execplan/CMakeLists.txt b/dbcon/execplan/CMakeLists.txt index 798290c67..bc8daac03 100755 --- a/dbcon/execplan/CMakeLists.txt +++ b/dbcon/execplan/CMakeLists.txt @@ -12,6 +12,7 @@ set(execplan_LIB_SRCS calpontexecutionplan.cpp calpontexecutionplanfactory.cpp calpontselectexecutionplan.cpp + mcsanalyzetableexecutionplan.cpp clientrotator.cpp constantcolumn.cpp constantfilter.cpp diff --git a/dbcon/execplan/calpontexecutionplan.h b/dbcon/execplan/calpontexecutionplan.h index 3b43b7d5e..213962ef4 100644 --- a/dbcon/execplan/calpontexecutionplan.h +++ b/dbcon/execplan/calpontexecutionplan.h @@ -51,6 +51,29 @@ class CalpontExecutionPlan */ public: + /** + * Flags that can be passed to caltraceon(). + */ + enum TRACE_FLAGS + { + TRACE_NONE = 0x0000, /*!< No tracing */ + TRACE_LOG = 0x0001, /*!< Full return of rows, extra debug about query in log */ + TRACE_NO_ROWS1 = 0x0002, /*!< Same as above, but rows not given to OCI layer */ + TRACE_NO_ROWS2 = 0x0004, /*!< Same as above, but rows not converted from stream */ + TRACE_NO_ROWS3 = 0x0008, /*!< Same as above, but rows not sent to DM from UM */ + TRACE_NO_ROWS4 = 0x0010, /*!< Same as above, but rows not sent to DeliveryStep */ + TRACE_LBIDS = 0x0020, /*!< Enable LBID tracing in PrimProc */ + TRACE_PLAN_ONLY = 0x0040, /*!< Only generate a serialized CSEP */ + PM_PROFILE = 0x0080, /*!< Enable PM profiling in PrimProc */ + IGNORE_CP = 0x0100, /*!< Ignore casual partitioning metadata */ + WRITE_TO_FILE = 0x0200, /*!< writes table rows out to a file from the Oracle connector */ + NOWRITE_TO_FILE = 0x0400, /*!< does not write table rows out to a file from the Oracle connector */ + TRACE_DISKIO_UM = 0x0800, /*!< Enable UM disk I/O logging */ + TRACE_RESRCMGR = 0x1000, /*!< Trace Resource Manager Usage */ + TRACE_TUPLE_AUTOSWITCH = 0x4000, /*!< Enable MySQL tuple-to-table auto switch */ + TRACE_TUPLE_OFF = 0x8000, /*!< Enable MySQL table interface */ + }; + /** * Constructors */ diff --git a/dbcon/execplan/calpontselectexecutionplan.h b/dbcon/execplan/calpontselectexecutionplan.h index 42664b71c..403ec3d8f 100644 --- a/dbcon/execplan/calpontselectexecutionplan.h +++ b/dbcon/execplan/calpontselectexecutionplan.h @@ -135,29 +135,6 @@ public: UNKNOWN_SUBS }; - /** - * Flags that can be passed to caltraceon(). - */ - enum TRACE_FLAGS - { - TRACE_NONE = 0x0000, /*!< No tracing */ - TRACE_LOG = 0x0001, /*!< Full return of rows, extra debug about query in log */ - TRACE_NO_ROWS1 = 0x0002, /*!< Same as above, but rows not given to OCI layer */ - TRACE_NO_ROWS2 = 0x0004, /*!< Same as above, but rows not converted from stream */ - TRACE_NO_ROWS3 = 0x0008, /*!< Same as above, but rows not sent to DM from UM */ - TRACE_NO_ROWS4 = 0x0010, /*!< Same as above, but rows not sent to DeliveryStep */ - TRACE_LBIDS = 0x0020, /*!< Enable LBID tracing in PrimProc */ - TRACE_PLAN_ONLY = 0x0040, /*!< Only generate a serialized CSEP */ - PM_PROFILE = 0x0080, /*!< Enable PM profiling in PrimProc */ - IGNORE_CP = 0x0100, /*!< Ignore casual partitioning metadata */ - WRITE_TO_FILE = 0x0200, /*!< writes table rows out to a file from the Oracle connector */ - NOWRITE_TO_FILE = 0x0400, /*!< does not write table rows out to a file from the Oracle connector */ - TRACE_DISKIO_UM = 0x0800, /*!< Enable UM disk I/O logging */ - TRACE_RESRCMGR = 0x1000, /*!< Trace Resource Manager Usage */ - TRACE_TUPLE_AUTOSWITCH = 0x4000, /*!< Enable MySQL tuple-to-table auto switch */ - TRACE_TUPLE_OFF = 0x8000, /*!< Enable MySQL table interface */ - }; - enum IDB_LOCAL_QUERY { GLOBAL_QUERY = 0, /*!< Standard processing */ diff --git a/dbcon/execplan/mcsanalyzetableexecutionplan.cpp b/dbcon/execplan/mcsanalyzetableexecutionplan.cpp new file mode 100644 index 000000000..a383dc88f --- /dev/null +++ b/dbcon/execplan/mcsanalyzetableexecutionplan.cpp @@ -0,0 +1,156 @@ +/* Copyright (C) 2021 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +using namespace std; + +#include + +#include "bytestream.h" +using namespace messageqcpp; + +#include "mcsanalyzetableexecutionplan.h" +#include "objectreader.h" +#include "filter.h" +#include "returnedcolumn.h" +#include "simplecolumn.h" +#include "querystats.h" + +#include "querytele.h" +using namespace querytele; + +namespace execplan +{ + +std::string MCSAnalyzeTableExecutionPlan::toString() const +{ + std::ostringstream output; + output << ">ANALYZE TABLE " << std::endl; + output << "Shema: " << fSchemaName << std::endl; + output << "Table: " << fTableName << std::endl; + + output << ">>Returned Columns" << std::endl; + + for (auto& rColumn : fReturnedCols) + output << *rColumn << std::endl; + + output << "--- Column Map ---" << std::endl; + CalpontSelectExecutionPlan::ColumnMap::const_iterator iter; + + for (iter = columnMap().begin(); iter != columnMap().end(); iter++) + output << (*iter).first << " : " << (*iter).second << endl; + + output << "SessionID: " << fSessionID << std::endl; + output << "TxnID: " << fTxnID << std::endl; + output << "VerID: " << fVerID << std::endl; + + return output.str(); +} + +void MCSAnalyzeTableExecutionPlan::serialize(messageqcpp::ByteStream& bs) const +{ + bs << static_cast(ObjectReader::MCSANALYZETBLEXECUTIONPLAN); + + // Returned columns. + bs << static_cast(fReturnedCols.size()); + for (auto& rColumn : fReturnedCols) + rColumn->serialize(bs); + + // Column map. + bs << static_cast(fColumnMap.size()); + for (auto& column : fColumnMap) + { + bs << column.first; + column.second->serialize(bs); + } + + bs << static_cast(frmParms.size()); + + for (RMParmVec::const_iterator it = frmParms.begin(); it != frmParms.end(); ++it) + { + bs << it->sessionId; + bs << it->id; + bs << it->value; + } + + bs << fData; + bs << static_cast(fSessionID); + bs << static_cast(fTxnID); + bs << fVerID; + bs << fStatementID; + bs << static_cast(fStringScanThreshold); + bs << fPriority; + bs << fSchemaName; + bs << fTableName; + bs << fLocalQuery; + bs << fTimeZone; + bs << fTraceFlags; +} + +void MCSAnalyzeTableExecutionPlan::unserialize(messageqcpp::ByteStream& bs) +{ + ObjectReader::checkType(bs, ObjectReader::MCSANALYZETBLEXECUTIONPLAN); + fReturnedCols.clear(); + fColumnMap.clear(); + uint32_t size; + + bs >> size; + for (uint32_t i = 0; i < size; ++i) + { + auto* returnedColumn = dynamic_cast(ObjectReader::createTreeNode(bs)); + SRCP srcp(returnedColumn); + fReturnedCols.push_back(srcp); + } + + bs >> size; + for (uint32_t i = 0; i < size; ++i) + { + std::string colName; + bs >> colName; + auto* returnedColumn = dynamic_cast(ObjectReader::createTreeNode(bs)); + SRCP srcp(returnedColumn); + fColumnMap.insert(ColumnMap::value_type(colName, srcp)); + } + + bs >> size; + messageqcpp::ByteStream::doublebyte id; + messageqcpp::ByteStream::quadbyte sessionId; + messageqcpp::ByteStream::octbyte memory; + + for (uint32_t i = 0; i < size; i++) + { + bs >> sessionId; + bs >> id; + bs >> memory; + frmParms.push_back(RMParam(sessionId, id, memory)); + } + + bs >> fData; + bs >> reinterpret_cast(fSessionID); + bs >> reinterpret_cast(fTxnID); + bs >> fVerID; + bs >> fStatementID; + bs >> reinterpret_cast(fStringScanThreshold); + bs >> fPriority; + bs >> fSchemaName; + bs >> fTableName; + bs >> fLocalQuery; + bs >> fTimeZone; + bs >> fTraceFlags; +} +} // namespace execplan diff --git a/dbcon/execplan/mcsanalyzetableexecutionplan.h b/dbcon/execplan/mcsanalyzetableexecutionplan.h new file mode 100644 index 000000000..4090174be --- /dev/null +++ b/dbcon/execplan/mcsanalyzetableexecutionplan.h @@ -0,0 +1,170 @@ +/* Copyright (C) 2021 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#ifndef MCSANALYZETABLEXEXCUTIONPLAN_H +#define MCSANALYZETABLEXEXCUTIONPLAN_H +#include +#include +#include + +#include + +#include "exp_templates.h" +#include "calpontexecutionplan.h" +#include "calpontselectexecutionplan.h" +#include "returnedcolumn.h" +#include "filter.h" +#include "expressionparser.h" +#include "calpontsystemcatalog.h" +#include "brmtypes.h" +#include "objectreader.h" +#include + +/** + * Namespace + */ +namespace execplan +{ + +class MCSAnalyzeTableExecutionPlan : public CalpontExecutionPlan +{ + public: + typedef std::vector ReturnedColumnList; + typedef std::multimap ColumnMap; + typedef std::vector RMParmVec; + + MCSAnalyzeTableExecutionPlan() : fTraceFlags(TRACE_NONE) {} + + MCSAnalyzeTableExecutionPlan(const ReturnedColumnList& returnedCols, + const ColumnMap& columnMap) + : fReturnedCols(returnedCols), fColumnMap(columnMap), fTraceFlags(TRACE_NONE) + { + } + + virtual ~MCSAnalyzeTableExecutionPlan() = default; + + const ReturnedColumnList& returnedCols() const { return fReturnedCols; } + ReturnedColumnList& returnedCols() { return fReturnedCols; } + + void returnedCols(const ReturnedColumnList& returnedCols) { fReturnedCols = returnedCols; } + + const ColumnMap& columnMap() const { return fColumnMap; } + + ColumnMap& columnMap() { return fColumnMap; } + + void columnMap(const ColumnMap& columnMap) { fColumnMap = columnMap; } + + const std::string data() const { return fData; } + + void data(const std::string data) { fData = data; } + + uint32_t sessionID() const { return fSessionID; } + + void sessionID(const uint32_t sessionID) { fSessionID = sessionID; } + + int txnID() const { return fTxnID; } + + void txnID(const int txnID) { fTxnID = txnID; } + + const BRM::QueryContext verID() const { return fVerID; } + + void verID(const BRM::QueryContext verID) { fVerID = verID; } + + inline std::string& schemaName() { return fSchemaName; } + + inline void schemaName(const std::string& schemaName, int lower_case_table_names) + { + fSchemaName = schemaName; + if (lower_case_table_names) + boost::algorithm::to_lower(fSchemaName); + } + + inline std::string& tableName() { return fTableName; } + + inline void tableName(const std::string& tableName, int lower_case_table_names) + { + fTableName = tableName; + if (lower_case_table_names) + boost::algorithm::to_lower(fTableName); + } + + uint32_t statementID() const { return fStatementID; } + + void statementID(const uint32_t statementID) { fStatementID = statementID; } + + void uuid(const boost::uuids::uuid& uuid) { fUuid = uuid; } + + const boost::uuids::uuid& uuid() const { return fUuid; } + + void timeZone(const std::string& timezone) { fTimeZone = timezone; } + + const std::string timeZone() const { return fTimeZone; } + + void priority(uint32_t p) { fPriority = p; } + + uint32_t priority() const { return fPriority; } + + const RMParmVec& rmParms() { return frmParms; } + + void rmParms(const RMParmVec& parms) + { + frmParms.clear(); + frmParms.assign(parms.begin(), parms.end()); + } + + uint32_t localQuery() const { return fLocalQuery; } + + void localQuery(const uint32_t localQuery) { fLocalQuery = localQuery; } + + inline bool traceOn() const { return (traceFlags() & TRACE_LOG); } + + inline uint32_t traceFlags() const { return fTraceFlags; } + + inline void traceFlags(uint32_t traceFlags) { fTraceFlags = traceFlags; } + + virtual std::string toString() const; + + virtual bool isInternal() const { return ((fSessionID & 0x80000000) != 0); } + + virtual void serialize(messageqcpp::ByteStream& bs) const; + + virtual void unserialize(messageqcpp::ByteStream& bs); + + // TODO: Why do we need this? + virtual bool operator==(const CalpontExecutionPlan* t) const { return false; } + virtual bool operator!=(const CalpontExecutionPlan* t) const { return false; } + + private: + ReturnedColumnList fReturnedCols; + ColumnMap fColumnMap; + uint32_t fSessionID; + int fTxnID; + BRM::QueryContext fVerID; + std::string fSchemaName; + std::string fTableName; + uint32_t fTraceFlags; + boost::uuids::uuid fUuid; + std::string fTimeZone; + uint32_t fStatementID; + uint64_t fStringScanThreshold; + std::string fData; + RMParmVec frmParms; + uint32_t fPriority; + uint32_t fLocalQuery; +}; +} // namespace execplan +#endif diff --git a/dbcon/execplan/objectreader.h b/dbcon/execplan/objectreader.h index af3682bd0..a3cba13c5 100644 --- a/dbcon/execplan/objectreader.h +++ b/dbcon/execplan/objectreader.h @@ -123,6 +123,9 @@ public: /** UDAF SDK */ MCSV1_CONTEXT, UDAFCOLUMN, + + /** ANALYZE TABLE */ + MCSANALYZETBLEXECUTIONPLAN, }; typedef uint8_t id_t; //expand as necessary diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index 43b70fdf2..c3bef8098 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -39,6 +39,7 @@ using namespace boost; #include "calpontexecutionplan.h" #include "calpontselectexecutionplan.h" +#include "mcsanalyzetableexecutionplan.h" #include "calpontsystemcatalog.h" #include "dbrm.h" #include "filter.h" @@ -79,6 +80,7 @@ using namespace logging; #include "tupleconstantstep.h" #include "tuplehavingstep.h" #include "windowfunctionstep.h" +#include "tupleannexstep.h" #include "jlf_common.h" #include "jlf_graphics.h" @@ -1766,8 +1768,159 @@ void makeVtableModeSteps(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo, // ds->setTraceFlags(jobInfo.traceFlags); } +void makeAnalyzeTableJobSteps(MCSAnalyzeTableExecutionPlan* caep, JobInfo& jobInfo, + JobStepVector& querySteps, DeliveredTableMap& deliverySteps) +{ + JobStepVector projectSteps; + const auto& retCols = caep->returnedCols(); + + if (retCols.size() == 0) + return; + + // This is strange, but to generate a valid `out rowgroup` in AnnexStep we have to initialize + // `nonConstDelCols`, otherwise we will get an empty result in ExeMgr. Why don't just use `in + // row group`? + jobInfo.nonConstDelCols = retCols; + + // Iterate over `returned columns` and create a `pColStep` for each. + for (uint32_t i = 0; i < retCols.size(); i++) + { + const SimpleColumn* sc = dynamic_cast(retCols[i].get()); + CalpontSystemCatalog::OID oid = sc->oid(); + CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc); + CalpontSystemCatalog::ColType colType; + + if (!sc->schemaName().empty()) + { + SJSTEP sjstep; + colType = sc->colType(); + auto* pcs = new pColStep(oid, tblOid, colType, jobInfo); + pcs->alias(extractTableAlias(sc)); + pcs->view(sc->viewName()); + pcs->name(sc->columnName()); + pcs->cardinality(sc->cardinality()); + + auto ti = setTupleInfo(colType, oid, jobInfo, tblOid, sc, pcs->alias()); + pcs->tupleId(ti.key); + + sjstep.reset(pcs); + projectSteps.push_back(sjstep); + } + } + + if (projectSteps.size() == 0) + return; + + // Transform the first `pColStep` to `pColScanStep`. + SJSTEP firstStep = projectSteps.front(); + pColStep* colStep = static_cast(firstStep.get()); + // Create a `pColScanStep` using first `pColStep`. + pColScanStep* scanStep = new pColScanStep(*colStep); + scanStep->outputAssociation(JobStepAssociation()); + // Update first step. + firstStep.reset(scanStep); + + // Create tuple BPS step. + TupleBPS* tbps = new TupleBPS(*scanStep, jobInfo); + tbps->setFirstStepType(SCAN); + + // One `filter` step is scan step. + tbps->setBPP(scanStep); + tbps->setStepCount(); + + vector pos; + vector oids; + vector keys; + vector scale; + vector precision; + vector types; + vector csNums; + pos.push_back(2); + + bool passThruCreated = false; + for (JobStepVector::iterator it = projectSteps.begin(); it != projectSteps.end(); it++) + { + JobStep* js = it->get(); + auto* colStep = static_cast(js); + + // TODO: Hoist the condition branch from the cycle, it will look ugly, but probaby faster. + if (UNLIKELY(!passThruCreated)) + { + PassThruStep* pts = new PassThruStep(*colStep); + passThruCreated = true; + pts->alias(colStep->alias()); + pts->view(colStep->view()); + pts->name(colStep->name()); + pts->tupleId(colStep->tupleId()); + tbps->setProjectBPP(pts, NULL); + } + else + { + tbps->setProjectBPP(it->get(), NULL); + } + + TupleInfo ti(getTupleInfo(colStep->tupleId(), jobInfo)); + pos.push_back(pos.back() + ti.width); + oids.push_back(ti.oid); + keys.push_back(ti.key); + types.push_back(ti.dtype); + csNums.push_back(ti.csNum); + scale.push_back(ti.scale); + precision.push_back(ti.precision); + } + + RowGroup rg(oids.size(), pos, oids, keys, types, csNums, scale, precision, 20); + + SJSTEP sjsp; + sjsp.reset(tbps); + // Add tuple BPS step to query steps. + querySteps.push_back(sjsp); + + // Delivery step. + SJSTEP annexStep; + auto* tas = new TupleAnnexStep(jobInfo); + annexStep.reset(tas); + + // Create input `RowGroupDL`. + RowGroupDL* dlIn = new RowGroupDL(1, jobInfo.fifoSize); + dlIn->OID(CNX_VTABLE_ID); + AnyDataListSPtr spdlIn(new AnyDataList()); + spdlIn->rowGroupDL(dlIn); + JobStepAssociation jsaIn; + jsaIn.outAdd(spdlIn); + + // Create output `RowGroupDL`. + RowGroupDL* dlOut = new RowGroupDL(1, jobInfo.fifoSize); + dlOut->OID(CNX_VTABLE_ID); + AnyDataListSPtr spdlOut(new AnyDataList()); + spdlOut->rowGroupDL(dlOut); + JobStepAssociation jsaOut; + jsaOut.outAdd(spdlOut); + + // Set input and output. + tbps->setOutputRowGroup(rg); + tbps->outputAssociation(jsaIn); + annexStep->inputAssociation(jsaIn); + annexStep->outputAssociation(jsaOut); + + // Initialize. + tas->initialize(rg, jobInfo); + + // Add `annexStep` to delivery steps and to query steps. + deliverySteps[CNX_VTABLE_ID] = annexStep; + querySteps.push_back(annexStep); + + if (jobInfo.trace) + { + std::cout << "TupleBPS created: " << std::endl; + std::cout << tbps->toString() << std::endl; + std::cout << "Result row group: " << std::endl; + std::cout << rg.toString() << std::endl; + } } +} // namespace + namespace joblist { @@ -1833,179 +1986,17 @@ void makeUnionJobSteps(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo, numberSteps(querySteps, stepNo, jobInfo.traceFlags); deliverySteps[execplan::CNX_VTABLE_ID] = unionStep; } - - -} +} // namespace joblist namespace { -SJLP makeJobList_( - CalpontExecutionPlan* cplan, - ResourceManager* rm, - bool isExeMgr, - unsigned& errCode, string& emsg) +void handleException(std::exception_ptr e, JobList* jl, JobInfo& jobInfo, unsigned& errCode, + string& emsg) { - CalpontSelectExecutionPlan* csep = dynamic_cast(cplan); - boost::shared_ptr csc = CalpontSystemCatalog::makeCalpontSystemCatalog(csep->sessionID()); - - static config::Config* sysConfig = config::Config::makeConfig(); - int pmsConfigured = atoi(sysConfig->getConfig("PrimitiveServers", "Count").c_str()); - - // We have to go ahead and create JobList now so we can store the joblist's - // projectTableOID pointer in JobInfo for use during jobstep creation. - SErrorInfo errorInfo(new ErrorInfo()); - boost::shared_ptr keyInfo(new TupleKeyInfo); - boost::shared_ptr subCount(new int); - *subCount = 0; - JobList* jl = new TupleJobList(isExeMgr); - jl->setPMsConfigured(pmsConfigured); - jl->priority(csep->priority()); - jl->errorInfo(errorInfo); - rm->setTraceFlags(csep->traceFlags()); - - //Stuff a util struct with some stuff we always need - JobInfo jobInfo(rm); - jobInfo.sessionId = csep->sessionID(); - jobInfo.txnId = csep->txnID(); - jobInfo.verId = csep->verID(); - jobInfo.statementId = csep->statementID(); - jobInfo.queryType = csep->queryType(); - jobInfo.csc = csc; - //TODO: clean up the vestiges of the bool trace - jobInfo.trace = csep->traceOn(); - jobInfo.traceFlags = csep->traceFlags(); - jobInfo.isExeMgr = isExeMgr; -// jobInfo.tryTuples = tryTuples; // always tuples after release 3.0 - jobInfo.stringScanThreshold = csep->stringScanThreshold(); - jobInfo.errorInfo = errorInfo; - jobInfo.keyInfo = keyInfo; - jobInfo.subCount = subCount; - jobInfo.projectingTableOID = jl->projectingTableOIDPtr(); - jobInfo.jobListPtr = jl; - jobInfo.stringTableThreshold = csep->stringTableThreshold(); - jobInfo.localQuery = csep->localQuery(); - jobInfo.uuid = csep->uuid(); - jobInfo.timeZone = csep->timeZone(); - - /* disk-based join vars */ - jobInfo.smallSideLimit = csep->djsSmallSideLimit(); - jobInfo.largeSideLimit = csep->djsLargeSideLimit(); - jobInfo.partitionSize = csep->djsPartitionSize(); - jobInfo.umMemLimit.reset(new int64_t); - *(jobInfo.umMemLimit) = csep->umMemLimit(); - jobInfo.isDML = csep->isDML(); - - jobInfo.smallSideUsage.reset(new int64_t); - *jobInfo.smallSideUsage = 0; - - // set fifoSize to 1 for CalpontSystemCatalog query - if (csep->sessionID() & 0x80000000) - jobInfo.fifoSize = 1; - else if (csep->traceOn()) - cout << (*csep) << endl; - try { - JobStepVector querySteps; - JobStepVector projectSteps; - DeliveredTableMap deliverySteps; - - if (csep->unionVec().size() == 0) - makeJobSteps(csep, jobInfo, querySteps, projectSteps, deliverySteps); - else - makeUnionJobSteps(csep, jobInfo, querySteps, projectSteps, deliverySteps); - - uint16_t stepNo = numberSteps(querySteps, 0, jobInfo.traceFlags); - stepNo = numberSteps(projectSteps, stepNo, jobInfo.traceFlags); - - struct timeval stTime; - - if (jobInfo.trace) - { - ostringstream oss; - oss << endl; - oss << endl << "job parms: " << endl; - oss << "maxBuckets = " << jobInfo.maxBuckets << ", maxElems = " << jobInfo.maxElems << - ", flushInterval = " << jobInfo.flushInterval << - ", fifoSize = " << jobInfo.fifoSize << - ", ScanLimit/Threshold = " << jobInfo.scanLbidReqLimit << "/" << - jobInfo.scanLbidReqThreshold << endl; - oss << "UUID: " << jobInfo.uuid << endl; - oss << endl << "job filter steps: " << endl; - ostream_iterator oIter(oss, "\n"); - copy(querySteps.begin(), querySteps.end(), oIter); - oss << endl << "job project steps: " << endl; - copy(projectSteps.begin(), projectSteps.end(), oIter); - oss << endl << "job delivery steps: " << endl; - DeliveredTableMap::iterator dsi = deliverySteps.begin(); - - while (dsi != deliverySteps.end()) - { - oss << dynamic_cast(dsi->second.get()) << endl; - ++dsi; - } - - oss << endl; - gettimeofday(&stTime, 0); - - struct tm tmbuf; -#ifdef _MSC_VER - errno_t p = 0; - time_t t = stTime.tv_sec; - p = localtime_s(&tmbuf, &t); - - if (p != 0) - memset(&tmbuf, 0, sizeof(tmbuf)); - -#else - localtime_r(&stTime.tv_sec, &tmbuf); -#endif - ostringstream tms; - tms << setfill('0') - << setw(4) << (tmbuf.tm_year + 1900) - << setw(2) << (tmbuf.tm_mon + 1) - << setw(2) << (tmbuf.tm_mday) - << setw(2) << (tmbuf.tm_hour) - << setw(2) << (tmbuf.tm_min) - << setw(2) << (tmbuf.tm_sec) - << setw(6) << (stTime.tv_usec); - string tmstr(tms.str()); - string jsrname("jobstep." + tmstr + ".dot"); - ofstream dotFile(jsrname.c_str()); - jlf_graphics::writeDotCmds(dotFile, querySteps, projectSteps); - - char timestamp[80]; -#ifdef _MSC_VER - t = stTime.tv_sec; - p = ctime_s(timestamp, 80, &t); - - if (p != 0) - strcpy(timestamp, "UNKNOWN"); - -#else - ctime_r((const time_t*)&stTime.tv_sec, timestamp); -#endif - oss << "runtime updates: start at " << timestamp; - cout << oss.str(); - Message::Args args; - args.add(oss.str()); - jobInfo.logger->logMessage(LOG_TYPE_DEBUG, LogSQLTrace, args, - LoggingID(5, jobInfo.sessionId, jobInfo.txnId, 0)); - cout << flush; - } - else - { - gettimeofday(&stTime, 0); - } - - // Finish initializing the JobList object - jl->addQuery(querySteps); - jl->addProject(projectSteps); - jl->addDelivery(deliverySteps); - csep->setDynamicParseTreeVec(jobInfo.dynamicParseTreeVec); - - dynamic_cast(jl)->setDeliveryFlag(true); + std::rethrow_exception(e); } catch (IDBExcept& iex) { @@ -2013,7 +2004,6 @@ SJLP makeJobList_( errCode = iex.errorCode(); exceptionHandler(jl, jobInfo, iex.what(), LOG_TYPE_DEBUG); emsg = iex.what(); - goto bailout; } catch (const std::exception& ex) { @@ -2021,7 +2011,6 @@ SJLP makeJobList_( errCode = makeJobListErr; exceptionHandler(jl, jobInfo, ex.what()); emsg = ex.what(); - goto bailout; } catch (...) { @@ -2029,24 +2018,302 @@ SJLP makeJobList_( errCode = makeJobListErr; exceptionHandler(jl, jobInfo, "an exception"); emsg = "An unknown internal joblist error"; - goto bailout; } - - goto done; - -bailout: delete jl; - jl = 0; - - if (emsg.empty()) - emsg = "An unknown internal joblist error"; - -done: - SJLP jlp(jl); - return jlp; + jl = nullptr; } +SJLP makeJobList_( + CalpontExecutionPlan* cplan, + ResourceManager* rm, + bool isExeMgr, + unsigned& errCode, string& emsg) +{ + // TODO: This part requires a proper refactoring, we have to move common methods from + // `CalpontSelectExecutionPlan` to the base class. I have no idea what's a point of + // `CalpontExecutionPlan` as base class without any meaningful virtual functions and common + // fields. The main thing I concern about - to make a huge refactoring of this before a week + // of release, because there is no time to test it and I do not want to introduce an errors in + // the existing code. Hope will make it on the next iteration of statistics development. + CalpontSelectExecutionPlan* csep = dynamic_cast(cplan); + if (csep != nullptr) + { + boost::shared_ptr csc = + CalpontSystemCatalog::makeCalpontSystemCatalog(csep->sessionID()); + + static config::Config* sysConfig = config::Config::makeConfig(); + int pmsConfigured = atoi(sysConfig->getConfig("PrimitiveServers", "Count").c_str()); + + // We have to go ahead and create JobList now so we can store the joblist's + // projectTableOID pointer in JobInfo for use during jobstep creation. + SErrorInfo errorInfo(new ErrorInfo()); + boost::shared_ptr keyInfo(new TupleKeyInfo); + boost::shared_ptr subCount(new int); + *subCount = 0; + JobList* jl = new TupleJobList(isExeMgr); + jl->setPMsConfigured(pmsConfigured); + jl->priority(csep->priority()); + jl->errorInfo(errorInfo); + rm->setTraceFlags(csep->traceFlags()); + + // Stuff a util struct with some stuff we always need + JobInfo jobInfo(rm); + jobInfo.sessionId = csep->sessionID(); + jobInfo.txnId = csep->txnID(); + jobInfo.verId = csep->verID(); + jobInfo.statementId = csep->statementID(); + jobInfo.queryType = csep->queryType(); + jobInfo.csc = csc; + // TODO: clean up the vestiges of the bool trace + jobInfo.trace = csep->traceOn(); + jobInfo.traceFlags = csep->traceFlags(); + jobInfo.isExeMgr = isExeMgr; + // jobInfo.tryTuples = tryTuples; // always tuples after release 3.0 + jobInfo.stringScanThreshold = csep->stringScanThreshold(); + jobInfo.errorInfo = errorInfo; + jobInfo.keyInfo = keyInfo; + jobInfo.subCount = subCount; + jobInfo.projectingTableOID = jl->projectingTableOIDPtr(); + jobInfo.jobListPtr = jl; + jobInfo.stringTableThreshold = csep->stringTableThreshold(); + jobInfo.localQuery = csep->localQuery(); + jobInfo.uuid = csep->uuid(); + jobInfo.timeZone = csep->timeZone(); + + /* disk-based join vars */ + jobInfo.smallSideLimit = csep->djsSmallSideLimit(); + jobInfo.largeSideLimit = csep->djsLargeSideLimit(); + jobInfo.partitionSize = csep->djsPartitionSize(); + jobInfo.umMemLimit.reset(new int64_t); + *(jobInfo.umMemLimit) = csep->umMemLimit(); + jobInfo.isDML = csep->isDML(); + + jobInfo.smallSideUsage.reset(new int64_t); + *jobInfo.smallSideUsage = 0; + + // set fifoSize to 1 for CalpontSystemCatalog query + if (csep->sessionID() & 0x80000000) + jobInfo.fifoSize = 1; + else if (csep->traceOn()) + cout << (*csep) << endl; + + try + { + JobStepVector querySteps; + JobStepVector projectSteps; + DeliveredTableMap deliverySteps; + + if (csep->unionVec().size() == 0) + makeJobSteps(csep, jobInfo, querySteps, projectSteps, deliverySteps); + else + makeUnionJobSteps(csep, jobInfo, querySteps, projectSteps, deliverySteps); + + uint16_t stepNo = numberSteps(querySteps, 0, jobInfo.traceFlags); + stepNo = numberSteps(projectSteps, stepNo, jobInfo.traceFlags); + + struct timeval stTime; + + if (jobInfo.trace) + { + ostringstream oss; + oss << endl; + oss << endl << "job parms: " << endl; + oss << "maxBuckets = " << jobInfo.maxBuckets << ", maxElems = " << jobInfo.maxElems + << ", flushInterval = " << jobInfo.flushInterval + << ", fifoSize = " << jobInfo.fifoSize + << ", ScanLimit/Threshold = " << jobInfo.scanLbidReqLimit << "/" + << jobInfo.scanLbidReqThreshold << endl; + oss << "UUID: " << jobInfo.uuid << endl; + oss << endl << "job filter steps: " << endl; + ostream_iterator oIter(oss, "\n"); + copy(querySteps.begin(), querySteps.end(), oIter); + oss << endl << "job project steps: " << endl; + copy(projectSteps.begin(), projectSteps.end(), oIter); + oss << endl << "job delivery steps: " << endl; + DeliveredTableMap::iterator dsi = deliverySteps.begin(); + + while (dsi != deliverySteps.end()) + { + oss << dynamic_cast(dsi->second.get()) << endl; + ++dsi; + } + + oss << endl; + gettimeofday(&stTime, 0); + + struct tm tmbuf; +#ifdef _MSC_VER + errno_t p = 0; + time_t t = stTime.tv_sec; + p = localtime_s(&tmbuf, &t); + + if (p != 0) + memset(&tmbuf, 0, sizeof(tmbuf)); + +#else + localtime_r(&stTime.tv_sec, &tmbuf); +#endif + ostringstream tms; + tms << setfill('0') << setw(4) << (tmbuf.tm_year + 1900) << setw(2) + << (tmbuf.tm_mon + 1) << setw(2) << (tmbuf.tm_mday) << setw(2) + << (tmbuf.tm_hour) << setw(2) << (tmbuf.tm_min) << setw(2) << (tmbuf.tm_sec) + << setw(6) << (stTime.tv_usec); + string tmstr(tms.str()); + string jsrname("jobstep." + tmstr + ".dot"); + ofstream dotFile(jsrname.c_str()); + jlf_graphics::writeDotCmds(dotFile, querySteps, projectSteps); + + char timestamp[80]; +#ifdef _MSC_VER + t = stTime.tv_sec; + p = ctime_s(timestamp, 80, &t); + + if (p != 0) + strcpy(timestamp, "UNKNOWN"); + +#else + ctime_r((const time_t*) &stTime.tv_sec, timestamp); +#endif + oss << "runtime updates: start at " << timestamp; + cout << oss.str(); + Message::Args args; + args.add(oss.str()); + jobInfo.logger->logMessage(LOG_TYPE_DEBUG, LogSQLTrace, args, + LoggingID(5, jobInfo.sessionId, jobInfo.txnId, 0)); + cout << flush; + } + else + { + gettimeofday(&stTime, 0); + } + + // Finish initializing the JobList object + jl->addQuery(querySteps); + jl->addProject(projectSteps); + jl->addDelivery(deliverySteps); + csep->setDynamicParseTreeVec(jobInfo.dynamicParseTreeVec); + + dynamic_cast(jl)->setDeliveryFlag(true); + } + catch (IDBExcept& iex) + { + jobInfo.errorInfo->errCode = iex.errorCode(); + errCode = iex.errorCode(); + exceptionHandler(jl, jobInfo, iex.what(), LOG_TYPE_DEBUG); + emsg = iex.what(); + goto bailout; + } + catch (const std::exception& ex) + { + jobInfo.errorInfo->errCode = makeJobListErr; + errCode = makeJobListErr; + exceptionHandler(jl, jobInfo, ex.what()); + emsg = ex.what(); + goto bailout; + } + catch (...) + { + jobInfo.errorInfo->errCode = makeJobListErr; + errCode = makeJobListErr; + exceptionHandler(jl, jobInfo, "an exception"); + emsg = "An unknown internal joblist error"; + goto bailout; + } + + goto done; + + bailout: + delete jl; + jl = 0; + + if (emsg.empty()) + emsg = "An unknown internal joblist error"; + + done: + SJLP jlp(jl); + return jlp; + } + else + { + auto* caep = dynamic_cast(cplan); + JobList* jl = nullptr; + + if (caep == nullptr) + { + SJLP jlp(jl); + std::cerr << "Ivalid execution plan" << std::endl; + return jlp; + } + + jl = new TupleJobList(isExeMgr); + boost::shared_ptr csc = + CalpontSystemCatalog::makeCalpontSystemCatalog(caep->sessionID()); + + static config::Config* sysConfig = config::Config::makeConfig(); + uint32_t pmsConfigured = atoi(sysConfig->getConfig("PrimitiveServers", "Count").c_str()); + + SErrorInfo errorInfo(new ErrorInfo()); + boost::shared_ptr keyInfo(new TupleKeyInfo); + boost::shared_ptr subCount(new int); + *subCount = 0; + + jl->setPMsConfigured(pmsConfigured); + jl->priority(caep->priority()); + jl->errorInfo(errorInfo); + + JobInfo jobInfo(rm); + jobInfo.sessionId = caep->sessionID(); + jobInfo.txnId = caep->txnID(); + jobInfo.verId = caep->verID(); + jobInfo.statementId = caep->statementID(); + jobInfo.csc = csc; + jobInfo.trace = caep->traceOn(); + jobInfo.isExeMgr = isExeMgr; + // TODO: Implement it when we have a dict column. + jobInfo.stringScanThreshold = 20; + jobInfo.errorInfo = errorInfo; + jobInfo.keyInfo = keyInfo; + jobInfo.subCount = subCount; + jobInfo.projectingTableOID = jl->projectingTableOIDPtr(); + jobInfo.jobListPtr = jl; + jobInfo.localQuery = caep->localQuery(); + jobInfo.timeZone = caep->timeZone(); + + try + { + JobStepVector querySteps; + DeliveredTableMap deliverySteps; + + // Parse exe plan and create a jobstesp from it. + makeAnalyzeTableJobSteps(caep, jobInfo, querySteps, deliverySteps); + + if (!querySteps.size()) + { + delete jl; + // Indicates that query steps is empty. + errCode = logging::statisticsJobListEmpty; + jl = nullptr; + goto out; + } + + numberSteps(querySteps, 0, jobInfo.traceFlags); + + jl->addQuery(querySteps); + jl->addDelivery(deliverySteps); + + dynamic_cast(jl)->setDeliveryFlag(true); + } + catch (...) + { + handleException(std::current_exception(), jl, jobInfo, errCode, emsg); + } + + out: + SJLP jlp(jl); + return jlp; + } } +} // namespace namespace joblist { diff --git a/dbcon/mysql/ha_mcs.cpp b/dbcon/mysql/ha_mcs.cpp index 199886d14..5ae8a3a0c 100644 --- a/dbcon/mysql/ha_mcs.cpp +++ b/dbcon/mysql/ha_mcs.cpp @@ -200,6 +200,24 @@ const char** ha_mcs::bas_ext() const return ha_mcs_exts; } +int ha_mcs::analyze(THD* thd, HA_CHECK_OPT* check_opt) +{ + DBUG_ENTER("ha_mcs::analyze"); + + int rc; + try + { + rc = ha_mcs_impl_analyze(thd, table); + } + catch (std::runtime_error& e) + { + thd->raise_error_printf(ER_INTERNAL_ERROR, e.what()); + rc = ER_INTERNAL_ERROR; + } + + DBUG_RETURN(rc); +} + /** @brief Used for opening tables. The name will be the name of the file. diff --git a/dbcon/mysql/ha_mcs.h b/dbcon/mysql/ha_mcs.h index 3d7bdf297..d4c2e232e 100644 --- a/dbcon/mysql/ha_mcs.h +++ b/dbcon/mysql/ha_mcs.h @@ -126,6 +126,11 @@ public: return (double) (stats.records + stats.deleted) / 20.0 + 10; } + /** @brief + Analyze table command. + */ + int analyze(THD* thd, HA_CHECK_OPT* check_opt); + /* Everything below are methods that we implement in ha_example.cc. diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index f69982955..e8446a463 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -105,6 +105,7 @@ using namespace BRM; using namespace querystats; #include "calpontselectexecutionplan.h" +#include "mcsanalyzetableexecutionplan.h" #include "calpontsystemcatalog.h" #include "simplecolumn_int.h" #include "simplecolumn_decimal.h" @@ -142,6 +143,7 @@ using namespace funcexp; #include "ha_mcs_sysvars.h" #include "ha_mcs_datatype.h" +#include "statistics.h" namespace cal_impl_if { @@ -1899,8 +1901,248 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector& c return rc; } +inline bool isSupportedToAnalyze(const execplan::CalpontSystemCatalog::ColType& colType) +{ + return colType.isUnsignedInteger() || colType.isSignedInteger(); +} + +// Initializes `cal_connection_info` using given `thd` and `sessionID`. +bool initializeCalConnectionInfo(cal_connection_info* ci, THD* thd, + boost::shared_ptr csc, + uint32_t sessionID, bool localQuery) +{ + ci->stats.reset(); + ci->stats.setStartTime(); + + if (thd->main_security_ctx.user) + { + ci->stats.fUser = thd->main_security_ctx.user; + } + else + { + ci->stats.fUser = ""; + } + + if (thd->main_security_ctx.host) + ci->stats.fHost = thd->main_security_ctx.host; + else if (thd->main_security_ctx.host_or_ip) + ci->stats.fHost = thd->main_security_ctx.host_or_ip; + else + ci->stats.fHost = "unknown"; + + try + { + ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser); + } + catch (std::exception& e) + { + string msg = string("Columnstore User Priority - ") + e.what(); + ci->warningMsg = msg; + } + + if (ci->queryState != 0) + { + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + + sm::sm_init(sessionID, &ci->cal_conn_hndl, localQuery); + idbassert(ci->cal_conn_hndl != 0); + ci->cal_conn_hndl->csc = csc; + idbassert(ci->cal_conn_hndl->exeMgr != 0); + + try + { + ci->cal_conn_hndl->connect(); + } + catch (...) + { + setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR)); + CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID); + return false; + } + + return true; +} + +bool sendExecutionPlanToExeMgr(sm::cpsm_conhdl_t* hndl, ByteStream::quadbyte qb, + std::shared_ptr caep, + cal_connection_info* ci, THD* thd) +{ + ByteStream msg; + try + { + msg << qb; + hndl->exeMgr->write(msg); + msg.restart(); + caep->rmParms(ci->rmParms); + + // Send the execution plan. + caep->serialize(msg); + hndl->exeMgr->write(msg); + + // Get the status from ExeMgr. + msg.restart(); + msg = hndl->exeMgr->read(); + + // Any return code is ok for now. + if (msg.length() == 0) + { + auto emsg = "Lost connection to ExeMgr. Please contact your administrator"; + setError(thd, ER_INTERNAL_ERROR, emsg); + return false; + } + } + catch (...) + { + return false; + } + + return true; +} + } //anon namespace +int ha_mcs_impl_analyze(THD* thd, TABLE* table) +{ + uint32_t sessionID = execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id); + boost::shared_ptr csc = + execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); + + csc->identity(execplan::CalpontSystemCatalog::FE); + + auto table_name = + execplan::make_table(table->s->db.str, table->s->table_name.str, lower_case_table_names); + + // Skip for now. + if (table->s->db.length && strcmp(table->s->db.str, "information_schema") == 0) + return 0; + + bool columnStore = (table ? isMCSTable(table) : true); + // Skip non columnstore tables. + if (!columnStore) + return 0; + + execplan::CalpontSystemCatalog::RIDList oidlist = csc->columnRIDs(table_name, true); + execplan::MCSAnalyzeTableExecutionPlan::ReturnedColumnList returnedColumnList; + execplan::MCSAnalyzeTableExecutionPlan::ColumnMap columnMap; + + // Iterate over table oid list and create a `SimpleColumn` for every column with supported type. + for (uint32_t i = 0, e = oidlist.size(); i < e; ++i) + { + execplan::SRCP returnedColumn; + const auto objNum = oidlist[i].objnum; + auto tableColName = csc->colName(objNum); + auto colType = csc->colType(objNum); + + if (!isSupportedToAnalyze(colType)) + continue; + + execplan::SimpleColumn* simpleColumn = new execplan::SimpleColumn(); + simpleColumn->columnName(tableColName.column); + simpleColumn->tableName(tableColName.table, lower_case_table_names); + simpleColumn->schemaName(tableColName.schema, lower_case_table_names); + simpleColumn->oid(objNum); + simpleColumn->alias(tableColName.column); + simpleColumn->resultType(colType); + simpleColumn->timeZone(thd->variables.time_zone->get_name()->ptr()); + + returnedColumn.reset(simpleColumn); + returnedColumnList.push_back(returnedColumn); + columnMap.insert(execplan::MCSAnalyzeTableExecutionPlan::ColumnMap::value_type( + simpleColumn->columnName(), returnedColumn)); + } + + // Create execution plan and initialize it with `returned columns` and `column map`. + std::shared_ptr caep( + new execplan::MCSAnalyzeTableExecutionPlan(returnedColumnList, columnMap)); + + caep->schemaName(table->s->db.str, lower_case_table_names); + caep->tableName(table->s->table_name.str, lower_case_table_names); + caep->timeZone(thd->variables.time_zone->get_name()->ptr()); + + SessionManager sm; + BRM::TxnID txnID; + txnID = sm.getTxnID(sessionID); + + if (!txnID.valid) + { + txnID.id = 0; + txnID.valid = true; + } + + QueryContext verID; + verID = sm.verID(); + + caep->txnID(txnID.id); + caep->verID(verID); + caep->sessionID(sessionID); + + string query; + query.assign(idb_mysql_query_str(thd)); + caep->data(query); + + if (!get_fe_conn_info_ptr()) + set_fe_conn_info_ptr(reinterpret_cast(new cal_connection_info(), thd)); + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + idbassert(ci != 0); + + try + { + caep->priority(ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser)); + } + catch (std::exception& e) + { + string msg = string("Columnstore User Priority - ") + e.what(); + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str()); + } + + if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) + { + force_close_fep_conn(thd, ci); + return 0; + } + + caep->traceFlags(ci->traceFlags); + + cal_table_info ti; + sm::cpsm_conhdl_t* hndl; + + bool localQuery = (get_local_query(thd) > 0 ? true : false); + caep->localQuery(localQuery); + + // Try to initialize connection. + if (!initializeCalConnectionInfo(ci, thd, csc, sessionID, localQuery)) + goto error; + + hndl = ci->cal_conn_hndl; + + if (caep->traceOn()) + std::cout << caep->toString() << std::endl; + { + ByteStream::quadbyte qb = ANALYZE_TABLE_EXECUTE; + // Serialize and the send the `anlyze table` execution plan. + if (!sendExecutionPlanToExeMgr(hndl, qb, caep, ci, thd)) + goto error; + } + + ci->rmParms.clear(); + ci->tableMap[table] = ti; + + return 0; + +error: + + if (ci->cal_conn_hndl) + { + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + + return ER_INTERNAL_ERROR; +} + int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked) { IDEBUG ( cout << "ha_mcs_impl_open: " << name << ", " << mode << ", " << test_if_locked << endl ); diff --git a/dbcon/mysql/ha_mcs_impl.h b/dbcon/mysql/ha_mcs_impl.h index e4701b79c..ff21737c5 100644 --- a/dbcon/mysql/ha_mcs_impl.h +++ b/dbcon/mysql/ha_mcs_impl.h @@ -27,6 +27,7 @@ struct mcs_handler_info; extern int ha_mcs_impl_discover_existence(const char* schema, const char* name); extern int ha_mcs_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO* create_info); extern int ha_mcs_impl_delete_table(const char* name); +extern int ha_mcs_impl_analyze(THD* thd, TABLE* table); extern int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked); extern int ha_mcs_impl_close(void); extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table); diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 4c518275d..55b240963 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -48,6 +48,7 @@ #include #include "calpontselectexecutionplan.h" +#include "mcsanalyzetableexecutionplan.h" #include "activestatementcounter.h" #include "distributedenginecomm.h" #include "resourcemanager.h" @@ -77,7 +78,7 @@ #include "dbrm.h" #include "mariadb_my_sys.h" - +#include "statistics.h" class Opt { @@ -604,7 +605,136 @@ private: fIos.write(emsgBs); } -public: + void analyzeTableExecute(messageqcpp::ByteStream& bs, joblist::SJLP& jl, bool& stmtCounted) + { + messageqcpp::ByteStream::quadbyte qb; + execplan::MCSAnalyzeTableExecutionPlan caep; + + bs = fIos.read(); + caep.unserialize(bs); + + statementsRunningCount->incr(stmtCounted); + jl = joblist::JobListFactory::makeJobList(&caep, fRm, false, true); + + // Joblist is empty. + if (jl->status() == logging::statisticsJobListEmpty) + { + if (caep.traceOn()) + std::cout << "JobList is empty " << std::endl; + + jl.reset(); + bs.restart(); + qb = ANALYZE_TABLE_SUCCESS; + bs << qb; + fIos.write(bs); + bs.reset(); + statementsRunningCount->decr(stmtCounted); + return; + } + + if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) + { + std::cout << "fEc setup " << std::endl; + fEc->Setup(); + } + + if (jl->status() == 0) + { + std::string emsg; + + if (jl->putEngineComm(fEc) != 0) + throw std::runtime_error(jl->errMsg()); + } + else + { + throw std::runtime_error("ExeMgr: could not build a JobList!"); + } + + // Execute a joblist. + jl->doQuery(); + + FEMsgHandler msgHandler(jl, &fIos); + + msgHandler.start(); + auto rowCount = jl->projectTable(100, bs); + msgHandler.stop(); + + auto outRG = (static_cast(jl.get()))->getOutputRowGroup(); + + if (caep.traceOn()) + std::cout << "Row count " << rowCount << std::endl; + + // Process `RowGroup`, increase an epoch and save statistics to the file. + auto* statisticsManager = statistics::StatisticsManager::instance(); + statisticsManager->analyzeColumnKeyTypes(outRG, caep.traceOn()); + statisticsManager->incEpoch(); + statisticsManager->saveToFile(); + + // Distribute statistics across all ExeMgr clients if possible. + statistics::StatisticsDistributor::instance()->distributeStatistics(); + + // Send the signal back to front-end. + bs.restart(); + qb = ANALYZE_TABLE_SUCCESS; + bs << qb; + fIos.write(bs); + bs.reset(); + statementsRunningCount->decr(stmtCounted); + } + + void analyzeTableHandleStats(messageqcpp::ByteStream& bs) + { + messageqcpp::ByteStream::quadbyte qb; +#ifdef DEBUG_STATISTICS + std::cout << "Get distributed statistics on ExeMgr(Client) from ExeMgr(Server) " + << std::endl; +#endif + bs = fIos.read(); +#ifdef DEBUG_STATISTICS + std::cout << "Read the hash from statistics on ExeMgr(Client) from ExeMgr(Server) " + << std::endl; +#endif + uint64_t dataHashRec; + bs >> dataHashRec; + + uint64_t dataHash = statistics::StatisticsManager::instance()->computeHashFromStats(); + // The stats are the same. + if (dataHash == dataHashRec) + { +#ifdef DEBUG_STATISTICS + std::cout << "The hash is the same as rec hash on ExeMgr(Client) from ExeMgr(Server) " + << std::endl; +#endif + qb = ANALYZE_TABLE_SUCCESS; + bs << qb; + fIos.write(bs); + bs.reset(); + return; + } + + bs.restart(); + qb = ANALYZE_TABLE_NEED_STATS; + bs << qb; + fIos.write(bs); + + bs.restart(); + bs = fIos.read(); +#ifdef DEBUG_STATISTICS + std::cout << "Read statistics on ExeMgr(Client) from ExeMgr(Server) " << std::endl; +#endif + statistics::StatisticsManager::instance()->unserialize(bs); + statistics::StatisticsManager::instance()->saveToFile(); + +#ifdef DEBUG_STATISTICS + std::cout << "Write flag on ExeMgr(Client) to ExeMgr(Server)" << std::endl; +#endif + qb = ANALYZE_TABLE_SUCCESS; + bs << qb; + fIos.write(bs); + bs.reset(); + } + + public: void operator()() { @@ -691,6 +821,16 @@ public: fIos.close(); break; } + else if (qb == ANALYZE_TABLE_EXECUTE) + { + analyzeTableExecute(bs, jl, stmtCounted); + continue; + } + else if (qb == ANALYZE_TABLE_REC_STATS) + { + analyzeTableHandleStats(bs); + continue; + } else { if (gDebug) @@ -1706,6 +1846,16 @@ int ServiceExeMgr::Child() exeMgrThreadPool.invoke(threadpool::ThreadPoolMonitor(&exeMgrThreadPool)); } + // Load statistics. + try + { + statistics::StatisticsManager::instance()->loadFromFile(); + } + catch (...) + { + std::cerr << "Cannot load statistics from file " << std::endl; + } + for (;;) { messageqcpp::IOSocket ios; diff --git a/mysql-test/columnstore/basic/r/analyze_table.result b/mysql-test/columnstore/basic/r/analyze_table.result new file mode 100644 index 000000000..65fd380e0 --- /dev/null +++ b/mysql-test/columnstore/basic/r/analyze_table.result @@ -0,0 +1,27 @@ +DROP DATABASE IF EXISTS analyze_table_db; +CREATE DATABASE analyze_table_db; +USE analyze_table_db; +create table t1 (a int, b int, c int) engine=columnstore; +insert into t1 values (1, 2, 3), (2, 2, 2), (2, 3, 4); +analyze table t1; +Table Op Msg_type Msg_text +analyze_table_db.t1 analyze status OK +create table t2 (a int, b double) engine=columnstore; +insert into t2 values (2, 3), (3, 4); +analyze table t2; +Table Op Msg_type Msg_text +analyze_table_db.t2 analyze status OK +create table t3 (a varchar(25)) engine=columnstore; +insert into t3 values ("a"), ("b"); +analyze table t3; +Table Op Msg_type Msg_text +analyze_table_db.t3 analyze status OK +analyze table t1, t2, t3; +Table Op Msg_type Msg_text +analyze_table_db.t1 analyze status OK +analyze_table_db.t2 analyze status OK +analyze_table_db.t3 analyze status OK +DROP TABLE t1; +DROP TABLE t2; +DROP TABLE t3; +DROP DATABASE analyze_table_db; diff --git a/mysql-test/columnstore/basic/t/analyze_table.test b/mysql-test/columnstore/basic/t/analyze_table.test new file mode 100644 index 000000000..217b24920 --- /dev/null +++ b/mysql-test/columnstore/basic/t/analyze_table.test @@ -0,0 +1,28 @@ +#-- source ../include/have_columnstore.inc + +--disable_warnings +DROP DATABASE IF EXISTS analyze_table_db; +--enable_warnings + +CREATE DATABASE analyze_table_db; +USE analyze_table_db; + +create table t1 (a int, b int, c int) engine=columnstore; +insert into t1 values (1, 2, 3), (2, 2, 2), (2, 3, 4); +analyze table t1; + +create table t2 (a int, b double) engine=columnstore; +insert into t2 values (2, 3), (3, 4); +analyze table t2; + +create table t3 (a varchar(25)) engine=columnstore; +insert into t3 values ("a"), ("b"); +analyze table t3; + + +analyze table t1, t2, t3; + +DROP TABLE t1; +DROP TABLE t2; +DROP TABLE t3; +DROP DATABASE analyze_table_db; diff --git a/utils/common/CMakeLists.txt b/utils/common/CMakeLists.txt index c6a0fde01..137ddcb06 100644 --- a/utils/common/CMakeLists.txt +++ b/utils/common/CMakeLists.txt @@ -10,10 +10,13 @@ set(common_LIB_SRCS MonitorProcMem.cpp nullvaluemanip.cpp threadnaming.cpp - utils_utf8.cpp) + utils_utf8.cpp + statistics.cpp) add_library(common SHARED ${common_LIB_SRCS}) +target_link_libraries(common boost_filesystem) + add_dependencies(common loggingcpp) install(TARGETS common DESTINATION ${ENGINE_LIBDIR} COMPONENT columnstore-engine) diff --git a/utils/common/statistics.cpp b/utils/common/statistics.cpp new file mode 100644 index 000000000..1bd647fd4 --- /dev/null +++ b/utils/common/statistics.cpp @@ -0,0 +1,435 @@ +/* Copyright (C) 2021 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include + +#include "statistics.h" +#include "IDBPolicy.h" +#include "brmtypes.h" +#include "hasher.h" +#include "messagequeue.h" +#include "configcpp.h" + +using namespace idbdatafile; +using namespace logging; + +namespace statistics +{ +using ColumnsCache = std::vector>; + +StatisticsManager* StatisticsManager::instance() +{ + static StatisticsManager* sm = new StatisticsManager(); + return sm; +} + +void StatisticsManager::analyzeColumnKeyTypes(const rowgroup::RowGroup& rowGroup, bool trace) +{ + std::lock_guard lock(mut); + auto rowCount = rowGroup.getRowCount(); + const auto columnCount = rowGroup.getColumnCount(); + if (!rowCount || !columnCount) + return; + + auto& oids = rowGroup.getOIDs(); + + rowgroup::Row r; + rowGroup.initRow(&r); + rowGroup.getRow(0, &r); + + ColumnsCache columns(columnCount, std::unordered_set()); + // Init key types. + for (uint32_t index = 0; index < columnCount; ++index) + keyTypes[oids[index]] = KeyType::PK; + + const uint32_t maxRowCount = 4096; + // TODO: We should read just couple of blocks from columns, not all data, but this requires + // more deep refactoring of column commands. + rowCount = std::min(rowCount, maxRowCount); + // This is strange, it's a CS but I'm processing data as row by row, how to fix it? + for (uint32_t i = 0; i < rowCount; ++i) + { + for (uint32_t j = 0; j < columnCount; ++j) + { + if (r.isNullValue(j) || columns[j].count(r.getIntField(j))) + keyTypes[oids[j]] = KeyType::FK; + else + columns[j].insert(r.getIntField(j)); + } + r.nextRow(); + } + + if (trace) + output(StatisticsType::PK_FK); +} + +void StatisticsManager::output(StatisticsType statisticsType) +{ + if (statisticsType == StatisticsType::PK_FK) + { + std::cout << "Columns count: " << keyTypes.size() << std::endl; + for (const auto& p : keyTypes) + std::cout << p.first << " " << (int) p.second << std::endl; + } +} + +// Someday it will be a virtual method, based on statistics type we processing. +std::unique_ptr StatisticsManager::convertStatsToDataStream(uint64_t& dataStreamSize) +{ + // Number of pairs. + uint64_t count = keyTypes.size(); + // count, [[uid, keyType], ... ] + dataStreamSize = sizeof(uint64_t) + count * (sizeof(uint32_t) + sizeof(KeyType)); + + // Allocate memory for data stream. + std::unique_ptr dataStreamSmartPtr(new char[dataStreamSize]); + auto* dataStream = dataStreamSmartPtr.get(); + // Initialize the data stream. + uint64_t offset = 0; + std::memcpy(dataStream, reinterpret_cast(&count), sizeof(uint64_t)); + offset += sizeof(uint64_t); + + // For each pair [oid, key type]. + for (const auto& p : keyTypes) + { + uint32_t oid = p.first; + std::memcpy(&dataStream[offset], reinterpret_cast(&oid), sizeof(uint32_t)); + offset += sizeof(uint32_t); + + KeyType keyType = p.second; + std::memcpy(&dataStream[offset], reinterpret_cast(&keyType), sizeof(KeyType)); + offset += sizeof(KeyType); + } + + return dataStreamSmartPtr; +} + +void StatisticsManager::saveToFile() +{ + std::lock_guard lock(mut); + + const char* fileName = statsFile.c_str(); + std::unique_ptr out( + IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, "wb", 1)); + + if (!out) + { + BRM::log_errno("StatisticsManager::saveToFile(): open"); + throw ios_base::failure("StatisticsManager::saveToFile(): open failed."); + } + + // Compute hash. + uint64_t dataStreamSize = 0; + std::unique_ptr dataStreamSmartPtr = convertStatsToDataStream(dataStreamSize); + utils::Hasher128 hasher; + + // Prepare a statistics file header. + const uint32_t headerSize = sizeof(StatisticsFileHeader); + StatisticsFileHeader fileHeader; + std::memset(&fileHeader, 0, headerSize); + fileHeader.version = version; + fileHeader.epoch = epoch; + fileHeader.dataSize = dataStreamSize; + // Compute hash from the data. + fileHeader.dataHash = hasher(dataStreamSmartPtr.get(), dataStreamSize); + + // Write statistics file header. + uint64_t size = out->write(reinterpret_cast(&fileHeader), headerSize); + if (size != headerSize) + { + auto rc = IDBPolicy::remove(fileName); + if (rc == -1) + std::cerr << "Cannot remove file " << fileName << std::endl; + + throw ios_base::failure("StatisticsManager::saveToFile(): write failed. "); + } + + // Write data. + size = out->write(dataStreamSmartPtr.get(), dataStreamSize); + if (size != dataStreamSize) + { + auto rc = IDBPolicy::remove(fileName); + if (rc == -1) + std::cerr << "Cannot remove file " << fileName << std::endl; + + throw ios_base::failure("StatisticsManager::saveToFile(): write failed. "); + } +} + +void StatisticsManager::loadFromFile() +{ + std::lock_guard lock(mut); + // Check that stats file does exist. + if (!boost::filesystem::exists(statsFile)) + return; + + const char* fileName = statsFile.c_str(); + std::unique_ptr in( + IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, "rb", 1)); + + if (!in) + { + BRM::log_errno("StatisticsManager::loadFromFile(): open"); + throw ios_base::failure( + "StatisticsManager::loadFromFile(): open failed. Check the error log."); + } + + // Read the file header. + StatisticsFileHeader fileHeader; + const uint32_t headerSize = sizeof(StatisticsFileHeader); + int64_t size = in->read(reinterpret_cast(&fileHeader), headerSize); + if (size != headerSize) + throw ios_base::failure("StatisticsManager::loadFromFile(): read failed. "); + + // Initialize fields from the file header. + version = fileHeader.version; + epoch = fileHeader.epoch; + const auto dataHash = fileHeader.dataHash; + const auto dataStreamSize = fileHeader.dataSize; + + // Allocate the memory for the file data. + std::unique_ptr dataStreamSmartPtr(new char[dataStreamSize]); + auto* dataStream = dataStreamSmartPtr.get(); + + // Read the data. + uint64_t dataOffset = 0; + auto sizeToRead = dataStreamSize; + size = in->read(dataStream, sizeToRead); + sizeToRead -= size; + dataOffset += size; + + while (sizeToRead > 0) + { + size = in->read(dataStream + dataOffset, sizeToRead); + if (size < 0) + throw ios_base::failure("StatisticsManager::loadFromFile(): read failed. "); + + sizeToRead -= size; + dataOffset += size; + } + + utils::Hasher128 hasher; + auto computedDataHash = hasher(dataStream, dataStreamSize); + if (dataHash != computedDataHash) + throw ios_base::failure("StatisticsManager::loadFromFile(): invalid file hash. "); + + uint64_t count = 0; + std::memcpy(reinterpret_cast(&count), dataStream, sizeof(uint64_t)); + uint64_t offset = sizeof(uint64_t); + + // For each pair. + for (uint64_t i = 0; i < count; ++i) + { + uint32_t oid; + KeyType keyType; + std::memcpy(reinterpret_cast(&oid), &dataStream[offset], sizeof(uint32_t)); + offset += sizeof(uint32_t); + std::memcpy(reinterpret_cast(&keyType), &dataStream[offset], sizeof(KeyType)); + offset += sizeof(KeyType); + // Insert pair. + keyTypes[oid] = keyType; + } +} + +uint64_t StatisticsManager::computeHashFromStats() +{ + utils::Hasher128 hasher; + uint64_t dataStreamSize = 0; + std::unique_ptr dataStreamSmartPtr = convertStatsToDataStream(dataStreamSize); + return hasher(dataStreamSmartPtr.get(), dataStreamSize); +} + +void StatisticsManager::serialize(messageqcpp::ByteStream& bs) +{ + uint64_t count = keyTypes.size(); + bs << version; + bs << epoch; + bs << count; + + for (const auto& keyType : keyTypes) + { + bs << keyType.first; + bs << (uint32_t) keyType.second; + } +} + +void StatisticsManager::unserialize(messageqcpp::ByteStream& bs) +{ + uint64_t count; + bs >> version; + bs >> epoch; + bs >> count; + + for (uint32_t i = 0; i < count; ++i) + { + uint32_t oid, keyType; + bs >> oid; + bs >> keyType; + keyTypes[oid] = static_cast(keyType); + } +} + +StatisticsDistributor* StatisticsDistributor::instance() +{ + static StatisticsDistributor* sd = new StatisticsDistributor(); + return sd; +} + +void StatisticsDistributor::distributeStatistics() +{ + countClients(); + { + std::lock_guard lock(mut); + // No clients. + if (clientsCount == 0) + return; + +#ifdef DEBUG_STATISTICS + std::cout << "Distribute statistics from ExeMgr(Server) to ExeMgr(Clients) " << std::endl; +#endif + + messageqcpp::ByteStream msg, statsHash, statsBs; + // Current hash. + statsHash << statistics::StatisticsManager::instance()->computeHashFromStats(); + // Statistics. + statistics::StatisticsManager::instance()->serialize(statsBs); + + for (uint32_t i = 0; i < clientsCount; ++i) + { + try + { + messageqcpp::ByteStream::quadbyte qb = ANALYZE_TABLE_REC_STATS; + msg << qb; + + auto exeMgrID = "ExeMgr" + std::to_string(i + 2); + // Create a client. + std::unique_ptr exemgrClient( + new messageqcpp::MessageQueueClient(exeMgrID)); + +#ifdef DEBUG_STATISTICS + std::cout << "Try to connect to " << exeMgrID << std::endl; +#endif + // Try to connect to the client. + if (!exemgrClient->connect()) + { + msg.restart(); +#ifdef DEBUG_STATISTICS + std::cout << "Unable to connect to " << exeMgrID << std::endl; +#endif + continue; + } + +#ifdef DEBUG_STATISTICS + std::cout + << "Write flag ANALYZE_TABLE_REC_STATS from ExeMgr(Server) to ExeMgr(Clients) " + << std::endl; +#endif + // Write a flag to client ExeMgr. + exemgrClient->write(msg); + +#ifdef DEBUG_STATISTICS + std::cout << "Write statistics hash from ExeMgr(Server) to ExeMgr(Clients) " + << std::endl; +#endif + // Write a hash of the stats. + exemgrClient->write(statsHash); + + // Read the state from Client. + msg.restart(); + msg = exemgrClient->read(); + msg >> qb; + + // Do not need a stats. + if (qb == ANALYZE_TABLE_SUCCESS) + { + msg.restart(); + continue; + } + +#ifdef DEBUG_STATISTICS + std::cout << "Write statistics bytestream from ExeMgr(Server) to ExeMgr(Clients) " + << std::endl; +#endif + // Write a statistics to client ExeMgr. + exemgrClient->write(statsBs); + + // Read the flag back from the client ExeMgr. + msg.restart(); + msg = exemgrClient->read(); + + if (msg.length() == 0) + throw runtime_error("Lost conection to ExeMgr."); +#ifdef DEBUG_STATISTICS + std::cout << "Read flag on ExeMgr(Server) from ExeMgr(Client) " << std::endl; +#endif + msg.restart(); + } + catch (std::exception& e) + { + msg.restart(); + std::cerr << "distributeStatistics() failed with error: " << e.what() << std::endl; + } + catch (...) + { + msg.restart(); + std::cerr << "distributeStatistics() failed with unknown error." << std::endl; + } + } + } +} + +void StatisticsDistributor::countClients() +{ +#ifdef DEBUG_STATISTICS + std::cout << "count clients to distribute statistics " << std::endl; +#endif + auto* config = config::Config::makeConfig(); + // Starting from the ExeMgr2, since the Server starts on the ExeMgr1. + std::atomic exeMgrNumber(2); + + try + { + while (true) + { + auto exeMgrID = "ExeMgr" + std::to_string(exeMgrNumber); + auto exeMgrIP = config->getConfig(exeMgrID, "IPAddr"); + if (exeMgrIP == "") + break; +#ifdef DEBUG_STATISTICS + std::cout << "Client: " << exeMgrID << std::endl; +#endif + ++exeMgrNumber; + } + } + catch (std::exception& e) + { + std::cerr << "countClients() failed with error: " << e.what() << std::endl; + } + catch (...) + { + std::cerr << "countClients() failed with unknown error: "; + } + + clientsCount = exeMgrNumber - 2; +#ifdef DEBUG_STATISTICS + std::cout << "Number of clients: " << clientsCount << std::endl; +#endif +} + +} // namespace statistics diff --git a/utils/common/statistics.h b/utils/common/statistics.h new file mode 100644 index 000000000..1dc7fc286 --- /dev/null +++ b/utils/common/statistics.h @@ -0,0 +1,122 @@ +/* Copyright (C) 2021 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#ifndef STATISTICS_H +#define STATISTICS_H + +#include "rowgroup.h" +#include "logger.h" +#include "hasher.h" +#include "IDBPolicy.h" + +#include +#include +#include + +// Represents a commands for `ExeMgr`. +#define ANALYZE_TABLE_EXECUTE 6 +#define ANALYZE_TABLE_REC_STATS 7 +#define ANALYZE_TABLE_NEED_STATS 8 +#define ANALYZE_TABLE_SUCCESS 9 +// #define DEBUG_STATISTICS + +using namespace idbdatafile; + +namespace statistics +{ + +// Represents a column key type: +// PK - primary key. +// FK - foreign key. +enum class KeyType : uint32_t +{ + PK, + FK +}; + +// Rerpresents types of statistics CS supports. +enum class StatisticsType : uint32_t +{ + // A special statistics type, made to solve circular inner join problem. + PK_FK +}; + +// Represetns a header for the statistics file. +struct StatisticsFileHeader +{ + uint64_t version; + uint64_t epoch; + uint64_t dataHash; + uint64_t dataSize; + uint8_t offset[1024]; +}; + +// This class is responsible for processing and storing statistics. +// On each `analyze table` iteration it increases an epoch and stores +// the updated statistics into the special file. +class StatisticsManager +{ + public: + // Returns the instance of this class, static initialization happens only once. + static StatisticsManager* instance(); + // Analyzes the given `rowGroup` by processing it row by row and searching for foreign key. + void analyzeColumnKeyTypes(const rowgroup::RowGroup& rowGroup, bool trace); + // Ouputs stats to out stream. + void output(StatisticsType statisticsType = StatisticsType::PK_FK); + // Saves stats to the file. + void saveToFile(); + // Loads stats from the file. + void loadFromFile(); + void incEpoch() { ++epoch; } + // Serialize stats to the given `bs`. + void serialize(messageqcpp::ByteStream& bs); + // Unserialize stats from the given `bs`. + void unserialize(messageqcpp::ByteStream& bs); + // Computes hash from the current statistics data. + uint64_t computeHashFromStats(); + + private: + std::map keyTypes; + StatisticsManager() : epoch(0), version(1) { IDBPolicy::init(true, false, "", 0); } + std::unique_ptr convertStatsToDataStream(uint64_t& dataStreamSize); + + std::mutex mut; + uint32_t epoch; + uint32_t version; + std::string statsFile = "/var/lib/columnstore/local/statistics"; +}; + +// This class is responsible for distributing the statistics across all `ExeMgr` in a cluster. +class StatisticsDistributor +{ + public: + // Returns the instance of this class, static initialization happens only once. + static StatisticsDistributor* instance(); + + // Distribute stats across all `ExeMgr` in cluster by connecting to them using config file. + void distributeStatistics(); + + private: + StatisticsDistributor() : clientsCount(0) {} + // Count the number of clients by reading config file and evaluating `ExeMgr` fields. + void countClients(); + uint32_t clientsCount; + std::mutex mut; +}; + +} // namespace statistics +#endif diff --git a/utils/loggingcpp/errorcodes.h b/utils/loggingcpp/errorcodes.h index 665cb1f19..e270ece65 100644 --- a/utils/loggingcpp/errorcodes.h +++ b/utils/loggingcpp/errorcodes.h @@ -77,7 +77,8 @@ enum ErrorCodeValues dataTypeErr, incompatJoinCols, incompatFilterCols, - aggregateResourceErr + aggregateResourceErr, + statisticsJobListEmpty = 301 }; struct ErrorCodes