1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-1601 GROUP BY supports subqueries in HAVING(derived tables processed by the server.)

This commit is contained in:
Roman Nozdrin
2018-09-17 16:15:10 +03:00
parent 764090ba0c
commit 1d0488df33
4 changed files with 114 additions and 29 deletions

View File

@ -5045,6 +5045,7 @@ int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type)
{
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, infinidb_autoswitch_warning.c_str());
}
ci->queryState = 0;
}
else // vtable mode
{
@ -5212,9 +5213,12 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
ci->warningMsg = msg;
}
// if the previous query has error, re-establish the connection
// If the previous query has error and
// this is not a subquery run by the server(MCOL-1601)
// re-establish the connection
if (ci->queryState != 0)
{
if( ci->cal_conn_hndl_st.size() == 0 )
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
@ -5237,6 +5241,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
hndl = ci->cal_conn_hndl;
ci->cal_conn_hndl_st.push(ci->cal_conn_hndl);
if (!csep)
csep.reset(new CalpontSelectExecutionPlan());
@ -5439,11 +5444,15 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
idbassert(hndl != 0);
hndl->csc = csc;
// The next section is useless
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
ti.conn_hndl = hndl;
else
{
ci->cal_conn_hndl = hndl;
ci->cal_conn_hndl_st.pop();
ci->cal_conn_hndl_st.push(ci->cal_conn_hndl);
}
try
{
hndl->connect();
@ -5476,11 +5485,11 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
(thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) ||
(thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_QUERY))
{
if (ti.tpl_ctx == 0)
{
// MCOL-1601 Using stacks of ExeMgr conn hndls, table and scan contexts.
ti.tpl_ctx = new sm::cpsm_tplh_t();
ti.tpl_ctx_st.push(ti.tpl_ctx);
ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
}
ti.tpl_scan_ctx_st.push(ti.tpl_scan_ctx);
// make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
// call rnd_init for a table more than once.
@ -5560,6 +5569,7 @@ error:
if (ci->cal_conn_hndl)
{
// end_query() should be called here.
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
@ -5571,6 +5581,7 @@ internal_error:
if (ci->cal_conn_hndl)
{
// end_query() should be called here.
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
@ -5802,6 +5813,12 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
ci->cal_conn_hndl = 0;
// clear querystats because no query stats available for cancelled query
ci->queryStats = "";
if ( ci->cal_conn_hndl_st.size() )
{
ci->cal_conn_hndl_st.pop();
if ( ci->cal_conn_hndl_st.size() )
ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
}
}
return 0;
@ -5811,6 +5828,7 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
cal_table_info ti = ci->tableMap[table];
sm::cpsm_conhdl_t* hndl;
bool clearScanCtx = false;
hndl = ci->cal_conn_hndl;
@ -5818,6 +5836,8 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
{
if (ti.tpl_scan_ctx.get())
{
clearScanCtx = ( (ti.tpl_scan_ctx.get()->rowsreturned) &&
ti.tpl_scan_ctx.get()->rowsreturned == ti.tpl_scan_ctx.get()->getRowCount() );
try
{
sm::tpl_scan_close(ti.tpl_scan_ctx);
@ -5829,11 +5849,31 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
}
ti.tpl_scan_ctx.reset();
if ( ti.tpl_scan_ctx_st.size() )
{
ti.tpl_scan_ctx_st.pop();
if ( ti.tpl_scan_ctx_st.size() )
ti.tpl_scan_ctx = ti.tpl_scan_ctx_st.top();
}
try
{
if(hndl)
sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats);
{
sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats, clearScanCtx);
// Normaly stats variables are set in external_lock method but we set it here
// since they we pretend we are in vtable_disabled mode and the stats vars won't be set.
// We sum the stats up here since server could run a number of
// queries e.g. each for a subquery in a filter.
if(hndl)
{
if (hndl->queryStats.length())
ci->queryStats += hndl->queryStats;
if (hndl->extendedStats.length())
ci->extendedStats += hndl->extendedStats;
if (hndl->miniStats.length())
ci->miniStats += hndl->miniStats;
}
}
ci->cal_conn_hndl = hndl;
@ -5866,6 +5906,20 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
ti.tpl_ctx = 0;
if ( ti.tpl_ctx_st.size() )
{
ti.tpl_ctx_st.pop();
if ( ti.tpl_ctx_st.size() )
ti.tpl_ctx = ti.tpl_ctx_st.top();
}
if ( ci->cal_conn_hndl_st.size() )
{
ci->cal_conn_hndl_st.pop();
if ( ci->cal_conn_hndl_st.size() )
ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
}
ci->tableMap[table] = ti;
// push warnings from CREATE phase

View File

@ -187,7 +187,9 @@ struct cal_table_info
{ }
~cal_table_info() {}
sm::cpsm_tplh_t* tpl_ctx;
std::stack<sm::cpsm_tplh_t*> tpl_ctx_st;
sm::sp_cpsm_tplsch_t tpl_scan_ctx;
std::stack<sm::sp_cpsm_tplsch_t> tpl_scan_ctx_st;
unsigned c; // for debug purpose
TABLE* msTablePtr; // no ownership
sm::cpsm_conhdl_t* conn_hndl;
@ -273,6 +275,7 @@ struct cal_connection_info
}
sm::cpsm_conhdl_t* cal_conn_hndl;
std::stack<sm::cpsm_conhdl_t*> cal_conn_hndl_st;
int queryState;
CalTableMap tableMap;
sm::tableid_t currentTable;

View File

@ -280,7 +280,7 @@ tpl_open ( tableid_t tableid,
cpsm_tplh_t* ntplh,
cpsm_conhdl_t* conn_hdl)
{
SMDEBUGLOG << "tpl_open: " << conn_hdl << " tableid: " << tableid << endl;
SMDEBUGLOG << "tpl_open: ntplh: " << ntplh << " conn_hdl: " << conn_hdl << " tableid: " << tableid << endl;
// if first time enter this function for a statement, set
// queryState to QUERY_IN_PRCOESS and get execution plan.
@ -319,7 +319,9 @@ tpl_scan_open ( tableid_t tableid,
sp_cpsm_tplsch_t& ntplsch,
cpsm_conhdl_t* conn_hdl )
{
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_scan_open: " << conn_hdl << " tableid: " << tableid << endl;
#endif
// @bug 649. No initialization here. take passed in reference
ntplsch->tableid = tableid;
@ -354,8 +356,8 @@ tpl_scan_close ( sp_cpsm_tplsch_t& ntplsch )
SMDEBUGLOG << "tpl_scan_close: ";
if (ntplsch)
SMDEBUGLOG << " tableid: " << ntplsch->tableid << endl;
SMDEBUGLOG << "tpl_scan_close: ntplsch " << ntplsch;
SMDEBUGLOG << "tpl_scan_close: tableid: " << ntplsch->tableid << endl;
#endif
ntplsch.reset();
@ -365,11 +367,12 @@ tpl_scan_close ( sp_cpsm_tplsch_t& ntplsch )
status_t
tpl_close ( cpsm_tplh_t* ntplh,
cpsm_conhdl_t** conn_hdl,
QueryStats& stats )
QueryStats& stats,
bool clear_scan_ctx)
{
cpsm_conhdl_t* hndl = *conn_hdl;
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_close: " << hndl;
SMDEBUGLOG << "tpl_close: hndl" << hndl << " ntplh " << ntplh;
if (ntplh)
SMDEBUGLOG << " tableid: " << ntplh->tableid;
@ -384,9 +387,21 @@ tpl_close ( cpsm_tplh_t* ntplh,
// Get the query stats
ByteStream bs;
ByteStream::quadbyte qb = 3;
//string tmpQueryStats;
//string tmpExtendedStats;
//string tmpMiniStats;
bs << qb;
hndl->write(bs);
// MCOL-1601 Dispose of unused empty RowGroup
if (clear_scan_ctx)
{
bs = hndl->exeMgr->read();
}
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_close hndl->exeMgr: " << hndl->exeMgr << endl;
#endif
//keep reading until we get a string
//TODO: really need to fix this! Why is ExeMgr sending other stuff?
for (int tries = 0; tries < 10; tries++)
@ -396,10 +411,21 @@ tpl_close ( cpsm_tplh_t* ntplh,
if (bs.length() == 0) break;
try
{
// MCOL-1601 Server could run a number of subqueries separetely.
// If so there will be a number of statistics returned.
/*if(hndl->queryStats.size())
{
bs >> tmpQueryStats, hndl->queryStats += tmpQueryStats;
bs >> tmpExtendedStats, hndl->extendedStats += tmpExtendedStats;
bs >> hndl->miniStats, hndl->miniStats += tmpMiniStats;
}
else*/
{
bs >> hndl->queryStats;
bs >> hndl->extendedStats;
bs >> hndl->miniStats;
}
stats.unserialize(bs);
stats.setEndTime();
stats.insert();
@ -415,6 +441,9 @@ tpl_close ( cpsm_tplh_t* ntplh,
{
// querystats messed up. close connection.
// no need to throw for querystats protocol error, like for tablemode.
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_close() exception whilst getting stats" << endl;
#endif
end_query(hndl);
sm_cleanup(hndl);
*conn_hdl = 0;
@ -436,9 +465,9 @@ sm_init ( uint32_t sid,
{
// clear file content
#if IDB_SM_DEBUG
smlog.close();
smlog.open("/tmp/sm.log");
SMDEBUGLOG << "sm_init: " << dboptions << endl;
//smlog.close();
//smlog.open("/tmp/sm.log");
SMDEBUGLOG << "sm_init: " << endl;
#endif
// @bug5660 Connection changes related to the local pm setting
@ -474,7 +503,6 @@ sm_cleanup ( cpsm_conhdl_t* conn_hdl )
{
#if IDB_SM_DEBUG
SMDEBUGLOG << "sm_cleanup: " << conn_hdl << endl;
SMDEBUGLOG.close();
#endif
delete conn_hdl;

View File

@ -60,12 +60,12 @@ const int SQL_NOT_FOUND = -1000;
const int SQL_KILLED = -1001;
const int CALPONT_INTERNAL_ERROR = -1007;
#if IDB_SM_DEBUG
extern std::ofstream smlog;
#define SMDEBUGLOG smlog
#else
#define SMDEBUGLOG if (false) std::cerr
#endif
//#if IDB_SM_DEBUG
//extern std::ofstream smlog;
//#define SMDEBUGLOG smlog
//#else
#define SMDEBUGLOG if (true) std::cerr
//#endif
extern const std::string DEFAULT_SAVE_PATH;
typedef uint64_t tableid_t;
@ -282,7 +282,7 @@ extern status_t tpl_open(tableid_t, cpsm_tplh_t*, cpsm_conhdl_t*);
extern status_t tpl_scan_open(tableid_t, sp_cpsm_tplsch_t&, cpsm_conhdl_t*);
extern status_t tpl_scan_fetch(sp_cpsm_tplsch_t&, cpsm_conhdl_t*, int* k = 0);
extern status_t tpl_scan_close(sp_cpsm_tplsch_t&);
extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats);
extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats, bool clear_scan_ctx = false);
}