From a7d5783b1b51adc99c5acc99783d0e61a8ea7664 Mon Sep 17 00:00:00 2001 From: Serguey Zefirov Date: Fri, 24 Oct 2025 09:46:14 +0000 Subject: [PATCH] Keep progress --- dbcon/ddlpackage/ddl.y | 9 +- dbcon/mysql/ha_mcs_impl.cpp | 218 ++++++++++++++++++++++++++++++++++++ 2 files changed, 226 insertions(+), 1 deletion(-) diff --git a/dbcon/ddlpackage/ddl.y b/dbcon/ddlpackage/ddl.y index 46d31bad8..933f7959a 100644 --- a/dbcon/ddlpackage/ddl.y +++ b/dbcon/ddlpackage/ddl.y @@ -56,6 +56,13 @@ int ddllex(YYSTYPE* ddllval, void* yyscanner); void ddlerror(struct pass_to_bison* x, char const *s); char* copy_string(const char *str); +static const char reserved_prefix[] = "__mcs_reserved_"; +bool has_reserved_prefix(const char* name) { + int i; + for(i=0;name[i] && reserved_prefix[i] && name[i] == reserved_prefix[i]; i++) { } + return !reserved_prefix[i]; // EOL is zero. +} /* has_reserved_prefix */ + void fix_column_length_and_charset(SchemaObject* elem, const CHARSET_INFO* def_cs, myf utf8_flag) { auto* column = dynamic_cast(elem); @@ -738,7 +745,7 @@ ata_add_column: column_name: TIME |DATE - |ident + |ident { if (has_reserved_prefix($$)) { YYERROR("column name starts with reserved prefix"); } } ; constraint_name: diff --git a/dbcon/mysql/ha_mcs_impl.cpp b/dbcon/mysql/ha_mcs_impl.cpp index 73a5a7513..6f97756a5 100644 --- a/dbcon/mysql/ha_mcs_impl.cpp +++ b/dbcon/mysql/ha_mcs_impl.cpp @@ -4633,4 +4633,222 @@ internal_error: return ER_INTERNAL_ERROR; } +extern "C" +{ + /** + * MCS_AUX_COUNT([schema,] table_name [, partition_code]) + * Returns SELECT COUNT(aux_column) FROM table WHERE table.partition = partition_code; + */ + + my_bool mcs_aux_count_init(UDF_INIT* /*initid*/, UDF_ARGS* args, char* message) + { + bool invalid = args->arg_count < 1 || args->arg_count > 3; + for(int i=0; !invalid && i < args->arg_count; i++) + { + invalid = invalid || args->arg_type[i] != STRING_RESULT; + } + if (invalid) + { + strcpy(message, "usage: mcs_aux_count ([schema,] table [, partition_code])"); + return 1; + } + return 0; + } + + void mcs_aux_count_deinit(UDF_INIT* initid) + { + delete[] initid->ptr; + } + +//bool sendExecutionPlanToExeMgr(sm::cpsm_conhdl_t* hndl, ByteStream::quadbyte qb, +// std::shared_ptr caep, +// cal_connection_info* ci, THD* thd) + long long mcs_aux_count(UDF_INIT* initid, UDF_ARGS* args, char* result, unsigned long* length, + char* /*is_null*/, char* /*error*/) + { + std::string schema, table, part; + if (args->arg_count == 1) // simplest case of single table. + { + table = args->args[0]; + } + else if (args->arg_count == 3) // next simplest one - full set of args. + { + schema = args->args[0]; + table = args->args[1]; + part = args->args[2]; + } + else + { + std::string temp = args->args[1]; + bool is_part = true; + for(int i=0;is_part && i= '0' && c <= '9'); + } + if (is_part) + { + table = args->args[0]; + part = args->args[1]; + } + else + { + schema = args->args[0]; + table = args->args[1]; + } + } + THD* thd = current_thd; + + if (get_fe_conn_info_ptr() == NULL) + { + set_fe_conn_info_ptr((void*)new cal_connection_info()); + thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); + } + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + execplan::CalpontSystemCatalog::TableName tableName; + + tableName.schema = schema; + tableName.table = table; + + if (lower_case_table_names) { + boost::algorithm::to_lower(tableName.schema); + boost::algorithm::to_lower(tableName.table); + } + uint32_t sessionID = execplan::CalpontSystemCatalog::idb_tid2sid(thd->thread_id); + boost::shared_ptr csc = + execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); + + csc->identity(execplan::CalpontSystemCatalog::FE); + + auto table_name = execplan::make_table(schema, table, lower_case_table_names); + + // TODO: Check if table is columnstore's own. + + execplan::MCSAnalyzeTableExecutionPlan::ReturnedColumnList returnedColumnList; + execplan::MCSAnalyzeTableExecutionPlan::ColumnMap columnMap; + + const char* timeZone = thd->variables.time_zone->get_name()->ptr(); + long timeZoneOffset; + dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &timeZoneOffset); + + // Create simple column for our AUX column. + { + execplan::SRCP returnedColumn; + const auto objNum = ZZZ; + auto tableColName = csc->colName(objNum); + auto colType = csc->colType(objNum); + + if (!isSupportedToAnalyze(colType)) + continue; + + execplan::SimpleColumn* simpleColumn = new execplan::SimpleColumn(); + simpleColumn->columnName(tableColName.column); + simpleColumn->tableName(tableColName.table, lower_case_table_names); + simpleColumn->schemaName(tableColName.schema, lower_case_table_names); + simpleColumn->oid(objNum); + simpleColumn->alias(tableColName.column); + simpleColumn->resultType(colType); + simpleColumn->timeZone(timeZoneOffset); + + returnedColumn.reset(simpleColumn); + returnedColumnList.push_back(returnedColumn); + columnMap.insert(execplan::MCSAnalyzeTableExecutionPlan::ColumnMap::value_type(simpleColumn->columnName(), + returnedColumn)); + } + + // Create execution plan and initialize it with `returned columns` and `column map`. + std::shared_ptr caep( + new execplan::MCSAnalyzeTableExecutionPlan(returnedColumnList, columnMap)); + + caep->schemaName(table->s->db.str, lower_case_table_names); + caep->tableName(table->s->table_name.str, lower_case_table_names); + caep->timeZone(timeZoneOffset); + + SessionManager sm; + BRM::TxnID txnID; + txnID = sm.getTxnID(sessionID); + + if (!txnID.valid) + { + txnID.id = 0; + txnID.valid = true; + } + + QueryContext verID; + verID = sm.verID(); + + caep->txnID(txnID.id); + caep->verID(verID); + caep->sessionID(sessionID); + + string query; + query.assign(idb_mysql_query_str(thd)); + caep->data(query); + + if (!get_fe_conn_info_ptr()) + { + set_fe_conn_info_ptr(reinterpret_cast(new cal_connection_info(), thd)); + thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr()); + } + + cal_connection_info* ci = reinterpret_cast(get_fe_conn_info_ptr()); + idbassert(ci != 0); + + try + { + caep->priority(ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser)); + } + catch (std::exception& e) + { + string msg = string("Columnstore User Priority - ") + e.what(); + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str()); + } + + if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD) + { + force_close_fep_conn(thd, ci); + return 0; + } + + caep->traceFlags(ci->traceFlags); + + cal_table_info ti; + sm::cpsm_conhdl_t* hndl; + + bool localQuery = (get_local_query(thd) > 0 ? true : false); + caep->localQuery(localQuery); + + // Try to initialize connection. + if (!initializeCalConnectionInfo(ci, thd, csc, sessionID, localQuery)) + goto error; + + hndl = ci->cal_conn_hndl; + + if (caep->traceOn()) + std::cout << caep->toString() << std::endl; + { + ByteStream::quadbyte qb = ANALYZE_TABLE_EXECUTE; + // Serialize and the send the `anlyze table` execution plan. + if (!sendExecutionPlanToExeMgr(hndl, qb, caep, ci, thd)) + goto error; + } + + ci->rmParms.clear(); + ci->tableMap[table] = ti; + + return 0; + +error: + + if (ci->cal_conn_hndl) + { + sm::sm_cleanup(ci->cal_conn_hndl); + ci->cal_conn_hndl = 0; + } + + return -1; + } +} // extern "C" + // vim:sw=4 ts=4: