You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
feat(rbo,rules): refactored statistics storage in gwi and implemented statistics based UNION rewrite.
This commit is contained in:
@@ -445,7 +445,7 @@ SCSEP FromSubQuery::transform()
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Insert column statistics
|
// Insert column statistics
|
||||||
fGwip.mergeColumnStatisticsMap(gwi.columnStatisticsMap);
|
fGwip.mergeTableStatistics(gwi.tableStatisticsMap);
|
||||||
|
|
||||||
fGwip.subselectList.push_back(csep);
|
fGwip.subselectList.push_back(csep);
|
||||||
return csep;
|
return csep;
|
||||||
|
@@ -6314,16 +6314,20 @@ void extractColumnStatistics(Item_field* ifp, gp_walk_info& gwi)
|
|||||||
assert(ifp->field->table->s);
|
assert(ifp->field->table->s);
|
||||||
// assert(ifp->field->table->s->db);
|
// assert(ifp->field->table->s->db);
|
||||||
// assert(ifp->field->table->s->table_name);
|
// assert(ifp->field->table->s->table_name);
|
||||||
// FQCN fqcn({ifp->field->table->s->db.str}, {ifp->field->table->s->table_name.str}, {ifp->field->field_name.str});
|
// FQCN fqcn({ifp->field->table->s->db.str}, {ifp->field->table->s->table_name.str},
|
||||||
//TODO use FQCN as a key type
|
// {ifp->field->field_name.str});
|
||||||
|
// TODO use FQCN as a key type
|
||||||
std::cout << "Adding column statistics for " << ifp->field->field_name.str << std::endl;
|
std::cout << "Adding column statistics for " << ifp->field->field_name.str << std::endl;
|
||||||
auto* histogram = dynamic_cast<Histogram_json_hb*>(ifp->field->read_stats->histogram);
|
auto* histogram = dynamic_cast<Histogram_json_hb*>(ifp->field->read_stats->histogram);
|
||||||
if (histogram)
|
if (histogram)
|
||||||
{
|
{
|
||||||
std::cout << "Type of histogram object: " << typeid(*histogram).name() << std::endl;
|
std::cout << "Type of histogram object: " << typeid(*histogram).name() << std::endl;
|
||||||
std::vector<Histogram_bucket> histogramBuckets = histogram->get_json_histogram();
|
// std::vector<Histogram_bucket> histogramBuckets = histogram->get_json_histogram();
|
||||||
std::cout << "gwi.columnStatisticsMap[ifp->field->field_name.str].size() " << histogramBuckets.size() << std::endl;
|
// std::cout << "gwi.tableStatisticsMap[{ifp->field->table->s->db.str, "
|
||||||
gwi.columnStatisticsMap[ifp->field->field_name.str] = histogramBuckets;
|
// "ifp->field->table->s->table_name.str}][ifp->field->field_name.str].size() "
|
||||||
|
// << histogramBuckets.size() << std::endl;
|
||||||
|
SchemaAndTableName tableName = {ifp->field->table->s->db.str, ifp->field->table->s->table_name.str};
|
||||||
|
gwi.tableStatisticsMap[tableName][ifp->field->field_name.str] = *histogram;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -6421,7 +6425,7 @@ int processSelect(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, vector
|
|||||||
{
|
{
|
||||||
Item_field* ifp = (Item_field*)item;
|
Item_field* ifp = (Item_field*)item;
|
||||||
extractColumnStatistics(ifp, gwi);
|
extractColumnStatistics(ifp, gwi);
|
||||||
std::cout << "gwi.columnStatisticsMap 1 size " << gwi.columnStatisticsMap.size() << std::endl;
|
// Handle * case
|
||||||
if (ifp->field_name.length && string(ifp->field_name.str) == "*")
|
if (ifp->field_name.length && string(ifp->field_name.str) == "*")
|
||||||
{
|
{
|
||||||
collectAllCols(gwi, ifp);
|
collectAllCols(gwi, ifp);
|
||||||
@@ -7473,7 +7477,6 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP&
|
|||||||
|
|
||||||
int status = getSelectPlan(gwi, select_lex, csep, false, true, isSelectLexUnit);
|
int status = getSelectPlan(gwi, select_lex, csep, false, true, isSelectLexUnit);
|
||||||
|
|
||||||
std::cout << "cs_get_select_plan columnStatisticsMap size " << gwi.columnStatisticsMap.size() << std::endl;
|
|
||||||
if (status > 0)
|
if (status > 0)
|
||||||
return ER_INTERNAL_ERROR;
|
return ER_INTERNAL_ERROR;
|
||||||
else if (status < 0)
|
else if (status < 0)
|
||||||
|
@@ -136,10 +136,36 @@ namespace cal_impl_if
|
|||||||
{
|
{
|
||||||
extern bool nonConstFunc(Item_func* ifp);
|
extern bool nonConstFunc(Item_func* ifp);
|
||||||
|
|
||||||
void gp_walk_info::mergeColumnStatisticsMap(const ColumnStatisticsMap& aColumnStatisticsMap)
|
void gp_walk_info::mergeTableStatistics(const TableStatisticsMap& aTableStatisticsMap)
|
||||||
{
|
{
|
||||||
columnStatisticsMap.insert(aColumnStatisticsMap.begin(), aColumnStatisticsMap.end());
|
for (auto& [schemaAndTableName, aColumnStatisticsMap]: aTableStatisticsMap)
|
||||||
|
{
|
||||||
|
auto tableStatisticsMapIt = tableStatisticsMap.find(schemaAndTableName);
|
||||||
|
if (tableStatisticsMapIt == tableStatisticsMap.end())
|
||||||
|
{
|
||||||
|
tableStatisticsMap[schemaAndTableName] = aColumnStatisticsMap;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
for (auto& [columnName, histogram]: aColumnStatisticsMap)
|
||||||
|
{
|
||||||
|
tableStatisticsMapIt->second[columnName] = histogram;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::optional<ColumnStatisticsMap> gp_walk_info::findStatisticsForATable(SchemaAndTableName& schemaAndTableName)
|
||||||
|
{
|
||||||
|
auto tableStatisticsMapIt = tableStatisticsMap.find(schemaAndTableName);
|
||||||
|
if (tableStatisticsMapIt == tableStatisticsMap.end())
|
||||||
|
{
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
|
return {tableStatisticsMapIt->second};
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
|
@@ -96,13 +96,29 @@ enum ClauseType
|
|||||||
ORDER_BY
|
ORDER_BY
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct SchemaAndTableName {
|
||||||
|
std::string schema;
|
||||||
|
std::string table;
|
||||||
|
bool operator==(const SchemaAndTableName& other) const {
|
||||||
|
return schema == other.schema && table == other.table;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct SchemaAndTableNameHash {
|
||||||
|
std::size_t operator()(const SchemaAndTableName& k) const {
|
||||||
|
return std::hash<std::string>()(k.schema + k.table);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
typedef std::vector<JoinInfo> JoinInfoVec;
|
typedef std::vector<JoinInfo> JoinInfoVec;
|
||||||
typedef dmlpackage::ColValuesList ColValuesList;
|
typedef dmlpackage::ColValuesList ColValuesList;
|
||||||
typedef dmlpackage::TableValuesMap TableValuesMap;
|
typedef dmlpackage::TableValuesMap TableValuesMap;
|
||||||
typedef std::map<execplan::CalpontSystemCatalog::TableAliasName, std::pair<int, TABLE_LIST*>> TableMap;
|
typedef std::map<execplan::CalpontSystemCatalog::TableAliasName, std::pair<int, TABLE_LIST*>> TableMap;
|
||||||
typedef std::tr1::unordered_map<TABLE_LIST*, std::vector<COND*>> TableOnExprList;
|
typedef std::tr1::unordered_map<TABLE_LIST*, std::vector<COND*>> TableOnExprList;
|
||||||
typedef std::tr1::unordered_map<TABLE_LIST*, uint> TableOuterJoinMap;
|
typedef std::tr1::unordered_map<TABLE_LIST*, uint> TableOuterJoinMap;
|
||||||
using ColumnStatisticsMap = std::unordered_map<std::string, std::vector<Histogram_bucket>>;
|
using ColumnName = std::string;
|
||||||
|
using ColumnStatisticsMap = std::unordered_map<ColumnName, Histogram_json_hb>;
|
||||||
|
using TableStatisticsMap = std::unordered_map<SchemaAndTableName, ColumnStatisticsMap, SchemaAndTableNameHash>;
|
||||||
|
|
||||||
struct gp_walk_info
|
struct gp_walk_info
|
||||||
{
|
{
|
||||||
@@ -112,7 +128,7 @@ struct gp_walk_info
|
|||||||
execplan::CalpontSelectExecutionPlan::ReturnedColumnList orderByCols;
|
execplan::CalpontSelectExecutionPlan::ReturnedColumnList orderByCols;
|
||||||
std::vector<Item*> extSelAggColsItems;
|
std::vector<Item*> extSelAggColsItems;
|
||||||
execplan::CalpontSelectExecutionPlan::ColumnMap columnMap;
|
execplan::CalpontSelectExecutionPlan::ColumnMap columnMap;
|
||||||
std::unordered_map<std::string, std::vector<Histogram_bucket>> columnStatisticsMap;
|
TableStatisticsMap tableStatisticsMap;
|
||||||
// This vector temporarily hold the projection columns to be added
|
// This vector temporarily hold the projection columns to be added
|
||||||
// to the returnedCols vector for subquery processing. It will be appended
|
// to the returnedCols vector for subquery processing. It will be appended
|
||||||
// to the end of returnedCols when the processing is finished.
|
// to the end of returnedCols when the processing is finished.
|
||||||
@@ -203,7 +219,8 @@ struct gp_walk_info
|
|||||||
SubQuery** subQueriesChain;
|
SubQuery** subQueriesChain;
|
||||||
|
|
||||||
gp_walk_info(long timeZone_, SubQuery** subQueriesChain_)
|
gp_walk_info(long timeZone_, SubQuery** subQueriesChain_)
|
||||||
: sessionid(0)
|
: tableStatisticsMap({})
|
||||||
|
, sessionid(0)
|
||||||
, fatalParseError(false)
|
, fatalParseError(false)
|
||||||
, condPush(false)
|
, condPush(false)
|
||||||
, dropCond(false)
|
, dropCond(false)
|
||||||
@@ -234,7 +251,8 @@ struct gp_walk_info
|
|||||||
}
|
}
|
||||||
~gp_walk_info();
|
~gp_walk_info();
|
||||||
|
|
||||||
void mergeColumnStatisticsMap(const std::unordered_map<std::string, std::vector<Histogram_bucket>>& columnStatisticsMap);
|
void mergeTableStatistics(const TableStatisticsMap& tableStatisticsMap);
|
||||||
|
std::optional<ColumnStatisticsMap> findStatisticsForATable(SchemaAndTableName& schemaAndTableName);
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SubQueryChainHolder;
|
struct SubQueryChainHolder;
|
||||||
|
@@ -97,7 +97,7 @@ SCSEP SelectSubQuery::transform()
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Insert column statistics
|
// Insert column statistics
|
||||||
fGwip.mergeColumnStatisticsMap(gwi.columnStatisticsMap);
|
fGwip.mergeTableStatistics(gwi.tableStatisticsMap);
|
||||||
// std::cout << "fGwip.columnStatisticsMap 2 size " << fGwip.columnStatisticsMap.size() << std::endl;
|
// std::cout << "fGwip.columnStatisticsMap 2 size " << fGwip.columnStatisticsMap.size() << std::endl;
|
||||||
// std::cout << "gwi.columnStatisticsMap 2 size " << gwi.columnStatisticsMap.size() << std::endl;
|
// std::cout << "gwi.columnStatisticsMap 2 size " << gwi.columnStatisticsMap.size() << std::endl;
|
||||||
|
|
||||||
|
@@ -191,9 +191,24 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep,
|
|||||||
return ptp;
|
return ptp;
|
||||||
}
|
}
|
||||||
|
|
||||||
execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep)
|
// Looking for a projected column that comes first in an available index and has EI statistics
|
||||||
|
// INV nullptr signifies that no suitable column was found
|
||||||
|
execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx)
|
||||||
{
|
{
|
||||||
return dynamic_cast<execplan::SimpleColumn*>(csep.returnedCols().front().get());
|
for (auto& rc : csep.returnedCols())
|
||||||
|
{
|
||||||
|
auto* simpleColumn = dynamic_cast<execplan::SimpleColumn*>(rc.get());
|
||||||
|
if (simpleColumn)
|
||||||
|
{
|
||||||
|
std::cout << "Found simple column " << simpleColumn->columnName() << std::endl;
|
||||||
|
cal_impl_if::SchemaAndTableName schemaAndTableNam = {simpleColumn->tableName(), simpleColumn->columnName()};
|
||||||
|
auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableNam);
|
||||||
|
|
||||||
|
return simpleColumn;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
||||||
@@ -201,49 +216,65 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
|||||||
{
|
{
|
||||||
execplan::CalpontSelectExecutionPlan::SelectList unionVec;
|
execplan::CalpontSelectExecutionPlan::SelectList unionVec;
|
||||||
// unionVec.reserve(numberOfLegs);
|
// unionVec.reserve(numberOfLegs);
|
||||||
execplan::SimpleColumn* keyColumn = findSuitableKeyColumn(csep);
|
execplan::SimpleColumn* keyColumn = findSuitableKeyColumn(csep, ctx);
|
||||||
std::cout << "looking for " << keyColumn->columnName() << " in ctx.gwi.columnStatisticsMap "
|
if (!keyColumn)
|
||||||
<< " with size " << ctx.gwi.columnStatisticsMap.size() << std::endl;
|
|
||||||
for (auto& [k, v] : ctx.gwi.columnStatisticsMap)
|
|
||||||
{
|
|
||||||
std::cout << "key " << k << " vector size " << v.size() << std::endl;
|
|
||||||
}
|
|
||||||
if (!keyColumn ||
|
|
||||||
ctx.gwi.columnStatisticsMap.find(keyColumn->columnName()) == ctx.gwi.columnStatisticsMap.end())
|
|
||||||
{
|
{
|
||||||
return unionVec;
|
return unionVec;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto columnStatistics = ctx.gwi.columnStatisticsMap[keyColumn->columnName()];
|
std::cout << "looking for " << keyColumn->columnName() << " in ctx.gwi.tableStatisticsMap "
|
||||||
std::cout << "columnStatistics.size() " << columnStatistics.size() << std::endl;
|
<< " with size " << ctx.gwi.tableStatisticsMap.size() << std::endl;
|
||||||
|
for (auto& [k, v] : ctx.gwi.tableStatisticsMap)
|
||||||
|
{
|
||||||
|
std::cout << "SchemaAndTableName " << k.schema << "." << k.table << " column map size " << v.size() << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()};
|
||||||
|
auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName);
|
||||||
|
if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end())
|
||||||
|
{
|
||||||
|
return unionVec;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto columnStatisticsIt = tableColumnsStatisticsIt->second.find(keyColumn->columnName());
|
||||||
|
if (columnStatisticsIt == tableColumnsStatisticsIt->second.end())
|
||||||
|
{
|
||||||
|
return unionVec;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto columnStatistics = columnStatisticsIt->second;
|
||||||
|
std::cout << "Histogram_json_hb histogram size " << columnStatistics.get_json_histogram().size() << std::endl;
|
||||||
// TODO char and other numerical types support
|
// TODO char and other numerical types support
|
||||||
size_t numberOfUnionUnits = 2;
|
size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), 16UL);
|
||||||
size_t numberOfBucketsPerUnionUnit = columnStatistics.size() / numberOfUnionUnits;
|
size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits;
|
||||||
|
|
||||||
std::vector<std::pair<uint64_t, uint64_t>> bounds;
|
std::vector<std::pair<uint64_t, uint64_t>> bounds;
|
||||||
|
|
||||||
// TODO need to process tail if number of buckets is not divisible by number of union units
|
// TODO need to process tail if number of buckets is not divisible by number of union units
|
||||||
// TODO non-overlapping buckets if it is a problem at all
|
// TODO non-overlapping buckets if it is a problem at all
|
||||||
for (size_t i = 0; i < numberOfUnionUnits; ++i)
|
for (size_t i = 0; i < numberOfUnionUnits - 1; ++i)
|
||||||
{
|
{
|
||||||
auto bucket = columnStatistics.begin() + i * numberOfBucketsPerUnionUnit;
|
auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit;
|
||||||
auto endBucket = columnStatistics.begin() + (i + 1) * numberOfBucketsPerUnionUnit;
|
auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit;
|
||||||
// TODO find a median b/w the current bucket start and the previous bucket end
|
uint64_t currentLowerBound = *(uint32_t*)bucket->start_value.data();
|
||||||
uint64_t currentLowerBound =
|
uint64_t currentUpperBound = *(uint32_t*)endBucket->start_value.data();
|
||||||
(bounds.empty() ? *(uint32_t*)bucket->start_value.data()
|
|
||||||
: std::min((uint64_t)*(uint32_t*)bucket->start_value.data(), bounds.back().second));
|
|
||||||
uint64_t currentUpperBound = currentLowerBound;
|
|
||||||
for (; bucket != endBucket; ++bucket)
|
|
||||||
{
|
|
||||||
uint64_t bucketLowerBound = *(uint32_t*)bucket->start_value.data();
|
|
||||||
std::cout << "bucket.start_value " << bucketLowerBound << std::endl;
|
|
||||||
currentUpperBound = bucketLowerBound + bucket->ndv;
|
|
||||||
}
|
|
||||||
std::cout << "currentLowerBound " << currentLowerBound << " currentUpperBound " << currentUpperBound
|
std::cout << "currentLowerBound " << currentLowerBound << " currentUpperBound " << currentUpperBound
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
bounds.push_back(std::make_pair(currentLowerBound, currentUpperBound));
|
bounds.push_back(std::make_pair(currentLowerBound, currentUpperBound));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add last range
|
||||||
|
auto lastBucket = columnStatistics.get_json_histogram().begin() + (numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit;
|
||||||
|
uint64_t currentLowerBound = *(uint32_t*)lastBucket->start_value.data();
|
||||||
|
uint64_t currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data();
|
||||||
|
|
||||||
|
std::cout << "last currentLowerBound " << currentLowerBound << " last currentUpperBound " << currentUpperBound
|
||||||
|
<< std::endl;
|
||||||
|
bounds.push_back(std::make_pair(currentLowerBound, currentUpperBound));
|
||||||
|
|
||||||
for (auto& bound : bounds)
|
for (auto& bound : bounds)
|
||||||
{
|
{
|
||||||
auto clonedCSEP = csep.cloneWORecursiveSelects();
|
auto clonedCSEP = csep.cloneWORecursiveSelects();
|
||||||
@@ -275,8 +306,6 @@ void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon
|
|||||||
derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS);
|
derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS);
|
||||||
derivedSCEP->derivedTbAlias(tableAlias);
|
derivedSCEP->derivedTbAlias(tableAlias);
|
||||||
|
|
||||||
// TODO: hardcoded for now
|
|
||||||
// size_t parallelFactor = 2;
|
|
||||||
// Create a copy of the current leaf CSEP with additional filters to partition the key column
|
// Create a copy of the current leaf CSEP with additional filters to partition the key column
|
||||||
auto additionalUnionVec = makeUnionFromTable(csep, ctx);
|
auto additionalUnionVec = makeUnionFromTable(csep, ctx);
|
||||||
derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),
|
derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),
|
||||||
|
Reference in New Issue
Block a user