diff --git a/dbcon/mysql/rbo_apply_parallel_ces.cpp b/dbcon/mysql/rbo_apply_parallel_ces.cpp index 86094a3ac..363ef263b 100644 --- a/dbcon/mysql/rbo_apply_parallel_ces.cpp +++ b/dbcon/mysql/rbo_apply_parallel_ces.cpp @@ -38,6 +38,13 @@ void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, const s static const std::string RewrittenSubTableAliasPrefix = "$added_sub_"; static const size_t MaxParallelFactor = 16; +bool tableAliasEqual(const execplan::CalpontSystemCatalog::TableAliasName& lhs, + const execplan::CalpontSystemCatalog::TableAliasName& rhs) +{ +return (lhs.schema == rhs.schema && lhs.table == rhs.table && lhs.alias == rhs.alias && +lhs.fisColumnStore == rhs.fisColumnStore); +} + bool tableIsInUnion(const execplan::CalpontSystemCatalog::TableAliasName& table, execplan::CalpontSelectExecutionPlan& csep) { @@ -116,16 +123,25 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, exe // Looking for a projected column that comes first in an available index and has EI statistics // INV nullptr signifies that no suitable column was found execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep, + execplan::CalpontSystemCatalog::TableAliasName& targetTable, optimizer::RBOptimizerContext& ctx) { + // TODO supply a list of suitable columns from a higher level for (auto& rc : csep.returnedCols()) { // TODO extract SC from RC auto* simpleColumn = dynamic_cast(rc.get()); if (simpleColumn) { + execplan::CalpontSystemCatalog::TableAliasName rcTable( + simpleColumn->schemaName(), simpleColumn->tableName(), simpleColumn->tableAlias(), "", false); + if (!tableAliasEqual(targetTable, rcTable)) + { + continue; + } cal_impl_if::SchemaAndTableName schemaAndTableName = {simpleColumn->schemaName(), simpleColumn->tableName()}; + auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableName); if (!columnStatistics) { @@ -144,59 +160,61 @@ execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPl // TODO char and other numerical types support execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable( - execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx) + execplan::CalpontSelectExecutionPlan& csep, execplan::CalpontSystemCatalog::TableAliasName& table, + optimizer::RBOptimizerContext& ctx) { execplan::CalpontSelectExecutionPlan::SelectList unionVec; // SC type controls an integral type used to produce suitable filters. The continuation of this function // should become a template function based on SC type. - execplan::SimpleColumn* keyColumn = findSuitableKeyColumn(csep, ctx); + execplan::SimpleColumn* keyColumn = findSuitableKeyColumn(csep, table, ctx); if (!keyColumn) { return unionVec; } - cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()}; - auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName); - if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end()) - { - return unionVec; - } - - auto columnStatisticsIt = tableColumnsStatisticsIt->second.find(keyColumn->columnName()); - if (columnStatisticsIt == tableColumnsStatisticsIt->second.end()) - { - return unionVec; - } - - auto columnStatistics = columnStatisticsIt->second; - - // TODO configurable parallel factor via session variable - // NB now histogram size is the way to control parallel factor with 16 being the maximum - size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), MaxParallelFactor); - size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits; - // TODO char and other numerical types support std::vector> bounds; - - // Loop over buckets to produce filter ranges - for (size_t i = 0; i < numberOfUnionUnits - 1; ++i) { - auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit; - auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit; - uint64_t currentLowerBound = *(uint32_t*)bucket->start_value.data(); - uint64_t currentUpperBound = *(uint32_t*)endBucket->start_value.data(); + cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()}; + auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName); + if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end()) + { + return unionVec; + } + + auto columnStatisticsIt = tableColumnsStatisticsIt->second.find(keyColumn->columnName()); + if (columnStatisticsIt == tableColumnsStatisticsIt->second.end()) + { + return unionVec; + } + + auto columnStatistics = columnStatisticsIt->second; + + // TODO configurable parallel factor via session variable + // NB now histogram size is the way to control parallel factor with 16 being the maximum + size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), MaxParallelFactor); + size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits; + + // Loop over buckets to produce filter ranges + for (size_t i = 0; i < numberOfUnionUnits - 1; ++i) + { + auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit; + auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit; + uint64_t currentLowerBound = *(uint32_t*)bucket->start_value.data(); + uint64_t currentUpperBound = *(uint32_t*)endBucket->start_value.data(); + bounds.push_back({currentLowerBound, currentUpperBound}); + } + + // Add last range + // NB despite the fact that currently Histogram_json_hb has the last bucket that has end as its start + auto lastBucket = columnStatistics.get_json_histogram().begin() + + (numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit; + uint64_t currentLowerBound = *(uint32_t*)lastBucket->start_value.data(); + uint64_t currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data(); bounds.push_back({currentLowerBound, currentUpperBound}); } - // Add last range - // NB despite the fact that currently Histogram_json_hb has the last bucket that has end as its start - auto lastBucket = - columnStatistics.get_json_histogram().begin() + (numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit; - uint64_t currentLowerBound = *(uint32_t*)lastBucket->start_value.data(); - uint64_t currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data(); - bounds.push_back({currentLowerBound, currentUpperBound}); - for (auto& bound : bounds) { auto clonedCSEP = csep.cloneWORecursiveSelects(); @@ -229,13 +247,13 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" + std::to_string(ctx.uniqueId); // TODO add original alias to support multiple same name tables - tableAliasMap.insert({table, tableAlias}); + tableAliasMap.insert({table, {tableAlias, 0}}); derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM); derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS); derivedSCEP->derivedTbAlias(tableAlias); // Create a copy of the current leaf CSEP with additional filters to partition the key column - auto additionalUnionVec = makeUnionFromTable(csep, ctx); + auto additionalUnionVec = makeUnionFromTable(csep, table, ctx); derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(), additionalUnionVec.end()); @@ -250,7 +268,7 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon } execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns; - [[maybe_unused]] size_t colPosition = 0; + // [[maybe_unused]] size_t colPosition = 0; // replace parent CSEP RCs with derived table RCs using ScheamAndTableName -> tableAlias map for (auto& rc : csep.returnedCols()) { @@ -268,17 +286,19 @@ bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon // auto sc = scs[0]; std::cout << "Processing RC schema " << sc->schemaName() << " table " << sc->tableName() << " alias " << sc->tableAlias() << std::endl; - auto newTableAlias = tableAliasMap.find( - {sc->schemaName(), sc->tableName(), sc->tableAlias(), "", false}); - if (newTableAlias == tableAliasMap.end()) + auto newTableAliasAndColPositionCounter = + tableAliasMap.find({sc->schemaName(), sc->tableName(), sc->tableAlias(), "", false}); + if (newTableAliasAndColPositionCounter == tableAliasMap.end()) { std::cout << "The RC doesn't belong to any of the derived tables, so leave it intact" << std::endl; continue; } sc->tableName(""); sc->schemaName(""); - sc->tableAlias(newTableAlias->second); - sc->isColumnStore(true); + auto& [newTableAlias, colPosition] = newTableAliasAndColPositionCounter->second; + sc->tableAlias(newTableAlias); + // WIP Not needed according with CSEP output + // sc->isColumnStore(true); sc->colPosition(colPosition++); // rcCloned->colPosition(colPosition++); // rcCloned->resultType(rc->resultType()); diff --git a/dbcon/mysql/rbo_apply_parallel_ces.h b/dbcon/mysql/rbo_apply_parallel_ces.h index 644ca764a..cf08b84c0 100644 --- a/dbcon/mysql/rbo_apply_parallel_ces.h +++ b/dbcon/mysql/rbo_apply_parallel_ces.h @@ -24,39 +24,40 @@ #include "execplan/calpontselectexecutionplan.h" #include "rulebased_optimizer.h" -namespace optimizer { - struct LessThan +namespace optimizer +{ +struct TableAliasLessThan +{ + bool operator()(const execplan::CalpontSystemCatalog::TableAliasName& lhs, + const execplan::CalpontSystemCatalog::TableAliasName& rhs) const { - bool operator()(const execplan::CalpontSystemCatalog::TableAliasName& lhs, - const execplan::CalpontSystemCatalog::TableAliasName& rhs) const + if (lhs.schema < rhs.schema) { - if (lhs.schema < rhs.schema) + return true; + } + else if (lhs.schema == rhs.schema) + { + if (lhs.table < rhs.table) { return true; } - else if (lhs.schema == rhs.schema) + else if (lhs.table == rhs.table) { - if (lhs.table < rhs.table) + if (lhs.alias < rhs.alias) { return true; } - else if (lhs.table == rhs.table) - { - if (lhs.alias < rhs.alias) - { - return true; - } - } } - - return false; } - }; - using TableAliasMap = std::map; - bool matchParallelCES(execplan::CalpontSelectExecutionPlan& csep); - void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx); - bool parallelCESFilter(execplan::CalpontSelectExecutionPlan& csep); - bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx); + return false; + } +}; + +using NewTableAliasAndColumnPosCounter = std::pair; +using TableAliasMap = std::map; + +bool parallelCESFilter(execplan::CalpontSelectExecutionPlan& csep); +bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx); } \ No newline at end of file