diff --git a/dbcon/mysql/ha_from_sub.cpp b/dbcon/mysql/ha_from_sub.cpp index 66e4d8648..fd35de219 100644 --- a/dbcon/mysql/ha_from_sub.cpp +++ b/dbcon/mysql/ha_from_sub.cpp @@ -445,7 +445,7 @@ SCSEP FromSubQuery::transform() } // Insert column statistics - fGwip.mergeColumnStatisticsMap(gwi.columnStatisticsMap); + fGwip.mergeTableStatistics(gwi.tableStatisticsMap); fGwip.subselectList.push_back(csep); return csep; diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 642606901..ab4eefe46 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -6314,16 +6314,20 @@ void extractColumnStatistics(Item_field* ifp, gp_walk_info& gwi) assert(ifp->field->table->s); // assert(ifp->field->table->s->db); // assert(ifp->field->table->s->table_name); - // FQCN fqcn({ifp->field->table->s->db.str}, {ifp->field->table->s->table_name.str}, {ifp->field->field_name.str}); - //TODO use FQCN as a key type + // FQCN fqcn({ifp->field->table->s->db.str}, {ifp->field->table->s->table_name.str}, + // {ifp->field->field_name.str}); + // TODO use FQCN as a key type std::cout << "Adding column statistics for " << ifp->field->field_name.str << std::endl; auto* histogram = dynamic_cast(ifp->field->read_stats->histogram); if (histogram) { std::cout << "Type of histogram object: " << typeid(*histogram).name() << std::endl; - std::vector histogramBuckets = histogram->get_json_histogram(); - std::cout << "gwi.columnStatisticsMap[ifp->field->field_name.str].size() " << histogramBuckets.size() << std::endl; - gwi.columnStatisticsMap[ifp->field->field_name.str] = histogramBuckets; + // std::vector histogramBuckets = histogram->get_json_histogram(); + // std::cout << "gwi.tableStatisticsMap[{ifp->field->table->s->db.str, " + // "ifp->field->table->s->table_name.str}][ifp->field->field_name.str].size() " + // << histogramBuckets.size() << std::endl; + SchemaAndTableName tableName = {ifp->field->table->s->db.str, ifp->field->table->s->table_name.str}; + gwi.tableStatisticsMap[tableName][ifp->field->field_name.str] = *histogram; } } } @@ -6421,7 +6425,7 @@ int processSelect(SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep, vector { Item_field* ifp = (Item_field*)item; extractColumnStatistics(ifp, gwi); - std::cout << "gwi.columnStatisticsMap 1 size " << gwi.columnStatisticsMap.size() << std::endl; + // Handle * case if (ifp->field_name.length && string(ifp->field_name.str) == "*") { collectAllCols(gwi, ifp); @@ -7473,7 +7477,6 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP& int status = getSelectPlan(gwi, select_lex, csep, false, true, isSelectLexUnit); - std::cout << "cs_get_select_plan columnStatisticsMap size " << gwi.columnStatisticsMap.size() << std::endl; if (status > 0) return ER_INTERNAL_ERROR; else if (status < 0) diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index d1fae3006..c385be180 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -136,10 +136,36 @@ namespace cal_impl_if { extern bool nonConstFunc(Item_func* ifp); -void gp_walk_info::mergeColumnStatisticsMap(const ColumnStatisticsMap& aColumnStatisticsMap) +void gp_walk_info::mergeTableStatistics(const TableStatisticsMap& aTableStatisticsMap) { - columnStatisticsMap.insert(aColumnStatisticsMap.begin(), aColumnStatisticsMap.end()); + for (auto& [schemaAndTableName, aColumnStatisticsMap]: aTableStatisticsMap) + { + auto tableStatisticsMapIt = tableStatisticsMap.find(schemaAndTableName); + if (tableStatisticsMapIt == tableStatisticsMap.end()) + { + tableStatisticsMap[schemaAndTableName] = aColumnStatisticsMap; + } + else + { + for (auto& [columnName, histogram]: aColumnStatisticsMap) + { + tableStatisticsMapIt->second[columnName] = histogram; + } + } + } } + +std::optional gp_walk_info::findStatisticsForATable(SchemaAndTableName& schemaAndTableName) +{ + auto tableStatisticsMapIt = tableStatisticsMap.find(schemaAndTableName); + if (tableStatisticsMapIt == tableStatisticsMap.end()) + { + return std::nullopt; + } + + return {tableStatisticsMapIt->second}; +} + } namespace diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 95335b7eb..df53800c9 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -96,13 +96,29 @@ enum ClauseType ORDER_BY }; +struct SchemaAndTableName { + std::string schema; + std::string table; + bool operator==(const SchemaAndTableName& other) const { + return schema == other.schema && table == other.table; + } +}; + +struct SchemaAndTableNameHash { + std::size_t operator()(const SchemaAndTableName& k) const { + return std::hash()(k.schema + k.table); + } +}; + typedef std::vector JoinInfoVec; typedef dmlpackage::ColValuesList ColValuesList; typedef dmlpackage::TableValuesMap TableValuesMap; typedef std::map> TableMap; typedef std::tr1::unordered_map> TableOnExprList; typedef std::tr1::unordered_map TableOuterJoinMap; -using ColumnStatisticsMap = std::unordered_map>; +using ColumnName = std::string; +using ColumnStatisticsMap = std::unordered_map; +using TableStatisticsMap = std::unordered_map; struct gp_walk_info { @@ -112,7 +128,7 @@ struct gp_walk_info execplan::CalpontSelectExecutionPlan::ReturnedColumnList orderByCols; std::vector extSelAggColsItems; execplan::CalpontSelectExecutionPlan::ColumnMap columnMap; - std::unordered_map> columnStatisticsMap; + TableStatisticsMap tableStatisticsMap; // This vector temporarily hold the projection columns to be added // to the returnedCols vector for subquery processing. It will be appended // to the end of returnedCols when the processing is finished. @@ -203,7 +219,8 @@ struct gp_walk_info SubQuery** subQueriesChain; gp_walk_info(long timeZone_, SubQuery** subQueriesChain_) - : sessionid(0) + : tableStatisticsMap({}) + , sessionid(0) , fatalParseError(false) , condPush(false) , dropCond(false) @@ -234,7 +251,8 @@ struct gp_walk_info } ~gp_walk_info(); - void mergeColumnStatisticsMap(const std::unordered_map>& columnStatisticsMap); + void mergeTableStatistics(const TableStatisticsMap& tableStatisticsMap); + std::optional findStatisticsForATable(SchemaAndTableName& schemaAndTableName); }; struct SubQueryChainHolder; diff --git a/dbcon/mysql/ha_select_sub.cpp b/dbcon/mysql/ha_select_sub.cpp index b2f39aa27..caddce6af 100644 --- a/dbcon/mysql/ha_select_sub.cpp +++ b/dbcon/mysql/ha_select_sub.cpp @@ -97,7 +97,7 @@ SCSEP SelectSubQuery::transform() } // Insert column statistics - fGwip.mergeColumnStatisticsMap(gwi.columnStatisticsMap); + fGwip.mergeTableStatistics(gwi.tableStatisticsMap); // std::cout << "fGwip.columnStatisticsMap 2 size " << fGwip.columnStatisticsMap.size() << std::endl; // std::cout << "gwi.columnStatisticsMap 2 size " << gwi.columnStatisticsMap.size() << std::endl; diff --git a/dbcon/mysql/rulebased_optimizer.cpp b/dbcon/mysql/rulebased_optimizer.cpp index 53f280e68..f14635bbc 100644 --- a/dbcon/mysql/rulebased_optimizer.cpp +++ b/dbcon/mysql/rulebased_optimizer.cpp @@ -191,9 +191,24 @@ execplan::ParseTree* filtersWithNewRangeAddedIfNeeded(execplan::SCSEP& csep, return ptp; } -execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep) +// Looking for a projected column that comes first in an available index and has EI statistics +// INV nullptr signifies that no suitable column was found +execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx) { - return dynamic_cast(csep.returnedCols().front().get()); + for (auto& rc : csep.returnedCols()) + { + auto* simpleColumn = dynamic_cast(rc.get()); + if (simpleColumn) + { + std::cout << "Found simple column " << simpleColumn->columnName() << std::endl; + cal_impl_if::SchemaAndTableName schemaAndTableNam = {simpleColumn->tableName(), simpleColumn->columnName()}; + auto columnStatistics = ctx.gwi.findStatisticsForATable(schemaAndTableNam); + + return simpleColumn; + } + } + + return nullptr; } execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable( @@ -201,49 +216,65 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable( { execplan::CalpontSelectExecutionPlan::SelectList unionVec; // unionVec.reserve(numberOfLegs); - execplan::SimpleColumn* keyColumn = findSuitableKeyColumn(csep); - std::cout << "looking for " << keyColumn->columnName() << " in ctx.gwi.columnStatisticsMap " - << " with size " << ctx.gwi.columnStatisticsMap.size() << std::endl; - for (auto& [k, v] : ctx.gwi.columnStatisticsMap) - { - std::cout << "key " << k << " vector size " << v.size() << std::endl; - } - if (!keyColumn || - ctx.gwi.columnStatisticsMap.find(keyColumn->columnName()) == ctx.gwi.columnStatisticsMap.end()) + execplan::SimpleColumn* keyColumn = findSuitableKeyColumn(csep, ctx); + if (!keyColumn) { return unionVec; } - auto columnStatistics = ctx.gwi.columnStatisticsMap[keyColumn->columnName()]; - std::cout << "columnStatistics.size() " << columnStatistics.size() << std::endl; + std::cout << "looking for " << keyColumn->columnName() << " in ctx.gwi.tableStatisticsMap " + << " with size " << ctx.gwi.tableStatisticsMap.size() << std::endl; + for (auto& [k, v] : ctx.gwi.tableStatisticsMap) + { + std::cout << "SchemaAndTableName " << k.schema << "." << k.table << " column map size " << v.size() << std::endl; + } + + + + cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()}; + auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName); + if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end()) + { + return unionVec; + } + + auto columnStatisticsIt = tableColumnsStatisticsIt->second.find(keyColumn->columnName()); + if (columnStatisticsIt == tableColumnsStatisticsIt->second.end()) + { + return unionVec; + } + + auto columnStatistics = columnStatisticsIt->second; + std::cout << "Histogram_json_hb histogram size " << columnStatistics.get_json_histogram().size() << std::endl; // TODO char and other numerical types support - size_t numberOfUnionUnits = 2; - size_t numberOfBucketsPerUnionUnit = columnStatistics.size() / numberOfUnionUnits; + size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), 16UL); + size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits; std::vector> bounds; // TODO need to process tail if number of buckets is not divisible by number of union units // TODO non-overlapping buckets if it is a problem at all - for (size_t i = 0; i < numberOfUnionUnits; ++i) + for (size_t i = 0; i < numberOfUnionUnits - 1; ++i) { - auto bucket = columnStatistics.begin() + i * numberOfBucketsPerUnionUnit; - auto endBucket = columnStatistics.begin() + (i + 1) * numberOfBucketsPerUnionUnit; - // TODO find a median b/w the current bucket start and the previous bucket end - uint64_t currentLowerBound = - (bounds.empty() ? *(uint32_t*)bucket->start_value.data() - : std::min((uint64_t)*(uint32_t*)bucket->start_value.data(), bounds.back().second)); - uint64_t currentUpperBound = currentLowerBound; - for (; bucket != endBucket; ++bucket) - { - uint64_t bucketLowerBound = *(uint32_t*)bucket->start_value.data(); - std::cout << "bucket.start_value " << bucketLowerBound << std::endl; - currentUpperBound = bucketLowerBound + bucket->ndv; - } + auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit; + auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit; + uint64_t currentLowerBound = *(uint32_t*)bucket->start_value.data(); + uint64_t currentUpperBound = *(uint32_t*)endBucket->start_value.data(); + std::cout << "currentLowerBound " << currentLowerBound << " currentUpperBound " << currentUpperBound << std::endl; bounds.push_back(std::make_pair(currentLowerBound, currentUpperBound)); } + // Add last range + auto lastBucket = columnStatistics.get_json_histogram().begin() + (numberOfUnionUnits - 1) * numberOfBucketsPerUnionUnit; + uint64_t currentLowerBound = *(uint32_t*)lastBucket->start_value.data(); + uint64_t currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data(); + + std::cout << "last currentLowerBound " << currentLowerBound << " last currentUpperBound " << currentUpperBound + << std::endl; + bounds.push_back(std::make_pair(currentLowerBound, currentUpperBound)); + for (auto& bound : bounds) { auto clonedCSEP = csep.cloneWORecursiveSelects(); @@ -275,8 +306,6 @@ void applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerCon derivedSCEP->subType(execplan::CalpontSelectExecutionPlan::FROM_SUBS); derivedSCEP->derivedTbAlias(tableAlias); - // TODO: hardcoded for now - // size_t parallelFactor = 2; // Create a copy of the current leaf CSEP with additional filters to partition the key column auto additionalUnionVec = makeUnionFromTable(csep, ctx); derivedSCEP->unionVec().insert(derivedSCEP->unionVec().end(), additionalUnionVec.begin(),