1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

Merge pull request #565 from drrtuy/MCOL-1601

MCOL-1601 GROUP BY now supports subqueries in HAVING.
This commit is contained in:
Andrew Hutchings
2018-09-18 13:57:17 +01:00
committed by GitHub
5 changed files with 103 additions and 127 deletions

View File

@@ -9704,7 +9704,6 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
{
gwi.fatalParseError = false;
execplan::CalpontSelectExecutionPlan::ReturnedColumnList::iterator iter = gwi.returnedCols.begin();
AggregateColumn* ac = NULL;
for ( ; iter != gwi.returnedCols.end(); iter++ )
{

View File

@@ -276,104 +276,6 @@ void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType&
}
}
void storeNumericFieldGroupBy(Field** f, int64_t value, CalpontSystemCatalog::ColType& ct)
{
// unset null bit first
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
// For unsigned, use the ColType returned in the row rather than the
// unsigned_flag set by mysql. This is because mysql gets it wrong for SUM()
// Hopefully, in all other cases we get it right.
switch ((*f)->type())
{
case MYSQL_TYPE_NEWDECIMAL:
{
Field_new_decimal* f2 = (Field_new_decimal*)*f;
// @bug4388 stick to InfiniDB's scale in case mysql gives wrong scale due
// to create vtable limitation.
if (f2->dec < ct.scale)
f2->dec = ct.scale;
char buf[256];
dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, buf, 256, ct.colDataType);
f2->store(buf, strlen(buf), f2->charset());
break;
}
case MYSQL_TYPE_TINY: //TINYINT type
{
Field_tiny* f2 = (Field_tiny*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
case MYSQL_TYPE_SHORT: //SMALLINT type
{
Field_short* f2 = (Field_short*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
case MYSQL_TYPE_LONG: //INT type
{
Field_long* f2 = (Field_long*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
case MYSQL_TYPE_LONGLONG: //BIGINT type
{
Field_longlong* f2 = (Field_longlong*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
case MYSQL_TYPE_FLOAT: // FLOAT type
{
Field_float* f2 = (Field_float*)*f;
float float_val = *(float*)(&value);
f2->store(float_val);
break;
}
case MYSQL_TYPE_DOUBLE: // DOUBLE type
{
Field_double* f2 = (Field_double*)*f;
double double_val = *(double*)(&value);
f2->store(double_val);
break;
}
case MYSQL_TYPE_VARCHAR:
{
Field_varstring* f2 = (Field_varstring*)*f;
char tmp[25];
if (ct.colDataType == CalpontSystemCatalog::DECIMAL)
dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, tmp, 25, ct.colDataType);
else
snprintf(tmp, 25, "%ld", value);
f2->store(tmp, strlen(tmp), f2->charset());
break;
}
default:
{
Field_longlong* f2 = (Field_longlong*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
}
}
//
// @bug 2244. Log exception related to lost connection to ExeMgr.
// Log exception error from calls to sm::tpl_scan_fetch in fetchNextRow()
@@ -5045,6 +4947,7 @@ int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type)
{
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, infinidb_autoswitch_warning.c_str());
}
ci->queryState = 0;
}
else // vtable mode
{
@@ -5212,10 +5115,13 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
ci->warningMsg = msg;
}
// if the previous query has error, re-establish the connection
// If the previous query has error and
// this is not a subquery run by the server(MCOL-1601)
// re-establish the connection
if (ci->queryState != 0)
{
sm::sm_cleanup(ci->cal_conn_hndl);
if( ci->cal_conn_hndl_st.size() == 0 )
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
@@ -5237,6 +5143,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
hndl = ci->cal_conn_hndl;
ci->cal_conn_hndl_st.push(ci->cal_conn_hndl);
if (!csep)
csep.reset(new CalpontSelectExecutionPlan());
@@ -5308,8 +5215,12 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
return 0;
string query;
query.assign(thd->infinidb_vtable.original_query.ptr(),
thd->infinidb_vtable.original_query.length());
// Set the query text only once if the server executes
// subqueries separately.
if(ci->queryState)
query.assign("<subquery of the previous>");
else
query.assign(thd->query_string.str(), thd->query_string.length());
csep->data(query);
try
@@ -5439,11 +5350,15 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
idbassert(hndl != 0);
hndl->csc = csc;
// The next section is useless
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
ti.conn_hndl = hndl;
else
{
ci->cal_conn_hndl = hndl;
ci->cal_conn_hndl_st.pop();
ci->cal_conn_hndl_st.push(ci->cal_conn_hndl);
}
try
{
hndl->connect();
@@ -5476,11 +5391,11 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
(thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) ||
(thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_QUERY))
{
if (ti.tpl_ctx == 0)
{
ti.tpl_ctx = new sm::cpsm_tplh_t();
ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
}
// MCOL-1601 Using stacks of ExeMgr conn hndls, table and scan contexts.
ti.tpl_ctx = new sm::cpsm_tplh_t();
ti.tpl_ctx_st.push(ti.tpl_ctx);
ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
ti.tpl_scan_ctx_st.push(ti.tpl_scan_ctx);
// make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
// call rnd_init for a table more than once.
@@ -5560,6 +5475,7 @@ error:
if (ci->cal_conn_hndl)
{
// end_query() should be called here.
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
@@ -5571,6 +5487,7 @@ internal_error:
if (ci->cal_conn_hndl)
{
// end_query() should be called here.
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
@@ -5802,6 +5719,12 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
ci->cal_conn_hndl = 0;
// clear querystats because no query stats available for cancelled query
ci->queryStats = "";
if ( ci->cal_conn_hndl_st.size() )
{
ci->cal_conn_hndl_st.pop();
if ( ci->cal_conn_hndl_st.size() )
ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
}
}
return 0;
@@ -5811,6 +5734,7 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
cal_table_info ti = ci->tableMap[table];
sm::cpsm_conhdl_t* hndl;
bool clearScanCtx = false;
hndl = ci->cal_conn_hndl;
@@ -5818,6 +5742,8 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
{
if (ti.tpl_scan_ctx.get())
{
clearScanCtx = ( (ti.tpl_scan_ctx.get()->rowsreturned) &&
ti.tpl_scan_ctx.get()->rowsreturned == ti.tpl_scan_ctx.get()->getRowCount() );
try
{
sm::tpl_scan_close(ti.tpl_scan_ctx);
@@ -5829,11 +5755,31 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
}
ti.tpl_scan_ctx.reset();
if ( ti.tpl_scan_ctx_st.size() )
{
ti.tpl_scan_ctx_st.pop();
if ( ti.tpl_scan_ctx_st.size() )
ti.tpl_scan_ctx = ti.tpl_scan_ctx_st.top();
}
try
{
if(hndl)
sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats);
{
sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats, clearScanCtx);
// Normaly stats variables are set in external_lock method but we set it here
// since they we pretend we are in vtable_disabled mode and the stats vars won't be set.
// We sum the stats up here since server could run a number of
// queries e.g. each for a subquery in a filter.
if(hndl)
{
if (hndl->queryStats.length())
ci->queryStats += hndl->queryStats;
if (hndl->extendedStats.length())
ci->extendedStats += hndl->extendedStats;
if (hndl->miniStats.length())
ci->miniStats += hndl->miniStats;
}
}
ci->cal_conn_hndl = hndl;
@@ -5866,6 +5812,20 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
ti.tpl_ctx = 0;
if ( ti.tpl_ctx_st.size() )
{
ti.tpl_ctx_st.pop();
if ( ti.tpl_ctx_st.size() )
ti.tpl_ctx = ti.tpl_ctx_st.top();
}
if ( ci->cal_conn_hndl_st.size() )
{
ci->cal_conn_hndl_st.pop();
if ( ci->cal_conn_hndl_st.size() )
ci->cal_conn_hndl = ci->cal_conn_hndl_st.top();
}
ci->tableMap[table] = ti;
// push warnings from CREATE phase

View File

@@ -187,7 +187,9 @@ struct cal_table_info
{ }
~cal_table_info() {}
sm::cpsm_tplh_t* tpl_ctx;
std::stack<sm::cpsm_tplh_t*> tpl_ctx_st;
sm::sp_cpsm_tplsch_t tpl_scan_ctx;
std::stack<sm::sp_cpsm_tplsch_t> tpl_scan_ctx_st;
unsigned c; // for debug purpose
TABLE* msTablePtr; // no ownership
sm::cpsm_conhdl_t* conn_hndl;
@@ -273,6 +275,7 @@ struct cal_connection_info
}
sm::cpsm_conhdl_t* cal_conn_hndl;
std::stack<sm::cpsm_conhdl_t*> cal_conn_hndl_st;
int queryState;
CalTableMap tableMap;
sm::tableid_t currentTable;

View File

@@ -280,7 +280,7 @@ tpl_open ( tableid_t tableid,
cpsm_tplh_t* ntplh,
cpsm_conhdl_t* conn_hdl)
{
SMDEBUGLOG << "tpl_open: " << conn_hdl << " tableid: " << tableid << endl;
SMDEBUGLOG << "tpl_open: ntplh: " << ntplh << " conn_hdl: " << conn_hdl << " tableid: " << tableid << endl;
// if first time enter this function for a statement, set
// queryState to QUERY_IN_PRCOESS and get execution plan.
@@ -319,7 +319,9 @@ tpl_scan_open ( tableid_t tableid,
sp_cpsm_tplsch_t& ntplsch,
cpsm_conhdl_t* conn_hdl )
{
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_scan_open: " << conn_hdl << " tableid: " << tableid << endl;
#endif
// @bug 649. No initialization here. take passed in reference
ntplsch->tableid = tableid;
@@ -354,8 +356,8 @@ tpl_scan_close ( sp_cpsm_tplsch_t& ntplsch )
SMDEBUGLOG << "tpl_scan_close: ";
if (ntplsch)
SMDEBUGLOG << " tableid: " << ntplsch->tableid << endl;
SMDEBUGLOG << "tpl_scan_close: ntplsch " << ntplsch;
SMDEBUGLOG << "tpl_scan_close: tableid: " << ntplsch->tableid << endl;
#endif
ntplsch.reset();
@@ -365,11 +367,12 @@ tpl_scan_close ( sp_cpsm_tplsch_t& ntplsch )
status_t
tpl_close ( cpsm_tplh_t* ntplh,
cpsm_conhdl_t** conn_hdl,
QueryStats& stats )
QueryStats& stats,
bool clear_scan_ctx)
{
cpsm_conhdl_t* hndl = *conn_hdl;
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_close: " << hndl;
SMDEBUGLOG << "tpl_close: hndl" << hndl << " ntplh " << ntplh;
if (ntplh)
SMDEBUGLOG << " tableid: " << ntplh->tableid;
@@ -386,7 +389,16 @@ tpl_close ( cpsm_tplh_t* ntplh,
ByteStream::quadbyte qb = 3;
bs << qb;
hndl->write(bs);
// MCOL-1601 Dispose of unused empty RowGroup
if (clear_scan_ctx)
{
bs = hndl->exeMgr->read();
}
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_close hndl->exeMgr: " << hndl->exeMgr << endl;
#endif
//keep reading until we get a string
//TODO: really need to fix this! Why is ExeMgr sending other stuff?
for (int tries = 0; tries < 10; tries++)
@@ -415,6 +427,9 @@ tpl_close ( cpsm_tplh_t* ntplh,
{
// querystats messed up. close connection.
// no need to throw for querystats protocol error, like for tablemode.
#if IDB_SM_DEBUG
SMDEBUGLOG << "tpl_close() exception whilst getting stats" << endl;
#endif
end_query(hndl);
sm_cleanup(hndl);
*conn_hdl = 0;
@@ -436,9 +451,9 @@ sm_init ( uint32_t sid,
{
// clear file content
#if IDB_SM_DEBUG
smlog.close();
smlog.open("/tmp/sm.log");
SMDEBUGLOG << "sm_init: " << dboptions << endl;
//smlog.close();
//smlog.open("/tmp/sm.log");
SMDEBUGLOG << "sm_init: " << endl;
#endif
// @bug5660 Connection changes related to the local pm setting
@@ -474,7 +489,6 @@ sm_cleanup ( cpsm_conhdl_t* conn_hdl )
{
#if IDB_SM_DEBUG
SMDEBUGLOG << "sm_cleanup: " << conn_hdl << endl;
SMDEBUGLOG.close();
#endif
delete conn_hdl;

View File

@@ -60,12 +60,12 @@ const int SQL_NOT_FOUND = -1000;
const int SQL_KILLED = -1001;
const int CALPONT_INTERNAL_ERROR = -1007;
#if IDB_SM_DEBUG
extern std::ofstream smlog;
#define SMDEBUGLOG smlog
#else
#define SMDEBUGLOG if (false) std::cerr
#endif
//#if IDB_SM_DEBUG
//extern std::ofstream smlog;
//#define SMDEBUGLOG smlog
//#else
#define SMDEBUGLOG if (true) std::cerr
//#endif
extern const std::string DEFAULT_SAVE_PATH;
typedef uint64_t tableid_t;
@@ -282,7 +282,7 @@ extern status_t tpl_open(tableid_t, cpsm_tplh_t*, cpsm_conhdl_t*);
extern status_t tpl_scan_open(tableid_t, sp_cpsm_tplsch_t&, cpsm_conhdl_t*);
extern status_t tpl_scan_fetch(sp_cpsm_tplsch_t&, cpsm_conhdl_t*, int* k = 0);
extern status_t tpl_scan_close(sp_cpsm_tplsch_t&);
extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats);
extern status_t tpl_close(cpsm_tplh_t*, cpsm_conhdl_t**, querystats::QueryStats& stats, bool clear_scan_ctx = false);
}