You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
feat(optimizer): PoC for EI stats retrieval in getSelectPlan()
This commit is contained in:
@@ -444,6 +444,9 @@ SCSEP FromSubQuery::transform()
|
|||||||
return csep;
|
return csep;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Insert column statistics
|
||||||
|
fGwip.mergeColumnStatisticsMap(gwi.columnStatisticsMap);
|
||||||
|
|
||||||
fGwip.subselectList.push_back(csep);
|
fGwip.subselectList.push_back(csep);
|
||||||
return csep;
|
return csep;
|
||||||
}
|
}
|
||||||
|
@@ -49,7 +49,7 @@ using namespace logging;
|
|||||||
#define PREFER_MY_CONFIG_H
|
#define PREFER_MY_CONFIG_H
|
||||||
#include <my_config.h>
|
#include <my_config.h>
|
||||||
#include "idb_mysql.h"
|
#include "idb_mysql.h"
|
||||||
#include "opt_histogram_json.h"
|
|
||||||
#include "partition_element.h"
|
#include "partition_element.h"
|
||||||
#include "partition_info.h"
|
#include "partition_info.h"
|
||||||
|
|
||||||
@@ -6290,24 +6290,25 @@ int processLimitAndOffset(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep
|
|||||||
|
|
||||||
void extractColumnStatistics(Item_field* ifp, gp_walk_info& gwi)
|
void extractColumnStatistics(Item_field* ifp, gp_walk_info& gwi)
|
||||||
{
|
{
|
||||||
if (!ifp->field->part_of_key.is_clear_all())
|
// TODO find clear way to check if the field is part of a key
|
||||||
{
|
// if (!ifp->field->part_of_key.is_clear_all())
|
||||||
return;
|
// {
|
||||||
}
|
// return;
|
||||||
std::cout << "Processing field item: " << ifp->field_name.str << std::endl;
|
// }
|
||||||
|
// std::cout << "Processing field item: " << ifp->field_name.str << std::endl;
|
||||||
// std::cout << "part of a key: " << buf << std::endl;
|
// std::cout << "part of a key: " << buf << std::endl;
|
||||||
std::cout << "ifp->field->field_index " << ifp->field->field_index << std::endl;
|
// std::cout << "ifp->field->field_index " << ifp->field->field_index << std::endl;
|
||||||
|
|
||||||
for (uint j = 0; j < ifp->field->table->s->keys; j++)
|
for (uint j = 0; j < ifp->field->table->s->keys; j++)
|
||||||
{
|
{
|
||||||
for (uint i = 0; i < ifp->field->table->s->key_info[j].usable_key_parts; i++)
|
for (uint i = 0; i < ifp->field->table->s->key_info[j].usable_key_parts; i++)
|
||||||
{
|
{
|
||||||
std::cout << "key fieldnr " << i << " "
|
// std::cout << "key fieldnr " << i << " "
|
||||||
<< ifp->field->table->s->key_info[j].key_part[i].field->field_name.str << " "
|
// << ifp->field->table->s->key_info[j].key_part[i].field->field_name.str << " "
|
||||||
<< ifp->field->table->s->key_info[j].key_part[i].fieldnr << std::endl;
|
// << ifp->field->table->s->key_info[j].key_part[i].fieldnr << std::endl;
|
||||||
if (ifp->field->table->s->key_info[j].key_part[i].fieldnr == ifp->field->field_index + 1)
|
if (ifp->field->table->s->key_info[j].key_part[i].fieldnr == ifp->field->field_index + 1)
|
||||||
{
|
{
|
||||||
std::cout << "key_info " << j << " key_part " << i << " matched " << std::endl;
|
// std::cout << "key_info " << j << " key_part " << i << " matched " << std::endl;
|
||||||
if (i == 0 && ifp->field->read_stats)
|
if (i == 0 && ifp->field->read_stats)
|
||||||
{
|
{
|
||||||
assert(ifp->field->table->s);
|
assert(ifp->field->table->s);
|
||||||
@@ -6315,7 +6316,15 @@ void extractColumnStatistics(Item_field* ifp, gp_walk_info& gwi)
|
|||||||
// assert(ifp->field->table->s->table_name);
|
// assert(ifp->field->table->s->table_name);
|
||||||
// FQCN fqcn({ifp->field->table->s->db.str}, {ifp->field->table->s->table_name.str}, {ifp->field->field_name.str});
|
// FQCN fqcn({ifp->field->table->s->db.str}, {ifp->field->table->s->table_name.str}, {ifp->field->field_name.str});
|
||||||
//TODO use FQCN as a key type
|
//TODO use FQCN as a key type
|
||||||
gwi.columnStatisticsMap[ifp->field->field_name.str] = ifp->field->read_stats->histogram->get_histogram();
|
std::cout << "Adding column statistics for " << ifp->field->field_name.str << std::endl;
|
||||||
|
auto* histogram = dynamic_cast<Histogram_json_hb*>(ifp->field->read_stats->histogram);
|
||||||
|
if (histogram)
|
||||||
|
{
|
||||||
|
std::cout << "Type of histogram object: " << typeid(*histogram).name() << std::endl;
|
||||||
|
std::vector<Histogram_bucket> histogramBuckets = histogram->get_histogram();
|
||||||
|
std::cout << "gwi.columnStatisticsMap[ifp->field->field_name.str].size() " << histogramBuckets.size() << std::endl;
|
||||||
|
gwi.columnStatisticsMap[ifp->field->field_name.str] = histogramBuckets;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -6412,7 +6421,7 @@ int processSelect(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, vector
|
|||||||
{
|
{
|
||||||
Item_field* ifp = (Item_field*)item;
|
Item_field* ifp = (Item_field*)item;
|
||||||
extractColumnStatistics(ifp, gwi);
|
extractColumnStatistics(ifp, gwi);
|
||||||
|
std::cout << "gwi.columnStatisticsMap 1 size " << gwi.columnStatisticsMap.size() << std::endl;
|
||||||
if (ifp->field_name.length && string(ifp->field_name.str) == "*")
|
if (ifp->field_name.length && string(ifp->field_name.str) == "*")
|
||||||
{
|
{
|
||||||
collectAllCols(gwi, ifp);
|
collectAllCols(gwi, ifp);
|
||||||
@@ -7464,6 +7473,7 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP&
|
|||||||
|
|
||||||
int status = getSelectPlan(gwi, select_lex, csep, false, true, isSelectLexUnit);
|
int status = getSelectPlan(gwi, select_lex, csep, false, true, isSelectLexUnit);
|
||||||
|
|
||||||
|
std::cout << "cs_get_select_plan columnStatisticsMap size " << gwi.columnStatisticsMap.size() << std::endl;
|
||||||
if (status > 0)
|
if (status > 0)
|
||||||
return ER_INTERNAL_ERROR;
|
return ER_INTERNAL_ERROR;
|
||||||
else if (status < 0)
|
else if (status < 0)
|
||||||
@@ -7481,7 +7491,8 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP&
|
|||||||
|
|
||||||
if (get_unstable_optimizer(thd))
|
if (get_unstable_optimizer(thd))
|
||||||
{
|
{
|
||||||
bool csepWasOptimized = optimizer::optimizeCSEP(*csep);
|
optimizer::RBOptimizerContext ctx(gwi);
|
||||||
|
bool csepWasOptimized = optimizer::optimizeCSEP(*csep, ctx);
|
||||||
if (csep->traceOn() && csepWasOptimized)
|
if (csep->traceOn() && csepWasOptimized)
|
||||||
{
|
{
|
||||||
cerr << "---------------- cs_get_select_plan optimized EXECUTION PLAN ----------------" << endl;
|
cerr << "---------------- cs_get_select_plan optimized EXECUTION PLAN ----------------" << endl;
|
||||||
|
@@ -135,6 +135,11 @@ using namespace funcexp;
|
|||||||
namespace cal_impl_if
|
namespace cal_impl_if
|
||||||
{
|
{
|
||||||
extern bool nonConstFunc(Item_func* ifp);
|
extern bool nonConstFunc(Item_func* ifp);
|
||||||
|
|
||||||
|
void gp_walk_info::mergeColumnStatisticsMap(const ColumnStatisticsMap& aColumnStatisticsMap)
|
||||||
|
{
|
||||||
|
columnStatisticsMap.insert(aColumnStatisticsMap.begin(), aColumnStatisticsMap.end());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
|
@@ -102,6 +102,7 @@ typedef dmlpackage::TableValuesMap TableValuesMap;
|
|||||||
typedef std::map<execplan::CalpontSystemCatalog::TableAliasName, std::pair<int, TABLE_LIST*>> TableMap;
|
typedef std::map<execplan::CalpontSystemCatalog::TableAliasName, std::pair<int, TABLE_LIST*>> TableMap;
|
||||||
typedef std::tr1::unordered_map<TABLE_LIST*, std::vector<COND*>> TableOnExprList;
|
typedef std::tr1::unordered_map<TABLE_LIST*, std::vector<COND*>> TableOnExprList;
|
||||||
typedef std::tr1::unordered_map<TABLE_LIST*, uint> TableOuterJoinMap;
|
typedef std::tr1::unordered_map<TABLE_LIST*, uint> TableOuterJoinMap;
|
||||||
|
using ColumnStatisticsMap = std::unordered_map<std::string, std::vector<Histogram_bucket>>;
|
||||||
|
|
||||||
struct gp_walk_info
|
struct gp_walk_info
|
||||||
{
|
{
|
||||||
@@ -232,6 +233,8 @@ struct gp_walk_info
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
~gp_walk_info();
|
~gp_walk_info();
|
||||||
|
|
||||||
|
void mergeColumnStatisticsMap(const std::unordered_map<std::string, std::vector<Histogram_bucket>>& columnStatisticsMap);
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SubQueryChainHolder;
|
struct SubQueryChainHolder;
|
||||||
|
@@ -96,6 +96,12 @@ SCSEP SelectSubQuery::transform()
|
|||||||
return csep;
|
return csep;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Insert column statistics
|
||||||
|
fGwip.mergeColumnStatisticsMap(gwi.columnStatisticsMap);
|
||||||
|
// std::cout << "fGwip.columnStatisticsMap 2 size " << fGwip.columnStatisticsMap.size() << std::endl;
|
||||||
|
// std::cout << "gwi.columnStatisticsMap 2 size " << gwi.columnStatisticsMap.size() << std::endl;
|
||||||
|
|
||||||
|
// Insert subselect CSEP
|
||||||
fGwip.subselectList.push_back(csep);
|
fGwip.subselectList.push_back(csep);
|
||||||
|
|
||||||
// remove outer query tables
|
// remove outer query tables
|
||||||
|
@@ -74,6 +74,7 @@
|
|||||||
#include "rpl_rli.h"
|
#include "rpl_rli.h"
|
||||||
#include "my_dbug.h"
|
#include "my_dbug.h"
|
||||||
#include "sql_show.h"
|
#include "sql_show.h"
|
||||||
|
#include "opt_histogram_json.h"
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
||||||
// Now clean up the pollution as best we can...
|
// Now clean up the pollution as best we can...
|
||||||
|
@@ -15,6 +15,13 @@
|
|||||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||||
MA 02110-1301, USA. */
|
MA 02110-1301, USA. */
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <limits>
|
||||||
|
|
||||||
|
#include "rulebased_optimizer.h"
|
||||||
|
|
||||||
#include "constantcolumn.h"
|
#include "constantcolumn.h"
|
||||||
#include "execplan/calpontselectexecutionplan.h"
|
#include "execplan/calpontselectexecutionplan.h"
|
||||||
#include "execplan/simplecolumn.h"
|
#include "execplan/simplecolumn.h"
|
||||||
@@ -23,44 +30,45 @@
|
|||||||
#include "operator.h"
|
#include "operator.h"
|
||||||
#include "predicateoperator.h"
|
#include "predicateoperator.h"
|
||||||
#include "simplefilter.h"
|
#include "simplefilter.h"
|
||||||
#include "rulebased_optimizer.h"
|
|
||||||
#include <cstdint>
|
|
||||||
#include <limits>
|
|
||||||
|
|
||||||
namespace optimizer
|
namespace optimizer
|
||||||
{
|
{
|
||||||
|
|
||||||
|
void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, const size_t id);
|
||||||
|
|
||||||
static const std::string RewrittenSubTableAliasPrefix = "$added_sub_";
|
static const std::string RewrittenSubTableAliasPrefix = "$added_sub_";
|
||||||
|
|
||||||
// Apply a list of rules to a CSEP
|
// Apply a list of rules to a CSEP
|
||||||
bool optimizeCSEPWithRules(execplan::CalpontSelectExecutionPlan& root, const std::vector<Rule>& rules)
|
bool optimizeCSEPWithRules(execplan::CalpontSelectExecutionPlan& root, const std::vector<Rule>& rules,
|
||||||
|
optimizer::RBOptimizerContext& ctx)
|
||||||
{
|
{
|
||||||
bool changed = false;
|
bool changed = false;
|
||||||
for (const auto& rule : rules)
|
for (const auto& rule : rules)
|
||||||
{
|
{
|
||||||
changed |= rule.apply(root);
|
changed |= rule.apply(root, ctx);
|
||||||
}
|
}
|
||||||
return changed;
|
return changed;
|
||||||
}
|
}
|
||||||
|
|
||||||
// high level API call for optimizer
|
// high level API call for optimizer
|
||||||
bool optimizeCSEP(execplan::CalpontSelectExecutionPlan& root)
|
bool optimizeCSEP(execplan::CalpontSelectExecutionPlan& root, optimizer::RBOptimizerContext& ctx)
|
||||||
{
|
{
|
||||||
optimizer::Rule parallelCES{"parallelCES", optimizer::matchParallelCES, optimizer::applyParallelCES};
|
optimizer::Rule parallelCES{"parallelCES", optimizer::matchParallelCES, optimizer::applyParallelCES};
|
||||||
|
|
||||||
std::vector<Rule> rules = {parallelCES};
|
std::vector<optimizer::Rule> rules = {parallelCES};
|
||||||
|
|
||||||
return optimizeCSEPWithRules(root, rules);
|
return optimizeCSEPWithRules(root, rules, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply iteratively until CSEP is converged by rule
|
// Apply iteratively until CSEP is converged by rule
|
||||||
bool Rule::apply(execplan::CalpontSelectExecutionPlan& root) const
|
bool Rule::apply(execplan::CalpontSelectExecutionPlan& root, optimizer::RBOptimizerContext& ctx) const
|
||||||
{
|
{
|
||||||
bool changedThisRound = false;
|
bool changedThisRound = false;
|
||||||
bool hasBeenApplied = false;
|
bool hasBeenApplied = false;
|
||||||
|
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
changedThisRound = walk(root);
|
changedThisRound = walk(root, ctx);
|
||||||
hasBeenApplied |= changedThisRound;
|
hasBeenApplied |= changedThisRound;
|
||||||
} while (changedThisRound && !applyOnlyOnce);
|
} while (changedThisRound && !applyOnlyOnce);
|
||||||
|
|
||||||
@@ -68,69 +76,42 @@ bool Rule::apply(execplan::CalpontSelectExecutionPlan& root) const
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DFS walk to match CSEP and apply rules if match
|
// DFS walk to match CSEP and apply rules if match
|
||||||
bool Rule::walk(execplan::CalpontSelectExecutionPlan& csep) const
|
bool Rule::walk(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx) const
|
||||||
{
|
{
|
||||||
bool rewrite = false;
|
bool rewrite = false;
|
||||||
|
|
||||||
for (auto& table : csep.derivedTableList())
|
std::stack<execplan::CalpontSelectExecutionPlan*> planStack;
|
||||||
|
planStack.push(&csep);
|
||||||
|
|
||||||
|
while (!planStack.empty())
|
||||||
{
|
{
|
||||||
auto* csepPtr = dynamic_cast<execplan::CalpontSelectExecutionPlan*>(table.get());
|
execplan::CalpontSelectExecutionPlan* current = planStack.top();
|
||||||
if (!csepPtr)
|
planStack.pop();
|
||||||
|
|
||||||
|
for (auto& table : current->derivedTableList())
|
||||||
{
|
{
|
||||||
continue;
|
auto* csepPtr = dynamic_cast<execplan::CalpontSelectExecutionPlan*>(table.get());
|
||||||
}
|
if (csepPtr)
|
||||||
|
|
||||||
auto& csepLocal = *csepPtr;
|
|
||||||
rewrite |= walk(csepLocal);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto& unionUnit : csep.unionVec())
|
|
||||||
{
|
|
||||||
auto* unionUnitPtr = dynamic_cast<execplan::CalpontSelectExecutionPlan*>(unionUnit.get());
|
|
||||||
if (!unionUnitPtr)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto& unionUnitLocal = *unionUnitPtr;
|
|
||||||
rewrite |= walk(unionUnitLocal);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (csep.filters() != nullptr)
|
|
||||||
{
|
|
||||||
bool rewriteLocal = false;
|
|
||||||
std::vector<execplan::ParseTree*> stack;
|
|
||||||
stack.push_back(csep.filters());
|
|
||||||
while (!stack.empty())
|
|
||||||
{
|
|
||||||
execplan::ParseTree* node = stack.back();
|
|
||||||
stack.pop_back();
|
|
||||||
if (node == nullptr)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
auto* existsFilter = dynamic_cast<execplan::ExistsFilter*>(node->data());
|
|
||||||
if (existsFilter)
|
|
||||||
{
|
{
|
||||||
if (matchRule(*existsFilter->sub()))
|
planStack.push(csepPtr);
|
||||||
{
|
|
||||||
applyRule(*existsFilter->sub());
|
|
||||||
rewriteLocal = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (node->right())
|
|
||||||
stack.push_back(node->right());
|
|
||||||
if (node->left())
|
|
||||||
stack.push_back(node->left());
|
|
||||||
}
|
}
|
||||||
if (rewriteLocal)
|
|
||||||
rewrite |= rewriteLocal;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (matchRule(csep))
|
for (auto& unionUnit : current->unionVec())
|
||||||
{
|
{
|
||||||
applyRule(csep);
|
auto* unionUnitPtr = dynamic_cast<execplan::CalpontSelectExecutionPlan*>(unionUnit.get());
|
||||||
rewrite = true;
|
if (unionUnitPtr)
|
||||||
|
{
|
||||||
|
planStack.push(unionUnitPtr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (matchRule(*current))
|
||||||
|
{
|
||||||
|
applyRule(*current, ctx);
|
||||||
|
++ctx.uniqueId;
|
||||||
|
rewrite = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return rewrite;
|
return rewrite;
|
||||||
@@ -177,7 +158,7 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep,
|
|||||||
auto* filterColLeftOp = new execplan::ConstantColumnUInt(bound.second, 0, 0);
|
auto* filterColLeftOp = new execplan::ConstantColumnUInt(bound.second, 0, 0);
|
||||||
// set TZ
|
// set TZ
|
||||||
// There is a question with ownership of the const column
|
// There is a question with ownership of the const column
|
||||||
// WIP here we lost upper bound value if predicate is not changed to weak lt
|
// WIP here we lost upper bound value if predicate is not changed to weak lt
|
||||||
execplan::SOP ltOp = boost::make_shared<execplan::Operator>(execplan::PredicateOperator("<"));
|
execplan::SOP ltOp = boost::make_shared<execplan::Operator>(execplan::PredicateOperator("<"));
|
||||||
ltOp->setOpType(filterColLeftOp->resultType(), tableKeyColumnLeftOp->resultType());
|
ltOp->setOpType(filterColLeftOp->resultType(), tableKeyColumnLeftOp->resultType());
|
||||||
ltOp->resultType(ltOp->operationType());
|
ltOp->resultType(ltOp->operationType());
|
||||||
@@ -210,14 +191,42 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep,
|
|||||||
return ptp;
|
return ptp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep)
|
||||||
|
{
|
||||||
|
return dynamic_cast<execplan::SimpleColumn*>(csep.returnedCols().front().get());
|
||||||
|
}
|
||||||
|
|
||||||
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
||||||
const size_t numberOfLegs, execplan::CalpontSelectExecutionPlan& csep)
|
execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx)
|
||||||
{
|
{
|
||||||
execplan::CalpontSelectExecutionPlan::SelectList unionVec;
|
execplan::CalpontSelectExecutionPlan::SelectList unionVec;
|
||||||
unionVec.reserve(numberOfLegs);
|
// unionVec.reserve(numberOfLegs);
|
||||||
std::vector<std::pair<uint64_t, uint64_t>> bounds({{0, 3000961},
|
execplan::SimpleColumn* keyColumn = findSuitableKeyColumn(csep);
|
||||||
{3000961, std::numeric_limits<uint64_t>::max()}});
|
std::cout << "looking for " << keyColumn->columnName() << " in ctx.gwi.columnStatisticsMap " << " with size " << ctx.gwi.columnStatisticsMap.size() << std::endl;
|
||||||
for (auto bound : bounds)
|
for (auto& [k, v] : ctx.gwi.columnStatisticsMap)
|
||||||
|
{
|
||||||
|
std::cout << "key " << k << std::endl;
|
||||||
|
}
|
||||||
|
if (!keyColumn ||
|
||||||
|
ctx.gwi.columnStatisticsMap.find(keyColumn->columnName()) == ctx.gwi.columnStatisticsMap.end())
|
||||||
|
{
|
||||||
|
return unionVec;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto columnStatistics = ctx.gwi.columnStatisticsMap[keyColumn->columnName()];
|
||||||
|
std::cout << "columnStatistics.size() " << columnStatistics.size() << std::endl;
|
||||||
|
// TODO char and other numerical types support
|
||||||
|
std::vector<std::pair<uint64_t, uint64_t>> bounds;
|
||||||
|
std::transform(columnStatistics.begin(), columnStatistics.end(), std::back_inserter(bounds),
|
||||||
|
[](const auto& bucket)
|
||||||
|
{
|
||||||
|
uint64_t lowerBound = std::stoul(bucket.start_value);
|
||||||
|
uint64_t upperBound = lowerBound + bucket.ndv;
|
||||||
|
return std::make_pair(lowerBound, upperBound);
|
||||||
|
});
|
||||||
|
// std::vector<std::pair<uint64_t, uint64_t>> bounds({{0, 3000961},
|
||||||
|
// // {3000961, std::numeric_limits<uint64_t>::max()}});
|
||||||
|
for (auto& bound : bounds)
|
||||||
{
|
{
|
||||||
auto clonedCSEP = csep.cloneWORecursiveSelects();
|
auto clonedCSEP = csep.cloneWORecursiveSelects();
|
||||||
// Add BETWEEN based on key column range
|
// Add BETWEEN based on key column range
|
||||||
@@ -227,8 +236,7 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable(
|
|||||||
|
|
||||||
return unionVec;
|
return unionVec;
|
||||||
}
|
}
|
||||||
|
void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx)
|
||||||
void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep)
|
|
||||||
{
|
{
|
||||||
auto tables = csep.tableList();
|
auto tables = csep.tableList();
|
||||||
execplan::CalpontSelectExecutionPlan::TableList newTableList;
|
execplan::CalpontSelectExecutionPlan::TableList newTableList;
|
||||||
@@ -242,16 +250,17 @@ void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep)
|
|||||||
{
|
{
|
||||||
auto derivedSCEP = csep.cloneWORecursiveSelects();
|
auto derivedSCEP = csep.cloneWORecursiveSelects();
|
||||||
// need to add a level here
|
// need to add a level here
|
||||||
std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table;
|
std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" +
|
||||||
|
std::to_string(ctx.uniqueId);
|
||||||
|
|
||||||
derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM);
|
derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM);
|
||||||
derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS);
|
derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS);
|
||||||
derivedSCEP->derivedTbAlias(tableAlias);
|
derivedSCEP->derivedTbAlias(tableAlias);
|
||||||
|
|
||||||
// TODO: hardcoded for now
|
// TODO: hardcoded for now
|
||||||
size_t parallelFactor = 2;
|
// size_t parallelFactor = 2;
|
||||||
// Create a copy of the current leaf CSEP with additional filters to partition the key column
|
// Create a copy of the current leaf CSEP with additional filters to partition the key column
|
||||||
auto additionalUnionVec = makeUnionFromTable(parallelFactor, csep);
|
auto additionalUnionVec = makeUnionFromTable(csep, ctx);
|
||||||
derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),
|
derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),
|
||||||
additionalUnionVec.end());
|
additionalUnionVec.end());
|
||||||
|
|
||||||
@@ -275,11 +284,95 @@ void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep)
|
|||||||
execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, "");
|
execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, "");
|
||||||
newTableList.push_back(tn);
|
newTableList.push_back(tn);
|
||||||
// Remove the filters as they were pushed down to union units
|
// Remove the filters as they were pushed down to union units
|
||||||
|
// This is inappropriate for EXISTS filter and join conditions
|
||||||
derivedSCEP->filters(nullptr);
|
derivedSCEP->filters(nullptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Remove the filters as they were pushed down to union units
|
// Remove the filters as they were pushed down to union units
|
||||||
csep.filters(nullptr);
|
// This is inappropriate for EXISTS filter and join conditions
|
||||||
|
// csep.filters(nullptr);
|
||||||
|
// There must be no derived at this point.
|
||||||
|
csep.derivedTableList(newDerivedTableList);
|
||||||
|
// Replace table list with new table list populated with union units
|
||||||
|
csep.tableList(newTableList);
|
||||||
|
csep.returnedCols(newReturnedColumns);
|
||||||
|
}
|
||||||
|
|
||||||
|
execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable_exists(
|
||||||
|
const size_t numberOfLegs, execplan::CalpontSelectExecutionPlan& csep)
|
||||||
|
{
|
||||||
|
execplan::CalpontSelectExecutionPlan::SelectList unionVec;
|
||||||
|
unionVec.reserve(numberOfLegs);
|
||||||
|
std::vector<std::pair<uint64_t, uint64_t>> bounds(
|
||||||
|
{{0, 3000961}, {3000961, std::numeric_limits<uint64_t>::max()}});
|
||||||
|
for (auto bound : bounds)
|
||||||
|
{
|
||||||
|
auto clonedCSEP = csep.cloneWORecursiveSelects();
|
||||||
|
clonedCSEP->filters(nullptr);
|
||||||
|
// Add BETWEEN based on key column range
|
||||||
|
clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, bound));
|
||||||
|
unionVec.push_back(clonedCSEP);
|
||||||
|
}
|
||||||
|
|
||||||
|
return unionVec;
|
||||||
|
}
|
||||||
|
|
||||||
|
void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx)
|
||||||
|
{
|
||||||
|
auto tables = csep.tableList();
|
||||||
|
execplan::CalpontSelectExecutionPlan::TableList newTableList;
|
||||||
|
execplan::CalpontSelectExecutionPlan::SelectList newDerivedTableList;
|
||||||
|
execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns;
|
||||||
|
|
||||||
|
// ATM Must be only 1 table
|
||||||
|
for (auto& table : tables)
|
||||||
|
{
|
||||||
|
if (!table.isColumnstore())
|
||||||
|
{
|
||||||
|
auto derivedSCEP = csep.cloneWORecursiveSelects();
|
||||||
|
// need to add a level here
|
||||||
|
std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" +
|
||||||
|
std::to_string(ctx.uniqueId);
|
||||||
|
|
||||||
|
derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM);
|
||||||
|
derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS);
|
||||||
|
derivedSCEP->derivedTbAlias(tableAlias);
|
||||||
|
|
||||||
|
// TODO: hardcoded for now
|
||||||
|
size_t parallelFactor = 2;
|
||||||
|
// Create a copy of the current leaf CSEP with additional filters to partition the key column
|
||||||
|
auto additionalUnionVec = makeUnionFromTable_exists(parallelFactor, csep);
|
||||||
|
derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),
|
||||||
|
additionalUnionVec.end());
|
||||||
|
|
||||||
|
size_t colPosition = 0;
|
||||||
|
// change parent to derived table columns
|
||||||
|
for (auto& rc : csep.returnedCols())
|
||||||
|
{
|
||||||
|
auto rcCloned = boost::make_shared<execplan::SimpleColumn>(*rc);
|
||||||
|
// TODO timezone and result type are not copied
|
||||||
|
// TODO add specific ctor for this functionality
|
||||||
|
rcCloned->tableName("");
|
||||||
|
rcCloned->schemaName("");
|
||||||
|
rcCloned->tableAlias(tableAlias);
|
||||||
|
rcCloned->colPosition(colPosition++);
|
||||||
|
rcCloned->resultType(rc->resultType());
|
||||||
|
|
||||||
|
newReturnedColumns.push_back(rcCloned);
|
||||||
|
}
|
||||||
|
|
||||||
|
newDerivedTableList.push_back(derivedSCEP);
|
||||||
|
execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, "");
|
||||||
|
newTableList.push_back(tn);
|
||||||
|
// Remove the filters as they were pushed down to union units
|
||||||
|
// This is inappropriate for EXISTS filter and join conditions
|
||||||
|
// TODO if needed
|
||||||
|
derivedSCEP->filters(nullptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Remove the filters as they were pushed down to union units
|
||||||
|
// This is inappropriate for EXISTS filter and join conditions
|
||||||
|
// csep.filters(nullptr);
|
||||||
// There must be no derived at this point.
|
// There must be no derived at this point.
|
||||||
csep.derivedTableList(newDerivedTableList);
|
csep.derivedTableList(newDerivedTableList);
|
||||||
// Replace table list with new table list populated with union units
|
// Replace table list with new table list populated with union units
|
||||||
|
@@ -18,14 +18,31 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
|
#define PREFER_MY_CONFIG_H
|
||||||
|
#include <my_config.h>
|
||||||
|
#include "idb_mysql.h"
|
||||||
|
|
||||||
|
#include "ha_mcs_impl_if.h"
|
||||||
|
|
||||||
#include "execplan/calpontselectexecutionplan.h"
|
#include "execplan/calpontselectexecutionplan.h"
|
||||||
|
|
||||||
namespace optimizer {
|
namespace optimizer {
|
||||||
|
|
||||||
|
class RBOptimizerContext {
|
||||||
|
public:
|
||||||
|
RBOptimizerContext() = delete;
|
||||||
|
RBOptimizerContext(cal_impl_if::gp_walk_info& walk_info) : gwi(walk_info) {}
|
||||||
|
// gwi lifetime should be longer than optimizer context.
|
||||||
|
// In plugin runtime this is always true.
|
||||||
|
cal_impl_if::gp_walk_info& gwi;
|
||||||
|
uint64_t uniqueId {0};
|
||||||
|
};
|
||||||
|
|
||||||
struct Rule
|
struct Rule
|
||||||
{
|
{
|
||||||
using RuleMatcher = bool (*)(execplan::CalpontSelectExecutionPlan&);
|
using RuleMatcher = bool (*)(execplan::CalpontSelectExecutionPlan&);
|
||||||
using RuleApplier = void (*)(execplan::CalpontSelectExecutionPlan&);
|
using RuleApplier = void (*)(execplan::CalpontSelectExecutionPlan&, RBOptimizerContext&);
|
||||||
|
|
||||||
Rule(std::string&& name, RuleMatcher matchRule, RuleApplier applyRule)
|
Rule(std::string&& name, RuleMatcher matchRule, RuleApplier applyRule)
|
||||||
: name(name), matchRule(matchRule), applyRule(applyRule) {};
|
: name(name), matchRule(matchRule), applyRule(applyRule) {};
|
||||||
@@ -39,15 +56,21 @@ struct Rule
|
|||||||
Rule() = default;
|
Rule() = default;
|
||||||
Rule(const Rule&) = default;
|
Rule(const Rule&) = default;
|
||||||
Rule(Rule&&) = default;
|
Rule(Rule&&) = default;
|
||||||
|
|
||||||
|
std::string getName() const
|
||||||
|
{
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
Rule& operator=(const Rule&) = default;
|
Rule& operator=(const Rule&) = default;
|
||||||
Rule& operator=(Rule&&) = default;
|
Rule& operator=(Rule&&) = default;
|
||||||
|
|
||||||
bool apply(execplan::CalpontSelectExecutionPlan& csep) const;
|
bool apply(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx) const;
|
||||||
bool walk(execplan::CalpontSelectExecutionPlan& csep) const;
|
bool walk(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx) const;
|
||||||
};
|
};
|
||||||
|
|
||||||
bool matchParallelCES(execplan::CalpontSelectExecutionPlan& csep);
|
bool matchParallelCES(execplan::CalpontSelectExecutionPlan& csep);
|
||||||
void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep);
|
void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx);
|
||||||
bool optimizeCSEP(execplan::CalpontSelectExecutionPlan& root);
|
bool optimizeCSEP(execplan::CalpontSelectExecutionPlan& root, RBOptimizerContext& ctx);
|
||||||
|
|
||||||
}
|
}
|
Reference in New Issue
Block a user