diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index 59a3df714..5a71d6d48 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -5045,6 +5045,7 @@ int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type) { push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, infinidb_autoswitch_warning.c_str()); } + ci->queryState = 0; } else // vtable mode { @@ -5212,10 +5213,13 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE ci->warningMsg = msg; } - // if the previous query has error, re-establish the connection + // If the previous query has error and + // this is not a subquery run by the server(MCOL-1601) + // re-establish the connection if (ci->queryState != 0) { - sm::sm_cleanup(ci->cal_conn_hndl); + if( ci->cal_conn_hndl_st.size() == 0 ) + sm::sm_cleanup(ci->cal_conn_hndl); ci->cal_conn_hndl = 0; } @@ -5237,6 +5241,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE hndl = ci->cal_conn_hndl; + ci->cal_conn_hndl_st.push(ci->cal_conn_hndl); if (!csep) csep.reset(new CalpontSelectExecutionPlan()); @@ -5439,11 +5444,15 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE idbassert(hndl != 0); hndl->csc = csc; + // The next section is useless if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) ti.conn_hndl = hndl; else + { ci->cal_conn_hndl = hndl; - + ci->cal_conn_hndl_st.pop(); + ci->cal_conn_hndl_st.push(ci->cal_conn_hndl); + } try { hndl->connect(); @@ -5476,11 +5485,11 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) || (thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_QUERY)) { - if (ti.tpl_ctx == 0) - { - ti.tpl_ctx = new sm::cpsm_tplh_t(); - ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); - } + // MCOL-1601 Using stacks of ExeMgr conn hndls, table and scan contexts. + ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_ctx_st.push(ti.tpl_ctx); + ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); + ti.tpl_scan_ctx_st.push(ti.tpl_scan_ctx); // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql // call rnd_init for a table more than once. @@ -5560,6 +5569,7 @@ error: if (ci->cal_conn_hndl) { + // end_query() should be called here. sm::sm_cleanup(ci->cal_conn_hndl); ci->cal_conn_hndl = 0; } @@ -5571,6 +5581,7 @@ internal_error: if (ci->cal_conn_hndl) { + // end_query() should be called here. sm::sm_cleanup(ci->cal_conn_hndl); ci->cal_conn_hndl = 0; } @@ -5802,6 +5813,12 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* ci->cal_conn_hndl = 0; // clear querystats because no query stats available for cancelled query ci->queryStats = ""; + if ( ci->cal_conn_hndl_st.size() ) + { + ci->cal_conn_hndl_st.pop(); + if ( ci->cal_conn_hndl_st.size() ) + ci->cal_conn_hndl = ci->cal_conn_hndl_st.top(); + } } return 0; @@ -5811,6 +5828,7 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* cal_table_info ti = ci->tableMap[table]; sm::cpsm_conhdl_t* hndl; + bool clearScanCtx = false; hndl = ci->cal_conn_hndl; @@ -5818,6 +5836,8 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* { if (ti.tpl_scan_ctx.get()) { + clearScanCtx = ( (ti.tpl_scan_ctx.get()->rowsreturned) && + ti.tpl_scan_ctx.get()->rowsreturned == ti.tpl_scan_ctx.get()->getRowCount() ); try { sm::tpl_scan_close(ti.tpl_scan_ctx); @@ -5829,11 +5849,31 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* } ti.tpl_scan_ctx.reset(); - + if ( ti.tpl_scan_ctx_st.size() ) + { + ti.tpl_scan_ctx_st.pop(); + if ( ti.tpl_scan_ctx_st.size() ) + ti.tpl_scan_ctx = ti.tpl_scan_ctx_st.top(); + } try { if(hndl) - sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats); + { + sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats, clearScanCtx); +// Normaly stats variables are set in external_lock method but we set it here +// since they we pretend we are in vtable_disabled mode and the stats vars won't be set. +// We sum the stats up here since server could run a number of +// queries e.g. each for a subquery in a filter. + if(hndl) + { + if (hndl->queryStats.length()) + ci->queryStats += hndl->queryStats; + if (hndl->extendedStats.length()) + ci->extendedStats += hndl->extendedStats; + if (hndl->miniStats.length()) + ci->miniStats += hndl->miniStats; + } + } ci->cal_conn_hndl = hndl; @@ -5866,6 +5906,20 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* ti.tpl_ctx = 0; + if ( ti.tpl_ctx_st.size() ) + { + ti.tpl_ctx_st.pop(); + if ( ti.tpl_ctx_st.size() ) + ti.tpl_ctx = ti.tpl_ctx_st.top(); + } + + if ( ci->cal_conn_hndl_st.size() ) + { + ci->cal_conn_hndl_st.pop(); + if ( ci->cal_conn_hndl_st.size() ) + ci->cal_conn_hndl = ci->cal_conn_hndl_st.top(); + } + ci->tableMap[table] = ti; // push warnings from CREATE phase diff --git a/dbcon/mysql/ha_calpont_impl_if.h b/dbcon/mysql/ha_calpont_impl_if.h index 4ebc7adb2..72579111b 100644 --- a/dbcon/mysql/ha_calpont_impl_if.h +++ b/dbcon/mysql/ha_calpont_impl_if.h @@ -187,7 +187,9 @@ struct cal_table_info { } ~cal_table_info() {} sm::cpsm_tplh_t* tpl_ctx; + std::stack tpl_ctx_st; sm::sp_cpsm_tplsch_t tpl_scan_ctx; + std::stack tpl_scan_ctx_st; unsigned c; // for debug purpose TABLE* msTablePtr; // no ownership sm::cpsm_conhdl_t* conn_hndl; @@ -273,6 +275,7 @@ struct cal_connection_info } sm::cpsm_conhdl_t* cal_conn_hndl; + std::stack cal_conn_hndl_st; int queryState; CalTableMap tableMap; sm::tableid_t currentTable; diff --git a/dbcon/mysql/sm.cpp b/dbcon/mysql/sm.cpp index 565a65ad0..1d8be5f05 100644 --- a/dbcon/mysql/sm.cpp +++ b/dbcon/mysql/sm.cpp @@ -280,7 +280,7 @@ tpl_open ( tableid_t tableid, cpsm_tplh_t* ntplh, cpsm_conhdl_t* conn_hdl) { - SMDEBUGLOG << "tpl_open: " << conn_hdl << " tableid: " << tableid << endl; + SMDEBUGLOG << "tpl_open: ntplh: " << ntplh << " conn_hdl: " << conn_hdl << " tableid: " << tableid << endl; // if first time enter this function for a statement, set // queryState to QUERY_IN_PRCOESS and get execution plan. @@ -319,7 +319,9 @@ tpl_scan_open ( tableid_t tableid, sp_cpsm_tplsch_t& ntplsch, cpsm_conhdl_t* conn_hdl ) { +#if IDB_SM_DEBUG SMDEBUGLOG << "tpl_scan_open: " << conn_hdl << " tableid: " << tableid << endl; +#endif // @bug 649. No initialization here. take passed in reference ntplsch->tableid = tableid; @@ -354,8 +356,8 @@ tpl_scan_close ( sp_cpsm_tplsch_t& ntplsch ) SMDEBUGLOG << "tpl_scan_close: "; if (ntplsch) - SMDEBUGLOG << " tableid: " << ntplsch->tableid << endl; - + SMDEBUGLOG << "tpl_scan_close: ntplsch " << ntplsch; + SMDEBUGLOG << "tpl_scan_close: tableid: " << ntplsch->tableid << endl; #endif ntplsch.reset(); @@ -365,11 +367,12 @@ tpl_scan_close ( sp_cpsm_tplsch_t& ntplsch ) status_t tpl_close ( cpsm_tplh_t* ntplh, cpsm_conhdl_t** conn_hdl, - QueryStats& stats ) + QueryStats& stats, + bool clear_scan_ctx) { cpsm_conhdl_t* hndl = *conn_hdl; #if IDB_SM_DEBUG - SMDEBUGLOG << "tpl_close: " << hndl; + SMDEBUGLOG << "tpl_close: hndl" << hndl << " ntplh " << ntplh; if (ntplh) SMDEBUGLOG << " tableid: " << ntplh->tableid; @@ -384,9 +387,21 @@ tpl_close ( cpsm_tplh_t* ntplh, // Get the query stats ByteStream bs; ByteStream::quadbyte qb = 3; + //string tmpQueryStats; + //string tmpExtendedStats; + //string tmpMiniStats; bs << qb; hndl->write(bs); + + // MCOL-1601 Dispose of unused empty RowGroup + if (clear_scan_ctx) + { + bs = hndl->exeMgr->read(); + } +#if IDB_SM_DEBUG + SMDEBUGLOG << "tpl_close hndl->exeMgr: " << hndl->exeMgr << endl; +#endif //keep reading until we get a string //TODO: really need to fix this! Why is ExeMgr sending other stuff? for (int tries = 0; tries < 10; tries++) @@ -397,9 +412,20 @@ tpl_close ( cpsm_tplh_t* ntplh, try { - bs >> hndl->queryStats; - bs >> hndl->extendedStats; - bs >> hndl->miniStats; + // MCOL-1601 Server could run a number of subqueries separetely. + // If so there will be a number of statistics returned. + /*if(hndl->queryStats.size()) + { + bs >> tmpQueryStats, hndl->queryStats += tmpQueryStats; + bs >> tmpExtendedStats, hndl->extendedStats += tmpExtendedStats; + bs >> hndl->miniStats, hndl->miniStats += tmpMiniStats; + } + else*/ + { + bs >> hndl->queryStats; + bs >> hndl->extendedStats; + bs >> hndl->miniStats; + } stats.unserialize(bs); stats.setEndTime(); stats.insert(); @@ -415,6 +441,9 @@ tpl_close ( cpsm_tplh_t* ntplh, { // querystats messed up. close connection. // no need to throw for querystats protocol error, like for tablemode. +#if IDB_SM_DEBUG + SMDEBUGLOG << "tpl_close() exception whilst getting stats" << endl; +#endif end_query(hndl); sm_cleanup(hndl); *conn_hdl = 0; @@ -436,9 +465,9 @@ sm_init ( uint32_t sid, { // clear file content #if IDB_SM_DEBUG - smlog.close(); - smlog.open("/tmp/sm.log"); - SMDEBUGLOG << "sm_init: " << dboptions << endl; + //smlog.close(); + //smlog.open("/tmp/sm.log"); + SMDEBUGLOG << "sm_init: " << endl; #endif // @bug5660 Connection changes related to the local pm setting @@ -474,7 +503,6 @@ sm_cleanup ( cpsm_conhdl_t* conn_hdl ) { #if IDB_SM_DEBUG SMDEBUGLOG << "sm_cleanup: " << conn_hdl << endl; - SMDEBUGLOG.close(); #endif delete conn_hdl; diff --git a/dbcon/mysql/sm.h b/dbcon/mysql/sm.h index a2c8defaa..65cf35123 100644 --- a/dbcon/mysql/sm.h +++ b/dbcon/mysql/sm.h @@ -60,12 +60,12 @@ const int SQL_NOT_FOUND = -1000; const int SQL_KILLED = -1001; const int CALPONT_INTERNAL_ERROR = -1007; -#if IDB_SM_DEBUG -extern std::ofstream smlog; -#define SMDEBUGLOG smlog -#else -#define SMDEBUGLOG if (false) std::cerr -#endif +//#if IDB_SM_DEBUG +//extern std::ofstream smlog; +//#define SMDEBUGLOG smlog +//#else +#define SMDEBUGLOG if (true) std::cerr +//#endif extern const std::string DEFAULT_SAVE_PATH; typedef uint64_t tableid_t; @@ -282,7 +282,7 @@ extern status_t tpl_open(tableid_t, cpsm_tplh_t*, cpsm_conhdl_t*); extern status_t tpl_scan_open(tableid_t, sp_cpsm_tplsch_t&, cpsm_conhdl_t*); extern status_t tpl_scan_fetch(sp_cpsm_tplsch_t&, cpsm_conhdl_t*, int* k = 0); extern status_t tpl_scan_close(sp_cpsm_tplsch_t&); -extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats); +extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats, bool clear_scan_ctx = false); }