1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-2121 New derived_handler(MDEV-17096) infrastructure.

Renamed isInfiniDB() into isMCSTable

    Changed getSelectPlan() to reuse it with derived and
        other handler types.

    Separate pushdown handlers methods and functions.

    Removed vcxproj files from the source.

    Added fix for MCOL-2166.

    Merged with MCOL-2121
This commit is contained in:
Roman Nozdrin
2019-01-30 18:59:44 +03:00
committed by Gagan Goel
parent d30745d37c
commit 2071716ebd
11 changed files with 1643 additions and 340 deletions

View File

@ -80,6 +80,7 @@ using namespace execplan;
using namespace dataconvert;
#include "sm.h"
#include "ha_mcs_pushdown.h"
#include "bytestream.h"
#include "messagequeue.h"
@ -3019,7 +3020,7 @@ int ha_calpont_impl_rnd_next(uchar* buf, TABLE* table)
return rc;
}
int ha_calpont_impl_rnd_end(TABLE* table)
int ha_calpont_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
{
int rc = 0;
THD* thd = current_thd;
@ -3047,6 +3048,12 @@ int ha_calpont_impl_rnd_end(TABLE* table)
thd->infinidb_vtable.isNewQuery = true;
// Workaround because CS doesn't reset isUnion in a normal way.
if (is_pushdown_hand)
{
thd->infinidb_vtable.isUnion = false;
}
if (get_fe_conn_info_ptr() != NULL)
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY )
@ -5275,4 +5282,552 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
return rc;
}
/*@brief Initiate the query for derived_handler */
/***********************************************************
* DESCRIPTION:
* Execute the query and saves derived table query.
* There is an extra handler argument so I ended up with a
* new init function. The code is a copy of
* ha_calpont_impl_rnd_init() mostly. We should come up with
* a semi-universal structure that allows to save any
* extra data.
* PARAMETERS:
* void* handler either select_ or derived_handler
* TABLE* table - table where to save the results
* RETURN:
* rc as int
***********************************************************/
int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
{
#ifdef DEBUG_SETENV
string home(getenv("HOME"));
if (!getenv("CALPONT_HOME"))
{
string calpontHome(home + "/Calpont/etc/");
setenv("CALPONT_HOME", calpontHome.c_str(), 1);
}
if (!getenv("CALPONT_CONFIG_FILE"))
{
string calpontConfigFile(home + "/Calpont/etc/Columnstore.xml");
setenv("CALPONT_CONFIG_FILE", calpontConfigFile.c_str(), 1);
}
if (!getenv("CALPONT_CSC_IDENT"))
setenv("CALPONT_CSC_IDENT", "dm", 1);
#endif
IDEBUG( cout << "pushdown_init for table " << endl );
THD* thd = current_thd;
//check whether the system is ready to process statement.
#ifndef _MSC_VER
static DBRM dbrm(true);
bool bSystemQueryReady = dbrm.getSystemQueryReady();
if (bSystemQueryReady == 0)
{
// Still not ready
setError(thd, ER_INTERNAL_ERROR, "The system is not yet ready to accept queries");
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
return ER_INTERNAL_ERROR;
}
else if (bSystemQueryReady < 0)
{
// Still not ready
setError(thd, ER_INTERNAL_ERROR, "DBRM is not responding. Cannot accept queries");
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
return ER_INTERNAL_ERROR;
}
#endif
// prevent "create table as select" from running on slave
thd->infinidb_vtable.hasInfiniDBTable = true;
/* If this node is the slave, ignore DML to IDB tables */
if (thd->slave_thread && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
// return error is error status is already set
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR)
return ER_INTERNAL_ERROR;
// @bug 2232. Basic SP support. Error out non support sp cases.
// @bug 3939. Only error out for sp with select. Let pass for alter table in sp.
if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE)
{
setError(thd, ER_CHECK_NOT_IMPLEMENTED, "This stored procedure syntax is not supported by Columnstore in this version");
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
return ER_INTERNAL_ERROR;
}
// mysql reads table twice for order by
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_PHASE1 ||
thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY)
return 0;
if ( (thd->lex)->sql_command == SQLCOM_ALTER_TABLE )
return 0;
//Update and delete code
if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
return doUpdateDelete(thd);
uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(CalpontSystemCatalog::FE);
if (!thd->infinidb_vtable.cal_conn_info)
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
idbassert(ci != 0);
// MySQL sometimes calls rnd_init multiple times, plan should only be
// generated and sent once.
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE &&
!thd->infinidb_vtable.isNewQuery)
return 0;
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
if (ci->cal_conn_hndl)
{
// send ExeMgr a signal before closing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// canceling query. ignore connection failure.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return 0;
}
sm::tableid_t tableid = 0;
cal_table_info ti;
sm::cpsm_conhdl_t* hndl;
SCSEP csep;
// Declare handlers ptrs in this scope for future use.
select_handler* sh = NULL;
derived_handler* dh = NULL;
// update traceFlags according to the autoswitch state. replication query
// on slave are in table mode (create table as...)
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE ||
(thd->slave_thread && thd->infinidb_vtable.vtable_state == THD::INFINIDB_INIT))
{
ci->traceFlags |= CalpontSelectExecutionPlan::TRACE_TUPLE_OFF;
thd->infinidb_vtable.vtable_state = THD::INFINIDB_DISABLE_VTABLE;
}
else
{
ci->traceFlags = (ci->traceFlags | CalpontSelectExecutionPlan::TRACE_TUPLE_OFF)^
CalpontSelectExecutionPlan::TRACE_TUPLE_OFF;
}
bool localQuery = (thd->variables.infinidb_local_query > 0 ? true : false);
{
//if (!ci->cal_conn_hndl || thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE)
if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE)
{
ci->stats.reset(); // reset query stats
ci->stats.setStartTime();
ci->stats.fUser = thd->main_security_ctx.user;
if (thd->main_security_ctx.host)
ci->stats.fHost = thd->main_security_ctx.host;
else if (thd->main_security_ctx.host_or_ip)
ci->stats.fHost = thd->main_security_ctx.host_or_ip;
else
ci->stats.fHost = "unknown";
try
{
ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
}
catch (std::exception& e)
{
string msg = string("Columnstore User Priority - ") + e.what();
ci->warningMsg = msg;
}
// if the previous query has error, re-establish the connection
if (ci->queryState != 0)
{
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
}
sm::sm_init(sessionID, &ci->cal_conn_hndl, localQuery);
idbassert(ci->cal_conn_hndl != 0);
ci->cal_conn_hndl->csc = csc;
idbassert(ci->cal_conn_hndl->exeMgr != 0);
try
{
ci->cal_conn_hndl->connect();
}
catch (...)
{
setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto error;
}
hndl = ci->cal_conn_hndl;
if (thd->infinidb_vtable.vtable_state != THD::INFINIDB_SELECT_VTABLE)
{
if (!csep)
csep.reset(new CalpontSelectExecutionPlan());
SessionManager sm;
BRM::TxnID txnID;
txnID = sm.getTxnID(sessionID);
if (!txnID.valid)
{
txnID.id = 0;
txnID.valid = true;
}
QueryContext verID;
verID = sm.verID();
csep->txnID(txnID.id);
csep->verID(verID);
csep->sessionID(sessionID);
if (thd->db.length)
csep->schemaName(thd->db.str);
csep->traceFlags(ci->traceFlags);
if (thd->infinidb_vtable.isInsertSelect)
csep->queryType(CalpontSelectExecutionPlan::INSERT_SELECT);
// cast the handler and get a plan.
int status = 42;
if (handler_info->hndl_type == mcs_handler_types_t::SELECT)
{
sh = reinterpret_cast<select_handler*>(handler_info->hndl_ptr);
status = cs_get_select_plan(sh, thd, csep);
}
else if (handler_info->hndl_type == DERIVED)
{
dh = reinterpret_cast<derived_handler*>(handler_info->hndl_ptr);
status = cs_get_derived_plan(dh, thd, csep);
}
// WIP MCOL-2121 Find a way to return an actual error
// It either ends up with 42 or other error status
if (status > 0)
goto internal_error;
else if (status < 0)
return 0;
// @bug 2547. don't need to send the plan if it's impossible where for all unions.
if (thd->infinidb_vtable.impossibleWhereOnUnion)
return 0;
string query;
query.assign(idb_mysql_query_str(thd));
//query.assign(thd->infinidb_vtable.original_query.ptr(),
// thd->infinidb_vtable.original_query.length());
csep->data(query);
try
{
csep->priority( ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser));
}
catch (std::exception& e)
{
string msg = string("Columnstore User Priority - ") + e.what();
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
}
#ifdef PLAN_HEX_FILE
// plan serialization
ifstream ifs("/tmp/li1-plan.hex");
ByteStream bs1;
ifs >> bs1;
ifs.close();
csep->unserialize(bs1);
#endif
if (ci->traceFlags & 1)
{
cerr << "---------------- EXECUTION PLAN ----------------" << endl;
cerr << *csep << endl ;
cerr << "-------------- EXECUTION PLAN END --------------\n" << endl;
}
else
{
IDEBUG( cout << "---------------- EXECUTION PLAN ----------------" << endl );
IDEBUG( cerr << *csep << endl );
IDEBUG( cout << "-------------- EXECUTION PLAN END --------------\n" << endl );
}
}
}// end of execution plan generation
if (thd->infinidb_vtable.vtable_state != THD::INFINIDB_SELECT_VTABLE)
{
ByteStream msg;
ByteStream emsgBs;
while (true)
{
try
{
ByteStream::quadbyte qb = 4;
msg << qb;
hndl->exeMgr->write(msg);
msg.restart();
csep->rmParms(rmParms);
//send plan
csep->serialize(msg);
hndl->exeMgr->write(msg);
//get ExeMgr status back to indicate a vtable joblist success or not
msg.restart();
emsgBs.restart();
msg = hndl->exeMgr->read();
emsgBs = hndl->exeMgr->read();
string emsg;
if (msg.length() == 0 || emsgBs.length() == 0)
{
emsg = "Lost connection to ExeMgr. Please contact your administrator";
setError(thd, ER_INTERNAL_ERROR, emsg);
return ER_INTERNAL_ERROR;
}
string emsgStr;
emsgBs >> emsgStr;
bool err = false;
if (msg.length() == 4)
{
msg >> qb;
if (qb != 0)
{
err = true;
// for makejoblist error, stats contains only error code and insert from here
// because table fetch is not started
ci->stats.setEndTime();
ci->stats.fQuery = csep->data();
ci->stats.fQueryType = csep->queryType();
ci->stats.fErrorNo = qb;
try
{
ci->stats.insert();
}
catch (std::exception& e)
{
string msg = string("Columnstore Query Stats - ") + e.what();
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
}
}
}
else
{
err = true;
}
if (err)
{
setError(thd, ER_INTERNAL_ERROR, emsgStr);
return ER_INTERNAL_ERROR;
}
rmParms.clear();
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
{
ci->tableMap[table] = ti;
}
else
{
ci->queryState = 1;
}
break;
}
catch (...)
{
sm::sm_cleanup(hndl);
hndl = 0;
sm::sm_init(sessionID, &hndl, localQuery);
idbassert(hndl != 0);
hndl->csc = csc;
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
ti.conn_hndl = hndl;
else
ci->cal_conn_hndl = hndl;
try
{
hndl->connect();
}
catch (...)
{
setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto error;
}
msg.restart();
}
}
}
// set query state to be in_process. Sometimes mysql calls rnd_init multiple
// times, this makes sure plan only being generated and sent once. It will be
// reset when query finishes in sm::end_query
thd->infinidb_vtable.isNewQuery = false;
// common path for both vtable select phase and table mode -- open scan handle
ti = ci->tableMap[table];
// This is the server's temp table for the result.
if(sh)
{
ti.msTablePtr = sh->table;
}
else
{
ti.msTablePtr = dh->table;
}
{
if (ti.tpl_ctx == 0)
{
ti.tpl_ctx = new sm::cpsm_tplh_t();
ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
}
// make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
// call rnd_init for a table more than once.
ti.tpl_scan_ctx->rowGroup = NULL;
try
{
tableid = execplan::IDB_VTABLE_ID;
}
catch (...)
{
string emsg = "No table ID found for table " + string(table->s->table_name.str);
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
try
{
sm::tpl_open(tableid, ti.tpl_ctx, hndl);
sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
}
catch (std::exception& e)
{
string emsg = "table can not be opened: " + string(e.what());
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
catch (...)
{
string emsg = "table can not be opened";
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
if ((ti.tpl_scan_ctx->ctp).size() == 0)
{
uint32_t num_attr = table->s->fields;
for (uint32_t i = 0; i < num_attr; i++)
{
CalpontSystemCatalog::ColType ctype;
ti.tpl_scan_ctx->ctp.push_back(ctype);
}
// populate coltypes here for table mode because tableband gives treeoid for dictionary column
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
{
CalpontSystemCatalog::RIDList oidlist = csc->columnRIDs(make_table(table->s->db.str, table->s->table_name.str), true);
if (oidlist.size() != num_attr)
{
string emsg = "Size mismatch probably caused by front end out of sync";
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
for (unsigned int j = 0; j < oidlist.size(); j++)
{
CalpontSystemCatalog::ColType ctype = csc->colType(oidlist[j].objnum);
ti.tpl_scan_ctx->ctp[ctype.colPosition] = ctype;
ti.tpl_scan_ctx->ctp[ctype.colPosition].colPosition = -1;
}
}
}
}
ci->tableMap[table] = ti;
return 0;
error:
if (ci->cal_conn_hndl)
{
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
// do we need to close all connection handle of the table map?
return ER_INTERNAL_ERROR;
internal_error:
if (ci->cal_conn_hndl)
{
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return ER_INTERNAL_ERROR;
}
// vim:sw=4 ts=4: