diff --git a/dbcon/dmlpackageproc/commandpackageprocessor.cpp b/dbcon/dmlpackageproc/commandpackageprocessor.cpp index 7d48a11ed..bd571c0da 100644 --- a/dbcon/dmlpackageproc/commandpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/commandpackageprocessor.cpp @@ -1286,13 +1286,19 @@ void CommandPackageProcessor::analyzePartitionBloat(const dmlpackage::CalpontDML BRM::TxnID txnID; txnID = fSessionManager.getTxnID(fSessionID); csep.txnID(txnID.id); - + + // Set the table list + CalpontSelectExecutionPlan::TableList tablelist; + tablelist.push_back(make_aliastable(tableName.schema, tableName.table, "")); + csep.tableList(tablelist); + csep.schemaName(tableName.schema, 0); + csep.tableName(tableName.table, 0); // Send CSEP to ExeMgr auto csepStr = csep.toString(); cout << "csep: " << csepStr << endl; CalpontSystemCatalog::NJLSysDataList sysDataList; - systemCatalogPtr->getSysData(csep, sysDataList, tableName); + systemCatalogPtr->getQueryData(csep, sysDataList); cout << "Done getSysData" << endl; @@ -1305,7 +1311,7 @@ void CommandPackageProcessor::analyzePartitionBloat(const dmlpackage::CalpontDML } // Return the result - use toString() to get the full plan representation - analysisResults << "80 (for testing)"; + analysisResults << sysDataList.sysDataVec.front()->GetData(0); result.bloatAnalysis = analysisResults.str(); cout << "analysisResults: " << analysisResults.str() << endl; diff --git a/dbcon/execplan/calpontsystemcatalog.cpp b/dbcon/execplan/calpontsystemcatalog.cpp index e3ba4774e..e5a088073 100644 --- a/dbcon/execplan/calpontsystemcatalog.cpp +++ b/dbcon/execplan/calpontsystemcatalog.cpp @@ -31,6 +31,7 @@ using namespace std; #include "messagequeue.h" #include "calpontsystemcatalog.h" +#include "brmtypes.h" #include "dataconvert.h" #include "ddlpkg.h" #include "expressionparser.h" @@ -449,7 +450,7 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::lookupTableOID(const TableName& try { - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); + getSysData(csep, sysDataList, SYSTABLE_TABLE); } catch (IDBExcept&) { @@ -662,7 +663,7 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::lookupOID(const TableColName& ta TableColName tcn; ColType ct; OID coloid = -1; - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); + getSysData(csep, sysDataList, SYSCOLUMN_TABLE); vector::const_iterator it; @@ -751,46 +752,28 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::lookupOID(const TableColName& ta } void CalpontSystemCatalog::getSysData(CalpontSelectExecutionPlan& csep, NJLSysDataList& sysDataList, - const TableName& tableName) + const string& sysTableName) { - // start up new transaction - BRM::TxnID txnID; int oldTxnID; - txnID = fSessionManager->getTxnID(fSessionID); - - if (!txnID.valid) - { - txnID.id = 0; - txnID.valid = true; - } - BRM::QueryContext verID, oldVerID; - verID = fSessionManager->verID(); - oldTxnID = csep.txnID(); - csep.txnID(txnID.id); - oldVerID = csep.verID(); - csep.verID(verID); + // We need to use a session ID that's separate from the actual query SID, because the dbcon runs queries - // in the middle of receiving data bands for the real query. // TODO: we really need a flag or something to identify this as a syscat query: there are assumptions made // in joblist that a high-bit-set session id is always a syscat query. This will be okay for a long time, // but not forever... + csep.sessionID(fSessionID | 0x80000000); + setupQueryTxnCtx(csep, txnID, oldTxnID, verID, oldVerID); - if (tableName.schema == CALPONT_SCHEMA) - { - csep.sessionID(fSessionID | 0x80000000); - } - else - { - csep.sessionID(fSessionID); - } int tryCnt = 0; + // in the middle of receiving data bands for the real query. // add the tableList to csep for tuple joblist to use CalpontSelectExecutionPlan::TableList tablelist; - tablelist.push_back(make_aliastable(tableName.schema, tableName.table, "")); + tablelist.push_back(make_aliastable("calpontsys", sysTableName, "")); csep.tableList(tablelist); + csep.schemaName(CALPONT_SCHEMA, 0); + csep.tableName(sysTableName, 0); // populate the returned column list as column map csep.returnedCols().clear(); @@ -805,7 +788,7 @@ void CalpontSystemCatalog::getSysData(CalpontSelectExecutionPlan& csep, NJLSysDa { try { - getSysData_EC(csep, sysDataList, tableName); + getSysData_EC(csep, sysDataList); } catch (IDBExcept&) { @@ -824,7 +807,7 @@ void CalpontSystemCatalog::getSysData(CalpontSelectExecutionPlan& csep, NJLSysDa try { - getSysData_FE(csep, sysDataList, tableName); + getSysData_FE(csep, sysDataList); break; } catch (IDBExcept&) // error already occurred. this is not a broken pipe @@ -856,25 +839,106 @@ void CalpontSystemCatalog::getSysData(CalpontSelectExecutionPlan& csep, NJLSysDa } } + restoreQueryTxnCtx(csep, oldTxnID, oldVerID); +} + +void CalpontSystemCatalog::getQueryData(execplan::CalpontSelectExecutionPlan& csep, NJLSysDataList& sysDataList) +{ + BRM::TxnID txnID; + int oldTxnID; + BRM::QueryContext verID, oldVerID; + + setupQueryTxnCtx(csep, txnID, oldTxnID, verID, oldVerID); + + int tryCnt = 0; + + if (fIdentity == EC) + { + try + { + getSysData_EC(csep, sysDataList); + } + catch (IDBExcept&) + { + throw; + } + catch (runtime_error& e) + { + throw runtime_error(e.what()); + } + } + else + { + while (tryCnt < 5) + { + tryCnt++; + + try + { + getSysData_FE(csep, sysDataList); + break; + } + catch (IDBExcept&) // error already occurred. this is not a broken pipe + { + throw; + } + catch (...) + { + // may be a broken pipe. re-establish exeMgr and send the message + delete fExeMgr; + fExeMgr = new ClientRotator(0, "ExeMgr"); + + try + { + fExeMgr->connect(5); + } + catch (...) + { + throw IDBExcept(ERR_LOST_CONN_EXEMGR); + } + } + } + + if (tryCnt >= 5) + // throw runtime_error("Error occurred when calling system catalog. ExeMgr is not functioning."); + throw IDBExcept(ERR_SYSTEM_CATALOG); + } + + restoreQueryTxnCtx(csep, oldTxnID, oldVerID); +} + +void CalpontSystemCatalog::setupQueryTxnCtx(CalpontSelectExecutionPlan& csep, BRM::TxnID& txnID, int& oldTxnID, + BRM::QueryContext& verID, BRM::QueryContext& oldVerID) +{ + // start up new transaction + txnID = fSessionManager->getTxnID(fSessionID); + + if (!txnID.valid) + { + txnID.id = 0; + txnID.valid = true; + } + + verID = fSessionManager->verID(); + oldTxnID = csep.txnID(); + csep.txnID(txnID.id); + oldVerID = csep.verID(); + csep.verID(verID); +} + +void CalpontSystemCatalog::restoreQueryTxnCtx(CalpontSelectExecutionPlan& csep, int oldTxnID, + const BRM::QueryContext& oldVerID) +{ csep.sessionID(fSessionID); csep.txnID(oldTxnID); csep.verID(oldVerID); } -void CalpontSystemCatalog::getSysData_EC(CalpontSelectExecutionPlan& csep, NJLSysDataList& sysDataList, - const TableName& tableName) +void CalpontSystemCatalog::getSysData_EC(CalpontSelectExecutionPlan& csep, NJLSysDataList& sysDataList) { DEBUG << "Enter getSysData_EC " << fSessionID << endl; - uint32_t tableOID; - if (tableName.schema == CALPONT_SCHEMA) - { - tableOID = IDB_VTABLE_ID; - } - else - { - tableOID = lookupTableOID(tableName); - } + uint32_t tableOID = IDB_VTABLE_ID; ByteStream bs; uint32_t status; @@ -941,8 +1005,7 @@ void CalpontSystemCatalog::getSysData_EC(CalpontSelectExecutionPlan& csep, NJLSy } } -void CalpontSystemCatalog::getSysData_FE(const CalpontSelectExecutionPlan& csep, NJLSysDataList& sysDataList, - const TableName& tableName) +void CalpontSystemCatalog::getSysData_FE(CalpontSelectExecutionPlan& csep, NJLSysDataList& sysDataList) { DEBUG << "Enter getSysData_FE " << fSessionID << endl; @@ -958,18 +1021,15 @@ void CalpontSystemCatalog::getSysData_FE(const CalpontSelectExecutionPlan& csep, csep.serialize(msg); fExeMgr->write(msg); - // Get the table oid for the table being queried. - // Use lookupTableOID for regular tables or fall back to IDB_VTABLE_ID for system catalog queries + // Get the table oid for the system table being queried. uint32_t tableOID; - if (tableName.schema == CALPONT_SCHEMA) + if (csep.schemaName() == CALPONT_SCHEMA) { - // System catalog tables still use virtual table ID for compatibility with existing infrastructure tableOID = IDB_VTABLE_ID; } else { - // Regular user tables - look up the actual table OID - tableOID = lookupTableOID(tableName); + tableOID = lookupTableOID(TableName(csep.schemaName(), csep.tableName())); } uint16_t status = 0; @@ -1215,7 +1275,7 @@ const CalpontSystemCatalog::ColType CalpontSystemCatalog::colType(const OID& Oid csep.data(oss.str()); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); + getSysData(csep, sysDataList, SYSCOLUMN_TABLE); TableColName tcn; vector::const_iterator it; @@ -1383,7 +1443,7 @@ const CalpontSystemCatalog::ColType CalpontSystemCatalog::colTypeDct(const OID& csep.data(oss.str()); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); + getSysData(csep, sysDataList, SYSCOLUMN_TABLE); vector::const_iterator it; @@ -1495,7 +1555,7 @@ const CalpontSystemCatalog::TableColName CalpontSystemCatalog::colName(const OID csep.data(oss.str()); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); + getSysData(csep, sysDataList, SYSCOLUMN_TABLE); vector::const_iterator it; @@ -1596,7 +1656,7 @@ const CalpontSystemCatalog::TableColName CalpontSystemCatalog::dictColName(const csep.data(oss.str()); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); + getSysData(csep, sysDataList, SYSCOLUMN_TABLE); vector::const_iterator it; @@ -1704,7 +1764,7 @@ uint64_t CalpontSystemCatalog::nextAutoIncrValue(TableName aTableName, int lower try { - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); + getSysData(csep, sysDataList, SYSCOLUMN_TABLE); } catch (runtime_error& e) { @@ -1813,7 +1873,7 @@ int32_t CalpontSystemCatalog::autoColumOid(TableName aTableName, int lower_case_ try { - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); + getSysData(csep, sysDataList, SYSCOLUMN_TABLE); } catch (runtime_error& e) { @@ -1883,7 +1943,7 @@ const CalpontSystemCatalog::ROPair CalpontSystemCatalog::nextAutoIncrRid(const O try { - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); + getSysData(csep, sysDataList, SYSCOLUMN_TABLE); } catch (runtime_error& e) { @@ -2854,7 +2914,7 @@ CalpontSystemCatalog::getTables(const std::string schema, int lower_case_table_n } NJLSysDataList sysDataList; - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); + getSysData(csep, sysDataList, SYSTABLE_TABLE); vector::const_iterator it; vector tnl; @@ -2934,7 +2994,7 @@ int CalpontSystemCatalog::getTableCount() OID oid1 = OID_SYSTABLE_OBJECTID; NJLSysDataList sysDataList; - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); + getSysData(csep, sysDataList, SYSTABLE_TABLE); vector::const_iterator it; @@ -3203,7 +3263,7 @@ const CalpontSystemCatalog::RIDList CalpontSystemCatalog::columnRIDs(const Table csep.data(oss.str()); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); + getSysData(csep, sysDataList, SYSCOLUMN_TABLE); vector::const_iterator it; ColType ct; @@ -3480,7 +3540,7 @@ const CalpontSystemCatalog::TableName CalpontSystemCatalog::tableName(const OID& try { - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); + getSysData(csep, sysDataList, SYSTABLE_TABLE); } catch (runtime_error& e) { @@ -3618,7 +3678,7 @@ const CalpontSystemCatalog::ROPair CalpontSystemCatalog::tableRID(const TableNam try { - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); + getSysData(csep, sysDataList, SYSTABLE_TABLE); } catch (IDBExcept&) { @@ -3761,7 +3821,7 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::tableAUXColumnOID(const TableNam try { - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); + getSysData(csep, sysDataList, SYSTABLE_TABLE); } catch (IDBExcept&) { @@ -3860,7 +3920,7 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::isAUXColumnOID(const OID& oid) try { - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); + getSysData(csep, sysDataList, SYSTABLE_TABLE); } catch (IDBExcept&) { @@ -5081,7 +5141,7 @@ const CalpontSystemCatalog::DictOIDList CalpontSystemCatalog::dictOIDs(const Tab csep.filterTokenList(filterTokenList); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); + getSysData(csep, sysDataList, SYSCOLUMN_TABLE); vector::const_iterator it; @@ -5694,7 +5754,7 @@ void CalpontSystemCatalog::getSchemaInfo(const string& in_schema, int lower_case csep.data(oss.str()); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); + getSysData(csep, sysDataList, SYSCOLUMN_TABLE); vector::const_iterator it; ColType ct; diff --git a/dbcon/execplan/calpontsystemcatalog.h b/dbcon/execplan/calpontsystemcatalog.h index 4607a1e1f..21d584341 100644 --- a/dbcon/execplan/calpontsystemcatalog.h +++ b/dbcon/execplan/calpontsystemcatalog.h @@ -45,6 +45,13 @@ #include "joblisttypes.h" #include "stdexcept" +namespace BRM +{ +struct _TxnID; +typedef struct _TxnID TxnID; +class QueryContext; +} // namespace BRM + #undef min #undef max @@ -901,6 +908,8 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog void flushCache(); + void getQueryData(execplan::CalpontSelectExecutionPlan&, NJLSysDataList&); + /** Convert a MySQL thread id to an InfiniDB session id */ static uint32_t idb_tid2sid(const uint32_t tid); @@ -913,20 +922,26 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog // e.g. in client UDFs like calshowpartitions(). explicit CalpontSystemCatalog(); - /** get system data for Front End */ - void getSysData_FE(const execplan::CalpontSelectExecutionPlan&, NJLSysDataList&, - const TableName& tableName); - - /** get system data */ - void getSysData(execplan::CalpontSelectExecutionPlan&, NJLSysDataList&, const TableName& tableName); private: /** Constuctors */ explicit CalpontSystemCatalog(const CalpontSystemCatalog& rhs); CalpontSystemCatalog& operator=(const CalpontSystemCatalog& rhs); + /** get system data */ + void getSysData(execplan::CalpontSelectExecutionPlan&, NJLSysDataList&, const std::string& sysTableName); + + /** get system data for Front End */ + void getSysData_FE(execplan::CalpontSelectExecutionPlan&, NJLSysDataList&); /** get system data for Engine Controller */ - void getSysData_EC(execplan::CalpontSelectExecutionPlan&, NJLSysDataList&, const TableName& tableName); + void getSysData_EC(execplan::CalpontSelectExecutionPlan&, NJLSysDataList&); + + /** setup context for getSysData/getQueryData (prolog) */ + void setupQueryTxnCtx(execplan::CalpontSelectExecutionPlan& csep, BRM::TxnID& txnID, int& oldTxnID, + BRM::QueryContext& verID, BRM::QueryContext& oldVerID); + /** restore context after getSysData/getQueryData (epilog) */ + void restoreQueryTxnCtx(execplan::CalpontSelectExecutionPlan& csep, int oldTxnID, + const BRM::QueryContext& oldVerID); void buildSysColinfomap(); void buildSysOIDmap();