1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

chore(connector): remove unused and disabled group by handler (#3481)

This commit is contained in:
drrtuy
2025-04-04 21:27:07 +01:00
committed by GitHub
parent 72480e512c
commit 6b8adb822b
10 changed files with 2 additions and 2763 deletions

View File

@@ -4005,703 +4005,6 @@ int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos)
return ER_INTERNAL_ERROR;
}
/*@brief ha_mcs_impl_group_by_init - Get data for MariaDB group_by
pushdown handler */
/***********************************************************
* DESCRIPTION:
* Prepares data for group_by_handler::next_row() calls.
* PARAMETERS:
* group_hand - group by handler, that preserves initial table and items lists. .
* table - TABLE pointer The table to save the result set into.
* RETURN:
* 0 if success
* others if something went wrong whilst getting the result set
***********************************************************/
int ha_mcs_impl_group_by_init(mcs_handler_info* handler_info, TABLE* table)
{
ha_mcs_group_by_handler* group_hand = reinterpret_cast<ha_mcs_group_by_handler*>(handler_info->hndl_ptr);
string tableName = group_hand->table_list->table->s->table_name.str;
IDEBUG(cout << "group_by_init for table " << tableName << endl);
THD* thd = current_thd;
// check whether the system is ready to process statement.
static DBRM dbrm(true);
int bSystemQueryReady = dbrm.getSystemQueryReady();
if (bSystemQueryReady == 0)
{
// Still not ready
setError(thd, ER_INTERNAL_ERROR, "The system is not yet ready to accept queries");
return ER_INTERNAL_ERROR;
}
else if (bSystemQueryReady < 0)
{
// Still not ready
setError(thd, ER_INTERNAL_ERROR, "DBRM is not responding. Cannot accept queries");
return ER_INTERNAL_ERROR;
}
uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(CalpontSystemCatalog::FE);
if (get_fe_conn_info_ptr() == nullptr) {
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
idbassert(ci != 0);
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
force_close_fep_conn(thd, ci);
return 0;
}
sm::tableid_t tableid = 0;
cal_table_info ti;
cal_group_info gi;
sm::cpsm_conhdl_t* hndl;
SCSEP csep;
bool localQuery = get_local_query(thd);
{
ci->stats.reset(); // reset query stats
ci->stats.setStartTime();
if (thd->main_security_ctx.user)
{
ci->stats.fUser = thd->main_security_ctx.user;
}
else
{
ci->stats.fUser = "";
}
if (thd->main_security_ctx.host)
ci->stats.fHost = thd->main_security_ctx.host;
else if (thd->main_security_ctx.host_or_ip)
ci->stats.fHost = thd->main_security_ctx.host_or_ip;
else
ci->stats.fHost = "unknown";
try
{
ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
}
catch (std::exception& e)
{
string msg = string("Columnstore User Priority - ") + e.what();
ci->warningMsg = msg;
}
// If the previous query has error and
// this is not a subquery run by the server(MCOL-1601)
// re-establish the connection
if (ci->queryState != 0)
{
if (ci->cal_conn_hndl_st.size() == 0)
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
sm::sm_init(sessionID, &ci->cal_conn_hndl, localQuery);
idbassert(ci->cal_conn_hndl != 0);
ci->cal_conn_hndl->csc = csc;
idbassert(ci->cal_conn_hndl->exeMgr != 0);
try
{
ci->cal_conn_hndl->connect();
}
catch (...)
{
setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto error;
}
hndl = ci->cal_conn_hndl;
ci->cal_conn_hndl_st.push(ci->cal_conn_hndl);
if (!csep)
csep.reset(new CalpontSelectExecutionPlan());
SessionManager sm;
BRM::TxnID txnID;
txnID = sm.getTxnID(sessionID);
if (!txnID.valid)
{
txnID.id = 0;
txnID.valid = true;
}
QueryContext verID;
verID = sm.verID();
csep->txnID(txnID.id);
csep->verID(verID);
csep->sessionID(sessionID);
if (group_hand->table_list->db.length)
csep->schemaName(group_hand->table_list->db.str, lower_case_table_names);
csep->traceFlags(ci->traceFlags);
// MCOL-1052 Send Items lists down to the optimizer.
gi.groupByTables = group_hand->table_list;
gi.groupByFields = group_hand->select;
gi.groupByWhere = group_hand->where;
gi.groupByGroup = group_hand->group_by;
gi.groupByOrder = group_hand->order_by;
gi.groupByHaving = group_hand->having;
gi.groupByDistinct = group_hand->distinct;
// MCOL-1052 Send pushed conditions here, since server could omit GROUP BY
// items in case of = or IN functions used on GROUP BY columns.
{
CalTableMap::iterator mapiter;
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator colMapIter;
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator condColMapIter;
execplan::ParseTree* ptIt;
for (TABLE_LIST* tl = gi.groupByTables; tl; tl = tl->next_local)
{
mapiter = ci->tableMap.find(tl->table);
if (mapiter != ci->tableMap.end() && mapiter->second.condInfo != NULL &&
mapiter->second.condInfo->gwi.condPush)
{
while (!mapiter->second.condInfo->gwi.ptWorkStack.empty())
{
ptIt = mapiter->second.condInfo->gwi.ptWorkStack.top();
mapiter->second.condInfo->gwi.ptWorkStack.pop();
gi.pushedPts.push_back(ptIt);
}
}
}
}
// send plan whenever group_init is called
int status = cp_get_group_plan(thd, csep, gi);
// Never proceed if status != 0 to avoid empty DA
// crashes on later stages
if (status != 0)
goto internal_error;
// @bug 2547. don't need to send the plan if it's impossible where for all unions.
// MCOL-2178 commenting the below out since cp_get_group_plan does not modify this variable
// which has a default value of false
// if (MIGR::infinidb_vtable.impossibleWhereOnUnion)
// return 0;
string query;
// Set the query text only once if the server executes
// subqueries separately.
if (ci->queryState)
query.assign("<subquery of the previous>");
else
query.assign(thd->query_string.str(), thd->query_string.length());
csep->data(query);
try
{
csep->priority(ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser));
}
catch (std::exception& e)
{
string msg = string("Columnstore User Priority - ") + e.what();
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
}
#ifdef PLAN_HEX_FILE
// plan serialization
string tmpDir = aTmpDir + "/li1-plan.hex";
ifstream ifs(tmpDir);
ByteStream bs1;
ifs >> bs1;
ifs.close();
csep->unserialize(bs1);
#endif
if (ci->traceFlags & 1)
{
cerr << "---------------- EXECUTION PLAN ----------------" << endl;
cerr << *csep << endl;
cerr << "-------------- EXECUTION PLAN END --------------\n" << endl;
}
else
{
IDEBUG(cout << "---------------- EXECUTION PLAN ----------------" << endl);
IDEBUG(cerr << *csep << endl);
IDEBUG(cout << "-------------- EXECUTION PLAN END --------------\n" << endl);
}
} // end of execution plan generation
{
ByteStream msg;
ByteStream emsgBs;
int ntries = 10;
// XXX: MCOL-5396: unable to reach this code.
while (true)
{
string emsg;
if (ntries < 0)
{
emsg = "Lost connection to ExeMgr. Please contact your administrator";
setError(thd, ER_INTERNAL_ERROR, emsg);
return ER_INTERNAL_ERROR;
}
try
{
ByteStream::quadbyte qb = 4;
msg << qb;
hndl->exeMgr->write(msg);
msg.restart();
csep->rmParms(ci->rmParms);
// send plan
csep->serialize(msg);
hndl->exeMgr->write(msg);
// get ExeMgr status back to indicate a vtable joblist success or not
msg.restart();
emsgBs.restart();
msg = hndl->exeMgr->read();
emsgBs = hndl->exeMgr->read();
if (msg.length() == 0 || emsgBs.length() == 0)
{
emsg = "Lost connection to ExeMgr. Please contact your administrator";
setError(thd, ER_INTERNAL_ERROR, emsg);
return ER_INTERNAL_ERROR;
}
string emsgStr;
emsgBs >> emsgStr;
bool err = false;
if (msg.length() == 4)
{
msg >> qb;
if (qb != 0)
{
err = true;
// for makejoblist error, stats contains only error code and insert from here
// because table fetch is not started
ci->stats.setEndTime();
ci->stats.fQuery = csep->data();
ci->stats.fQueryType = csep->queryType();
ci->stats.fErrorNo = qb;
try
{
ci->stats.insert();
}
catch (std::exception& e)
{
string msg = string("Columnstore Query Stats - ") + e.what();
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
}
}
}
else
{
err = true;
}
if (err)
{
setError(thd, ER_INTERNAL_ERROR, emsgStr);
return ER_INTERNAL_ERROR;
}
ci->rmParms.clear();
ci->queryState = 1;
break;
}
catch (...)
{
sm::sm_cleanup(hndl);
hndl = 0;
sm::sm_init(sessionID, &hndl, localQuery);
idbassert(hndl != 0);
hndl->csc = csc;
ci->cal_conn_hndl = hndl;
ci->cal_conn_hndl_st.pop();
ci->cal_conn_hndl_st.push(ci->cal_conn_hndl);
using namespace std::chrono_literals;
std::this_thread::sleep_for(100ms);
ntries --;
try
{
hndl->connect();
}
catch (...)
{
setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto error;
}
msg.restart();
}
}
}
// set query state to be in_process. Sometimes mysql calls rnd_init multiple
// times, this makes sure plan only being generated and sent once. It will be
// reset when query finishes in sm::end_query
// common path for both vtable select phase and table mode -- open scan handle
ti = ci->tableMap[table];
ti.msTablePtr = table;
{
// MCOL-1601 Using stacks of ExeMgr conn hndls, table and scan contexts.
ti.tpl_ctx.reset(new sm::cpsm_tplh_t());
ti.tpl_ctx_st.push(ti.tpl_ctx);
ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
ti.tpl_scan_ctx_st.push(ti.tpl_scan_ctx);
// make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
// call rnd_init for a table more than once.
ti.tpl_scan_ctx->rowGroup = nullptr;
try
{
tableid = execplan::IDB_VTABLE_ID;
}
catch (...)
{
string emsg = "No table ID found for table " + string(table->s->table_name.str);
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
try
{
sm::tpl_open(tableid, ti.tpl_ctx, hndl);
sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
}
catch (std::exception& e)
{
string emsg = "table can not be opened: " + string(e.what());
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
catch (...)
{
string emsg = "table can not be opened";
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
if ((ti.tpl_scan_ctx->ctp).size() == 0)
{
uint32_t num_attr = table->s->fields;
for (uint32_t i = 0; i < num_attr; i++)
{
CalpontSystemCatalog::ColType ctype;
ti.tpl_scan_ctx->ctp.push_back(ctype);
}
}
}
ci->tableMap[table] = ti;
return 0;
error:
if (ci->cal_conn_hndl)
{
// end_query() should be called here.
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
// do we need to close all connection handle of the table map?
return ER_INTERNAL_ERROR;
internal_error:
if (ci->cal_conn_hndl)
{
// end_query() should be called here.
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return ER_INTERNAL_ERROR;
}
/*@brief ha_mcs_impl_group_by_next - Return result set for MariaDB group_by
pushdown handler
*/
/***********************************************************
* DESCRIPTION:
* Return a result record for each group_by_handler::next_row() call.
* PARAMETERS:
* group_hand - group by handler, that preserves initial table and items lists. .
* table - TABLE pointer The table to save the result set in.
* RETURN:
* 0 if success
* HA_ERR_END_OF_FILE if the record set has come to an end
* others if something went wrong whilst getting the result set
***********************************************************/
int ha_mcs_impl_group_by_next(TABLE* table, long timeZone)
{
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
return HA_ERR_END_OF_FILE;
if (isMCSTableUpdate(thd) || isMCSTableDelete(thd))
return HA_ERR_END_OF_FILE;
if (get_fe_conn_info_ptr() == nullptr) {
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
force_close_fep_conn(thd, ci);
return 0;
}
if (ci->alterTableState > 0)
return HA_ERR_END_OF_FILE;
cal_table_info ti;
ti = ci->tableMap[table];
int rc = HA_ERR_END_OF_FILE;
if (!ti.tpl_ctx || !ti.tpl_scan_ctx)
{
CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
return ER_INTERNAL_ERROR;
}
idbassert(ti.msTablePtr == table);
try
{
// fetchNextRow interface forces to use buf.
unsigned char buf;
rc = fetchNextRow(&buf, ti, ci, timeZone, true);
}
catch (std::exception& e)
{
string emsg = string("Error while fetching from ExeMgr: ") + e.what();
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
return ER_INTERNAL_ERROR;
}
ci->tableMap[table] = ti;
if (rc != 0 && rc != HA_ERR_END_OF_FILE)
{
string emsg;
// remove this check when all error handling migrated to the new framework.
if (rc >= 1000)
emsg = ti.tpl_scan_ctx->errMsg;
else
{
logging::ErrorCodes errorcodes;
emsg = errorcodes.errorString(rc);
}
setError(thd, ER_INTERNAL_ERROR, emsg);
ci->stats.fErrorNo = rc;
CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
rc = ER_INTERNAL_ERROR;
}
return rc;
}
int ha_mcs_impl_group_by_end(TABLE* table)
{
int rc = 0;
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
return 0;
cal_connection_info* ci = nullptr;
if (get_fe_conn_info_ptr() != NULL)
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (!ci)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
thd_set_ha_data(thd, mcs_hton, ci);
}
if (((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT))
{
force_close_fep_conn(thd, ci, true); // with checking prev command rc
return rc;
}
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
force_close_fep_conn(thd, ci);
// clear querystats because no query stats available for cancelled query
ci->queryStats = "";
// Poping next ExeMgr connection out of the stack
if (ci->cal_conn_hndl_st.size())
{
ci->cal_conn_hndl_st.pop();
if (ci->cal_conn_hndl_st.size())
ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
}
return 0;
}
IDEBUG(cerr << "group_by_end for table " << table->s->table_name.str << endl);
cal_table_info ti = ci->tableMap[table];
sm::cpsm_conhdl_t* hndl;
bool clearScanCtx = false;
hndl = ci->cal_conn_hndl;
if (ti.tpl_ctx)
{
if (ti.tpl_scan_ctx.get())
{
clearScanCtx = ((ti.tpl_scan_ctx.get()->rowsreturned) &&
ti.tpl_scan_ctx.get()->rowsreturned == ti.tpl_scan_ctx.get()->getRowCount());
try
{
sm::tpl_scan_close(ti.tpl_scan_ctx);
}
catch (...)
{
rc = ER_INTERNAL_ERROR;
}
}
ti.tpl_scan_ctx.reset();
if (ti.tpl_scan_ctx_st.size())
{
ti.tpl_scan_ctx_st.pop();
if (ti.tpl_scan_ctx_st.size())
ti.tpl_scan_ctx = ti.tpl_scan_ctx_st.top();
}
try
{
if (hndl)
{
{
bool ask_4_stats = (ci->traceFlags) ? true : false;
sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats, ask_4_stats, clearScanCtx);
ti.tpl_ctx = 0;
}
// Normaly stats variables are set in external_lock method but we set it here
// since they we pretend we are in vtable_disabled mode and the stats vars won't be set.
// We sum the stats up here since server could run a number of
// queries e.g. each for a subquery in a filter.
if (hndl)
{
if (hndl->queryStats.length())
ci->queryStats += hndl->queryStats;
if (hndl->extendedStats.length())
ci->extendedStats += hndl->extendedStats;
if (hndl->miniStats.length())
ci->miniStats += hndl->miniStats;
}
}
else
{
ti.tpl_ctx.reset();
}
ci->cal_conn_hndl = hndl;
}
catch (IDBExcept& e)
{
if (e.errorCode() == ERR_CROSS_ENGINE_CONNECT || e.errorCode() == ERR_CROSS_ENGINE_CONFIG)
{
string msg = string("Columnstore Query Stats - ") + e.what();
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
}
else
{
setError(thd, ER_INTERNAL_ERROR, e.what());
rc = ER_INTERNAL_ERROR;
}
}
catch (std::exception& e)
{
setError(thd, ER_INTERNAL_ERROR, e.what());
rc = ER_INTERNAL_ERROR;
}
catch (...)
{
setError(thd, ER_INTERNAL_ERROR, "Internal error throwed in group_by_end");
rc = ER_INTERNAL_ERROR;
}
}
ti.tpl_ctx = 0;
if (ti.tpl_ctx_st.size())
{
ti.tpl_ctx_st.pop();
if (ti.tpl_ctx_st.size())
ti.tpl_ctx = ti.tpl_ctx_st.top();
}
if (ci->cal_conn_hndl_st.size())
{
ci->cal_conn_hndl_st.pop();
if (ci->cal_conn_hndl_st.size())
ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
}
ci->tableMap[table] = ti;
// push warnings from CREATE phase
if (!ci->warningMsg.empty())
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, ci->warningMsg.c_str());
ci->warningMsg.clear();
// reset expressionId just in case
ci->expressionId = 0;
return rc;
}
/*@brief Initiate the query for derived_handler */
/***********************************************************
* DESCRIPTION: