1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

MCOL-2178 SH now allows to fallback to other pushdown handlers.

SH query execution migrated from SH::init() into create_SH().
There is a session variable columnstore_processing_handlers_fallback
that allows to fallback to DH, GBH if SH fails. DH now uses semantic
tree check for unsupported features to allow to fallback to GBH or
storage API.

Fixes GBH related bug when create_GBH() returns a handler for
queries with impossible WHERE/HAVING.

Fixed bug in FromSubquery::transform() where isUnion is set to true.

Enabled RTTI b/c server team enabled it for MDB.

Removed unused code supposed to be used with vtable.
This commit is contained in:
Roman Nozdrin
2019-08-25 04:05:59 +03:00
parent 4ef1133d9f
commit 2c63258537
10 changed files with 469 additions and 167 deletions

View File

@ -2940,7 +2940,6 @@ int ha_calpont_impl_delete_table(const char* name)
int ha_calpont_impl_write_row(const uchar* buf, TABLE* table)
{
THD* thd = current_thd;
//sleep(100);
// Error out INSERT on VIEW. It's currently not supported.
// @note INSERT on VIEW works natually (for simple cases at least), but we choose to turn it off
// for now - ZZ.
@ -4846,6 +4845,7 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
gp_walk_info gwi;
gwi.thd = thd;
bool err = false;
//check whether the system is ready to process statement.
#ifndef _MSC_VER
@ -5024,12 +5024,6 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
if (status != 0)
goto internal_error;
// @bug 2547. don't need to send the plan if it's impossible where for all unions.
if (gwi.cs_vtable_impossible_where_on_union)
{
return 0;
}
string query;
query.assign(idb_mysql_query_str(thd));
csep->data(query);
@ -5103,7 +5097,6 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
string emsgStr;
emsgBs >> emsgStr;
bool err = false;
if (msg.length() == 4)
{
@ -5137,6 +5130,7 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
if (err)
{
// CS resets error in create_SH() if fallback is enabled
setError(thd, ER_INTERNAL_ERROR, emsgStr);
return ER_INTERNAL_ERROR;
}
@ -5190,6 +5184,9 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
ti.msTablePtr = dh->table;
}
// For SH CS creates SM environment inside select_next().
// This allows us to try and fail with SH.
if (!sh)
{
if (ti.tpl_ctx == 0)
{
@ -5245,9 +5242,9 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
ti.tpl_scan_ctx->ctp.push_back(ctype);
}
}
ci->tableMap[table] = ti;
}
ci->tableMap[table] = ti;
return 0;
error:
@ -5271,4 +5268,158 @@ internal_error:
return ER_INTERNAL_ERROR;
}
int ha_cs_impl_select_next(uchar* buf, TABLE* table)
{
int rc = HA_ERR_END_OF_FILE;
THD* thd = current_thd;
if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->slave_thread && !ci->replicationEnabled && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) ||
((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
return rc;
// @bug 2547
// MCOL-2178 This variable can never be true in the scope of this function
// if (MIGR::infinidb_vtable.impossibleWhereOnUnion)
// return HA_ERR_END_OF_FILE;
// @bug 3078
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
force_close_fep_conn(thd, ci);
return 0;
}
if (ci->alterTableState > 0) return rc;
cal_table_info ti;
ti= ci->tableMap[table];
// This is the server's temp table for the result.
ti.msTablePtr= table;
sm::tableid_t tableid= execplan::IDB_VTABLE_ID;
sm::cpsm_conhdl_t* hndl= ci->cal_conn_hndl;
if (!ti.tpl_ctx || !ti.tpl_scan_ctx || (hndl && hndl->queryState == sm::NO_QUERY))
{
if (ti.tpl_ctx == 0)
{
ti.tpl_ctx = new sm::cpsm_tplh_t();
ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
}
// make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
// call rnd_init for a table more than once.
ti.tpl_scan_ctx->rowGroup = NULL;
try
{
sm::tpl_open(tableid, ti.tpl_ctx, hndl);
sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
}
catch (std::exception& e)
{
uint32_t sessionID = tid2sid(thd->thread_id);
string emsg = "table can not be opened: " + string(e.what());
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
catch (...)
{
uint32_t sessionID = tid2sid(thd->thread_id);
string emsg = "table can not be opened";
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
if ((ti.tpl_scan_ctx->ctp).size() == 0)
{
uint32_t num_attr = table->s->fields;
for (uint32_t i = 0; i < num_attr; i++)
{
CalpontSystemCatalog::ColType ctype;
ti.tpl_scan_ctx->ctp.push_back(ctype);
}
}
ci->tableMap[table] = ti;
}
if (!ti.tpl_ctx || !ti.tpl_scan_ctx)
{
uint32_t sessionID = tid2sid(thd->thread_id);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
return ER_INTERNAL_ERROR;
}
idbassert(ti.msTablePtr == table);
try
{
rc = fetchNextRow(buf, ti, ci);
}
catch (std::exception& e)
{
uint32_t sessionID = tid2sid(thd->thread_id);
string emsg = string("Error while fetching from ExeMgr: ") + e.what();
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
return ER_INTERNAL_ERROR;
}
ci->tableMap[table]= ti;
if (rc != 0 && rc != HA_ERR_END_OF_FILE)
{
string emsg;
// remove this check when all error handling migrated to the new framework.
if (rc >= 1000)
emsg = ti.tpl_scan_ctx->errMsg;
else
{
logging::ErrorCodes errorcodes;
emsg = errorcodes.errorString(rc);
}
uint32_t sessionID = tid2sid(thd->thread_id);
setError(thd, ER_INTERNAL_ERROR, emsg);
ci->stats.fErrorNo = rc;
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
rc = ER_INTERNAL_ERROR;
}
return rc;
internal_error:
if (ci->cal_conn_hndl)
{
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return ER_INTERNAL_ERROR;
}
// vim:sw=4 ts=4: