diff --git a/dbcon/mysql/ha_calpont_execplan.cpp b/dbcon/mysql/ha_calpont_execplan.cpp index 0af74c433..789d85ad3 100644 --- a/dbcon/mysql/ha_calpont_execplan.cpp +++ b/dbcon/mysql/ha_calpont_execplan.cpp @@ -9704,7 +9704,6 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro { gwi.fatalParseError = false; execplan::CalpontSelectExecutionPlan::ReturnedColumnList::iterator iter = gwi.returnedCols.begin(); - AggregateColumn* ac = NULL; for ( ; iter != gwi.returnedCols.end(); iter++ ) { diff --git a/dbcon/mysql/ha_calpont_impl.cpp b/dbcon/mysql/ha_calpont_impl.cpp index 59a3df714..ad83f48db 100644 --- a/dbcon/mysql/ha_calpont_impl.cpp +++ b/dbcon/mysql/ha_calpont_impl.cpp @@ -276,104 +276,6 @@ void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType& } } -void storeNumericFieldGroupBy(Field** f, int64_t value, CalpontSystemCatalog::ColType& ct) -{ - // unset null bit first - if ((*f)->null_ptr) - *(*f)->null_ptr &= ~(*f)->null_bit; - - // For unsigned, use the ColType returned in the row rather than the - // unsigned_flag set by mysql. This is because mysql gets it wrong for SUM() - // Hopefully, in all other cases we get it right. - switch ((*f)->type()) - { - case MYSQL_TYPE_NEWDECIMAL: - { - Field_new_decimal* f2 = (Field_new_decimal*)*f; - - // @bug4388 stick to InfiniDB's scale in case mysql gives wrong scale due - // to create vtable limitation. - if (f2->dec < ct.scale) - f2->dec = ct.scale; - - char buf[256]; - dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, buf, 256, ct.colDataType); - f2->store(buf, strlen(buf), f2->charset()); - break; - } - - case MYSQL_TYPE_TINY: //TINYINT type - { - Field_tiny* f2 = (Field_tiny*)*f; - longlong int_val = (longlong)value; - f2->store(int_val, f2->unsigned_flag); - break; - } - - case MYSQL_TYPE_SHORT: //SMALLINT type - { - Field_short* f2 = (Field_short*)*f; - longlong int_val = (longlong)value; - f2->store(int_val, f2->unsigned_flag); - break; - } - - case MYSQL_TYPE_LONG: //INT type - { - Field_long* f2 = (Field_long*)*f; - longlong int_val = (longlong)value; - f2->store(int_val, f2->unsigned_flag); - break; - } - - case MYSQL_TYPE_LONGLONG: //BIGINT type - { - Field_longlong* f2 = (Field_longlong*)*f; - longlong int_val = (longlong)value; - f2->store(int_val, f2->unsigned_flag); - break; - } - - case MYSQL_TYPE_FLOAT: // FLOAT type - { - Field_float* f2 = (Field_float*)*f; - float float_val = *(float*)(&value); - f2->store(float_val); - break; - } - - case MYSQL_TYPE_DOUBLE: // DOUBLE type - { - Field_double* f2 = (Field_double*)*f; - double double_val = *(double*)(&value); - f2->store(double_val); - break; - } - - case MYSQL_TYPE_VARCHAR: - { - Field_varstring* f2 = (Field_varstring*)*f; - char tmp[25]; - - if (ct.colDataType == CalpontSystemCatalog::DECIMAL) - dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, tmp, 25, ct.colDataType); - else - snprintf(tmp, 25, "%ld", value); - - f2->store(tmp, strlen(tmp), f2->charset()); - break; - } - - default: - { - Field_longlong* f2 = (Field_longlong*)*f; - longlong int_val = (longlong)value; - f2->store(int_val, f2->unsigned_flag); - break; - } - } -} - // // @bug 2244. Log exception related to lost connection to ExeMgr. // Log exception error from calls to sm::tpl_scan_fetch in fetchNextRow() @@ -5045,6 +4947,7 @@ int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type) { push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, infinidb_autoswitch_warning.c_str()); } + ci->queryState = 0; } else // vtable mode { @@ -5212,10 +5115,13 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE ci->warningMsg = msg; } - // if the previous query has error, re-establish the connection + // If the previous query has error and + // this is not a subquery run by the server(MCOL-1601) + // re-establish the connection if (ci->queryState != 0) { - sm::sm_cleanup(ci->cal_conn_hndl); + if( ci->cal_conn_hndl_st.size() == 0 ) + sm::sm_cleanup(ci->cal_conn_hndl); ci->cal_conn_hndl = 0; } @@ -5237,6 +5143,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE hndl = ci->cal_conn_hndl; + ci->cal_conn_hndl_st.push(ci->cal_conn_hndl); if (!csep) csep.reset(new CalpontSelectExecutionPlan()); @@ -5308,8 +5215,12 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE return 0; string query; - query.assign(thd->infinidb_vtable.original_query.ptr(), - thd->infinidb_vtable.original_query.length()); + // Set the query text only once if the server executes + // subqueries separately. + if(ci->queryState) + query.assign(""); + else + query.assign(thd->query_string.str(), thd->query_string.length()); csep->data(query); try @@ -5439,11 +5350,15 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE idbassert(hndl != 0); hndl->csc = csc; + // The next section is useless if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) ti.conn_hndl = hndl; else + { ci->cal_conn_hndl = hndl; - + ci->cal_conn_hndl_st.pop(); + ci->cal_conn_hndl_st.push(ci->cal_conn_hndl); + } try { hndl->connect(); @@ -5476,11 +5391,11 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) || (thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_QUERY)) { - if (ti.tpl_ctx == 0) - { - ti.tpl_ctx = new sm::cpsm_tplh_t(); - ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); - } + // MCOL-1601 Using stacks of ExeMgr conn hndls, table and scan contexts. + ti.tpl_ctx = new sm::cpsm_tplh_t(); + ti.tpl_ctx_st.push(ti.tpl_ctx); + ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t()); + ti.tpl_scan_ctx_st.push(ti.tpl_scan_ctx); // make sure rowgroup is null so the new meta data can be taken. This is for some case mysql // call rnd_init for a table more than once. @@ -5560,6 +5475,7 @@ error: if (ci->cal_conn_hndl) { + // end_query() should be called here. sm::sm_cleanup(ci->cal_conn_hndl); ci->cal_conn_hndl = 0; } @@ -5571,6 +5487,7 @@ internal_error: if (ci->cal_conn_hndl) { + // end_query() should be called here. sm::sm_cleanup(ci->cal_conn_hndl); ci->cal_conn_hndl = 0; } @@ -5802,6 +5719,12 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* ci->cal_conn_hndl = 0; // clear querystats because no query stats available for cancelled query ci->queryStats = ""; + if ( ci->cal_conn_hndl_st.size() ) + { + ci->cal_conn_hndl_st.pop(); + if ( ci->cal_conn_hndl_st.size() ) + ci->cal_conn_hndl = ci->cal_conn_hndl_st.top(); + } } return 0; @@ -5811,6 +5734,7 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* cal_table_info ti = ci->tableMap[table]; sm::cpsm_conhdl_t* hndl; + bool clearScanCtx = false; hndl = ci->cal_conn_hndl; @@ -5818,6 +5742,8 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* { if (ti.tpl_scan_ctx.get()) { + clearScanCtx = ( (ti.tpl_scan_ctx.get()->rowsreturned) && + ti.tpl_scan_ctx.get()->rowsreturned == ti.tpl_scan_ctx.get()->getRowCount() ); try { sm::tpl_scan_close(ti.tpl_scan_ctx); @@ -5829,11 +5755,31 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* } ti.tpl_scan_ctx.reset(); - + if ( ti.tpl_scan_ctx_st.size() ) + { + ti.tpl_scan_ctx_st.pop(); + if ( ti.tpl_scan_ctx_st.size() ) + ti.tpl_scan_ctx = ti.tpl_scan_ctx_st.top(); + } try { if(hndl) - sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats); + { + sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats, clearScanCtx); +// Normaly stats variables are set in external_lock method but we set it here +// since they we pretend we are in vtable_disabled mode and the stats vars won't be set. +// We sum the stats up here since server could run a number of +// queries e.g. each for a subquery in a filter. + if(hndl) + { + if (hndl->queryStats.length()) + ci->queryStats += hndl->queryStats; + if (hndl->extendedStats.length()) + ci->extendedStats += hndl->extendedStats; + if (hndl->miniStats.length()) + ci->miniStats += hndl->miniStats; + } + } ci->cal_conn_hndl = hndl; @@ -5866,6 +5812,20 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* ti.tpl_ctx = 0; + if ( ti.tpl_ctx_st.size() ) + { + ti.tpl_ctx_st.pop(); + if ( ti.tpl_ctx_st.size() ) + ti.tpl_ctx = ti.tpl_ctx_st.top(); + } + + if ( ci->cal_conn_hndl_st.size() ) + { + ci->cal_conn_hndl_st.pop(); + if ( ci->cal_conn_hndl_st.size() ) + ci->cal_conn_hndl = ci->cal_conn_hndl_st.top(); + } + ci->tableMap[table] = ti; // push warnings from CREATE phase diff --git a/dbcon/mysql/ha_calpont_impl_if.h b/dbcon/mysql/ha_calpont_impl_if.h index 4ebc7adb2..72579111b 100644 --- a/dbcon/mysql/ha_calpont_impl_if.h +++ b/dbcon/mysql/ha_calpont_impl_if.h @@ -187,7 +187,9 @@ struct cal_table_info { } ~cal_table_info() {} sm::cpsm_tplh_t* tpl_ctx; + std::stack tpl_ctx_st; sm::sp_cpsm_tplsch_t tpl_scan_ctx; + std::stack tpl_scan_ctx_st; unsigned c; // for debug purpose TABLE* msTablePtr; // no ownership sm::cpsm_conhdl_t* conn_hndl; @@ -273,6 +275,7 @@ struct cal_connection_info } sm::cpsm_conhdl_t* cal_conn_hndl; + std::stack cal_conn_hndl_st; int queryState; CalTableMap tableMap; sm::tableid_t currentTable; diff --git a/dbcon/mysql/sm.cpp b/dbcon/mysql/sm.cpp index 565a65ad0..9cbfc73e6 100644 --- a/dbcon/mysql/sm.cpp +++ b/dbcon/mysql/sm.cpp @@ -280,7 +280,7 @@ tpl_open ( tableid_t tableid, cpsm_tplh_t* ntplh, cpsm_conhdl_t* conn_hdl) { - SMDEBUGLOG << "tpl_open: " << conn_hdl << " tableid: " << tableid << endl; + SMDEBUGLOG << "tpl_open: ntplh: " << ntplh << " conn_hdl: " << conn_hdl << " tableid: " << tableid << endl; // if first time enter this function for a statement, set // queryState to QUERY_IN_PRCOESS and get execution plan. @@ -319,7 +319,9 @@ tpl_scan_open ( tableid_t tableid, sp_cpsm_tplsch_t& ntplsch, cpsm_conhdl_t* conn_hdl ) { +#if IDB_SM_DEBUG SMDEBUGLOG << "tpl_scan_open: " << conn_hdl << " tableid: " << tableid << endl; +#endif // @bug 649. No initialization here. take passed in reference ntplsch->tableid = tableid; @@ -354,8 +356,8 @@ tpl_scan_close ( sp_cpsm_tplsch_t& ntplsch ) SMDEBUGLOG << "tpl_scan_close: "; if (ntplsch) - SMDEBUGLOG << " tableid: " << ntplsch->tableid << endl; - + SMDEBUGLOG << "tpl_scan_close: ntplsch " << ntplsch; + SMDEBUGLOG << "tpl_scan_close: tableid: " << ntplsch->tableid << endl; #endif ntplsch.reset(); @@ -365,11 +367,12 @@ tpl_scan_close ( sp_cpsm_tplsch_t& ntplsch ) status_t tpl_close ( cpsm_tplh_t* ntplh, cpsm_conhdl_t** conn_hdl, - QueryStats& stats ) + QueryStats& stats, + bool clear_scan_ctx) { cpsm_conhdl_t* hndl = *conn_hdl; #if IDB_SM_DEBUG - SMDEBUGLOG << "tpl_close: " << hndl; + SMDEBUGLOG << "tpl_close: hndl" << hndl << " ntplh " << ntplh; if (ntplh) SMDEBUGLOG << " tableid: " << ntplh->tableid; @@ -386,7 +389,16 @@ tpl_close ( cpsm_tplh_t* ntplh, ByteStream::quadbyte qb = 3; bs << qb; hndl->write(bs); + + // MCOL-1601 Dispose of unused empty RowGroup + if (clear_scan_ctx) + { + bs = hndl->exeMgr->read(); + } +#if IDB_SM_DEBUG + SMDEBUGLOG << "tpl_close hndl->exeMgr: " << hndl->exeMgr << endl; +#endif //keep reading until we get a string //TODO: really need to fix this! Why is ExeMgr sending other stuff? for (int tries = 0; tries < 10; tries++) @@ -415,6 +427,9 @@ tpl_close ( cpsm_tplh_t* ntplh, { // querystats messed up. close connection. // no need to throw for querystats protocol error, like for tablemode. +#if IDB_SM_DEBUG + SMDEBUGLOG << "tpl_close() exception whilst getting stats" << endl; +#endif end_query(hndl); sm_cleanup(hndl); *conn_hdl = 0; @@ -436,9 +451,9 @@ sm_init ( uint32_t sid, { // clear file content #if IDB_SM_DEBUG - smlog.close(); - smlog.open("/tmp/sm.log"); - SMDEBUGLOG << "sm_init: " << dboptions << endl; + //smlog.close(); + //smlog.open("/tmp/sm.log"); + SMDEBUGLOG << "sm_init: " << endl; #endif // @bug5660 Connection changes related to the local pm setting @@ -474,7 +489,6 @@ sm_cleanup ( cpsm_conhdl_t* conn_hdl ) { #if IDB_SM_DEBUG SMDEBUGLOG << "sm_cleanup: " << conn_hdl << endl; - SMDEBUGLOG.close(); #endif delete conn_hdl; diff --git a/dbcon/mysql/sm.h b/dbcon/mysql/sm.h index a2c8defaa..65cf35123 100644 --- a/dbcon/mysql/sm.h +++ b/dbcon/mysql/sm.h @@ -60,12 +60,12 @@ const int SQL_NOT_FOUND = -1000; const int SQL_KILLED = -1001; const int CALPONT_INTERNAL_ERROR = -1007; -#if IDB_SM_DEBUG -extern std::ofstream smlog; -#define SMDEBUGLOG smlog -#else -#define SMDEBUGLOG if (false) std::cerr -#endif +//#if IDB_SM_DEBUG +//extern std::ofstream smlog; +//#define SMDEBUGLOG smlog +//#else +#define SMDEBUGLOG if (true) std::cerr +//#endif extern const std::string DEFAULT_SAVE_PATH; typedef uint64_t tableid_t; @@ -282,7 +282,7 @@ extern status_t tpl_open(tableid_t, cpsm_tplh_t*, cpsm_conhdl_t*); extern status_t tpl_scan_open(tableid_t, sp_cpsm_tplsch_t&, cpsm_conhdl_t*); extern status_t tpl_scan_fetch(sp_cpsm_tplsch_t&, cpsm_conhdl_t*, int* k = 0); extern status_t tpl_scan_close(sp_cpsm_tplsch_t&); -extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats); +extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats, bool clear_scan_ctx = false); }