diff --git a/dbcon/mysql/rbo_apply_parallel_ces.cpp b/dbcon/mysql/rbo_apply_parallel_ces.cpp index 7529308b9..e9465dcfd 100644 --- a/dbcon/mysql/rbo_apply_parallel_ces.cpp +++ b/dbcon/mysql/rbo_apply_parallel_ces.cpp @@ -65,16 +65,16 @@ bool matchParallelCES(execplan::CalpontSelectExecutionPlan& csep) // This routine produces a new ParseTree that is AND(lowerBand <= column, column <= upperBand) // TODO add engine-independent statistics-derived ranges -execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, +execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, execplan::SimpleColumn& column, std::pair& bound) { // INV this is SimpleColumn we supply as an argument // TODO find the suitable column using EI statistics. - auto* column = dynamic_cast(csep->returnedCols().front().get()); - assert(column); + // auto* column = dynamic_cast(csep->returnedCols().front().get()); + // assert(column); - auto tableKeyColumnLeftOp = new execplan::SimpleColumn(*column); - tableKeyColumnLeftOp->resultType(column->resultType()); + auto tableKeyColumnLeftOp = new execplan::SimpleColumn(column); + tableKeyColumnLeftOp->resultType(column.resultType()); // TODO Nobody owns this allocation and cleanup only depends on delete in ParseTree nodes' dtors. auto* filterColLeftOp = new execplan::ConstantColumnUInt(bound.second, 0, 0); @@ -87,8 +87,8 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, auto* sfr = new execplan::SimpleFilter(ltOp, tableKeyColumnLeftOp, filterColLeftOp); // auto tableKeyColumn = derivedSCEP->returnedCols().front(); - auto tableKeyColumnRightOp = new execplan::SimpleColumn(*column); - tableKeyColumnRightOp->resultType(column->resultType()); + auto tableKeyColumnRightOp = new execplan::SimpleColumn(column); + tableKeyColumnRightOp->resultType(column.resultType()); // TODO hardcoded column type and value auto* filterColRightOp = new execplan::ConstantColumnUInt(bound.first, 0, 0); @@ -117,16 +117,24 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, // INV nullptr signifies that no suitable column was found execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx) { + std::cout << "findSuitableKeyColumn " << csep.returnedCols().size() << std::endl; for (auto& rc : csep.returnedCols()) { auto* simpleColumn = dynamic_cast(rc.get()); if (simpleColumn) { std::cout << "Found simple column " << simpleColumn->columnName() << std::endl; - cal_impl_if::SchemaAndTableName schemaAndTableNam = {simpleColumn->tableName(), simpleColumn->columnName()}; + cal_impl_if::SchemaAndTableName schemaAndTableNam = {simpleColumn->schemaName(), simpleColumn->tableName()}; auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableNam); - - return simpleColumn; + if (!columnStatistics) + { + continue; + } + auto columnStatisticsIt = columnStatistics->find(simpleColumn->columnName()); + if (columnStatisticsIt != columnStatistics->end()) + { + return simpleColumn; + } } } @@ -167,9 +175,8 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable( } auto columnStatistics = columnStatisticsIt->second; - std::cout << "Histogram_json_hb histogram size " << columnStatistics.get_json_histogram().size() << std::endl; - // TODO configurable parallel factor + // TODO configurable parallel factor via session variable // NB now histogram size is the way to control parallel factor with 16 being the maximum size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), MaxParallelFactor); size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits; @@ -188,23 +195,26 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable( std::cout << "currentLowerBound " << currentLowerBound << " currentUpperBound " << currentUpperBound << std::endl; - bounds.push_back(std::make_pair(currentLowerBound, currentUpperBound)); + bounds.push_back({currentLowerBound, currentUpperBound}); } // Add last range + // NB despite the fact that currently Histogram_json_hb has the last bucket that has end as its start auto lastBucket = columnStatistics.get_json_histogram().begin() + (numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit; uint64_t currentLowerBound = *(uint32_t*)lastBucket->start_value.data(); + std::cout << "lastBucket start_value " << currentLowerBound << std::endl; uint64_t currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data(); + std::cout << "Histogram end_value " << currentUpperBound << std::endl; std::cout << "last currentLowerBound " << currentLowerBound << " last currentUpperBound " << currentUpperBound << std::endl; - bounds.push_back(std::make_pair(currentLowerBound, currentUpperBound)); + bounds.push_back({currentLowerBound, currentUpperBound}); for (auto& bound : bounds) { auto clonedCSEP = csep.cloneWORecursiveSelects(); // Add BETWEEN based on key column range - clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, bound)); + clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, *keyColumn, bound)); unionVec.push_back(clonedCSEP); } @@ -270,87 +280,87 @@ void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon csep.returnedCols(newReturnedColumns); } -execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable_exists( - const size_t numberOfLegs, execplan::CalpontSelectExecutionPlan& csep) -{ - execplan::CalpontSelectExecutionPlan::SelectList unionVec; - unionVec.reserve(numberOfLegs); - std::vector> bounds( - {{0, 3000961}, {3000961, std::numeric_limits::max()}}); - for (auto bound : bounds) - { - auto clonedCSEP = csep.cloneWORecursiveSelects(); - clonedCSEP->filters(nullptr); - // Add BETWEEN based on key column range - clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, bound)); - unionVec.push_back(clonedCSEP); - } +// execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable_exists( +// const size_t numberOfLegs, execplan::CalpontSelectExecutionPlan& csep) +// { +// execplan::CalpontSelectExecutionPlan::SelectList unionVec; +// unionVec.reserve(numberOfLegs); +// std::vector> bounds( +// {{0, 3000961}, {3000961, std::numeric_limits::max()}}); +// for (auto bound : bounds) +// { +// auto clonedCSEP = csep.cloneWORecursiveSelects(); +// clonedCSEP->filters(nullptr); +// // Add BETWEEN based on key column range +// clonedCSEP->filters(filtersWithNewRangeAddedIfNeeded(clonedCSEP, bound)); +// unionVec.push_back(clonedCSEP); +// } - return unionVec; -} +// return unionVec; +// } -// TODO: remove applyParallelCES_exists -void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx) -{ - auto tables = csep.tableList(); - execplan::CalpontSelectExecutionPlan::TableList newTableList; - execplan::CalpontSelectExecutionPlan::SelectList newDerivedTableList; - execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns; +// // TODO: remove applyParallelCES_exists +// void applyParallelCES_exists(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx) +// { +// auto tables = csep.tableList(); +// execplan::CalpontSelectExecutionPlan::TableList newTableList; +// execplan::CalpontSelectExecutionPlan::SelectList newDerivedTableList; +// execplan::CalpontSelectExecutionPlan::ReturnedColumnList newReturnedColumns; - // ATM Must be only 1 table - for (auto& table : tables) - { - if (!table.isColumnstore()) - { - auto derivedSCEP = csep.cloneWORecursiveSelects(); - // need to add a level here - std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" + - std::to_string(ctx.uniqueId); +// // ATM Must be only 1 table +// for (auto& table : tables) +// { +// if (!table.isColumnstore()) +// { +// auto derivedSCEP = csep.cloneWORecursiveSelects(); +// // need to add a level here +// std::string tableAlias = RewrittenSubTableAliasPrefix + table.schema + "_" + table.table + "_" + +// std::to_string(ctx.uniqueId); - derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM); - derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS); - derivedSCEP->derivedTbAlias(tableAlias); +// derivedSCEP->location(execplan::CalpontSelectExecutionPlan::FROM); +// derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS); +// derivedSCEP->derivedTbAlias(tableAlias); - // TODO: hardcoded for now - size_t parallelFactor = 2; - // Create a copy of the current leaf CSEP with additional filters to partition the key column - auto additionalUnionVec = makeUnionFromTable_exists(parallelFactor, csep); - derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(), - additionalUnionVec.end()); +// // TODO: hardcoded for now +// size_t parallelFactor = 2; +// // Create a copy of the current leaf CSEP with additional filters to partition the key column +// auto additionalUnionVec = makeUnionFromTable_exists(parallelFactor, csep); +// derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(), +// additionalUnionVec.end()); - size_t colPosition = 0; - // change parent to derived table columns - for (auto& rc : csep.returnedCols()) - { - auto rcCloned = boost::make_shared(*rc); - // TODO timezone and result type are not copied - // TODO add specific ctor for this functionality - rcCloned->tableName(""); - rcCloned->schemaName(""); - rcCloned->tableAlias(tableAlias); - rcCloned->colPosition(colPosition++); - rcCloned->resultType(rc->resultType()); +// size_t colPosition = 0; +// // change parent to derived table columns +// for (auto& rc : csep.returnedCols()) +// { +// auto rcCloned = boost::make_shared(*rc); +// // TODO timezone and result type are not copied +// // TODO add specific ctor for this functionality +// rcCloned->tableName(""); +// rcCloned->schemaName(""); +// rcCloned->tableAlias(tableAlias); +// rcCloned->colPosition(colPosition++); +// rcCloned->resultType(rc->resultType()); - newReturnedColumns.push_back(rcCloned); - } +// newReturnedColumns.push_back(rcCloned); +// } - newDerivedTableList.push_back(derivedSCEP); - execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, ""); - newTableList.push_back(tn); - // Remove the filters as they were pushed down to union units - // This is inappropriate for EXISTS filter and join conditions - // TODO if needed - derivedSCEP->filters(nullptr); - } - } - // Remove the filters as they were pushed down to union units - // This is inappropriate for EXISTS filter and join conditions - // csep.filters(nullptr); - // There must be no derived at this point. - csep.derivedTableList(newDerivedTableList); - // Replace table list with new table list populated with union units - csep.tableList(newTableList); - csep.returnedCols(newReturnedColumns); -} +// newDerivedTableList.push_back(derivedSCEP); +// execplan::CalpontSystemCatalog::TableAliasName tn = execplan::make_aliasview("", "", tableAlias, ""); +// newTableList.push_back(tn); +// // Remove the filters as they were pushed down to union units +// // This is inappropriate for EXISTS filter and join conditions +// // TODO if needed +// derivedSCEP->filters(nullptr); +// } +// } +// // Remove the filters as they were pushed down to union units +// // This is inappropriate for EXISTS filter and join conditions +// // csep.filters(nullptr); +// // There must be no derived at this point. +// csep.derivedTableList(newDerivedTableList); +// // Replace table list with new table list populated with union units +// csep.tableList(newTableList); +// csep.returnedCols(newReturnedColumns); +// } } // namespace optimizer