1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-11-03 17:13:17 +03:00

feat: analyze partition bloat

This commit is contained in:
Amr Elmohamady
2025-07-22 14:05:33 +03:00
committed by drrtuy
parent 99392ed87f
commit 2557a0a504
10 changed files with 378 additions and 38 deletions

View File

@@ -38,6 +38,13 @@
#include "oamcache.h"
#include "liboamcpp.h"
#include "resourcemanager.h"
#include "simplecolumn.h"
#include "functioncolumn.h"
#include "aggregatecolumn.h"
#include "simplefilter.h"
#include "constantcolumn.h"
#include "pseudocolumn.h"
#include "functor_str.h"
using namespace std;
using namespace WriteEngine;
@@ -46,9 +53,12 @@ using namespace execplan;
using namespace logging;
using namespace boost;
using namespace BRM;
using namespace funcexp;
namespace dmlpackageprocessor
{
typedef CalpontSelectExecutionPlan::ColumnMap::value_type CMVT_;
// Tracks active cleartablelock commands by storing set of table lock IDs
/*static*/ std::set<uint64_t> CommandPackageProcessor::fActiveClearTableLockCmds;
/*static*/ boost::mutex CommandPackageProcessor::fActiveClearTableLockCmdMutex;
@@ -461,6 +471,10 @@ DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackageInternal(
{
clearTableLock(uniqueId, cpackage, result);
}
else if (stmt == "ANALYZEPARTITIONBLOAT")
{
analyzePartitionBloat(cpackage, result);
}
else if (!cpackage.get_Logging())
{
BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID());
@@ -774,7 +788,7 @@ void CommandPackageProcessor::viewTableLock(const dmlpackage::CalpontDMLPackage&
found = true;
} // end of displaying a table lock match
} // end of loop through all table locks
} // end of loop through all table locks
if (!found)
{
@@ -1153,4 +1167,162 @@ void CommandPackageProcessor::establishTableLockToClear(uint64_t tableLockID, BR
fActiveClearTableLockCmds.insert(tableLockID);
}
void CommandPackageProcessor::analyzePartitionBloat(const dmlpackage::CalpontDMLPackage& cpackage,
DMLPackageProcessor::DMLResult& result)
{
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::TableName tableName;
tableName.schema = cpackage.get_SchemaName();
tableName.table = cpackage.get_TableName();
std::string partitionStr = cpackage.get_SQLStatement();
std::ostringstream analysisResults;
bool bErrFlag = false;
std::string errorMsg;
try
{
// Get AUX column OID for the table
CalpontSystemCatalog::OID auxColumnOid = systemCatalogPtr->tableAUXColumnOID(tableName);
if (auxColumnOid <= 3000)
{
analysisResults << "Table " << tableName.toString()
<< " does not have an AUX column for bloat analysis.";
result.bloatAnalysis = analysisResults.str();
return;
}
// SELECT COUNT(aux) AS count_aux FROM schema.table WHERE idbPartition(aux) = partitionStr;
CalpontSelectExecutionPlan csep;
CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList;
CalpontSelectExecutionPlan::FilterTokenList filterTokenList;
CalpontSelectExecutionPlan::ColumnMap colMap;
// Create the base SimpleColumn for 'aux'
SimpleColumn* auxCol = new SimpleColumn(tableName.schema, tableName.table, "aux", fSessionID);
auxCol->alias("aux");
CalpontSystemCatalog::ColType auxColType;
auxColType.colDataType = CalpontSystemCatalog::INT;
auxColType.colWidth = 4;
auxCol->resultType(auxColType);
// Create the COUNT(aux) AS count_aux aggregate column
AggregateColumn* countAuxCol = new AggregateColumn(fSessionID);
countAuxCol->alias("count_aux");
countAuxCol->aggOp(AggregateColumn::COUNT);
countAuxCol->functionName("count");
countAuxCol->expressionId(1);
CalpontSystemCatalog::ColType countAuxColType;
countAuxColType.colDataType = CalpontSystemCatalog::INT;
countAuxColType.colWidth = 4;
countAuxCol->resultType(countAuxColType);
SRCP auxSRCP(auxCol->clone());
countAuxCol->aggParms().push_back(auxSRCP);
// Add the base 'aux' column to ColumnMap (used for reference resolution)
// Note: The aggregate result "count_aux" does NOT go in ColumnMap
// Add "aux" twice since it's referenced in both COUNT(aux) and idbPartition(aux)
colMap.insert(CMVT_(tableName.schema + "." + tableName.table + "." + "aux", auxSRCP));
auxSRCP.reset(auxCol->clone());
colMap.insert(CMVT_(tableName.schema + "." + tableName.table + "." + "aux", auxSRCP));
// Add the COUNT column to ReturnedColumnList (what gets returned by SELECT)
SRCP countSRCP(countAuxCol->clone());
returnedColumnList.push_back(countSRCP);
csep.columnMapNonStatic(colMap);
csep.returnedCols(returnedColumnList);
// Define the filter using FunctionColumn for idbPartition()
const SOP opeq(new Operator("="));
// Create a FunctionColumn for idbPartition(aux)
// parms: psueducolumn dbroot, segmentdir, segment
SPTP sptp;
FunctionColumn* fc = new FunctionColumn();
fc->functionName("idbpartition");
fc->sessionID(fSessionID);
fc->expressionId(0);
funcexp::FunctionParm parms;
PseudoColumn* dbroot = new PseudoColumn(*auxCol, PSEUDO_DBROOT, fSessionID);
sptp.reset(new ParseTree(dbroot));
parms.push_back(sptp);
PseudoColumn* pp = new PseudoColumn(*auxCol, PSEUDO_SEGMENTDIR, fSessionID);
sptp.reset(new ParseTree(pp));
parms.push_back(sptp);
PseudoColumn* seg = new PseudoColumn(*auxCol, PSEUDO_SEGMENT, fSessionID);
sptp.reset(new ParseTree(seg));
parms.push_back(sptp);
fc->functionParms(parms);
CalpontSystemCatalog::ColType resultType;
resultType.colDataType = CalpontSystemCatalog::VARCHAR;
resultType.colWidth = 256;
fc->resultType(resultType);
funcexp::Func_idbpartition* idbpartition = new funcexp::Func_idbpartition();
fc->operationType(idbpartition->operationType(parms, fc->resultType()));
delete idbpartition;
// Set up the filter
ConstantColumn* partitionConstCol = new ConstantColumn(partitionStr, ConstantColumn::LITERAL);
SimpleFilter* f1 = new SimpleFilter(opeq, fc, partitionConstCol);
filterTokenList.push_back(f1);
csep.filterTokenList(filterTokenList);
// Set the session ID, transaction ID and version Id
BRM::QueryContext verID;
verID = fSessionManager.verID();
csep.verID(verID);
csep.sessionID(fSessionID);
BRM::TxnID txnID;
txnID = fSessionManager.getTxnID(fSessionID);
csep.txnID(txnID.id);
// Send CSEP to ExeMgr
auto csepStr = csep.toString();
cout << "csep: " << csepStr << endl;
CalpontSystemCatalog::NJLSysDataList sysDataList;
systemCatalogPtr->getSysData(csep, sysDataList, tableName);
cout << "Done getSysData" << endl;
cout << "result size: " << sysDataList.sysDataVec.size() << endl;
// parse the result
for (auto it = sysDataList.begin(); it != sysDataList.end(); it++)
{
cout << "result: " << (*it)->GetData(0) << endl;
}
// Return the result - use toString() to get the full plan representation
analysisResults << "80 (for testing)";
result.bloatAnalysis = analysisResults.str();
cout << "analysisResults: " << analysisResults.str() << endl;
}
catch (std::exception& ex)
{
bErrFlag = true;
errorMsg = ex.what();
}
if (bErrFlag)
{
std::ostringstream oss;
oss << "Partition bloat analysis failed for table " << tableName.toString() << ", partition "
<< partitionStr << ": " << errorMsg;
result.bloatAnalysis = oss.str();
}
}
} // namespace dmlpackageprocessor