1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-20 01:42:27 +03:00

Merge pull request #462 from drrtuy/MCOL-1052

MCOL-1052 GROUP BY pushdown handler
This commit is contained in:
Andrew Hutchings
2018-05-05 06:29:30 +01:00
committed by GitHub
13 changed files with 3248 additions and 161 deletions

View File

@@ -756,8 +756,8 @@ int doFromSubquery(CalpontExecutionPlan* ep, const string& alias, const string&
void addOrderByAndLimit(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
{
// make sure there is a LIMIT
if (csep->orderByCols().size() > 0 && csep->limitNum() == (uint64_t) - 1)
return;
// if (csep->orderByCols().size() > 0 csep->limitNum() == (uint64_t) - 1)
// return;
jobInfo.limitStart = csep->limitStart();
jobInfo.limitCount = csep->limitNum();

View File

@@ -483,28 +483,28 @@ void adjustLastStep(JobStepVector& querySteps, DeliveredTableMap& deliverySteps,
deliverySteps[CNX_VTABLE_ID] = ws;
}
if ((jobInfo.limitCount != (uint64_t) - 1) ||
(jobInfo.constantCol == CONST_COL_EXIST) ||
(jobInfo.hasDistinct))
{
// if ((jobInfo.limitCount != (uint64_t) - 1) ||
// (jobInfo.constantCol == CONST_COL_EXIST) ||
// (jobInfo.hasDistinct))
// {
if (jobInfo.annexStep.get() == NULL)
jobInfo.annexStep.reset(new TupleAnnexStep(jobInfo));
TupleAnnexStep* tas = dynamic_cast<TupleAnnexStep*>(jobInfo.annexStep.get());
tas->setLimit(jobInfo.limitStart, jobInfo.limitCount);
if (jobInfo.limitCount != (uint64_t) - 1)
{
// if (jobInfo.limitCount != (uint64_t) - 1)
// {
if (jobInfo.orderByColVec.size() > 0)
tas->addOrderBy(new LimitedOrderBy());
}
// }
if (jobInfo.constantCol == CONST_COL_EXIST)
tas->addConstant(new TupleConstantStep(jobInfo));
if (jobInfo.hasDistinct)
tas->setDistinct();
}
// }
if (jobInfo.annexStep)
{

View File

@@ -1622,7 +1622,7 @@ void makeVtableModeSteps(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo,
JobStepVector& querySteps, JobStepVector& projectSteps, DeliveredTableMap& deliverySteps)
{
// @bug4848, enhance and unify limit handling.
if (csep->limitNum() != (uint64_t) - 1)
// if (csep->limitNum() != (uint64_t) - 1)
{
// special case for outer query order by limit -- return all
if (jobInfo.subId == 0 && csep->hasOrderBy())

View File

@@ -76,7 +76,13 @@ void LimitedOrderBy::initialize(const RowGroup& rg, const JobInfo& jobInfo)
{
map<uint32_t, uint32_t>::iterator j = keyToIndexMap.find(i->first);
idbassert(j != keyToIndexMap.end());
fOrderByCond.push_back(IdbSortSpec(j->second, i->second));
// MCOL-1052 Ordering direction in CSEP differs from
// internal direction representation.
if (i->second)
fOrderByCond.push_back(IdbSortSpec(j->second, false));
else
fOrderByCond.push_back(IdbSortSpec(j->second, true));
//fOrderByCond.push_back(IdbSortSpec(j->second, i->second));
}
// limit row count info
@@ -174,7 +180,9 @@ void LimitedOrderBy::finalize()
if (fRowGroup.getRowCount() > 0)
fDataQueue.push(fData);
if (fStart != 0)
// MCOL-1052 The removed check effectivly disables sorting to happen,
// since fStart = 0;
if (true)
{
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
fMemSize += newSize;

View File

@@ -123,6 +123,9 @@ static int calpont_rollback(handlerton* hton, THD* thd, bool all);
static int calpont_close_connection ( handlerton* hton, THD* thd );
handlerton* calpont_hton;
static group_by_handler *
create_calpont_group_by_handler(THD *thd, Query *query);
/* Variables for example share methods */
/*
@@ -218,6 +221,7 @@ static int columnstore_init_func(void* p)
calpont_hton->commit = calpont_commit;
calpont_hton->rollback = calpont_rollback;
calpont_hton->close_connection = calpont_close_connection;
calpont_hton->create_group_by = create_calpont_group_by_handler;
DBUG_RETURN(0);
}
@@ -1135,6 +1139,86 @@ static MYSQL_SYSVAR_ULONG(
0);
#endif
/*@brief create_calpont_group_by_handler- Creates handler*/
/***********************************************************
* DESCRIPTION:
* Creates a group_by pushdown handler.
* Details are in server/sql/group_by_handler.h
* PARAMETERS:
* thd - THD pointer.
* query - Query structure, that describes the pushdowned query.
* RETURN:
* group_by_handler if success
* NULL in other case
***********************************************************/
static group_by_handler *
create_calpont_group_by_handler(THD *thd, Query *query)
{
ha_calpont_group_by_handler *handler = NULL;
if ( thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE )
{
handler = new ha_calpont_group_by_handler(thd, query);
// Notify the server, that CS handles GROUP BY, ORDER BY and HAVING clauses.
query->group_by = NULL;
query->order_by = NULL;
query->having = NULL;
}
return handler;
}
/***********************************************************
* DESCRIPTION:
* Makes the plan and prepares the data
* RETURN:
* int rc
***********************************************************/
int ha_calpont_group_by_handler::init_scan()
{
DBUG_ENTER("ha_calpont_group_by_handler::init_scan");
// Save vtable_state to restore the after we inited.
THD::infinidb_state oldState = thd->infinidb_vtable.vtable_state;
// MCOL-1052 Should be removed after cleaning the code up.
thd->infinidb_vtable.vtable_state = THD::INFINIDB_CREATE_VTABLE;
int rc = ha_calpont_impl_group_by_init(this, table);
thd->infinidb_vtable.vtable_state = oldState;
DBUG_RETURN(rc);
}
/***********************************************************
* DESCRIPTION:
* Fetches a row and saves it to a temporary table.
* RETURN:
* int rc
***********************************************************/
int ha_calpont_group_by_handler::next_row()
{
DBUG_ENTER("ha_calpont_group_by_handler::next_row");
int rc = ha_calpont_impl_group_by_next(this, table);
DBUG_RETURN(rc);
}
/***********************************************************
* DESCRIPTION:
* Shuts the scan down.
* RETURN:
* int rc
***********************************************************/
int ha_calpont_group_by_handler::end_scan()
{
DBUG_ENTER("ha_calpont_group_by_handler::end_scan");
int rc = ha_calpont_impl_group_by_end(this, table);
DBUG_RETURN(rc);
}
static struct st_mysql_sys_var* calpont_system_variables[] =
{
// MYSQL_SYSVAR(enum_var),

View File

@@ -40,6 +40,8 @@
#include <my_config.h>
#include "idb_mysql.h"
extern handlerton* calpont_hton;
/** @brief
EXAMPLE_SHARE is a structure that will be shared among all open handlers.
This example implements the minimum of what you will probably need.
@@ -245,5 +247,56 @@ public:
}
};
/*@brief group_by_handler class*/
/***********************************************************
* DESCRIPTION:
* Provides server with group_by_handler API methods.
* One should read comments in server/sql/group_by_handler.h
* Attributes:
* select - attribute contains all GROUP BY, HAVING, ORDER items and calls it
* an extended SELECT list accordin to comments in
* server/sql/group_handler.cc.
* So the temporary table for
* select count(*) from b group by a having a > 3 order by a
* will have 4 columns not 1.
* However server ignores all NULLs used in GROUP BY, HAVING, ORDER.
* table_list - contains all tables involved. Must be CS tables only.
* distinct - looks like a useless thing for now. Couldn't get it set by server.
* where - where items.
* group_by - group by ORDER items.
* order_by - order by ORDER items.
* having - having Item.
* Methods:
* init_scan - get plan and send it to ExeMgr. Get the execution result.
* next_row - get a row back from sm.
* end_scan - finish and clean the things up.
***********************************************************/
class ha_calpont_group_by_handler: public group_by_handler
{
public:
ha_calpont_group_by_handler(THD *thd_arg, Query *query)
: group_by_handler(thd_arg, calpont_hton),
select(query->select),
table_list(query->from),
distinct(query->distinct),
where(query->where),
group_by(query->group_by),
order_by(query->order_by),
having(query->having)
{ }
~ha_calpont_group_by_handler() { }
int init_scan();
int next_row();
int end_scan();
List<Item> *select;
TABLE_LIST *table_list;
bool distinct;
Item *where;
ORDER *group_by;
ORDER *order_by;
Item *having;
};
#endif //HA_CALPONT_H__

File diff suppressed because it is too large Load Diff

View File

@@ -171,6 +171,7 @@ const unsigned NONSUPPORTED_ERR_THRESH = 2000;
vector<RMParam> rmParms;
ResourceManager* rm = ResourceManager::instance();
bool useHdfs = rm->useHdfs();
//convenience fcn
inline uint32_t tid2sid(const uint32_t tid)
{
@@ -275,6 +276,104 @@ void storeNumericField(Field** f, int64_t value, CalpontSystemCatalog::ColType&
}
}
void storeNumericFieldGroupBy(Field** f, int64_t value, CalpontSystemCatalog::ColType& ct)
{
// unset null bit first
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
// For unsigned, use the ColType returned in the row rather than the
// unsigned_flag set by mysql. This is because mysql gets it wrong for SUM()
// Hopefully, in all other cases we get it right.
switch ((*f)->type())
{
case MYSQL_TYPE_NEWDECIMAL:
{
Field_new_decimal* f2 = (Field_new_decimal*)*f;
// @bug4388 stick to InfiniDB's scale in case mysql gives wrong scale due
// to create vtable limitation.
if (f2->dec < ct.scale)
f2->dec = ct.scale;
char buf[256];
dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, buf, 256, ct.colDataType);
f2->store(buf, strlen(buf), f2->charset());
break;
}
case MYSQL_TYPE_TINY: //TINYINT type
{
Field_tiny* f2 = (Field_tiny*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
case MYSQL_TYPE_SHORT: //SMALLINT type
{
Field_short* f2 = (Field_short*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
case MYSQL_TYPE_LONG: //INT type
{
Field_long* f2 = (Field_long*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
case MYSQL_TYPE_LONGLONG: //BIGINT type
{
Field_longlong* f2 = (Field_longlong*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
case MYSQL_TYPE_FLOAT: // FLOAT type
{
Field_float* f2 = (Field_float*)*f;
float float_val = *(float*)(&value);
f2->store(float_val);
break;
}
case MYSQL_TYPE_DOUBLE: // DOUBLE type
{
Field_double* f2 = (Field_double*)*f;
double double_val = *(double*)(&value);
f2->store(double_val);
break;
}
case MYSQL_TYPE_VARCHAR:
{
Field_varstring* f2 = (Field_varstring*)*f;
char tmp[25];
if (ct.colDataType == CalpontSystemCatalog::DECIMAL)
dataconvert::DataConvert::decimalToString(value, (unsigned)ct.scale, tmp, 25, ct.colDataType);
else
snprintf(tmp, 25, "%ld", value);
f2->store(tmp, strlen(tmp), f2->charset());
break;
}
default:
{
Field_longlong* f2 = (Field_longlong*)*f;
longlong int_val = (longlong)value;
f2->store(int_val, f2->unsigned_flag);
break;
}
}
}
//
// @bug 2244. Log exception related to lost connection to ExeMgr.
// Log exception error from calls to sm::tpl_scan_fetch in fetchNextRow()
@@ -328,7 +427,7 @@ int vbin2hex(const uint8_t* p, const unsigned l, char* o)
return 0;
}
int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci)
int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool handler_flag=false)
{
int rc = HA_ERR_END_OF_FILE;
int num_attr = ti.msTablePtr->s->fields;
@@ -371,7 +470,12 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci)
Field** f;
f = ti.msTablePtr->field;
//set all fields to null in null col bitmap
if (!handler_flag)
memset(buf, -1, ti.msTablePtr->s->null_bytes);
else
{
memset(ti.msTablePtr->null_flags, -1, ti.msTablePtr->s->null_bytes);
}
std::vector<CalpontSystemCatalog::ColType>& colTypes = ti.tpl_scan_ctx->ctp;
int64_t intColVal = 0;
uint64_t uintColVal = 0;
@@ -4970,5 +5074,801 @@ int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos)
return ER_INTERNAL_ERROR;
}
// vim:sw=4 ts=4:
/*@brief ha_calpont_impl_group_by_init - Get data for MariaDB group_by
pushdown handler */
/***********************************************************
* DESCRIPTION:
* Prepares data for group_by_handler::next_row() calls.
* PARAMETERS:
* group_hand - group by handler, that preserves initial table and items lists. .
* table - TABLE pointer The table to save the result set into.
* RETURN:
* 0 if success
* others if something went wrong whilst getting the result set
***********************************************************/
int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table)
{
string tableName = group_hand->table_list->table->s->table_name.str;
IDEBUG( cout << "group_by_init for table " << tableName << endl );
THD* thd = current_thd;
//check whether the system is ready to process statement.
#ifndef _MSC_VER
static DBRM dbrm(true);
bool bSystemQueryReady = dbrm.getSystemQueryReady();
if (bSystemQueryReady == 0)
{
// Still not ready
setError(thd, ER_INTERNAL_ERROR, "The system is not yet ready to accept queries");
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
return ER_INTERNAL_ERROR;
}
else if (bSystemQueryReady < 0)
{
// Still not ready
setError(thd, ER_INTERNAL_ERROR, "DBRM is not responding. Cannot accept queries");
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
return ER_INTERNAL_ERROR;
}
#endif
// prevent "create table as select" from running on slave
thd->infinidb_vtable.hasInfiniDBTable = true;
// return error if error status has been already set
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR)
return ER_INTERNAL_ERROR;
// MCOL-1052
// by pass the extra union trips. return 0
//if (thd->infinidb_vtable.isUnion && thd->infinidb_vtable.vtable_state == THD::INFINIDB_CREATE_VTABLE)
// return 0;
// @bug 2232. Basic SP support. Error out non support sp cases.
// @bug 3939. Only error out for sp with select. Let pass for alter table in sp.
if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE)
{
setError(thd, ER_CHECK_NOT_IMPLEMENTED, "This stored procedure syntax is not supported by Columnstore in this version");
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
return ER_INTERNAL_ERROR;
}
uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(CalpontSystemCatalog::FE);
if (!thd->infinidb_vtable.cal_conn_info)
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
idbassert(ci != 0);
// MySQL sometimes calls rnd_init multiple times, plan should only be
// generated and sent once.
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE &&
!thd->infinidb_vtable.isNewQuery)
return 0;
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
if (ci->cal_conn_hndl)
{
// send ExeMgr a signal before closing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// canceling query. ignore connection failure.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return 0;
}
sm::tableid_t tableid = 0;
cal_table_info ti;
cal_group_info gi;
sm::cpsm_conhdl_t* hndl;
SCSEP csep;
bool localQuery = (thd->variables.infinidb_local_query > 0 ? true : false);
{
ci->stats.reset(); // reset query stats
ci->stats.setStartTime();
ci->stats.fUser = thd->main_security_ctx.user;
if (thd->main_security_ctx.host)
ci->stats.fHost = thd->main_security_ctx.host;
else if (thd->main_security_ctx.host_or_ip)
ci->stats.fHost = thd->main_security_ctx.host_or_ip;
else
ci->stats.fHost = "unknown";
try
{
ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser);
}
catch (std::exception& e)
{
string msg = string("Columnstore User Priority - ") + e.what();
ci->warningMsg = msg;
}
// if the previous query has error, re-establish the connection
if (ci->queryState != 0)
{
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
sm::sm_init(sessionID, &ci->cal_conn_hndl, localQuery);
idbassert(ci->cal_conn_hndl != 0);
ci->cal_conn_hndl->csc = csc;
idbassert(ci->cal_conn_hndl->exeMgr != 0);
try
{
ci->cal_conn_hndl->connect();
}
catch (...)
{
setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto error;
}
hndl = ci->cal_conn_hndl;
if (!csep)
csep.reset(new CalpontSelectExecutionPlan());
SessionManager sm;
BRM::TxnID txnID;
txnID = sm.getTxnID(sessionID);
if (!txnID.valid)
{
txnID.id = 0;
txnID.valid = true;
}
QueryContext verID;
verID = sm.verID();
csep->txnID(txnID.id);
csep->verID(verID);
csep->sessionID(sessionID);
if (group_hand->table_list->db_length)
csep->schemaName(group_hand->table_list->db);
csep->traceFlags(ci->traceFlags);
// MCOL-1052 Send Items lists down to the optimizer.
gi.groupByTables = group_hand->table_list;
gi.groupByFields = group_hand->select;
gi.groupByWhere = group_hand->where;
gi.groupByGroup = group_hand->group_by;
gi.groupByOrder = group_hand->order_by;
gi.groupByHaving = group_hand->having;
gi.groupByDistinct = group_hand->distinct;
// MCOL-1052 Send pushed conditions here, since server could omit GROUP BY
// items in case of = or IN functions used on GROUP BY columns.
{
CalTableMap::iterator mapiter;
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator colMapIter;
execplan::CalpontSelectExecutionPlan::ColumnMap::iterator condColMapIter;
execplan::ParseTree* ptIt;
execplan::ReturnedColumn* rcIt;
for(TABLE_LIST* tl = gi.groupByTables; tl; tl=tl->next_local)
{
mapiter = ci->tableMap.find(tl->table);
if(mapiter != ci->tableMap.end() && mapiter->second.condInfo != NULL
&& mapiter->second.condInfo->condPush)
{
while(!mapiter->second.condInfo->ptWorkStack.empty())
{
ptIt=mapiter->second.condInfo->ptWorkStack.top();
mapiter->second.condInfo->ptWorkStack.pop();
gi.pushedPts.push_back(ptIt);
}
}
}
}
// send plan whenever group_init is called
int status = cp_get_group_plan(thd, csep, gi);
if (status > 0)
goto internal_error;
else if (status < 0)
return 0;
// @bug 2547. don't need to send the plan if it's impossible where for all unions.
if (thd->infinidb_vtable.impossibleWhereOnUnion)
return 0;
string query;
query.assign(thd->infinidb_vtable.original_query.ptr(),
thd->infinidb_vtable.original_query.length());
csep->data(query);
try
{
csep->priority( ci->stats.userPriority(ci->stats.fHost, ci->stats.fUser));
}
catch (std::exception& e)
{
string msg = string("Columnstore User Priority - ") + e.what();
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
}
#ifdef PLAN_HEX_FILE
// plan serialization
ifstream ifs("/tmp/li1-plan.hex");
ByteStream bs1;
ifs >> bs1;
ifs.close();
csep->unserialize(bs1);
#endif
if (ci->traceFlags & 1)
{
cerr << "---------------- EXECUTION PLAN ----------------" << endl;
cerr << *csep << endl ;
cerr << "-------------- EXECUTION PLAN END --------------\n" << endl;
}
else
{
IDEBUG( cout << "---------------- EXECUTION PLAN ----------------" << endl );
IDEBUG( cerr << *csep << endl );
IDEBUG( cout << "-------------- EXECUTION PLAN END --------------\n" << endl );
}
}// end of execution plan generation
{
ByteStream msg;
ByteStream emsgBs;
while (true)
{
try
{
ByteStream::quadbyte qb = 4;
msg << qb;
hndl->exeMgr->write(msg);
msg.restart();
csep->rmParms(rmParms);
//send plan
csep->serialize(msg);
hndl->exeMgr->write(msg);
//get ExeMgr status back to indicate a vtable joblist success or not
msg.restart();
emsgBs.restart();
msg = hndl->exeMgr->read();
emsgBs = hndl->exeMgr->read();
string emsg;
if (msg.length() == 0 || emsgBs.length() == 0)
{
emsg = "Lost connection to ExeMgr. Please contact your administrator";
setError(thd, ER_INTERNAL_ERROR, emsg);
return ER_INTERNAL_ERROR;
}
string emsgStr;
emsgBs >> emsgStr;
bool err = false;
if (msg.length() == 4)
{
msg >> qb;
if (qb != 0)
{
err = true;
// for makejoblist error, stats contains only error code and insert from here
// because table fetch is not started
ci->stats.setEndTime();
ci->stats.fQuery = csep->data();
ci->stats.fQueryType = csep->queryType();
ci->stats.fErrorNo = qb;
try
{
ci->stats.insert();
}
catch (std::exception& e)
{
string msg = string("Columnstore Query Stats - ") + e.what();
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
}
}
}
else
{
err = true;
}
if (err)
{
setError(thd, ER_INTERNAL_ERROR, emsgStr);
return ER_INTERNAL_ERROR;
}
rmParms.clear();
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
{
ci->tableMap[table] = ti;
}
else
{
ci->queryState = 1;
}
break;
}
catch (...)
{
sm::sm_cleanup(hndl);
hndl = 0;
sm::sm_init(sessionID, &hndl, localQuery);
idbassert(hndl != 0);
hndl->csc = csc;
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
ti.conn_hndl = hndl;
else
ci->cal_conn_hndl = hndl;
try
{
hndl->connect();
}
catch (...)
{
setError(thd, ER_INTERNAL_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_LOST_CONN_EXEMGR));
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto error;
}
msg.restart();
}
}
}
// set query state to be in_process. Sometimes mysql calls rnd_init multiple
// times, this makes sure plan only being generated and sent once. It will be
// reset when query finishes in sm::end_query
thd->infinidb_vtable.isNewQuery = false;
// common path for both vtable select phase and table mode -- open scan handle
ti = ci->tableMap[table];
ti.msTablePtr = table;
// MCOL-1052
thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE;
if ((thd->infinidb_vtable.vtable_state == THD::INFINIDB_SELECT_VTABLE) ||
(thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE) ||
(thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_QUERY))
{
if (ti.tpl_ctx == 0)
{
ti.tpl_ctx = new sm::cpsm_tplh_t();
ti.tpl_scan_ctx = sm::sp_cpsm_tplsch_t(new sm::cpsm_tplsch_t());
}
// make sure rowgroup is null so the new meta data can be taken. This is for some case mysql
// call rnd_init for a table more than once.
ti.tpl_scan_ctx->rowGroup = NULL;
try
{
tableid = execplan::IDB_VTABLE_ID;
}
catch (...)
{
string emsg = "No table ID found for table " + string(table->s->table_name.str);
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
try
{
sm::tpl_open(tableid, ti.tpl_ctx, hndl);
sm::tpl_scan_open(tableid, ti.tpl_scan_ctx, hndl);
}
catch (std::exception& e)
{
string emsg = "table can not be opened: " + string(e.what());
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
catch (...)
{
string emsg = "table can not be opened";
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
ti.tpl_scan_ctx->traceFlags = ci->traceFlags;
if ((ti.tpl_scan_ctx->ctp).size() == 0)
{
uint32_t num_attr = table->s->fields;
for (uint32_t i = 0; i < num_attr; i++)
{
CalpontSystemCatalog::ColType ctype;
ti.tpl_scan_ctx->ctp.push_back(ctype);
}
// populate coltypes here for table mode because tableband gives treeoid for dictionary column
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE)
{
CalpontSystemCatalog::RIDList oidlist = csc->columnRIDs(make_table(table->s->db.str, table->s->table_name.str), true);
if (oidlist.size() != num_attr)
{
string emsg = "Size mismatch probably caused by front end out of sync";
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionID);
goto internal_error;
}
for (unsigned int j = 0; j < oidlist.size(); j++)
{
CalpontSystemCatalog::ColType ctype = csc->colType(oidlist[j].objnum);
ti.tpl_scan_ctx->ctp[ctype.colPosition] = ctype;
ti.tpl_scan_ctx->ctp[ctype.colPosition].colPosition = -1;
}
}
}
}
ci->tableMap[table] = ti;
return 0;
error:
if (ci->cal_conn_hndl)
{
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
// do we need to close all connection handle of the table map?
return ER_INTERNAL_ERROR;
internal_error:
if (ci->cal_conn_hndl)
{
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return ER_INTERNAL_ERROR;
}
/*@brief ha_calpont_impl_group_by_next - Return result set for MariaDB group_by
pushdown handler
*/
/***********************************************************
* DESCRIPTION:
* Return a result record for each group_by_handler::next_row() call.
* PARAMETERS:
* group_hand - group by handler, that preserves initial table and items lists. .
* table - TABLE pointer The table to save the result set in.
* RETURN:
* 0 if success
* HA_ERR_END_OF_FILE if the record set has come to an end
* others if something went wrong whilst getting the result set
***********************************************************/
int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table)
{
THD* thd = current_thd;
/* If this node is the slave, ignore DML to IDB tables */
if (thd->slave_thread && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ERROR)
return ER_INTERNAL_ERROR;
if (((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) ||
((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
return HA_ERR_END_OF_FILE;
// @bug 2547
if (thd->infinidb_vtable.impossibleWhereOnUnion)
return HA_ERR_END_OF_FILE;
// @bug 2232. Basic SP support
// @bug 3939. Only error out for sp with select. Let pass for alter table in sp.
/*if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE)
{
setError(thd, ER_CHECK_NOT_IMPLEMENTED, "This stored procedure syntax is not supported by Columnstore in this version");
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
return ER_INTERNAL_ERROR;
}
*/
if (!thd->infinidb_vtable.cal_conn_info)
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
// @bug 3078
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
if (ci->cal_conn_hndl)
{
// send ExeMgr a signal before cloing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// cancel query. ignore connection failure.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
}
return 0;
}
if (ci->alterTableState > 0) return HA_ERR_END_OF_FILE;
cal_table_info ti;
ti = ci->tableMap[table];
int rc = HA_ERR_END_OF_FILE;
if (!ti.tpl_ctx || !ti.tpl_scan_ctx)
{
CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
return ER_INTERNAL_ERROR;
}
idbassert(ti.msTablePtr == table);
try
{
// fetchNextRow interface forces to use buf.
unsigned char buf;
rc = fetchNextRow(&buf, ti, ci, true);
}
catch (std::exception& e)
{
string emsg = string("Error while fetching from ExeMgr: ") + e.what();
setError(thd, ER_INTERNAL_ERROR, emsg);
CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
return ER_INTERNAL_ERROR;
}
ci->tableMap[table] = ti;
if (rc != 0 && rc != HA_ERR_END_OF_FILE)
{
string emsg;
// remove this check when all error handling migrated to the new framework.
if (rc >= 1000)
emsg = ti.tpl_scan_ctx->errMsg;
else
{
logging::ErrorCodes errorcodes;
emsg = errorcodes.errorString(rc);
}
setError(thd, ER_INTERNAL_ERROR, emsg);
ci->stats.fErrorNo = rc;
CalpontSystemCatalog::removeCalpontSystemCatalog(tid2sid(thd->thread_id));
rc = ER_INTERNAL_ERROR;
}
return rc;
}
int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table)
{
int rc = 0;
THD* thd = current_thd;
cal_connection_info* ci = NULL;
if (thd->slave_thread && (
thd->lex->sql_command == SQLCOM_INSERT ||
thd->lex->sql_command == SQLCOM_INSERT_SELECT ||
thd->lex->sql_command == SQLCOM_UPDATE ||
thd->lex->sql_command == SQLCOM_UPDATE_MULTI ||
thd->lex->sql_command == SQLCOM_DELETE ||
thd->lex->sql_command == SQLCOM_DELETE_MULTI ||
thd->lex->sql_command == SQLCOM_TRUNCATE ||
thd->lex->sql_command == SQLCOM_LOAD))
return 0;
thd->infinidb_vtable.isNewQuery = true;
thd->infinidb_vtable.isUnion = false;
if (thd->infinidb_vtable.cal_conn_info)
ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
// MCOL-1052
//if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY )
//{
// thd->infinidb_vtable.vtable_state = THD::INFINIDB_SELECT_VTABLE;// flip back to normal state
// return rc;
//}
if (((thd->lex)->sql_command == SQLCOM_INSERT) ||
((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) )
{
// @bug 4022. error handling for select part of dml
if (ci->cal_conn_hndl && ci->rc)
{
// send ExeMgr a signal before closing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// this is error handling, so ignore connection failure.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
return rc;
}
}
if (!ci)
{
thd->infinidb_vtable.cal_conn_info = (void*)(new cal_connection_info());
ci = reinterpret_cast<cal_connection_info*>(thd->infinidb_vtable.cal_conn_info);
}
// @bug 3078. Also session limit variable works the same as ctrl+c
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD ||
((thd->lex)->sql_command != SQLCOM_INSERT &&
(thd->lex)->sql_command != SQLCOM_INSERT_SELECT &&
thd->variables.select_limit != (uint64_t) - 1))
{
if (ci->cal_conn_hndl)
{
// send ExeMgr a signal before closing the connection
ByteStream msg;
ByteStream::quadbyte qb = 0;
msg << qb;
try
{
ci->cal_conn_hndl->exeMgr->write(msg);
}
catch (...)
{
// this is the end of query. Ignore the exception if exemgr connection failed
// for whatever reason.
}
sm::sm_cleanup(ci->cal_conn_hndl);
ci->cal_conn_hndl = 0;
// clear querystats because no query stats available for cancelled query
ci->queryStats = "";
}
return 0;
}
IDEBUG( cerr << "group_by_end for table " << table->s->table_name.str << endl );
cal_table_info ti = ci->tableMap[table];
sm::cpsm_conhdl_t* hndl;
hndl = ci->cal_conn_hndl;
if (ti.tpl_ctx)
{
if (ti.tpl_scan_ctx.get())
{
try
{
sm::tpl_scan_close(ti.tpl_scan_ctx);
}
catch (...)
{
rc = ER_INTERNAL_ERROR;
}
}
ti.tpl_scan_ctx.reset();
try
{
sm::tpl_close(ti.tpl_ctx, &hndl, ci->stats);
ci->cal_conn_hndl = hndl;
ti.tpl_ctx = 0;
}
catch (IDBExcept& e)
{
if (e.errorCode() == ERR_CROSS_ENGINE_CONNECT || e.errorCode() == ERR_CROSS_ENGINE_CONFIG)
{
string msg = string("Columnstore Query Stats - ") + e.what();
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, msg.c_str());
}
else
{
setError(thd, ER_INTERNAL_ERROR, e.what());
rc = ER_INTERNAL_ERROR;
}
}
catch (std::exception& e)
{
setError(thd, ER_INTERNAL_ERROR, e.what());
rc = ER_INTERNAL_ERROR;
}
catch (...)
{
setError(thd, ER_INTERNAL_ERROR, "Internal error throwed in group_by_end");
rc = ER_INTERNAL_ERROR;
}
}
ti.tpl_ctx = 0;
ci->tableMap[table] = ti;
// push warnings from CREATE phase
if (!ci->warningMsg.empty())
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, ci->warningMsg.c_str());
ci->warningMsg.clear();
// reset expressionId just in case
ci->expressionId = 0;
return rc;
}
// vim:sw=4 ts=4:

View File

@@ -48,11 +48,16 @@ extern int ha_calpont_impl_external_lock(THD* thd, TABLE* table, int lock_type);
extern int ha_calpont_impl_update_row();
extern int ha_calpont_impl_delete_row();
extern int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos);
extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table);
#endif
#ifdef NEED_CALPONT_INTERFACE
#include "ha_calpont_impl_if.h"
#include "calpontsystemcatalog.h"
#include "ha_calpont.h"
extern int ha_calpont_impl_rename_table_(const char* from, const char* to, cal_impl_if::cal_connection_info& ci);
extern int ha_calpont_impl_write_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci, ha_rows& rowsInserted);
extern int ha_calpont_impl_write_batch_row_(uchar* buf, TABLE* table, cal_impl_if::cal_connection_info& ci);
@@ -69,6 +74,9 @@ extern std::string ha_calpont_impl_droppartition_ (execplan::CalpontSystemCatal
extern std::string ha_calpont_impl_viewtablelock( cal_impl_if::cal_connection_info& ci, execplan::CalpontSystemCatalog::TableName& tablename);
extern std::string ha_calpont_impl_cleartablelock( cal_impl_if::cal_connection_info& ci, uint64_t tableLockID);
extern int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table);
extern int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE* table);
#endif
#endif

View File

@@ -99,6 +99,7 @@ struct gp_walk_info
execplan::CalpontSelectExecutionPlan::ReturnedColumnList groupByCols;
execplan::CalpontSelectExecutionPlan::ReturnedColumnList subGroupByCols;
execplan::CalpontSelectExecutionPlan::ReturnedColumnList orderByCols;
std::vector <Item*> havingAggColsItems;
execplan::CalpontSelectExecutionPlan::ColumnMap columnMap;
// This vector temporarily hold the projection columns to be added
// to the returnedCols vector for subquery processing. It will be appended
@@ -191,6 +192,28 @@ struct cal_table_info
bool moreRows; //are there more rows to consume (b/c of limit)
};
struct cal_group_info
{
cal_group_info() : groupByFields(0),
groupByTables(0),
groupByWhere(0),
groupByGroup(0),
groupByOrder(0),
groupByHaving(0),
groupByDistinct(false)
{ }
~cal_group_info() { }
List<Item>* groupByFields; // MCOL-1052 SELECT
TABLE_LIST* groupByTables; // MCOL-1052 FROM
Item* groupByWhere; // MCOL-1052 WHERE
ORDER* groupByGroup; // MCOL-1052 GROUP BY
ORDER* groupByOrder; // MCOL-1052 ORDER BY
Item* groupByHaving; // MCOL-1052 HAVING
bool groupByDistinct; //MCOL-1052 DISTINCT
std::vector<execplan::ParseTree*> pushedPts;
};
typedef std::tr1::unordered_map<TABLE*, cal_table_info> CalTableMap;
typedef std::vector<std::string> ColValuesList;
typedef std::vector<std::string> ColNameList;
@@ -297,7 +320,9 @@ const std::string infinidb_err_msg = "\nThe query includes syntax that is not su
int cp_get_plan(THD* thd, execplan::SCSEP& csep);
int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_info& ti);
int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_info& gi);
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false);
int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi, bool isUnion = false);
void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi);
void setError(THD* thd, uint32_t errcode, const std::string errmsg);
void gp_walk(const Item* item, void* arg);

View File

@@ -513,6 +513,18 @@ ReturnedColumn* buildWindowFunctionColumn(Item* item, gp_walk_info& gwi, bool& n
Item* orderItem = *(orderCol->item);
srcp.reset(buildReturnedColumn(orderItem, gwi, nonSupport));
// MCOL-1052 GROUP BY handler has all of query's agg Items
// as field and correlates them with its extended SELECT Items.
if (!srcp)
{
orderItem = orderCol->item_ptr;
if (orderItem)
{
gwi.fatalParseError = false;
srcp.reset(buildReturnedColumn(orderItem, gwi, nonSupport));
}
}
if (!srcp)
return nullOnError(gwi);