You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-11-03 17:13:17 +03:00
feat(rbo,rules,QA): the rule now uses target table to consider interesting keys available.
This commit is contained in:
@@ -38,6 +38,13 @@ void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, const s
|
||||
static const std::string RewrittenSubTableAliasPrefix = "$added_sub_";
|
||||
static const size_t MaxParallelFactor = 16;
|
||||
|
||||
bool tableAliasEqual(const execplan::CalpontSystemCatalog::TableAliasName& lhs,
|
||||
const execplan::CalpontSystemCatalog::TableAliasName& rhs)
|
||||
{
|
||||
return (lhs.schema == rhs.schema && lhs.table == rhs.table && lhs.alias == rhs.alias &&
|
||||
lhs.fisColumnStore == rhs.fisColumnStore);
|
||||
}
|
||||
|
||||
bool tableIsInUnion(const execplan::CalpontSystemCatalog::TableAliasName& table,
|
||||
execplan::CalpontSelectExecutionPlan& csep)
|
||||
{
|
||||
@@ -116,16 +123,25 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, exe
|
||||
// Looking for a projected column that comes first in an available index and has EI statistics
|
||||
// INV nullptr signifies that no suitable column was found
|
||||
execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep,
|
||||
execplan::CalpontSystemCatalog::TableAliasName& targetTable,
|
||||
optimizer::RBOptimizerContext& ctx)
|
||||
{
|
||||
// TODO supply a list of suitable columns from a higher level
|
||||
for (auto& rc : csep.returnedCols())
|
||||
{
|
||||
// TODO extract SC from RC
|
||||
auto* simpleColumn = dynamic_cast<execplan::SimpleColumn*>(rc.get());
|
||||
if (simpleColumn)
|
||||
{
|
||||
execplan::CalpontSystemCatalog::TableAliasName rcTable(
|
||||
simpleColumn->schemaName(), simpleColumn->tableName(), simpleColumn->tableAlias(), "", false);
|
||||
if (!tableAliasEqual(targetTable, rcTable))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
cal_impl_if::SchemaAndTableName schemaAndTableName = {simpleColumn->schemaName(),
|
||||
simpleColumn->tableName()};
|
||||
|
||||
auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableName);
|
||||
if (!columnStatistics)
|
||||
{
|
||||
@@ -144,59 +160,61 @@ execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPl
|
||||
|
||||
// TODO char and other numerical types support
|
||||
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
||||
execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx)
|
||||
execplan::CalpontSelectExecutionPlan& csep, execplan::CalpontSystemCatalog::TableAliasName& table,
|
||||
optimizer::RBOptimizerContext& ctx)
|
||||
{
|
||||
execplan::CalpontSelectExecutionPlan::SelectList unionVec;
|
||||
|
||||
// SC type controls an integral type used to produce suitable filters. The continuation of this function
|
||||
// should become a template function based on SC type.
|
||||
execplan::SimpleColumn* keyColumn = findSuitableKeyColumn(csep, ctx);
|
||||
execplan::SimpleColumn* keyColumn = findSuitableKeyColumn(csep, table, ctx);
|
||||
if (!keyColumn)
|
||||
{
|
||||
return unionVec;
|
||||
}
|
||||
|
||||
cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()};
|
||||
auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName);
|
||||
if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end())
|
||||
{
|
||||
return unionVec;
|
||||
}
|
||||
|
||||
auto columnStatisticsIt = tableColumnsStatisticsIt->second.find(keyColumn->columnName());
|
||||
if (columnStatisticsIt == tableColumnsStatisticsIt->second.end())
|
||||
{
|
||||
return unionVec;
|
||||
}
|
||||
|
||||
auto columnStatistics = columnStatisticsIt->second;
|
||||
|
||||
// TODO configurable parallel factor via session variable
|
||||
// NB now histogram size is the way to control parallel factor with 16 being the maximum
|
||||
size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), MaxParallelFactor);
|
||||
size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits;
|
||||
|
||||
// TODO char and other numerical types support
|
||||
std::vector<std::pair<uint64_t, uint64_t>> bounds;
|
||||
|
||||
// Loop over buckets to produce filter ranges
|
||||
for (size_t i = 0; i < numberOfUnionUnits - 1; ++i)
|
||||
{
|
||||
auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit;
|
||||
auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit;
|
||||
uint64_t currentLowerBound = *(uint32_t*)bucket->start_value.data();
|
||||
uint64_t currentUpperBound = *(uint32_t*)endBucket->start_value.data();
|
||||
cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()};
|
||||
auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName);
|
||||
if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end())
|
||||
{
|
||||
return unionVec;
|
||||
}
|
||||
|
||||
auto columnStatisticsIt = tableColumnsStatisticsIt->second.find(keyColumn->columnName());
|
||||
if (columnStatisticsIt == tableColumnsStatisticsIt->second.end())
|
||||
{
|
||||
return unionVec;
|
||||
}
|
||||
|
||||
auto columnStatistics = columnStatisticsIt->second;
|
||||
|
||||
// TODO configurable parallel factor via session variable
|
||||
// NB now histogram size is the way to control parallel factor with 16 being the maximum
|
||||
size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), MaxParallelFactor);
|
||||
size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits;
|
||||
|
||||
// Loop over buckets to produce filter ranges
|
||||
for (size_t i = 0; i < numberOfUnionUnits - 1; ++i)
|
||||
{
|
||||
auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit;
|
||||
auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit;
|
||||
uint64_t currentLowerBound = *(uint32_t*)bucket->start_value.data();
|
||||
uint64_t currentUpperBound = *(uint32_t*)endBucket->start_value.data();
|
||||
bounds.push_back({currentLowerBound, currentUpperBound});
|
||||
}
|
||||
|
||||
// Add last range
|
||||
// NB despite the fact that currently Histogram_json_hb has the last bucket that has end as its start
|
||||
auto lastBucket = columnStatistics.get_json_histogram().begin() +
|
||||
(numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit;
|
||||
uint64_t currentLowerBound = *(uint32_t*)lastBucket->start_value.data();
|
||||
uint64_t currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data();
|
||||
bounds.push_back({currentLowerBound, currentUpperBound});
|
||||
}
|
||||
|
||||
// Add last range
|
||||
// NB despite the fact that currently Histogram_json_hb has the last bucket that has end as its start
|
||||
auto lastBucket =
|
||||
columnStatistics.get_json_histogram().begin() + (numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit;
|
||||
uint64_t currentLowerBound = *(uint32_t*)lastBucket->start_value.data();
|
||||
uint64_t currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data();
|
||||
bounds.push_back({currentLowerBound, currentUpperBound});
|
||||
|
||||
for (auto& bound : bounds)
|
||||
{
|
||||
auto clonedCSEP = csep.cloneWORecursiveSelects();
|
||||
@@ -229,13 +247,13 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon
|
||||
std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" +
|
||||
std::to_string(ctx.uniqueId);
|
||||
// TODO add original alias to support multiple same name tables
|
||||
tableAliasMap.insert({table, tableAlias});
|
||||
tableAliasMap.insert({table, {tableAlias, 0}});
|
||||
derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM);
|
||||
derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS);
|
||||
derivedSCEP->derivedTbAlias(tableAlias);
|
||||
|
||||
// Create a copy of the current leaf CSEP with additional filters to partition the key column
|
||||
auto additionalUnionVec = makeUnionFromTable(csep, ctx);
|
||||
auto additionalUnionVec = makeUnionFromTable(csep, table, ctx);
|
||||
derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),
|
||||
additionalUnionVec.end());
|
||||
|
||||
@@ -250,7 +268,7 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon
|
||||
}
|
||||
|
||||
execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns;
|
||||
[[maybe_unused]] size_t colPosition = 0;
|
||||
// [[maybe_unused]] size_t colPosition = 0;
|
||||
// replace parent CSEP RCs with derived table RCs using ScheamAndTableName -> tableAlias map
|
||||
for (auto& rc : csep.returnedCols())
|
||||
{
|
||||
@@ -268,17 +286,19 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon
|
||||
// auto sc = scs[0];
|
||||
std::cout << "Processing RC schema " << sc->schemaName() << " table " << sc->tableName() << " alias "
|
||||
<< sc->tableAlias() << std::endl;
|
||||
auto newTableAlias = tableAliasMap.find(
|
||||
{sc->schemaName(), sc->tableName(), sc->tableAlias(), "", false});
|
||||
if (newTableAlias == tableAliasMap.end())
|
||||
auto newTableAliasAndColPositionCounter =
|
||||
tableAliasMap.find({sc->schemaName(), sc->tableName(), sc->tableAlias(), "", false});
|
||||
if (newTableAliasAndColPositionCounter == tableAliasMap.end())
|
||||
{
|
||||
std::cout << "The RC doesn't belong to any of the derived tables, so leave it intact" << std::endl;
|
||||
continue;
|
||||
}
|
||||
sc->tableName("");
|
||||
sc->schemaName("");
|
||||
sc->tableAlias(newTableAlias->second);
|
||||
sc->isColumnStore(true);
|
||||
auto& [newTableAlias, colPosition] = newTableAliasAndColPositionCounter->second;
|
||||
sc->tableAlias(newTableAlias);
|
||||
// WIP Not needed according with CSEP output
|
||||
// sc->isColumnStore(true);
|
||||
sc->colPosition(colPosition++);
|
||||
// rcCloned->colPosition(colPosition++);
|
||||
// rcCloned->resultType(rc->resultType());
|
||||
|
||||
@@ -24,39 +24,40 @@
|
||||
#include "execplan/calpontselectexecutionplan.h"
|
||||
#include "rulebased_optimizer.h"
|
||||
|
||||
namespace optimizer {
|
||||
struct LessThan
|
||||
namespace optimizer
|
||||
{
|
||||
struct TableAliasLessThan
|
||||
{
|
||||
bool operator()(const execplan::CalpontSystemCatalog::TableAliasName& lhs,
|
||||
const execplan::CalpontSystemCatalog::TableAliasName& rhs) const
|
||||
{
|
||||
bool operator()(const execplan::CalpontSystemCatalog::TableAliasName& lhs,
|
||||
const execplan::CalpontSystemCatalog::TableAliasName& rhs) const
|
||||
if (lhs.schema < rhs.schema)
|
||||
{
|
||||
if (lhs.schema < rhs.schema)
|
||||
return true;
|
||||
}
|
||||
else if (lhs.schema == rhs.schema)
|
||||
{
|
||||
if (lhs.table < rhs.table)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
else if (lhs.schema == rhs.schema)
|
||||
else if (lhs.table == rhs.table)
|
||||
{
|
||||
if (lhs.table < rhs.table)
|
||||
if (lhs.alias < rhs.alias)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
else if (lhs.table == rhs.table)
|
||||
{
|
||||
if (lhs.alias < rhs.alias)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
};
|
||||
using TableAliasMap = std::map<execplan::CalpontSystemCatalog::TableAliasName, std::string,
|
||||
LessThan>;
|
||||
|
||||
bool matchParallelCES(execplan::CalpontSelectExecutionPlan& csep);
|
||||
void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx);
|
||||
bool parallelCESFilter(execplan::CalpontSelectExecutionPlan& csep);
|
||||
bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
using NewTableAliasAndColumnPosCounter = std::pair<string, size_t>;
|
||||
using TableAliasMap = std::map<execplan::CalpontSystemCatalog::TableAliasName,
|
||||
NewTableAliasAndColumnPosCounter, TableAliasLessThan>;
|
||||
|
||||
bool parallelCESFilter(execplan::CalpontSelectExecutionPlan& csep);
|
||||
bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx);
|
||||
}
|
||||
Reference in New Issue
Block a user