From c030ff4224e051447a20ba8dcfb0b0af7e9485cb Mon Sep 17 00:00:00 2001 From: drrtuy Date: Tue, 5 Aug 2025 14:33:39 +0000 Subject: [PATCH] feat(rbo,rules,QA): index column type is now derived from the corresponding Field --- dbcon/execplan/simplecolumn.cpp | 2 + dbcon/mysql/ha_mcs_execplan.cpp | 109 ++++++++++++++++++++++--- dbcon/mysql/ha_mcs_impl_if.h | 2 +- dbcon/mysql/rbo_apply_parallel_ces.cpp | 105 +++++++++++++++--------- 4 files changed, 168 insertions(+), 50 deletions(-) diff --git a/dbcon/execplan/simplecolumn.cpp b/dbcon/execplan/simplecolumn.cpp index c37e9fe14..d0c6458a9 100644 --- a/dbcon/execplan/simplecolumn.cpp +++ b/dbcon/execplan/simplecolumn.cpp @@ -207,6 +207,7 @@ SimpleColumn::SimpleColumn(const SimpleColumn& rhs, const uint32_t sessionID) , fTimeZone(rhs.timeZone()) , fisColumnStore(rhs.isColumnStore()) { + fResultType = rhs.resultType(); } SimpleColumn::SimpleColumn(const ReturnedColumn& rhs, const uint32_t sessionID) @@ -250,6 +251,7 @@ SimpleColumn& SimpleColumn::operator=(const SimpleColumn& rhs) fDistinct = rhs.distinct(); fisColumnStore = rhs.isColumnStore(); fPartitions = rhs.fPartitions; + fResultType = rhs.resultType(); } return *this; diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index f508b89a8..66e365d1e 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -2651,6 +2651,39 @@ CalpontSystemCatalog::ColType colType_MysqlToIDB(const Item* item) return ct; } +// Simplified version to support QA-specific RBO re-write rule. +// TBD turn into template to merge with colType_MysqlToIDB for Item +CalpontSystemCatalog::ColType colType_MysqlToIDB(const Field* field) +{ + CalpontSystemCatalog::ColType ct; + ct.precision = 4; + + switch (field->result_type()) + { + case INT_RESULT: + if (field->is_unsigned()) + { + ct.colDataType = CalpontSystemCatalog::UBIGINT; + } + else + { + ct.colDataType = CalpontSystemCatalog::BIGINT; + } + + ct.colWidth = 8; + break; + + case STRING_RESULT: + ct.colDataType = CalpontSystemCatalog::VARCHAR; + + default: + IDEBUG(cerr << "colType_MysqlToIDB:: Unknown result type of MySQL " << item->result_type() << endl); + break; + } + ct.charsetNumber = field->charset()->number; + return ct; +} + bool itemDisablesWrapping(Item* item, gp_walk_info& gwi) { if (gwi.select_lex == nullptr) @@ -4184,6 +4217,62 @@ SimpleColumn* buildSimpleColumn(Item_field* ifp, gp_walk_info& gwi) return sc; } +SimpleColumn* buildSimpleColumnFromFieldForStatistics(Field* field, gp_walk_info& gwi) +{ + if (!gwi.csc) + { + gwi.csc = CalpontSystemCatalog::makeCalpontSystemCatalog(gwi.sessionid); + gwi.csc->identity(CalpontSystemCatalog::FE); + } + + CalpontSystemCatalog::ColType ct; + datatypes::SimpleColumnParam prm(gwi.sessionid, true); + + try + { + // check foreign engine + if (field->table) + prm.columnStore(ha_mcs_common::isMCSTable(field->table)); + + if (prm.columnStore()) + { + ct = gwi.csc->colType(gwi.csc->lookupOID( + make_tcn(field->table->s->db.str, field->table->s->table_name.str, field->field_name.str))); + } + else + { + ct = colType_MysqlToIDB(field); + } + } + catch (std::exception& ex) + { + gwi.fatalParseError = true; + gwi.parseErrorText = ex.what(); + return NULL; + } + + const datatypes::DatabaseQualifiedColumnName name(field->table->s->db.str, field->table->s->table_name.str, + field->field_name.str); + const datatypes::TypeHandler* h = ct.typeHandler(); + SimpleColumn* sc = h->newSimpleColumn(name, ct, prm); + + sc->resultType(ct); + sc->charsetNumber(field->charset()->number); + string tbname(field->table->s->table_name.str); + + // Note: differs with the original buildSimpleColumn + sc->tableAlias(field->table->alias.c_ptr(), lower_case_table_names); + + sc->alias(field->field_name.str); + sc->isColumnStore(prm.columnStore()); + sc->timeZone(gwi.timeZone); + + sc->oid(field->field_index + 1); // ExeMgr requires offset started from 1 + // TODO add partitions support here + + return sc; +} + ParseTree* buildParseTree(Item* item, gp_walk_info& gwi, bool& /*nonSupport*/) { ParseTree* pt = 0; @@ -5215,7 +5304,6 @@ int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& { for (uint j = 0; j < table_ptr->table->s->keys; j++) { - // for (uint i = 0; i < table_ptr->table->s->key_info[j].usable_key_parts; i++) { Field* field = table_ptr->table->key_info[j].key_part[0].field; std::cout << "j index " << j << " i column " << 0 << " fieldnr " @@ -5225,28 +5313,29 @@ int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& auto* histogram = dynamic_cast(field->read_stats->histogram); if (histogram) { - std::cout << " has stats "; - SchemaAndTableName tableName = {field->table->s->db.str, - field->table->s->table_name.str}; - execplan::SimpleColumn simpleColumn = {field->table->s->db.str, - field->table->s->table_name.str, - field->field_name.str}; + std::cout << " has stats with " << histogram->buckets.size() << " buckets"; + SchemaAndTableName tableName = {field->table->s->db.str, field->table->s->table_name.str}; + auto* sc = buildSimpleColumnFromFieldForStatistics(field, gwi); + std::cout << "sc with stats !!!!! " << sc->toString() << std::endl; + // execplan::SimpleColumn simpleColumn = { + // field->table->s->db.str, field->table->s->table_name.str, field->field_name.str, false}; + auto tableStatisticsMapIt = gwi.tableStatisticsMap.find(tableName); if (tableStatisticsMapIt == gwi.tableStatisticsMap.end()) { - gwi.tableStatisticsMap[tableName][field->field_name.str] = {simpleColumn, {*histogram}}; + gwi.tableStatisticsMap[tableName][field->field_name.str] = {*sc, {histogram}}; } else { auto columnStatisticsMapIt = tableStatisticsMapIt->second.find(field->field_name.str); if (columnStatisticsMapIt == tableStatisticsMapIt->second.end()) { - tableStatisticsMapIt->second[field->field_name.str] = {simpleColumn, {*histogram}}; + tableStatisticsMapIt->second[field->field_name.str] = {*sc, {histogram}}; } else { auto columnStatisticsVec = columnStatisticsMapIt->second.second; - columnStatisticsVec.push_back(*histogram); + columnStatisticsVec.push_back(histogram); } } } diff --git a/dbcon/mysql/ha_mcs_impl_if.h b/dbcon/mysql/ha_mcs_impl_if.h index 0548e021a..13d901ce4 100644 --- a/dbcon/mysql/ha_mcs_impl_if.h +++ b/dbcon/mysql/ha_mcs_impl_if.h @@ -116,7 +116,7 @@ typedef std::map> TableOnExprList; typedef std::tr1::unordered_map TableOuterJoinMap; using ColumnName = std::string; -using ColumnStatisticsMap = std::unordered_map>>; +using ColumnStatisticsMap = std::unordered_map>>; using TableStatisticsMap = std::unordered_map; // This structure is used to store MDB AST -> CSEP translation context. diff --git a/dbcon/mysql/rbo_apply_parallel_ces.cpp b/dbcon/mysql/rbo_apply_parallel_ces.cpp index 02454e401..f44931b20 100644 --- a/dbcon/mysql/rbo_apply_parallel_ces.cpp +++ b/dbcon/mysql/rbo_apply_parallel_ces.cpp @@ -64,7 +64,8 @@ bool someAreForeignTables(execplan::CalpontSelectExecutionPlan& csep) [](const auto& table) { return !table.isColumnstore(); }); } -bool someForeignTablesHasStatisticsAndMbIndex(execplan::CalpontSelectExecutionPlan& csep, optimizer::RBOptimizerContext& ctx) +bool someForeignTablesHasStatisticsAndMbIndex(execplan::CalpontSelectExecutionPlan& csep, + optimizer::RBOptimizerContext& ctx) { return std::any_of( csep.tableList().begin(), csep.tableList().end(), @@ -100,12 +101,13 @@ execplan::ParseTree* filtersWithNewRange(execplan::SCSEP& csep, execplan::Simple // There is a question with ownership of the const column // TODO here we lost upper bound value if predicate is not changed to weak lt execplan::SOP ltOp = (isLast) ? boost::make_shared(execplan::PredicateOperator("<=")) - : boost::make_shared(execplan::PredicateOperator("<")); + : boost::make_shared(execplan::PredicateOperator("<")); ltOp->setOpType(filterColLeftOp->resultType(), tableKeyColumnLeftOp->resultType()); ltOp->resultType(ltOp->operationType()); auto* sfr = new execplan::SimpleFilter(ltOp, tableKeyColumnLeftOp, filterColLeftOp); - // TODO new + // TODO new + // TODO remove new and re-use tableKeyColumnLeftOp auto tableKeyColumnRightOp = new execplan::SimpleColumn(column); tableKeyColumnRightOp->resultType(column.resultType()); // TODO hardcoded column type and value @@ -173,63 +175,74 @@ execplan::SimpleColumn* findSuitableKeyColumn(execplan::CalpontSelectExecutionPl } // TBD -Histogram_json_hb& chooseStatisticsToUse(std::vector& columnStatisticsVec) +Histogram_json_hb* chooseStatisticsToUse(std::vector& columnStatisticsVec) { return columnStatisticsVec.front(); } +// Looking for a projected column that comes first in an available index and has EI statistics +// INV nullptr signifies that no suitable column was found +std::optional> chooseKeyColumnAndStatistics( + execplan::CalpontSystemCatalog::TableAliasName& targetTable, optimizer::RBOptimizerContext& ctx) +{ + cal_impl_if::SchemaAndTableName schemaAndTableName = {targetTable.schema, targetTable.table}; + + auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName); + if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end() || + tableColumnsStatisticsIt->second.empty()) + { + return std::nullopt; + } + + // TODO take some column and some stats for it!!! + for (auto& [columnName, scAndStatisticsVec] : tableColumnsStatisticsIt->second) + { + auto& [sc, columnStatisticsVec] = scAndStatisticsVec; + auto* columnStatistics = chooseStatisticsToUse(columnStatisticsVec); + return {{sc, columnStatistics}}; + } + + return std::nullopt; +} + // Populates range bounds based on column statistics // Returns optional with bounds if successful, nullopt otherwise template -std::optional> populateRangeBounds(execplan::SimpleColumn* keyColumn, - optimizer::RBOptimizerContext& ctx) +std::optional> populateRangeBounds(Histogram_json_hb* columnStatistics) { - cal_impl_if::SchemaAndTableName schemaAndTableName = {keyColumn->schemaName(), keyColumn->tableName()}; - auto tableColumnsStatisticsIt = ctx.gwi.tableStatisticsMap.find(schemaAndTableName); - if (tableColumnsStatisticsIt == ctx.gwi.tableStatisticsMap.end()) - { - return std::nullopt; - } - - auto columnStatisticsIt = tableColumnsStatisticsIt->second.find(keyColumn->columnName()); - if (columnStatisticsIt == tableColumnsStatisticsIt->second.end()) - { - return std::nullopt; - } - - auto& [simpleColumn, columnStatisticsVec] = columnStatisticsIt->second; - auto& columnStatistics = chooseStatisticsToUse(columnStatisticsVec); + FilterRangeBounds bounds; // TODO configurable parallel factor via session variable // NB now histogram size is the way to control parallel factor with 16 being the maximum - size_t numberOfUnionUnits = std::min(columnStatistics.get_json_histogram().size(), MaxParallelFactor); - size_t numberOfBucketsPerUnionUnit = columnStatistics.get_json_histogram().size() / numberOfUnionUnits; + std::cout << "populateRangeBounds() columnStatistics->buckets.size() " << columnStatistics->get_json_histogram().size() + << std::endl; + size_t numberOfUnionUnits = std::min(columnStatistics->get_json_histogram().size(), MaxParallelFactor); + size_t numberOfBucketsPerUnionUnit = columnStatistics->get_json_histogram().size() / numberOfUnionUnits; std::cout << "Number of union units: " << numberOfUnionUnits << std::endl; std::cout << "Number of buckets per union unit: " << numberOfBucketsPerUnionUnit << std::endl; - FilterRangeBounds bounds; - // Loop over buckets to produce filter ranges // NB Currently Histogram_json_hb has the last bucket that has end as its start for (size_t i = 0; i < numberOfUnionUnits - 1; ++i) { - auto bucket = columnStatistics.get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit; - auto endBucket = columnStatistics.get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit; + auto bucket = columnStatistics->get_json_histogram().begin() + i * numberOfBucketsPerUnionUnit; + auto endBucket = columnStatistics->get_json_histogram().begin() + (i + 1) * numberOfBucketsPerUnionUnit; T currentLowerBound = *(uint32_t*)bucket->start_value.data(); T currentUpperBound = *(uint32_t*)endBucket->start_value.data(); bounds.push_back({currentLowerBound, currentUpperBound}); } - for (auto& bucket : columnStatistics.get_json_histogram()) + for (auto& bucket : columnStatistics->get_json_histogram()) { T currentLowerBound = *(uint32_t*)bucket.start_value.data(); std::cout << "Bucket: " << currentLowerBound << std::endl; } - // auto penultimateBucket = columnStatistics.get_json_histogram().begin() + numberOfUnionUnits * numberOfBucketsPerUnionUnit; - // T currentLowerBound = *(uint32_t*)penultimateBucket->start_value.data(); - // T currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data(); + // auto penultimateBucket = columnStatistics.get_json_histogram().begin() + numberOfUnionUnits * + // numberOfBucketsPerUnionUnit; T currentLowerBound = *(uint32_t*)penultimateBucket->start_value.data(); T + // currentUpperBound = *(uint32_t*)columnStatistics.get_last_bucket_end_endp().data(); // bounds.push_back({currentLowerBound, currentUpperBound}); + for (auto& bound : bounds) { std::cout << "Bound: " << bound.first << " " << bound.second << std::endl; @@ -247,14 +260,19 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable( // SC type controls an integral type used to produce suitable filters. The continuation of this function // should become a template function based on SC type. - execplan::SimpleColumn* keyColumn = findSuitableKeyColumn(csep, table, ctx); - if (!keyColumn) + auto keyColumnAndStatistics = chooseKeyColumnAndStatistics(table, ctx); + if (!keyColumnAndStatistics) { return unionVec; } + auto& [keyColumn, columnStatistics] = keyColumnAndStatistics.value(); + + std::cout << "makeUnionFromTable keyColumn " << keyColumn.toString() << std::endl; + std::cout << "makeUnionFromTable RC front " << csep.returnedCols().front()->toString() << std::endl; + // TODO char and other numerical types support - auto boundsOpt = populateRangeBounds(keyColumn, ctx); + auto boundsOpt = populateRangeBounds(columnStatistics); if (!boundsOpt.has_value()) { return unionVec; @@ -269,15 +287,24 @@ execplan::CalpontSelectExecutionPlan::SelectList makeUnionFromTable( { auto clonedCSEP = csep.cloneForTableWORecursiveSelects(table); // Add BETWEEN based on key column range - clonedCSEP->filters(filtersWithNewRange(clonedCSEP, *keyColumn, bounds[i], false)); + auto filter = filtersWithNewRange(clonedCSEP, keyColumn, bounds[i], false); + clonedCSEP->filters(filter); + // To create CES filter we need to have a column in the column map + clonedCSEP->columnMap().insert({keyColumn.columnName(), execplan::SRCP(keyColumn.clone())}); unionVec.push_back(clonedCSEP); } } // This last bound produces low <= col <= high - auto clonedCSEP = csep.cloneForTableWORecursiveSelects(table); - clonedCSEP->filters(filtersWithNewRange(clonedCSEP, *keyColumn, bounds.back(), true)); - unionVec.push_back(clonedCSEP); - + // TODO add NULLs into filter of the last step + if (!bounds.empty()) + { + auto clonedCSEP = csep.cloneForTableWORecursiveSelects(table); + auto filter = filtersWithNewRange(clonedCSEP, keyColumn, bounds.back(), true); + clonedCSEP->columnMap().insert({keyColumn.columnName(), execplan::SRCP(keyColumn.clone())}); + clonedCSEP->filters(filter); + unionVec.push_back(clonedCSEP); + } + return unionVec; } bool applyParallelCES(execplan::CalpontSelectExecutionPlan& csep, RBOptimizerContext& ctx)