diff --git a/dbcon/dmlpackageproc/commandpackageprocessor.cpp b/dbcon/dmlpackageproc/commandpackageprocessor.cpp index e57a80a54..7d48a11ed 100644 --- a/dbcon/dmlpackageproc/commandpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/commandpackageprocessor.cpp @@ -38,6 +38,13 @@ #include "oamcache.h" #include "liboamcpp.h" #include "resourcemanager.h" +#include "simplecolumn.h" +#include "functioncolumn.h" +#include "aggregatecolumn.h" +#include "simplefilter.h" +#include "constantcolumn.h" +#include "pseudocolumn.h" +#include "functor_str.h" using namespace std; using namespace WriteEngine; @@ -46,9 +53,12 @@ using namespace execplan; using namespace logging; using namespace boost; using namespace BRM; +using namespace funcexp; namespace dmlpackageprocessor { +typedef CalpontSelectExecutionPlan::ColumnMap::value_type CMVT_; + // Tracks active cleartablelock commands by storing set of table lock IDs /*static*/ std::set CommandPackageProcessor::fActiveClearTableLockCmds; /*static*/ boost::mutex CommandPackageProcessor::fActiveClearTableLockCmdMutex; @@ -461,6 +471,10 @@ DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackageInternal( { clearTableLock(uniqueId, cpackage, result); } + else if (stmt == "ANALYZEPARTITIONBLOAT") + { + analyzePartitionBloat(cpackage, result); + } else if (!cpackage.get_Logging()) { BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID()); @@ -774,7 +788,7 @@ void CommandPackageProcessor::viewTableLock(const dmlpackage::CalpontDMLPackage& found = true; } // end of displaying a table lock match - } // end of loop through all table locks + } // end of loop through all table locks if (!found) { @@ -1153,4 +1167,162 @@ void CommandPackageProcessor::establishTableLockToClear(uint64_t tableLockID, BR fActiveClearTableLockCmds.insert(tableLockID); } +void CommandPackageProcessor::analyzePartitionBloat(const dmlpackage::CalpontDMLPackage& cpackage, + DMLPackageProcessor::DMLResult& result) +{ + boost::shared_ptr systemCatalogPtr = + CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID); + systemCatalogPtr->identity(CalpontSystemCatalog::EC); + CalpontSystemCatalog::TableName tableName; + tableName.schema = cpackage.get_SchemaName(); + tableName.table = cpackage.get_TableName(); + std::string partitionStr = cpackage.get_SQLStatement(); + + std::ostringstream analysisResults; + bool bErrFlag = false; + std::string errorMsg; + + try + { + // Get AUX column OID for the table + CalpontSystemCatalog::OID auxColumnOid = systemCatalogPtr->tableAUXColumnOID(tableName); + + if (auxColumnOid <= 3000) + { + analysisResults << "Table " << tableName.toString() + << " does not have an AUX column for bloat analysis."; + result.bloatAnalysis = analysisResults.str(); + return; + } + + // SELECT COUNT(aux) AS count_aux FROM schema.table WHERE idbPartition(aux) = partitionStr; + CalpontSelectExecutionPlan csep; + CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList; + CalpontSelectExecutionPlan::FilterTokenList filterTokenList; + CalpontSelectExecutionPlan::ColumnMap colMap; + + // Create the base SimpleColumn for 'aux' + SimpleColumn* auxCol = new SimpleColumn(tableName.schema, tableName.table, "aux", fSessionID); + auxCol->alias("aux"); + CalpontSystemCatalog::ColType auxColType; + auxColType.colDataType = CalpontSystemCatalog::INT; + auxColType.colWidth = 4; + auxCol->resultType(auxColType); + + // Create the COUNT(aux) AS count_aux aggregate column + AggregateColumn* countAuxCol = new AggregateColumn(fSessionID); + countAuxCol->alias("count_aux"); + countAuxCol->aggOp(AggregateColumn::COUNT); + countAuxCol->functionName("count"); + countAuxCol->expressionId(1); + CalpontSystemCatalog::ColType countAuxColType; + countAuxColType.colDataType = CalpontSystemCatalog::INT; + countAuxColType.colWidth = 4; + countAuxCol->resultType(countAuxColType); + + SRCP auxSRCP(auxCol->clone()); + countAuxCol->aggParms().push_back(auxSRCP); + + // Add the base 'aux' column to ColumnMap (used for reference resolution) + // Note: The aggregate result "count_aux" does NOT go in ColumnMap + // Add "aux" twice since it's referenced in both COUNT(aux) and idbPartition(aux) + colMap.insert(CMVT_(tableName.schema + "." + tableName.table + "." + "aux", auxSRCP)); + auxSRCP.reset(auxCol->clone()); + colMap.insert(CMVT_(tableName.schema + "." + tableName.table + "." + "aux", auxSRCP)); + + // Add the COUNT column to ReturnedColumnList (what gets returned by SELECT) + SRCP countSRCP(countAuxCol->clone()); + returnedColumnList.push_back(countSRCP); + + csep.columnMapNonStatic(colMap); + csep.returnedCols(returnedColumnList); + + // Define the filter using FunctionColumn for idbPartition() + const SOP opeq(new Operator("=")); + + // Create a FunctionColumn for idbPartition(aux) + // parms: psueducolumn dbroot, segmentdir, segment + SPTP sptp; + FunctionColumn* fc = new FunctionColumn(); + fc->functionName("idbpartition"); + fc->sessionID(fSessionID); + fc->expressionId(0); + funcexp::FunctionParm parms; + PseudoColumn* dbroot = new PseudoColumn(*auxCol, PSEUDO_DBROOT, fSessionID); + sptp.reset(new ParseTree(dbroot)); + parms.push_back(sptp); + + PseudoColumn* pp = new PseudoColumn(*auxCol, PSEUDO_SEGMENTDIR, fSessionID); + sptp.reset(new ParseTree(pp)); + parms.push_back(sptp); + + PseudoColumn* seg = new PseudoColumn(*auxCol, PSEUDO_SEGMENT, fSessionID); + sptp.reset(new ParseTree(seg)); + parms.push_back(sptp); + + fc->functionParms(parms); + + CalpontSystemCatalog::ColType resultType; + resultType.colDataType = CalpontSystemCatalog::VARCHAR; + resultType.colWidth = 256; + fc->resultType(resultType); + + funcexp::Func_idbpartition* idbpartition = new funcexp::Func_idbpartition(); + fc->operationType(idbpartition->operationType(parms, fc->resultType())); + delete idbpartition; + + // Set up the filter + ConstantColumn* partitionConstCol = new ConstantColumn(partitionStr, ConstantColumn::LITERAL); + SimpleFilter* f1 = new SimpleFilter(opeq, fc, partitionConstCol); + filterTokenList.push_back(f1); + csep.filterTokenList(filterTokenList); + + + // Set the session ID, transaction ID and version Id + BRM::QueryContext verID; + verID = fSessionManager.verID(); + csep.verID(verID); + csep.sessionID(fSessionID); + BRM::TxnID txnID; + txnID = fSessionManager.getTxnID(fSessionID); + csep.txnID(txnID.id); + + + // Send CSEP to ExeMgr + auto csepStr = csep.toString(); + cout << "csep: " << csepStr << endl; + CalpontSystemCatalog::NJLSysDataList sysDataList; + systemCatalogPtr->getSysData(csep, sysDataList, tableName); + + cout << "Done getSysData" << endl; + + cout << "result size: " << sysDataList.sysDataVec.size() << endl; + + // parse the result + for (auto it = sysDataList.begin(); it != sysDataList.end(); it++) + { + cout << "result: " << (*it)->GetData(0) << endl; + } + + // Return the result - use toString() to get the full plan representation + analysisResults << "80 (for testing)"; + + result.bloatAnalysis = analysisResults.str(); + cout << "analysisResults: " << analysisResults.str() << endl; + } + catch (std::exception& ex) + { + bErrFlag = true; + errorMsg = ex.what(); + } + + if (bErrFlag) + { + std::ostringstream oss; + oss << "Partition bloat analysis failed for table " << tableName.toString() << ", partition " + << partitionStr << ": " << errorMsg; + result.bloatAnalysis = oss.str(); + } +} + } // namespace dmlpackageprocessor diff --git a/dbcon/dmlpackageproc/commandpackageprocessor.h b/dbcon/dmlpackageproc/commandpackageprocessor.h index ec0218e59..ba82060b9 100644 --- a/dbcon/dmlpackageproc/commandpackageprocessor.h +++ b/dbcon/dmlpackageproc/commandpackageprocessor.h @@ -53,6 +53,7 @@ class CommandPackageProcessor : public DMLPackageProcessor void viewTableLock(const dmlpackage::CalpontDMLPackage& cpackage, DMLResult& result); void clearTableLock(uint64_t uniqueId, const dmlpackage::CalpontDMLPackage& cpackage, DMLResult& result); void establishTableLockToClear(uint64_t tableLockID, BRM::TableLockInfo& lockInfo); + void analyzePartitionBloat(const dmlpackage::CalpontDMLPackage& cpackage, DMLPackageProcessor::DMLResult& result); DMLResult processPackageInternal(dmlpackage::CalpontDMLPackage& cpackage) override; // Tracks active cleartablelock commands by storing set of table lock IDs diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.h b/dbcon/dmlpackageproc/dmlpackageprocessor.h index 4379e9412..03abce9c9 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.h +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.h @@ -129,6 +129,7 @@ class DMLPackageProcessor std::string extendedStats; std::string miniStats; querystats::QueryStats stats; + std::string bloatAnalysis; DMLResult() : result(NO_ERROR), rowCount(0){}; }; diff --git a/dbcon/execplan/calpontsystemcatalog.cpp b/dbcon/execplan/calpontsystemcatalog.cpp index 118fe1f6e..e3ba4774e 100644 --- a/dbcon/execplan/calpontsystemcatalog.cpp +++ b/dbcon/execplan/calpontsystemcatalog.cpp @@ -449,7 +449,7 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::lookupTableOID(const TableName& try { - getSysData(csep, sysDataList, SYSTABLE_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); } catch (IDBExcept&) { @@ -662,7 +662,7 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::lookupOID(const TableColName& ta TableColName tcn; ColType ct; OID coloid = -1; - getSysData(csep, sysDataList, SYSCOLUMN_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); vector::const_iterator it; @@ -751,7 +751,7 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::lookupOID(const TableColName& ta } void CalpontSystemCatalog::getSysData(CalpontSelectExecutionPlan& csep, NJLSysDataList& sysDataList, - const string& sysTableName) + const TableName& tableName) { // start up new transaction @@ -777,12 +777,19 @@ void CalpontSystemCatalog::getSysData(CalpontSelectExecutionPlan& csep, NJLSysDa // in joblist that a high-bit-set session id is always a syscat query. This will be okay for a long time, // but not forever... - csep.sessionID(fSessionID | 0x80000000); + if (tableName.schema == CALPONT_SCHEMA) + { + csep.sessionID(fSessionID | 0x80000000); + } + else + { + csep.sessionID(fSessionID); + } int tryCnt = 0; // add the tableList to csep for tuple joblist to use CalpontSelectExecutionPlan::TableList tablelist; - tablelist.push_back(make_aliastable("calpontsys", sysTableName, "")); + tablelist.push_back(make_aliastable(tableName.schema, tableName.table, "")); csep.tableList(tablelist); // populate the returned column list as column map @@ -798,7 +805,7 @@ void CalpontSystemCatalog::getSysData(CalpontSelectExecutionPlan& csep, NJLSysDa { try { - getSysData_EC(csep, sysDataList, sysTableName); + getSysData_EC(csep, sysDataList, tableName); } catch (IDBExcept&) { @@ -817,7 +824,7 @@ void CalpontSystemCatalog::getSysData(CalpontSelectExecutionPlan& csep, NJLSysDa try { - getSysData_FE(csep, sysDataList, sysTableName); + getSysData_FE(csep, sysDataList, tableName); break; } catch (IDBExcept&) // error already occurred. this is not a broken pipe @@ -855,11 +862,19 @@ void CalpontSystemCatalog::getSysData(CalpontSelectExecutionPlan& csep, NJLSysDa } void CalpontSystemCatalog::getSysData_EC(CalpontSelectExecutionPlan& csep, NJLSysDataList& sysDataList, - const string& /*sysTableName*/) + const TableName& tableName) { DEBUG << "Enter getSysData_EC " << fSessionID << endl; - uint32_t tableOID = IDB_VTABLE_ID; + uint32_t tableOID; + if (tableName.schema == CALPONT_SCHEMA) + { + tableOID = IDB_VTABLE_ID; + } + else + { + tableOID = lookupTableOID(tableName); + } ByteStream bs; uint32_t status; @@ -927,7 +942,7 @@ void CalpontSystemCatalog::getSysData_EC(CalpontSelectExecutionPlan& csep, NJLSy } void CalpontSystemCatalog::getSysData_FE(const CalpontSelectExecutionPlan& csep, NJLSysDataList& sysDataList, - const string& sysTableName) + const TableName& tableName) { DEBUG << "Enter getSysData_FE " << fSessionID << endl; @@ -943,11 +958,19 @@ void CalpontSystemCatalog::getSysData_FE(const CalpontSelectExecutionPlan& csep, csep.serialize(msg); fExeMgr->write(msg); - // Get the table oid for the system table being queried. - TableName tableName; - tableName.schema = CALPONT_SCHEMA; - tableName.table = sysTableName; - uint32_t tableOID = IDB_VTABLE_ID; + // Get the table oid for the table being queried. + // Use lookupTableOID for regular tables or fall back to IDB_VTABLE_ID for system catalog queries + uint32_t tableOID; + if (tableName.schema == CALPONT_SCHEMA) + { + // System catalog tables still use virtual table ID for compatibility with existing infrastructure + tableOID = IDB_VTABLE_ID; + } + else + { + // Regular user tables - look up the actual table OID + tableOID = lookupTableOID(tableName); + } uint16_t status = 0; // Send the request for the table. @@ -1192,7 +1215,7 @@ const CalpontSystemCatalog::ColType CalpontSystemCatalog::colType(const OID& Oid csep.data(oss.str()); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, SYSCOLUMN_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); TableColName tcn; vector::const_iterator it; @@ -1360,7 +1383,7 @@ const CalpontSystemCatalog::ColType CalpontSystemCatalog::colTypeDct(const OID& csep.data(oss.str()); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, SYSCOLUMN_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); vector::const_iterator it; @@ -1472,7 +1495,7 @@ const CalpontSystemCatalog::TableColName CalpontSystemCatalog::colName(const OID csep.data(oss.str()); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, SYSCOLUMN_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); vector::const_iterator it; @@ -1573,7 +1596,7 @@ const CalpontSystemCatalog::TableColName CalpontSystemCatalog::dictColName(const csep.data(oss.str()); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, SYSCOLUMN_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); vector::const_iterator it; @@ -1681,7 +1704,7 @@ uint64_t CalpontSystemCatalog::nextAutoIncrValue(TableName aTableName, int lower try { - getSysData(csep, sysDataList, SYSCOLUMN_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); } catch (runtime_error& e) { @@ -1790,7 +1813,7 @@ int32_t CalpontSystemCatalog::autoColumOid(TableName aTableName, int lower_case_ try { - getSysData(csep, sysDataList, SYSCOLUMN_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); } catch (runtime_error& e) { @@ -1860,7 +1883,7 @@ const CalpontSystemCatalog::ROPair CalpontSystemCatalog::nextAutoIncrRid(const O try { - getSysData(csep, sysDataList, SYSCOLUMN_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); } catch (runtime_error& e) { @@ -2831,7 +2854,7 @@ CalpontSystemCatalog::getTables(const std::string schema, int lower_case_table_n } NJLSysDataList sysDataList; - getSysData(csep, sysDataList, SYSTABLE_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); vector::const_iterator it; vector tnl; @@ -2911,7 +2934,7 @@ int CalpontSystemCatalog::getTableCount() OID oid1 = OID_SYSTABLE_OBJECTID; NJLSysDataList sysDataList; - getSysData(csep, sysDataList, SYSTABLE_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); vector::const_iterator it; @@ -3180,7 +3203,7 @@ const CalpontSystemCatalog::RIDList CalpontSystemCatalog::columnRIDs(const Table csep.data(oss.str()); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, SYSCOLUMN_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); vector::const_iterator it; ColType ct; @@ -3457,7 +3480,7 @@ const CalpontSystemCatalog::TableName CalpontSystemCatalog::tableName(const OID& try { - getSysData(csep, sysDataList, SYSTABLE_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); } catch (runtime_error& e) { @@ -3595,7 +3618,7 @@ const CalpontSystemCatalog::ROPair CalpontSystemCatalog::tableRID(const TableNam try { - getSysData(csep, sysDataList, SYSTABLE_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); } catch (IDBExcept&) { @@ -3738,7 +3761,7 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::tableAUXColumnOID(const TableNam try { - getSysData(csep, sysDataList, SYSTABLE_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); } catch (IDBExcept&) { @@ -3837,7 +3860,7 @@ CalpontSystemCatalog::OID CalpontSystemCatalog::isAUXColumnOID(const OID& oid) try { - getSysData(csep, sysDataList, SYSTABLE_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSTABLE_TABLE)); } catch (IDBExcept&) { @@ -5058,7 +5081,7 @@ const CalpontSystemCatalog::DictOIDList CalpontSystemCatalog::dictOIDs(const Tab csep.filterTokenList(filterTokenList); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, SYSCOLUMN_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); vector::const_iterator it; @@ -5671,7 +5694,7 @@ void CalpontSystemCatalog::getSchemaInfo(const string& in_schema, int lower_case csep.data(oss.str()); NJLSysDataList sysDataList; - getSysData(csep, sysDataList, SYSCOLUMN_TABLE); + getSysData(csep, sysDataList, TableName(CALPONT_SCHEMA, SYSCOLUMN_TABLE)); vector::const_iterator it; ColType ct; diff --git a/dbcon/execplan/calpontsystemcatalog.h b/dbcon/execplan/calpontsystemcatalog.h index 027878345..4607a1e1f 100644 --- a/dbcon/execplan/calpontsystemcatalog.h +++ b/dbcon/execplan/calpontsystemcatalog.h @@ -913,19 +913,20 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog // e.g. in client UDFs like calshowpartitions(). explicit CalpontSystemCatalog(); + /** get system data for Front End */ + void getSysData_FE(const execplan::CalpontSelectExecutionPlan&, NJLSysDataList&, + const TableName& tableName); + + /** get system data */ + void getSysData(execplan::CalpontSelectExecutionPlan&, NJLSysDataList&, const TableName& tableName); private: /** Constuctors */ explicit CalpontSystemCatalog(const CalpontSystemCatalog& rhs); CalpontSystemCatalog& operator=(const CalpontSystemCatalog& rhs); - /** get system data */ - void getSysData(execplan::CalpontSelectExecutionPlan&, NJLSysDataList&, const std::string& sysTableName); - /** get system data for Front End */ - void getSysData_FE(const execplan::CalpontSelectExecutionPlan&, NJLSysDataList&, - const std::string& sysTableName); /** get system data for Engine Controller */ - void getSysData_EC(execplan::CalpontSelectExecutionPlan&, NJLSysDataList&, const std::string& sysTableName); + void getSysData_EC(execplan::CalpontSelectExecutionPlan&, NJLSysDataList&, const TableName& tableName); void buildSysColinfomap(); void buildSysOIDmap(); diff --git a/dbcon/mysql/ha_mcs_client_udfs.cpp b/dbcon/mysql/ha_mcs_client_udfs.cpp index dc905e938..d7a9d2a01 100644 --- a/dbcon/mysql/ha_mcs_client_udfs.cpp +++ b/dbcon/mysql/ha_mcs_client_udfs.cpp @@ -1169,4 +1169,65 @@ extern "C" { } + my_bool analyze_partition_bloat_init(UDF_INIT* initid, UDF_ARGS* args, char* message, const char* funcname) + { + if (args->arg_count != 3 || + args->arg_type[0] != STRING_RESULT || + args->arg_type[1] != STRING_RESULT || + args->arg_type[2] != STRING_RESULT) + { + sprintf(message, "%s() requires three string arguments", funcname); + return 1; + } + + initid->maybe_null = 0; + initid->max_length = 3; + + return 0; + } + + my_bool mcs_analyze_partition_bloat_init(UDF_INIT* initid, UDF_ARGS* args, char* message) + { + return analyze_partition_bloat_init(initid, args, message, "MCSANALYZEPARTITIONBLOAT"); + } + + const char* mcs_analyze_partition_bloat(UDF_INIT* /*initid*/, UDF_ARGS* args, char* result, + unsigned long* length, char* /*is_null*/, char* /*error*/) + { + THD* thd = current_thd; + + if (get_fe_conn_info_ptr() == NULL) + { + set_fe_conn_info_ptr((void*)new cal_connection_info()); + thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); + } + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + execplan::CalpontSystemCatalog::TableName tableName; + + tableName.schema = args->args[0]; + tableName.table = args->args[1]; + std::string partition = args->args[2]; + + if (lower_case_table_names) { + boost::algorithm::to_lower(tableName.schema); + boost::algorithm::to_lower(tableName.table); + } + + if (!ci->dmlProc) + { + ci->dmlProc = new MessageQueueClient("DMLProc"); + } + + std::string analysisResult = ha_mcs_impl_analyze_partition_bloat(*ci, tableName, partition); + + memcpy(result, analysisResult.c_str(), analysisResult.length()); + *length = analysisResult.length(); + return result; + } + + void mcs_analyze_partition_bloat_deinit(UDF_INIT* /*initid*/) + { + } + } // extern "C" diff --git a/dbcon/mysql/ha_mcs_dml.cpp b/dbcon/mysql/ha_mcs_dml.cpp index 77e69bd62..c1e812362 100644 --- a/dbcon/mysql/ha_mcs_dml.cpp +++ b/dbcon/mysql/ha_mcs_dml.cpp @@ -1001,3 +1001,79 @@ int ha_mcs_impl_close_connection_(handlerton* /*hton*/, THD* thd, cal_connection // transaction. Under either situation, system catalog cache for this session should be removed return rc; } + +std::string ha_mcs_impl_analyze_partition_bloat(cal_impl_if::cal_connection_info& ci, + execplan::CalpontSystemCatalog::TableName& tablename, + const std::string& partition) +{ + THD* thd = current_thd; + ulong sessionID = tid2sid(thd->thread_id); + CalpontDMLPackage* pDMLPackage; + std::string dmlStatement("ANALYZEPARTITIONBLOAT"); + VendorDMLStatement cmdStmt(dmlStatement, DML_COMMAND, sessionID); + pDMLPackage = CalpontDMLFactory::makeCalpontDMLPackageFromMysqlBuffer(cmdStmt); + + if (lower_case_table_names) + { + boost::algorithm::to_lower(tablename.schema); + boost::algorithm::to_lower(tablename.table); + } + pDMLPackage->set_SchemaName(tablename.schema); + pDMLPackage->set_TableName(tablename.table); + + pDMLPackage->set_SQLStatement(partition); + + ByteStream bytestream; + bytestream << static_cast(sessionID); + pDMLPackage->write(bytestream); + delete pDMLPackage; + + ByteStream::byte b = 0; + ByteStream::octbyte rows; + std::string errorMsg; + std::string analysisResult; + + try + { + ci.dmlProc->write(bytestream); + bytestream = ci.dmlProc->read(); + + if (bytestream.length() == 0) + { + thd->get_stmt_da()->set_overwrite_status(true); + thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [9]"); + } + else + { + bytestream >> b; + bytestream >> rows; + bytestream >> errorMsg; + + // Skip tableLockInfo, queryStats, extendedStats, miniStats (not used for this command) + std::string tmp; + bytestream >> tmp; + bytestream >> tmp; + bytestream >> tmp; + bytestream >> tmp; + + // Read the bloatAnalysis result + bytestream >> analysisResult; + } + } + catch (runtime_error&) + { + thd->get_stmt_da()->set_overwrite_status(true); + thd->raise_error_printf(ER_INTERNAL_ERROR, "Lost connection to DMLProc [10]"); + } + catch (...) + { + thd->get_stmt_da()->set_overwrite_status(true); + thd->raise_error_printf(ER_INTERNAL_ERROR, "Caught unknown error"); + } + + if (b != 0) + analysisResult = errorMsg; + + return analysisResult; +} + diff --git a/dbcon/mysql/ha_mcs_impl.h b/dbcon/mysql/ha_mcs_impl.h index 61c5b0690..a6fb5ee38 100644 --- a/dbcon/mysql/ha_mcs_impl.h +++ b/dbcon/mysql/ha_mcs_impl.h @@ -75,4 +75,7 @@ extern std::string ha_mcs_impl_droppartition_(execplan::CalpontSystemCatalog::Ta extern std::string ha_mcs_impl_viewtablelock(cal_impl_if::cal_connection_info& ci, execplan::CalpontSystemCatalog::TableName& tablename); extern std::string ha_mcs_impl_cleartablelock(cal_impl_if::cal_connection_info& ci, uint64_t tableLockID); +extern std::string ha_mcs_impl_analyze_partition_bloat(cal_impl_if::cal_connection_info& ci, + execplan::CalpontSystemCatalog::TableName& tablename, + const std::string& partition); #endif diff --git a/dbcon/mysql/install_mcs_mysql.sh.in b/dbcon/mysql/install_mcs_mysql.sh.in index 9c36b9a8d..cff6b9044 100755 --- a/dbcon/mysql/install_mcs_mysql.sh.in +++ b/dbcon/mysql/install_mcs_mysql.sh.in @@ -56,6 +56,7 @@ CREATE OR REPLACE FUNCTION mcssystemreadonly RETURNS INTEGER SONAME 'ha_columnst CREATE OR REPLACE FUNCTION mcssystemprimary RETURNS INTEGER SONAME 'ha_columnstore.so'; CREATE OR REPLACE FUNCTION mcs_emindex_size RETURNS INTEGER SONAME 'ha_columnstore.so'; CREATE OR REPLACE FUNCTION mcs_emindex_free RETURNS INTEGER SONAME 'ha_columnstore.so'; +CREATE OR REPLACE FUNCTION mcs_analyze_partition_bloat RETURNS STRING SONAME 'ha_columnstore.so'; CREATE OR REPLACE FUNCTION columnstore_dataload RETURNS STRING SONAME 'ha_columnstore.so'; CREATE OR REPLACE AGGREGATE FUNCTION regr_avgx RETURNS REAL SONAME 'libregr_mysql.so'; CREATE OR REPLACE AGGREGATE FUNCTION regr_avgy RETURNS REAL SONAME 'libregr_mysql.so'; diff --git a/dmlproc/dmlprocessor.cpp b/dmlproc/dmlprocessor.cpp index c0383873f..2d21d5970 100644 --- a/dmlproc/dmlprocessor.cpp +++ b/dmlproc/dmlprocessor.cpp @@ -1199,6 +1199,7 @@ void PackageHandler::run() results << result.queryStats; results << result.extendedStats; results << result.miniStats; + results << result.bloatAnalysis; result.stats.serialize(results); fIos.write(results); // Bug 5226. dmlprocessor thread will close the socket to mysqld.