You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-11-02 06:13:16 +03:00
feat(rbo,rules,QA): filtered RC clone for UNION units.
This commit is contained in:
@@ -1017,11 +1017,13 @@ execplan::SCSEP CalpontSelectExecutionPlan::cloneForTableWORecursiveSelects(
|
|||||||
ReturnedColumnList newReturnedCols;
|
ReturnedColumnList newReturnedCols;
|
||||||
for (const auto& rc : fReturnedCols)
|
for (const auto& rc : fReturnedCols)
|
||||||
{
|
{
|
||||||
auto* simpleColumn = dynamic_cast<execplan::SimpleColumn*>(rc.get());
|
rc->setSimpleColumnList();
|
||||||
if (simpleColumn)
|
for (auto* simpleColumn : rc->simpleColumnList())
|
||||||
{
|
{
|
||||||
|
// TODO check that is columnstore is correct
|
||||||
execplan::CalpontSystemCatalog::TableAliasName rcTable(
|
execplan::CalpontSystemCatalog::TableAliasName rcTable(
|
||||||
simpleColumn->schemaName(), simpleColumn->tableName(), simpleColumn->tableAlias(), "", false);
|
simpleColumn->schemaName(), simpleColumn->tableName(), simpleColumn->tableAlias(), "",
|
||||||
|
simpleColumn->isColumnStore());
|
||||||
if (!targetTableAlias.weakerEq(rcTable))
|
if (!targetTableAlias.weakerEq(rcTable))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
@@ -1029,6 +1031,11 @@ execplan::SCSEP CalpontSelectExecutionPlan::cloneForTableWORecursiveSelects(
|
|||||||
newReturnedCols.push_back(SRCP(rc->clone()));
|
newReturnedCols.push_back(SRCP(rc->clone()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (newReturnedCols.empty())
|
||||||
|
{
|
||||||
|
std::cout << "cloneForTableWORecursiveSelects(): there are no Returned Columns after table filtering."
|
||||||
|
<< std::endl;
|
||||||
|
}
|
||||||
newPlan->returnedCols(newReturnedCols);
|
newPlan->returnedCols(newReturnedCols);
|
||||||
|
|
||||||
// Deep copy of filters
|
// Deep copy of filters
|
||||||
@@ -1059,7 +1066,7 @@ execplan::SCSEP CalpontSelectExecutionPlan::cloneForTableWORecursiveSelects(
|
|||||||
OrderByColumnList newOrderByCols;
|
OrderByColumnList newOrderByCols;
|
||||||
for (const auto& col : fOrderByCols)
|
for (const auto& col : fOrderByCols)
|
||||||
{
|
{
|
||||||
newOrderByCols.push_back(SRCP(col->clone()));
|
newOrderByCols.push_back(SRCP(col->clone()));
|
||||||
}
|
}
|
||||||
newPlan->orderByCols(newOrderByCols);
|
newPlan->orderByCols(newOrderByCols);
|
||||||
|
|
||||||
|
|||||||
@@ -288,7 +288,7 @@ class ReturnedColumn : public TreeNode
|
|||||||
/* @brief traverse this ReturnedColumn and re-populate fSimpleColumnList.
|
/* @brief traverse this ReturnedColumn and re-populate fSimpleColumnList.
|
||||||
*
|
*
|
||||||
* @note all ReturnedColumns that may have simple column arguments added
|
* @note all ReturnedColumns that may have simple column arguments added
|
||||||
* to the list need to implement thhis function.
|
* to the list need to implement this function.
|
||||||
*/
|
*/
|
||||||
virtual void setSimpleColumnList();
|
virtual void setSimpleColumnList();
|
||||||
|
|
||||||
|
|||||||
@@ -751,4 +751,9 @@ void SimpleColumn::evaluate(Row& row, bool& isNull)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SimpleColumn::setSimpleColumnList()
|
||||||
|
{
|
||||||
|
fSimpleColumnList.push_back(this);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace execplan
|
} // namespace execplan
|
||||||
|
|||||||
@@ -264,6 +264,9 @@ class SimpleColumn : public ReturnedColumn
|
|||||||
*/
|
*/
|
||||||
bool singleTable(CalpontSystemCatalog::TableAliasName& tan) override;
|
bool singleTable(CalpontSystemCatalog::TableAliasName& tan) override;
|
||||||
|
|
||||||
|
void setSimpleColumnList() override;
|
||||||
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
/**
|
/**
|
||||||
* Fields
|
* Fields
|
||||||
|
|||||||
@@ -6307,7 +6307,15 @@ void extractColumnStatistics(Item_field* ifp, gp_walk_info& gwi)
|
|||||||
if (histogram)
|
if (histogram)
|
||||||
{
|
{
|
||||||
SchemaAndTableName tableName = {ifp->field->table->s->db.str, ifp->field->table->s->table_name.str};
|
SchemaAndTableName tableName = {ifp->field->table->s->db.str, ifp->field->table->s->table_name.str};
|
||||||
gwi.tableStatisticsMap[tableName][ifp->field->field_name.str] = *histogram;
|
auto tableStatisticsMapIt = gwi.tableStatisticsMap.find(tableName);
|
||||||
|
if (tableStatisticsMapIt == gwi.tableStatisticsMap.end())
|
||||||
|
{
|
||||||
|
gwi.tableStatisticsMap.insert({tableName, {{ifp->field->field_name.str, *histogram}}});
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
tableStatisticsMapIt->second.insert({ifp->field->field_name.str, *histogram});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,8 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <cstddef>
|
#include <cstddef>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
|
#include <optional>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
#include "rulebased_optimizer.h"
|
#include "rulebased_optimizer.h"
|
||||||
|
|
||||||
@@ -33,6 +35,9 @@
|
|||||||
namespace optimizer
|
namespace optimizer
|
||||||
{
|
{
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
using FilterRangeBounds = std::vector<std::pair<T, T>>;
|
||||||
|
|
||||||
void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, const size_t id);
|
void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, const size_t id);
|
||||||
|
|
||||||
static const std::string RewrittenSubTableAliasPrefix = "$added_sub_";
|
static const std::string RewrittenSubTableAliasPrefix = "$added_sub_";
|
||||||
@@ -163,6 +168,55 @@ execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPl
|
|||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Populates range bounds based on column statistics
|
||||||
|
// Returns optional with bounds if successful, nullopt otherwise
|
||||||
|
template <typename T>
|
||||||
|
std::optional<FilterRangeBounds<T>> populateRangeBounds(execplan::SimpleColumn* keyColumn,
|
||||||
|
optimizer::RBOptimizerContext& ctx)
|
||||||
|
{
|
||||||
|
cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()};
|
||||||
|
auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName);
|
||||||
|
if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end())
|
||||||
|
{
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto columnStatisticsIt = tableColumnsStatisticsIt->second.find(keyColumn->columnName());
|
||||||
|
if (columnStatisticsIt == tableColumnsStatisticsIt->second.end())
|
||||||
|
{
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto columnStatistics = columnStatisticsIt->second;
|
||||||
|
|
||||||
|
// TODO configurable parallel factor via session variable
|
||||||
|
// NB now histogram size is the way to control parallel factor with 16 being the maximum
|
||||||
|
size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), MaxParallelFactor);
|
||||||
|
size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits;
|
||||||
|
|
||||||
|
FilterRangeBounds<T> bounds;
|
||||||
|
|
||||||
|
// Loop over buckets to produce filter ranges
|
||||||
|
for (size_t i = 0; i < numberOfUnionUnits - 1; ++i)
|
||||||
|
{
|
||||||
|
auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit;
|
||||||
|
auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit;
|
||||||
|
T currentLowerBound = *(uint32_t*)bucket->start_value.data();
|
||||||
|
T currentUpperBound = *(uint32_t*)endBucket->start_value.data();
|
||||||
|
bounds.push_back({currentLowerBound, currentUpperBound});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add last range
|
||||||
|
// NB despite the fact that currently Histogram_json_hb has the last bucket that has end as its start
|
||||||
|
auto lastBucket =
|
||||||
|
columnStatistics.get_json_histogram().begin() + (numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit;
|
||||||
|
T currentLowerBound = *(uint32_t*)lastBucket->start_value.data();
|
||||||
|
T currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data();
|
||||||
|
bounds.push_back({currentLowerBound, currentUpperBound});
|
||||||
|
|
||||||
|
return bounds;
|
||||||
|
}
|
||||||
|
|
||||||
// TODO char and other numerical types support
|
// TODO char and other numerical types support
|
||||||
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
||||||
execplan::CalpontSelectExecutionPlan& csep, execplan::CalpontSystemCatalog::TableAliasName& table,
|
execplan::CalpontSelectExecutionPlan& csep, execplan::CalpontSystemCatalog::TableAliasName& table,
|
||||||
@@ -179,46 +233,12 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO char and other numerical types support
|
// TODO char and other numerical types support
|
||||||
std::vector<std::pair<uint64_t, uint64_t>> bounds;
|
auto boundsOpt = populateRangeBounds<uint64_t>(keyColumn, ctx);
|
||||||
|
if (!boundsOpt.has_value())
|
||||||
{
|
{
|
||||||
cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()};
|
return unionVec;
|
||||||
auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName);
|
|
||||||
if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end())
|
|
||||||
{
|
|
||||||
return unionVec;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto columnStatisticsIt = tableColumnsStatisticsIt->second.find(keyColumn->columnName());
|
|
||||||
if (columnStatisticsIt == tableColumnsStatisticsIt->second.end())
|
|
||||||
{
|
|
||||||
return unionVec;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto columnStatistics = columnStatisticsIt->second;
|
|
||||||
|
|
||||||
// TODO configurable parallel factor via session variable
|
|
||||||
// NB now histogram size is the way to control parallel factor with 16 being the maximum
|
|
||||||
size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), MaxParallelFactor);
|
|
||||||
size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits;
|
|
||||||
|
|
||||||
// Loop over buckets to produce filter ranges
|
|
||||||
for (size_t i = 0; i < numberOfUnionUnits - 1; ++i)
|
|
||||||
{
|
|
||||||
auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit;
|
|
||||||
auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit;
|
|
||||||
uint64_t currentLowerBound = *(uint32_t*)bucket->start_value.data();
|
|
||||||
uint64_t currentUpperBound = *(uint32_t*)endBucket->start_value.data();
|
|
||||||
bounds.push_back({currentLowerBound, currentUpperBound});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add last range
|
|
||||||
// NB despite the fact that currently Histogram_json_hb has the last bucket that has end as its start
|
|
||||||
auto lastBucket = columnStatistics.get_json_histogram().begin() +
|
|
||||||
(numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit;
|
|
||||||
uint64_t currentLowerBound = *(uint32_t*)lastBucket->start_value.data();
|
|
||||||
uint64_t currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data();
|
|
||||||
bounds.push_back({currentLowerBound, currentUpperBound});
|
|
||||||
}
|
}
|
||||||
|
auto& bounds = boundsOpt.value();
|
||||||
|
|
||||||
for (auto& bound : bounds)
|
for (auto& bound : bounds)
|
||||||
{
|
{
|
||||||
@@ -243,15 +263,19 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon
|
|||||||
cal_impl_if::SchemaAndTableName schemaAndTableName = {table.schema, table.table};
|
cal_impl_if::SchemaAndTableName schemaAndTableName = {table.schema, table.table};
|
||||||
std::cout << "Processing table schema " << schemaAndTableName.schema << " table "
|
std::cout << "Processing table schema " << schemaAndTableName.schema << " table "
|
||||||
<< schemaAndTableName.table << " alias " << table.alias << std::endl;
|
<< schemaAndTableName.table << " alias " << table.alias << std::endl;
|
||||||
auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableName);
|
auto anyColumnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableName);
|
||||||
std::cout << "Column statistics: " << columnStatistics.has_value() << std::endl;
|
std::cout << "Column statistics: " << anyColumnStatistics.has_value() << std::endl;
|
||||||
// TODO add column statistics check to the corresponding match
|
// TODO add column statistics check to the corresponding match
|
||||||
if (!table.isColumnstore() && columnStatistics)
|
if (!table.isColumnstore() && anyColumnStatistics)
|
||||||
{
|
{
|
||||||
|
// Don't copy filters for this
|
||||||
auto derivedSCEP = csep.cloneForTableWORecursiveSelects(table);
|
auto derivedSCEP = csep.cloneForTableWORecursiveSelects(table);
|
||||||
// Remove the filters as they were pushed down to union units
|
// Remove the filters as they were pushed down to union units
|
||||||
// This is inappropriate for EXISTS filter and join conditions
|
// This is inappropriate for EXISTS filter and join conditions
|
||||||
// WIP replace with filters applied to filters, so that only relevant filters are left
|
// WIP replace with filters applied to filters, so that only relevant filters are left
|
||||||
|
// WIP Ugly hack to avoid leaks
|
||||||
|
auto unusedFilters = derivedSCEP->filters();
|
||||||
|
delete unusedFilters;
|
||||||
derivedSCEP->filters(nullptr);
|
derivedSCEP->filters(nullptr);
|
||||||
auto* derivedCSEP = dynamic_cast<execplan::CalpontSelectExecutionPlan*>(derivedSCEP.get());
|
auto* derivedCSEP = dynamic_cast<execplan::CalpontSelectExecutionPlan*>(derivedSCEP.get());
|
||||||
if (!derivedCSEP)
|
if (!derivedCSEP)
|
||||||
@@ -283,8 +307,7 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns;
|
// execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns;
|
||||||
// [[maybe_unused]] size_t colPosition = 0;
|
|
||||||
// replace parent CSEP RCs with derived table RCs using ScheamAndTableName -> tableAlias map
|
// replace parent CSEP RCs with derived table RCs using ScheamAndTableName -> tableAlias map
|
||||||
if (!newDerivedTableList.empty())
|
if (!newDerivedTableList.empty())
|
||||||
{
|
{
|
||||||
@@ -298,8 +321,6 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon
|
|||||||
// If there is an alias in the map then it is a new derived table
|
// If there is an alias in the map then it is a new derived table
|
||||||
auto sc = dynamic_cast<execplan::SimpleColumn*>(rc.get());
|
auto sc = dynamic_cast<execplan::SimpleColumn*>(rc.get());
|
||||||
std::vector<execplan::SimpleColumn*> scs;
|
std::vector<execplan::SimpleColumn*> scs;
|
||||||
// execplan::ParseTree pt(rc.get());
|
|
||||||
// pt.walk(execplan::getSimpleCols, &scs);
|
|
||||||
|
|
||||||
std::cout << "Processing RC schema " << sc->schemaName() << " table " << sc->tableName() << " alias "
|
std::cout << "Processing RC schema " << sc->schemaName() << " table " << sc->tableName() << " alias "
|
||||||
<< sc->tableAlias() << std::endl;
|
<< sc->tableAlias() << std::endl;
|
||||||
@@ -330,24 +351,29 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon
|
|||||||
// But this is inappropriate for EXISTS filter and join conditions
|
// But this is inappropriate for EXISTS filter and join conditions
|
||||||
// There must be no derived at this point, so we can replace it with the new derived table list
|
// There must be no derived at this point, so we can replace it with the new derived table list
|
||||||
|
|
||||||
auto* left = dynamic_cast<execplan::SimpleFilter*>(csep.filters()->data());
|
// WIP hardcoded query
|
||||||
if (left)
|
if (csep.filters() && csep.filters()->data())
|
||||||
{
|
{
|
||||||
auto* lhs = left->lhs()->clone();
|
auto* left = dynamic_cast<execplan::SimpleFilter*>(csep.filters()->data());
|
||||||
if (lhs)
|
if (left)
|
||||||
{
|
{
|
||||||
auto* lhsSC = dynamic_cast<execplan::SimpleColumn*>(lhs);
|
auto* lhs = left->lhs()->clone();
|
||||||
if (lhsSC)
|
if (lhs)
|
||||||
{
|
{
|
||||||
auto newTableAlias = tableAliasMap.find({lhsSC->schemaName(), lhsSC->tableName(), lhsSC->tableAlias(), "", false});
|
auto* lhsSC = dynamic_cast<execplan::SimpleColumn*>(lhs);
|
||||||
// WIP Leak loosing previous lhs
|
if (lhsSC)
|
||||||
if (newTableAlias != tableAliasMap.end())
|
|
||||||
{
|
{
|
||||||
lhsSC->tableName("");
|
auto newTableAlias =
|
||||||
lhsSC->schemaName("");
|
tableAliasMap.find({lhsSC->schemaName(), lhsSC->tableName(), lhsSC->tableAlias(), "", false});
|
||||||
lhsSC->tableAlias(newTableAlias->second.first);
|
// WIP Leak loosing previous lhs
|
||||||
lhsSC->colPosition(0);
|
if (newTableAlias != tableAliasMap.end())
|
||||||
left->lhs(lhs);
|
{
|
||||||
|
lhsSC->tableName("");
|
||||||
|
lhsSC->schemaName("");
|
||||||
|
lhsSC->tableAlias(newTableAlias->second.first);
|
||||||
|
lhsSC->colPosition(0);
|
||||||
|
left->lhs(lhs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user