You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
feat(rbo,rules): use EI statistics for filter ranges
This commit is contained in:
@ -6288,44 +6288,23 @@ int processLimitAndOffset(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Loop over available indexes to find and extract corresponding EI column statistics
|
||||||
|
// for the first column of the index if any.
|
||||||
|
// Statistics is stored in GWI context.
|
||||||
void extractColumnStatistics(Item_field* ifp, gp_walk_info& gwi)
|
void extractColumnStatistics(Item_field* ifp, gp_walk_info& gwi)
|
||||||
{
|
{
|
||||||
// TODO find clear way to check if the field is part of a key
|
|
||||||
// if (!ifp->field->part_of_key.is_clear_all())
|
|
||||||
// {
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
// std::cout << "Processing field item: " << ifp->field_name.str << std::endl;
|
|
||||||
// std::cout << "part of a key: " << buf << std::endl;
|
|
||||||
// std::cout << "ifp->field->field_index " << ifp->field->field_index << std::endl;
|
|
||||||
|
|
||||||
for (uint j = 0; j < ifp->field->table->s->keys; j++)
|
for (uint j = 0; j < ifp->field->table->s->keys; j++)
|
||||||
{
|
{
|
||||||
for (uint i = 0; i < ifp->field->table->s->key_info[j].usable_key_parts; i++)
|
for (uint i = 0; i < ifp->field->table->s->key_info[j].usable_key_parts; i++)
|
||||||
{
|
{
|
||||||
// std::cout << "key fieldnr " << i << " "
|
|
||||||
// << ifp->field->table->s->key_info[j].key_part[i].field->field_name.str << " "
|
|
||||||
// << ifp->field->table->s->key_info[j].key_part[i].fieldnr << std::endl;
|
|
||||||
if (ifp->field->table->s->key_info[j].key_part[i].fieldnr == ifp->field->field_index + 1)
|
if (ifp->field->table->s->key_info[j].key_part[i].fieldnr == ifp->field->field_index + 1)
|
||||||
{
|
{
|
||||||
// std::cout << "key_info " << j << " key_part " << i << " matched " << std::endl;
|
|
||||||
if (i == 0 && ifp->field->read_stats)
|
if (i == 0 && ifp->field->read_stats)
|
||||||
{
|
{
|
||||||
assert(ifp->field->table->s);
|
assert(ifp->field->table->s);
|
||||||
// assert(ifp->field->table->s->db);
|
|
||||||
// assert(ifp->field->table->s->table_name);
|
|
||||||
// FQCN fqcn({ifp->field->table->s->db.str}, {ifp->field->table->s->table_name.str},
|
|
||||||
// {ifp->field->field_name.str});
|
|
||||||
// TODO use FQCN as a key type
|
|
||||||
std::cout << "Adding column statistics for " << ifp->field->field_name.str << std::endl;
|
|
||||||
auto* histogram = dynamic_cast<Histogram_json_hb*>(ifp->field->read_stats->histogram);
|
auto* histogram = dynamic_cast<Histogram_json_hb*>(ifp->field->read_stats->histogram);
|
||||||
if (histogram)
|
if (histogram)
|
||||||
{
|
{
|
||||||
std::cout << "Type of histogram object: " << typeid(*histogram).name() << std::endl;
|
|
||||||
// std::vector<Histogram_bucket> histogramBuckets = histogram->get_json_histogram();
|
|
||||||
// std::cout << "gwi.tableStatisticsMap[{ifp->field->table->s->db.str, "
|
|
||||||
// "ifp->field->table->s->table_name.str}][ifp->field->field_name.str].size() "
|
|
||||||
// << histogramBuckets.size() << std::endl;
|
|
||||||
SchemaAndTableName tableName = {ifp->field->table->s->db.str, ifp->field->table->s->table_name.str};
|
SchemaAndTableName tableName = {ifp->field->table->s->db.str, ifp->field->table->s->table_name.str};
|
||||||
gwi.tableStatisticsMap[tableName][ifp->field->field_name.str] = *histogram;
|
gwi.tableStatisticsMap[tableName][ifp->field->field_name.str] = *histogram;
|
||||||
}
|
}
|
||||||
|
@ -120,6 +120,12 @@ using ColumnName = std::string;
|
|||||||
using ColumnStatisticsMap = std::unordered_map<ColumnName, Histogram_json_hb>;
|
using ColumnStatisticsMap = std::unordered_map<ColumnName, Histogram_json_hb>;
|
||||||
using TableStatisticsMap = std::unordered_map<SchemaAndTableName, ColumnStatisticsMap, SchemaAndTableNameHash>;
|
using TableStatisticsMap = std::unordered_map<SchemaAndTableName, ColumnStatisticsMap, SchemaAndTableNameHash>;
|
||||||
|
|
||||||
|
// This structure is used to store MDB AST -> CSEP translation context.
|
||||||
|
// There is a column statistics for some columns in a query.
|
||||||
|
// As per 23.10.5 "some" means first column of the index in projection list of CSEP
|
||||||
|
// satisfies the condition of applyParallelCSEP RBO rule.
|
||||||
|
// Note that statistics must be merged from subquery/derived table
|
||||||
|
// to the statistics of the outer query.
|
||||||
struct gp_walk_info
|
struct gp_walk_info
|
||||||
{
|
{
|
||||||
execplan::CalpontSelectExecutionPlan::ReturnedColumnList returnedCols;
|
execplan::CalpontSelectExecutionPlan::ReturnedColumnList returnedCols;
|
||||||
|
@ -68,10 +68,6 @@ bool matchParallelCES(execplan::CalpontSelectExecutionPlan& csep)
|
|||||||
execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, execplan::SimpleColumn& column,
|
execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, execplan::SimpleColumn& column,
|
||||||
std::pair<uint64_t, uint64_t>& bound)
|
std::pair<uint64_t, uint64_t>& bound)
|
||||||
{
|
{
|
||||||
// INV this is SimpleColumn we supply as an argument
|
|
||||||
// TODO find the suitable column using EI statistics.
|
|
||||||
// auto* column = dynamic_cast<execplan::SimpleColumn*>(csep->returnedCols().front().get());
|
|
||||||
// assert(column);
|
|
||||||
|
|
||||||
auto tableKeyColumnLeftOp = new execplan::SimpleColumn(column);
|
auto tableKeyColumnLeftOp = new execplan::SimpleColumn(column);
|
||||||
tableKeyColumnLeftOp->resultType(column.resultType());
|
tableKeyColumnLeftOp->resultType(column.resultType());
|
||||||
@ -117,13 +113,11 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, exe
|
|||||||
// INV nullptr signifies that no suitable column was found
|
// INV nullptr signifies that no suitable column was found
|
||||||
execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx)
|
execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx)
|
||||||
{
|
{
|
||||||
std::cout << "findSuitableKeyColumn " << csep.returnedCols().size() << std::endl;
|
|
||||||
for (auto& rc : csep.returnedCols())
|
for (auto& rc : csep.returnedCols())
|
||||||
{
|
{
|
||||||
auto* simpleColumn = dynamic_cast<execplan::SimpleColumn*>(rc.get());
|
auto* simpleColumn = dynamic_cast<execplan::SimpleColumn*>(rc.get());
|
||||||
if (simpleColumn)
|
if (simpleColumn)
|
||||||
{
|
{
|
||||||
std::cout << "Found simple column " << simpleColumn->columnName() << std::endl;
|
|
||||||
cal_impl_if::SchemaAndTableName schemaAndTableNam = {simpleColumn->schemaName(), simpleColumn->tableName()};
|
cal_impl_if::SchemaAndTableName schemaAndTableNam = {simpleColumn->schemaName(), simpleColumn->tableName()};
|
||||||
auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableNam);
|
auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableNam);
|
||||||
if (!columnStatistics)
|
if (!columnStatistics)
|
||||||
@ -141,6 +135,7 @@ execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPl
|
|||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO char and other numerical types support
|
||||||
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
||||||
execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx)
|
execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx)
|
||||||
{
|
{
|
||||||
@ -154,13 +149,6 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
|||||||
return unionVec;
|
return unionVec;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cout << "looking for " << keyColumn->columnName() << " in ctx.gwi.tableStatisticsMap "
|
|
||||||
<< " with size " << ctx.gwi.tableStatisticsMap.size() << std::endl;
|
|
||||||
for (auto& [k, v] : ctx.gwi.tableStatisticsMap)
|
|
||||||
{
|
|
||||||
std::cout << "SchemaAndTableName " << k.schema << "." << k.table << " column map size " << v.size() << std::endl;
|
|
||||||
}
|
|
||||||
|
|
||||||
cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()};
|
cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()};
|
||||||
auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName);
|
auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName);
|
||||||
if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end())
|
if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end())
|
||||||
@ -184,17 +172,13 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
|||||||
// TODO char and other numerical types support
|
// TODO char and other numerical types support
|
||||||
std::vector<std::pair<uint64_t, uint64_t>> bounds;
|
std::vector<std::pair<uint64_t, uint64_t>> bounds;
|
||||||
|
|
||||||
// TODO need to process tail if number of buckets is not divisible by number of union units
|
// Loop over buckets to produce filter ranges
|
||||||
// TODO non-overlapping buckets if it is a problem at all
|
|
||||||
for (size_t i = 0; i < numberOfUnionUnits - 1; ++i)
|
for (size_t i = 0; i < numberOfUnionUnits - 1; ++i)
|
||||||
{
|
{
|
||||||
auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit;
|
auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit;
|
||||||
auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit;
|
auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit;
|
||||||
uint64_t currentLowerBound = *(uint32_t*)bucket->start_value.data();
|
uint64_t currentLowerBound = *(uint32_t*)bucket->start_value.data();
|
||||||
uint64_t currentUpperBound = *(uint32_t*)endBucket->start_value.data();
|
uint64_t currentUpperBound = *(uint32_t*)endBucket->start_value.data();
|
||||||
|
|
||||||
std::cout << "currentLowerBound " << currentLowerBound << " currentUpperBound " << currentUpperBound
|
|
||||||
<< std::endl;
|
|
||||||
bounds.push_back({currentLowerBound, currentUpperBound});
|
bounds.push_back({currentLowerBound, currentUpperBound});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,12 +186,7 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
|||||||
// NB despite the fact that currently Histogram_json_hb has the last bucket that has end as its start
|
// NB despite the fact that currently Histogram_json_hb has the last bucket that has end as its start
|
||||||
auto lastBucket = columnStatistics.get_json_histogram().begin() + (numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit;
|
auto lastBucket = columnStatistics.get_json_histogram().begin() + (numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit;
|
||||||
uint64_t currentLowerBound = *(uint32_t*)lastBucket->start_value.data();
|
uint64_t currentLowerBound = *(uint32_t*)lastBucket->start_value.data();
|
||||||
std::cout << "lastBucket start_value " << currentLowerBound << std::endl;
|
|
||||||
uint64_t currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data();
|
uint64_t currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data();
|
||||||
std::cout << "Histogram end_value " << currentUpperBound << std::endl;
|
|
||||||
|
|
||||||
std::cout << "last currentLowerBound " << currentLowerBound << " last currentUpperBound " << currentUpperBound
|
|
||||||
<< std::endl;
|
|
||||||
bounds.push_back({currentLowerBound, currentUpperBound});
|
bounds.push_back({currentLowerBound, currentUpperBound});
|
||||||
|
|
||||||
for (auto& bound : bounds)
|
for (auto& bound : bounds)
|
||||||
@ -270,97 +249,13 @@ void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon
|
|||||||
derivedSCEP->filters(nullptr);
|
derivedSCEP->filters(nullptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Remove the filters as they were pushed down to union units
|
// Remove the filters if necessary using csep.filters(nullptr) as they were pushed down to union units
|
||||||
// This is inappropriate for EXISTS filter and join conditions
|
// But this is inappropriate for EXISTS filter and join conditions
|
||||||
// csep.filters(nullptr);
|
// There must be no derived at this point, so we can replace it with the new derived table list
|
||||||
// There must be no derived at this point.
|
|
||||||
csep.derivedTableList(newDerivedTableList);
|
csep.derivedTableList(newDerivedTableList);
|
||||||
// Replace table list with new table list populated with union units
|
// Replace table list with new table list populated with union units
|
||||||
csep.tableList(newTableList);
|
csep.tableList(newTableList);
|
||||||
csep.returnedCols(newReturnedColumns);
|
csep.returnedCols(newReturnedColumns);
|
||||||
}
|
}
|
||||||
|
|
||||||
// execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable_exists(
|
|
||||||
// const size_t numberOfLegs, execplan::CalpontSelectExecutionPlan& csep)
|
|
||||||
// {
|
|
||||||
// execplan::CalpontSelectExecutionPlan::SelectList unionVec;
|
|
||||||
// unionVec.reserve(numberOfLegs);
|
|
||||||
// std::vector<std::pair<uint64_t, uint64_t>> bounds(
|
|
||||||
// {{0, 3000961}, {3000961, std::numeric_limits<uint64_t>::max()}});
|
|
||||||
// for (auto bound : bounds)
|
|
||||||
// {
|
|
||||||
// auto clonedCSEP = csep.cloneWORecursiveSelects();
|
|
||||||
// clonedCSEP->filters(nullptr);
|
|
||||||
// // Add BETWEEN based on key column range
|
|
||||||
// clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, bound));
|
|
||||||
// unionVec.push_back(clonedCSEP);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return unionVec;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // TODO: remove applyParallelCES_exists
|
|
||||||
// void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx)
|
|
||||||
// {
|
|
||||||
// auto tables = csep.tableList();
|
|
||||||
// execplan::CalpontSelectExecutionPlan::TableList newTableList;
|
|
||||||
// execplan::CalpontSelectExecutionPlan::SelectList newDerivedTableList;
|
|
||||||
// execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns;
|
|
||||||
|
|
||||||
// // ATM Must be only 1 table
|
|
||||||
// for (auto& table : tables)
|
|
||||||
// {
|
|
||||||
// if (!table.isColumnstore())
|
|
||||||
// {
|
|
||||||
// auto derivedSCEP = csep.cloneWORecursiveSelects();
|
|
||||||
// // need to add a level here
|
|
||||||
// std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" +
|
|
||||||
// std::to_string(ctx.uniqueId);
|
|
||||||
|
|
||||||
// derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM);
|
|
||||||
// derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS);
|
|
||||||
// derivedSCEP->derivedTbAlias(tableAlias);
|
|
||||||
|
|
||||||
// // TODO: hardcoded for now
|
|
||||||
// size_t parallelFactor = 2;
|
|
||||||
// // Create a copy of the current leaf CSEP with additional filters to partition the key column
|
|
||||||
// auto additionalUnionVec = makeUnionFromTable_exists(parallelFactor, csep);
|
|
||||||
// derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),
|
|
||||||
// additionalUnionVec.end());
|
|
||||||
|
|
||||||
// size_t colPosition = 0;
|
|
||||||
// // change parent to derived table columns
|
|
||||||
// for (auto& rc : csep.returnedCols())
|
|
||||||
// {
|
|
||||||
// auto rcCloned = boost::make_shared<execplan::SimpleColumn>(*rc);
|
|
||||||
// // TODO timezone and result type are not copied
|
|
||||||
// // TODO add specific ctor for this functionality
|
|
||||||
// rcCloned->tableName("");
|
|
||||||
// rcCloned->schemaName("");
|
|
||||||
// rcCloned->tableAlias(tableAlias);
|
|
||||||
// rcCloned->colPosition(colPosition++);
|
|
||||||
// rcCloned->resultType(rc->resultType());
|
|
||||||
|
|
||||||
// newReturnedColumns.push_back(rcCloned);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// newDerivedTableList.push_back(derivedSCEP);
|
|
||||||
// execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, "");
|
|
||||||
// newTableList.push_back(tn);
|
|
||||||
// // Remove the filters as they were pushed down to union units
|
|
||||||
// // This is inappropriate for EXISTS filter and join conditions
|
|
||||||
// // TODO if needed
|
|
||||||
// derivedSCEP->filters(nullptr);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// // Remove the filters as they were pushed down to union units
|
|
||||||
// // This is inappropriate for EXISTS filter and join conditions
|
|
||||||
// // csep.filters(nullptr);
|
|
||||||
// // There must be no derived at this point.
|
|
||||||
// csep.derivedTableList(newDerivedTableList);
|
|
||||||
// // Replace table list with new table list populated with union units
|
|
||||||
// csep.tableList(newTableList);
|
|
||||||
// csep.returnedCols(newReturnedColumns);
|
|
||||||
// }
|
|
||||||
|
|
||||||
} // namespace optimizer
|
} // namespace optimizer
|
||||||
|
Reference in New Issue
Block a user