1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-1052 WIP Uses item_ptr instead of auxilary vector; Changes fetchNextRow() interface and remove unused funcs; Uncomment extra checks in ha_calpont_impl_group_by_init().

This commit is contained in:
Roman Nozdrin
2018-04-11 15:52:26 +03:00
parent 5a0ff2bb60
commit ba38e392fc
4 changed files with 102 additions and 1541 deletions

View File

@ -1166,8 +1166,6 @@ create_calpont_group_by_handler(THD *thd, Query *query)
query->having = NULL;
}
return handler;
}

View File

@ -2978,260 +2978,6 @@ ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupp
return rc;
}
ReturnedColumn* buildReturnedColumnGr(Item* item, gp_walk_info& gwi, bool& nonSupport)
{
ReturnedColumn* rc = NULL;
if ( gwi.thd)
{
//if ( ((gwi.thd->lex)->sql_command == SQLCOM_UPDATE ) || ((gwi.thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ))
{
if ( !item->fixed)
{
item->fix_fields(gwi.thd, (Item**)&item);
}
}
}
Item::Type itype = Item::SUM_FUNC_ITEM;
switch (itype)
{
case Item::FIELD_ITEM:
{
Item_field* ifp = (Item_field*)item;
return buildSimpleColumn(ifp, gwi);
}
case Item::INT_ITEM:
case Item::VARBIN_ITEM:
{
String val, *str = item->val_str(&val);
string valStr;
valStr.assign(str->ptr(), str->length());
if (item->unsigned_flag)
{
//cc = new ConstantColumn(valStr, (uint64_t)item->val_uint(), ConstantColumn::NUM);
// It seems that str at this point is crap if val_uint() is > MAX_BIGINT. By using
// this constructor, ConstantColumn is built with the proper string. For whatever reason,
// ExeMgr converts the fConstval member to numeric, rather than using the existing numeric
// values available, so it's important to have fConstval correct.
rc = new ConstantColumn((uint64_t)item->val_uint(), ConstantColumn::NUM);
}
else
{
rc = new ConstantColumn(valStr, (int64_t)item->val_int(), ConstantColumn::NUM);
}
//return cc;
break;
}
case Item::STRING_ITEM:
{
String val, *str = item->val_str(&val);
string valStr;
valStr.assign(str->ptr(), str->length());
rc = new ConstantColumn(valStr);
break;
}
case Item::REAL_ITEM:
{
String val, *str = item->val_str(&val);
string valStr;
valStr.assign(str->ptr(), str->length());
rc = new ConstantColumn(valStr, item->val_real());
break;
}
case Item::DECIMAL_ITEM:
{
rc = buildDecimalColumn(item, gwi);
break;
}
case Item::FUNC_ITEM:
{
Item_func* ifp = (Item_func*)item;
string func_name = ifp->func_name();
// try to evaluate const F&E. only for select clause
vector <Item_field*> tmpVec;
//bool hasAggColumn = false;
uint16_t parseInfo = 0;
parse_item(ifp, tmpVec, gwi.fatalParseError, parseInfo);
if (parseInfo & SUB_BIT)
{
gwi.fatalParseError = true;
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_SELECT_SUB);
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
break;
}
if (!gwi.fatalParseError &&
!nonConstFunc(ifp) &&
!(parseInfo & AF_BIT) &&
(tmpVec.size() == 0))
{
String val, *str = ifp->val_str(&val);
string valStr;
if (str)
{
valStr.assign(str->ptr(), str->length());
}
if (!str)
{
rc = new ConstantColumn("", ConstantColumn::NULLDATA);
}
else if (ifp->result_type() == STRING_RESULT)
{
rc = new ConstantColumn(valStr, ConstantColumn::LITERAL);
rc->resultType(colType_MysqlToIDB(item));
}
else if (ifp->result_type() == DECIMAL_RESULT)
rc = buildDecimalColumn(ifp, gwi);
else
{
rc = new ConstantColumn(valStr, ConstantColumn::NUM);
rc->resultType(colType_MysqlToIDB(item));
}
break;
}
if (func_name == "+" || func_name == "-" || func_name == "*" || func_name == "/" )
return buildArithmeticColumn(ifp, gwi, nonSupport);
else
return buildFunctionColumn(ifp, gwi, nonSupport);
}
case Item::SUM_FUNC_ITEM:
{
return buildAggregateColumn(item, gwi);
}
case Item::REF_ITEM:
{
Item_ref* ref = (Item_ref*)item;
switch ((*(ref->ref))->type())
{
case Item::SUM_FUNC_ITEM:
return buildAggregateColumn(*(ref->ref), gwi);
case Item::FIELD_ITEM:
return buildReturnedColumn(*(ref->ref), gwi, nonSupport);
case Item::REF_ITEM:
return buildReturnedColumn(*(((Item_ref*)(*(ref->ref)))->ref), gwi, nonSupport);
case Item::FUNC_ITEM:
return buildFunctionColumn((Item_func*)(*(ref->ref)), gwi, nonSupport);
case Item::WINDOW_FUNC_ITEM:
return buildWindowFunctionColumn(*(ref->ref), gwi, nonSupport);
default:
gwi.fatalParseError = true;
gwi.parseErrorText = "Unknown REF item";
break;
}
}
case Item::NULL_ITEM:
{
if (gwi.condPush)
return new SimpleColumn("noop");
return new ConstantColumn("", ConstantColumn::NULLDATA);
}
case Item::CACHE_ITEM:
{
Item* col = ((Item_cache*)item)->get_example();
rc = buildReturnedColumn(col, gwi, nonSupport);
if (rc)
{
ConstantColumn* cc = dynamic_cast<ConstantColumn*>(rc);
if (!cc)
{
rc->joinInfo(rc->joinInfo() | JOIN_CORRELATED);
if (gwi.subQuery)
gwi.subQuery->correlated(true);
}
}
break;
}
case Item::EXPR_CACHE_ITEM:
{
// TODO: item is a Item_cache_wrapper
printf("EXPR_CACHE_ITEM in buildReturnedColumn\n");
cerr << "EXPR_CACHE_ITEM in buildReturnedColumn" << endl;
break;
}
case Item::DATE_ITEM:
{
String val, *str = item->val_str(&val);
string valStr;
valStr.assign(str->ptr(), str->length());
rc = new ConstantColumn(valStr);
break;
}
case Item::WINDOW_FUNC_ITEM:
{
return buildWindowFunctionColumn(item, gwi, nonSupport);
}
#if INTERVAL_ITEM
case Item::INTERVAL_ITEM:
{
Item_interval* interval = (Item_interval*)item;
SRCP srcp;
srcp.reset(buildReturnedColumn(interval->item, gwi, nonSupport));
if (!srcp)
return NULL;
rc = new IntervalColumn(srcp, (int)interval->interval);
rc->resultType(srcp->resultType());
break;
}
#endif
case Item::SUBSELECT_ITEM:
{
gwi.hasSubSelect = true;
break;
}
default:
{
gwi.fatalParseError = true;
gwi.parseErrorText = "Unknown item type";
break;
}
}
if (rc && item->name)
rc->alias(item->name);
return rc;
}
ArithmeticColumn* buildArithmeticColumn(Item_func* item, gp_walk_info& gwi, bool& nonSupport)
{
if (!(gwi.thd->infinidb_vtable.cal_conn_info))
@ -8249,10 +7995,6 @@ int cp_get_group_plan(THD* thd, SCSEP& csep, cal_impl_if::cal_group_info& gi)
else if (status < 0)
return status;
cerr << "---------------- cp_get_group_plan EXECUTION PLAN ----------------" << endl;
cerr << *csep << endl ;
cerr << "-------------- EXECUTION PLAN END --------------\n" << endl;
return 0;
}
@ -8274,10 +8016,9 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
gwi.internalDecimalScale = (gwi.thd->variables.infinidb_use_decimal_scale ? gwi.thd->variables.infinidb_decimal_scale : -1);
gwi.subSelectType = csep->subType();
// MCOL-1052
JOIN* join = select_lex.join;
Item_cond* icp = 0;
// MCOL-1052
if ( gi.groupByWhere )
icp = reinterpret_cast<Item_cond*>(gi.groupByWhere);
@ -8485,7 +8226,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
{
gwi.rcWorkStack.push(new ConstantColumn((int64_t)0, ConstantColumn::NUM));
}
// MCOL-1052
uint32_t failed = buildOuterJoin(gwi, select_lex);
if (failed) return failed;
@ -8678,12 +8419,11 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
// add this agg col to returnedColumnList
boost::shared_ptr<ReturnedColumn> spac(ac);
gwi.returnedCols.push_back(spac);
// This item will be used in HAVING|ORDER clause later.
// This item will be used in HAVING later.
Item_func_or_sum* isfp = reinterpret_cast<Item_func_or_sum*>(item);
if ( ! isfp->name_length )
{
gwi.havingAggColsItems.push_back(item);
gwi.extSelectColsItems.push_back(item);
}
gwi.selectCols.push_back('`' + escapeBackTick(spac->alias().c_str()) + '`');
@ -9143,11 +8883,9 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
gwi.fatalParseError = false;
gwi.parseErrorText = "";
//if (gi.groupByHaving != 0)
if ( select_lex.having != 0 )
if (gi.groupByHaving != 0)
{
Item_cond* having = reinterpret_cast<Item_cond*>(select_lex.having);
//Item_cond* having = reinterpret_cast<Item_cond*>(gi.groupByHaving);
Item_cond* having = reinterpret_cast<Item_cond*>(gi.groupByHaving);
#ifdef DEBUG_WALK_COND
cerr << "------------------- HAVING ---------------------" << endl;
having->traverse_cond(debug_walk, &gwi, Item::POSTFIX);
@ -9202,7 +8940,6 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
for (uint32_t i = 0; i < funcFieldVec.size(); i++)
{
//SimpleColumn *sc = new SimpleColumn(funcFieldVec[i]->db_name, bestTableName(funcFieldVec[i])/*funcFieldVec[i]->table_name*/, funcFieldVec[i]->field_name, sessionID);
SimpleColumn* sc = buildSimpleColumn(funcFieldVec[i], gwi);
if (!sc || gwi.fatalParseError)
@ -9570,7 +9307,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
// for subquery, order+limit by will be supported in infinidb. build order by columns
// @todo union order by and limit support
//if (gwi.hasWindowFunc || gwi.subSelectType != CalpontSelectExecutionPlan::MAIN_SELECT)
{
for (; ordercol; ordercol = ordercol->next)
{
ReturnedColumn* rc = NULL;
@ -9610,7 +9347,25 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
// this ORDER BY item.
if ( iter == gwi.groupByCols.end() )
{
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION);
Item_ident *iip = reinterpret_cast<Item_ident*>(ord_item);
std::ostringstream ostream;
ostream << "'";
if (iip->db_name)
ostream << iip->db_name << '.';
else
ostream << "unknown db" << '.';
if (iip->table_name)
ostream << iip->table_name << '.';
else
ostream << "unknown table" << '.';
if (iip->field_name)
ostream << iip->field_name;
else
ostream << "unknown field";
ostream << "'";
Message::Args args;
args.add(ostream.str());
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NOT_GROUPBY_EXPRESSION, args);
gwi.parseErrorText = emsg;
setError(gwi.thd, ER_INTERNAL_ERROR, emsg, gwi);
return ERR_NOT_GROUPBY_EXPRESSION;
@ -9631,12 +9386,14 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
rc = buildReturnedColumn(item_ptr, gwi, gwi.fatalParseError);
}
// This ORDER BY item must be a agg function - try to
// find corresponding item in the exteded SELECT list.
// This ORDER BY item must be an agg function -
// the ordercol->item_ptr and exteded SELECT list
// must contain the corresponding item.
if (!rc)
{
rc = buildReturnedColumn(gwi.extSelectColsItems[0], gwi, gwi.fatalParseError);
Item* item_ptr = ordercol->item_ptr;
if (item_ptr)
rc = buildReturnedColumn(item_ptr, gwi, gwi.fatalParseError);
}
if (!rc)
@ -9655,302 +9412,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
gwi.orderByCols.push_back(SRCP(rc));
}
}
/*
else if (!isUnion)
{
vector <Item_field*> fieldVec;
bool addToSel;
// the following order by is just for redo phase
if (!unionSel)
{
for (; ordercol; ordercol = ordercol->next)
{
Item* ord_item = *(ordercol->item);
// @bug5993. Could be nested ref.
while (ord_item->type() == Item::REF_ITEM)
ord_item = (*((Item_ref*)ord_item)->ref);
// @bug 1706. re-construct the order by item one by one
//Item* ord_item = *(ordercol->item);
if (ord_cols.length() != 0)
ord_cols += ", ";
addToSel = true;
string fullname;
if (ordercol->in_field_list && ordercol->counter_used)
{
ostringstream oss;
oss << ordercol->counter;
ord_cols += oss.str();
if (ordercol->direction != ORDER::ORDER_ASC)
ord_cols += " desc";
continue;
}
else if (ord_item->type() == Item::FUNC_ITEM)
{
// @bug 2621. order by alias
if (!ord_item->is_autogenerated_name && ord_item->name)
{
ord_cols += ord_item->name;
continue;
}
// if there's group by clause or aggregate column, check to see
// if this item or the arguments is on the GB list.
ReturnedColumn* rc = 0;
// check if this order by column is on the select list
Item_func* ifp = (Item_func*)(*(ordercol->item));
rc = buildFunctionColumn(ifp, gwi, gwi.fatalParseError);
if (rc)
{
for (uint32_t i = 0; i < gwi.returnedCols.size(); i++)
{
if (rc && rc->operator==(gwi.returnedCols[i].get()))
{
ostringstream oss;
oss << i + 1;
ord_cols += oss.str();
addToSel = false;
break;
}
}
}
if (addToSel)
{
FunctionColumn* fc = dynamic_cast<FunctionColumn*>(rc);
if (fc)
{
addToSel = false;
redo = true;
string ord_func = string(ifp->func_name()) + "(";
for (uint32_t i = 0; i < fc->functionParms().size(); i++)
{
if (i != 0)
ord_func += ",";
for (uint32_t j = 0; j < gwi.returnedCols.size(); j++)
{
if (fc->functionParms()[i]->data()->operator==(gwi.returnedCols[j].get()))
{
ord_func += "`" + escapeBackTick(gwi.returnedCols[j]->alias().c_str()) + "`";
continue;
}
AggregateColumn* ac = dynamic_cast<AggregateColumn*>(fc->functionParms()[i]->data());
if (ac)
{
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_ORDER_BY);
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
return ER_CHECK_NOT_IMPLEMENTED;
}
addToSel = true;
//continue;
}
}
ord_func += ")";
if (!addToSel)
ord_cols += ord_func;
}
}
}
else if (ord_item->type() == Item::SUBSELECT_ITEM)
{
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_ORDER_BY);
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, emsg, gwi);
return ER_CHECK_NOT_IMPLEMENTED;
}
else if (ord_item->type() == Item::SUM_FUNC_ITEM)
{
ReturnedColumn* ac = 0;
Item_sum* ifp = (Item_sum*)(*(ordercol->item));
// @bug3477. add aggregate column to the select list of the create phase.
ac = buildAggregateColumn(ifp, gwi);
if (!ac)
{
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED,
IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_ORDER_BY), gwi);
return ER_CHECK_NOT_IMPLEMENTED;
}
// check if this order by column is on the select list
for (uint32_t i = 0; i < gwi.returnedCols.size(); i++)
{
AggregateColumn* ret = dynamic_cast<AggregateColumn*>(gwi.returnedCols[i].get());
if (!ret)
continue;
if (ac->operator==(gwi.returnedCols[i].get()))
{
ostringstream oss;
oss << i + 1;
ord_cols += oss.str();
addToSel = false;
break;
}
}
if (ac || !gwi.groupByCols.empty())
{
if (addToSel)
{
redo = true;
// @bug 3076. do not add the argument of aggregate function to the SELECT list,
// instead, add the whole column
String str;
ord_item->print(&str, QT_INFINIDB_NO_QUOTE);
if (sel_cols_in_create.length() != 0)
sel_cols_in_create += ", ";
sel_cols_in_create += str.c_ptr();
//gwi.selectCols.push_back(" `" + string(str.c_ptr()) + "`");
SRCP srcp(ac);
gwi.returnedCols.push_back(srcp);
ord_cols += " `" + escapeBackTick(str.c_ptr()) + "`";
}
if (ordercol->direction != ORDER::ORDER_ASC)
ord_cols += " desc";
continue;
}
}
else if (ord_item->name && ord_item->type() == Item::FIELD_ITEM)
{
Item_field* field = reinterpret_cast<Item_field*>(ord_item);
ReturnedColumn* rc = buildSimpleColumn(field, gwi);
fullname = field->full_name();
for (uint32_t i = 0; i < gwi.returnedCols.size(); i++)
{
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(gwi.returnedCols[i].get());
if (sc && ((Item_field*)ord_item)->cached_table &&
(strcasecmp(getViewName(((Item_field*)ord_item)->cached_table).c_str(), sc->viewName().c_str()) != 0))
{
continue;
}
if (strcasecmp(fullname.c_str(), gwi.returnedCols[i]->alias().c_str()) == 0 ||
strcasecmp(ord_item->name, gwi.returnedCols[i]->alias().c_str()) == 0)
{
ord_cols += string(" `") + escapeBackTick(gwi.returnedCols[i]->alias().c_str()) + '`';
addToSel = false;
break;
}
if (sc && sc->sameColumn(rc))
{
ostringstream oss;
oss << i + 1;
ord_cols += oss.str();
addToSel = false;
break;
}
}
}
if (addToSel)
{
// @bug 2719. Error out order by not on the distinct select list.
if ( gi.groupByDistinct )
{
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_ORDERBY_NOT_IN_DISTINCT);
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
return ER_CHECK_NOT_IMPLEMENTED;
}
bool hasNonSupportItem = false;
uint16_t parseInfo = 0;
parse_item(ord_item, fieldVec, hasNonSupportItem, parseInfo);
if (hasNonSupportItem)
{
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_ORDER_BY);
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, emsg, gwi);
return ER_CHECK_NOT_IMPLEMENTED;
}
String str;
ord_item->print(&str, QT_INFINIDB);
ord_cols += str.c_ptr();
}
if (ordercol->direction != ORDER::ORDER_ASC)
ord_cols += " desc";
}
}
redo = (redo || fieldVec.size() != 0);
// populate string to be added to the select list for order by
for (uint32_t i = 0; i < fieldVec.size(); i++)
{
SimpleColumn* sc = buildSimpleColumn(fieldVec[i], gwi);
if (!sc)
{
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_ORDER_BY);
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, emsg, gwi);
return ER_CHECK_NOT_IMPLEMENTED;
}
String str;
fieldVec[i]->print(&str, QT_INFINIDB_NO_QUOTE);
sc->alias(string(str.c_ptr()));
SRCP srcp(sc);
uint32_t j = 0;
for (; j < gwi.returnedCols.size(); j++)
{
if (sc->sameColumn(gwi.returnedCols[j].get()))
{
SimpleColumn* field = dynamic_cast<SimpleColumn*>(gwi.returnedCols[j].get());
if (field && field->alias() == sc->alias())
break;
}
}
if (j == gwi.returnedCols.size())
{
string fullname;
if (sel_cols_in_create.length() != 0)
sel_cols_in_create += ", ";
fullname = str.c_ptr();
sel_cols_in_create += fullname + " `" + escapeBackTick(fullname.c_str()) + "`";
gwi.returnedCols.push_back(srcp);
gwi.columnMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(string(fieldVec[i]->field_name), srcp));
TABLE_LIST* tmp = (fieldVec[i]->cached_table ? fieldVec[i]->cached_table : 0);
gwi.tableMap[make_aliastable(sc->schemaName(), sc->tableName(), sc->tableAlias(), sc->isInfiniDB())] =
make_pair(1, tmp);
}
}
}
*/
// make sure columnmap, returnedcols and count(*) arg_list are not empty
TableMap::iterator tb_iter = gwi.tableMap.begin();

View File

@ -427,7 +427,7 @@ int vbin2hex(const uint8_t* p, const unsigned l, char* o)
return 0;
}
int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci)
int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, bool handler_flag=false)
{
int rc = HA_ERR_END_OF_FILE;
int num_attr = ti.msTablePtr->s->fields;
@ -470,6 +470,7 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci)
Field** f;
f = ti.msTablePtr->field;
//set all fields to null in null col bitmap
if (!handler_flag)
memset(buf, -1, ti.msTablePtr->s->null_bytes);
std::vector<CalpontSystemCatalog::ColType>& colTypes = ti.tpl_scan_ctx->ctp;
int64_t intColVal = 0;
@ -859,874 +860,6 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci)
return rc;
}
int fetchNextRowGrHandGroupBy(cal_table_info& ti, cal_connection_info* ci)
{
int rc = HA_ERR_END_OF_FILE;
int num_attr = ti.msTablePtr->s->fields;
sm::status_t sm_stat;
try
{
if (ti.conn_hndl)
{
sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ti.conn_hndl);
}
else if (ci->cal_conn_hndl)
{
sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ci->cal_conn_hndl, (int*)(&current_thd->killed));
}
else
throw runtime_error("internal error");
}
catch (std::exception& ex)
{
// @bug 2244. Always log this msg for now, as we try to track down when/why we are
// losing socket connection with ExeMgr
//#ifdef INFINIDB_DEBUG
tpl_scan_fetch_LogException( ti, ci, &ex);
//#endif
sm_stat = sm::CALPONT_INTERNAL_ERROR;
}
catch (...)
{
// @bug 2244. Always log this msg for now, as we try to track down when/why we are
// losing socket connection with ExeMgr
//#ifdef INFINIDB_DEBUG
tpl_scan_fetch_LogException( ti, ci, 0 );
//#endif
sm_stat = sm::CALPONT_INTERNAL_ERROR;
}
if (sm_stat == sm::STATUS_OK)
{
Field** f;
f = ti.msTablePtr->field;
//set all fields to null in null col bitmap
//memset(buf, -1, ti.msTablePtr->s->null_bytes);
std::vector<CalpontSystemCatalog::ColType>& colTypes = ti.tpl_scan_ctx->ctp;
int64_t intColVal = 0;
uint64_t uintColVal = 0;
char tmp[256];
RowGroup* rowGroup = ti.tpl_scan_ctx->rowGroup;
// table mode mysql expects all columns of the table. mapping between columnoid and position in rowgroup
// set coltype.position to be the position in rowgroup. only set once.
if (ti.tpl_scan_ctx->rowsreturned == 0 &&
(ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF))
{
for (uint32_t i = 0; i < rowGroup->getColumnCount(); i++)
{
int oid = rowGroup->getOIDs()[i];
int j = 0;
for (; j < num_attr; j++)
{
// mysql should haved eliminated duplicate projection columns
if (oid == colTypes[j].columnOID || oid == colTypes[j].ddn.dictOID)
{
colTypes[j].colPosition = i;
break;
}
}
}
}
rowgroup::Row row;
rowGroup->initRow(&row);
rowGroup->getRow(ti.tpl_scan_ctx->rowsreturned, &row);
int s;
for (int p = 0; p < num_attr; p++, f++)
{
//This col is going to be written
bitmap_set_bit(ti.msTablePtr->write_set, (*f)->field_index);
// get coltype if not there yet
if (colTypes[0].colWidth == 0)
{
for (short c = 0; c < num_attr; c++)
{
colTypes[c].colPosition = c;
colTypes[c].colWidth = rowGroup->getColumnWidth(c);
colTypes[c].colDataType = rowGroup->getColTypes()[c];
colTypes[c].columnOID = rowGroup->getOIDs()[c];
colTypes[c].scale = rowGroup->getScale()[c];
colTypes[c].precision = rowGroup->getPrecision()[c];
}
}
CalpontSystemCatalog::ColType colType(colTypes[p]);
// table mode handling
if (ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF)
{
if (colType.colPosition == -1) // not projected by tuplejoblist
continue;
else
s = colType.colPosition;
}
else
{
s = p;
}
// precision == -16 is borrowed as skip null check indicator for bit ops.
if (row.isNullValue(s) && colType.precision != -16)
{
// @2835. Handle empty string and null confusion. store empty string for string column
if (colType.colDataType == CalpontSystemCatalog::CHAR ||
colType.colDataType == CalpontSystemCatalog::VARCHAR ||
colType.colDataType == CalpontSystemCatalog::VARBINARY)
{
Field_varstring* f2 = (Field_varstring*)*f;
f2->store(tmp, 0, f2->charset());
}
continue;
}
// fetch and store data
switch (colType.colDataType)
{
case CalpontSystemCatalog::DATE:
{
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
intColVal = row.getUintField<4>(s);
DataConvert::dateToString(intColVal, tmp, 255);
Field_varstring* f2 = (Field_varstring*)*f;
f2->store(tmp, strlen(tmp), f2->charset());
break;
}
case CalpontSystemCatalog::DATETIME:
{
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
intColVal = row.getUintField<8>(s);
DataConvert::datetimeToString(intColVal, tmp, 255);
/* setting the field_length is a sort-of hack. The length
* at this point can be long enough to include mseconds.
* ColumnStore doesn't fully support mseconds yet so if
* they are requested, trim them off.
* At a later date we should set this more intelligently
* based on the result set.
*/
/* MCOL-683: UTF-8 datetime no msecs is 57, this sometimes happens! */
if (((*f)->field_length > 19) && ((*f)->field_length != 57))
(*f)->field_length = strlen(tmp);
Field_varstring* f2 = (Field_varstring*)*f;
f2->store(tmp, strlen(tmp), f2->charset());
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
{
Field_varstring* f2 = (Field_varstring*)*f;
switch (colType.colWidth)
{
case 1:
intColVal = row.getUintField<1>(s);
f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset());
break;
case 2:
intColVal = row.getUintField<2>(s);
f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset());
break;
case 4:
intColVal = row.getUintField<4>(s);
f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset());
break;
case 8:
//make sure we don't send strlen off into the weeds...
intColVal = row.getUintField<8>(s);
memcpy(tmp, &intColVal, 8);
tmp[8] = 0;
f2->store(tmp, strlen(tmp), f2->charset());
break;
default:
f2->store((const char*)row.getStringPointer(s), row.getStringLength(s), f2->charset());
}
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
break;
}
case CalpontSystemCatalog::VARBINARY:
{
Field_varstring* f2 = (Field_varstring*)*f;
if (current_thd->variables.infinidb_varbin_always_hex)
{
uint32_t l;
const uint8_t* p = row.getVarBinaryField(l, s);
uint32_t ll = l * 2;
boost::scoped_array<char> sca(new char[ll]);
vbin2hex(p, l, sca.get());
f2->store(sca.get(), ll, f2->charset());
}
else
f2->store((const char*)row.getVarBinaryField(s), row.getVarBinaryLength(s), f2->charset());
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
break;
}
case CalpontSystemCatalog::BIGINT:
{
intColVal = row.getIntField<8>(s);
storeNumericField(f, intColVal, colType);
break;
}
case CalpontSystemCatalog::UBIGINT:
{
uintColVal = row.getUintField<8>(s);
storeNumericField(f, uintColVal, colType);
break;
}
case CalpontSystemCatalog::INT:
{
intColVal = row.getIntField<4>(s);
storeNumericField(f, intColVal, colType);
break;
}
case CalpontSystemCatalog::UINT:
{
uintColVal = row.getUintField<4>(s);
storeNumericField(f, uintColVal, colType);
break;
}
case CalpontSystemCatalog::SMALLINT:
{
intColVal = row.getIntField<2>(s);
storeNumericField(f, intColVal, colType);
break;
}
case CalpontSystemCatalog::USMALLINT:
{
uintColVal = row.getUintField<2>(s);
storeNumericField(f, uintColVal, colType);
break;
}
case CalpontSystemCatalog::TINYINT:
{
intColVal = row.getIntField<1>(s);
storeNumericField(f, intColVal, colType);
break;
}
case CalpontSystemCatalog::UTINYINT:
{
uintColVal = row.getUintField<1>(s);
storeNumericField(f, uintColVal, colType);
break;
}
//In this case, we're trying to load a double output column with float data. This is the
// case when you do sum(floatcol), e.g.
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
float dl = row.getFloatField(s);
if (dl == std::numeric_limits<float>::infinity())
continue;
//int64_t* icvp = (int64_t*)&dl;
//intColVal = *icvp;
Field_float* f2 = (Field_float*)*f;
// bug 3485, reserve enough space for the longest float value
// -3.402823466E+38 to -1.175494351E-38, 0, and
// 1.175494351E-38 to 3.402823466E+38.
(*f)->field_length = 40;
//float float_val = *(float*)(&value);
//f2->store(float_val);
if (f2->decimals() < (uint32_t)row.getScale(s))
f2->dec = (uint32_t)row.getScale(s);
f2->store(dl);
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
break;
//storeNumericField(f, intColVal, colType);
//break;
}
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
{
double dl = row.getDoubleField(s);
if (dl == std::numeric_limits<double>::infinity())
continue;
Field_double* f2 = (Field_double*)*f;
// bug 3483, reserve enough space for the longest double value
// -1.7976931348623157E+308 to -2.2250738585072014E-308, 0, and
// 2.2250738585072014E-308 to 1.7976931348623157E+308.
(*f)->field_length = 310;
//double double_val = *(double*)(&value);
//f2->store(double_val);
if (f2->decimals() < (uint32_t)row.getScale(s))
f2->dec = (uint32_t)row.getScale(s);
f2->store(dl);
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
break;
//int64_t* icvp = (int64_t*)&dl;
//intColVal = *icvp;
//storeNumericField(f, intColVal, colType);
//break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
intColVal = row.getIntField(s);
storeNumericField(f, intColVal, colType);
break;
}
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::TEXT:
{
Field_blob* f2 = (Field_blob*)*f;
f2->set_ptr(row.getVarBinaryLength(s), (unsigned char*)row.getVarBinaryField(s));
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
break;
}
default: // treat as int64
{
intColVal = row.getUintField<8>(s);
storeNumericField(f, intColVal, colType);
break;
}
}
}
ti.tpl_scan_ctx->rowsreturned++;
ti.c++;
#ifdef INFINIDB_DEBUG
if ((ti.c % 1000000) == 0)
cerr << "fetchNextRow so far table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl;
#endif
ti.moreRows = true;
rc = 0;
}
else if (sm_stat == sm::SQL_NOT_FOUND)
{
IDEBUG( cerr << "fetchNextRow done for table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl );
ti.c = 0;
ti.moreRows = false;
rc = HA_ERR_END_OF_FILE;
}
else if (sm_stat == sm::CALPONT_INTERNAL_ERROR)
{
ti.moreRows = false;
rc = ER_INTERNAL_ERROR;
ci->rc = rc;
}
else if ((uint32_t)sm_stat == logging::ERR_LOST_CONN_EXEMGR)
{
ti.moreRows = false;
rc = logging::ERR_LOST_CONN_EXEMGR;
sm::sm_init(tid2sid(current_thd->thread_id), &ci->cal_conn_hndl,
current_thd->variables.infinidb_local_query);
idbassert(ci->cal_conn_hndl != 0);
ci->rc = rc;
}
else if (sm_stat == sm::SQL_KILLED)
{
// query was aborted by the user. treat it the same as limit query. close
// connection after rnd_close.
ti.c = 0;
ti.moreRows = false;
rc = HA_ERR_END_OF_FILE;
ci->rc = rc;
}
else
{
ti.moreRows = false;
rc = sm_stat;
ci->rc = rc;
}
return rc;
}
int fetchNextRowGrHand(cal_table_info& ti, cal_connection_info* ci)
{
int rc = HA_ERR_END_OF_FILE;
int num_attr = ti.msTablePtr->s->fields;
sm::status_t sm_stat;
try
{
if (ti.conn_hndl)
{
sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ti.conn_hndl);
}
else if (ci->cal_conn_hndl)
{
sm_stat = sm::tpl_scan_fetch(ti.tpl_scan_ctx, ci->cal_conn_hndl, (int*)(&current_thd->killed));
}
else
throw runtime_error("internal error");
}
catch (std::exception& ex)
{
// @bug 2244. Always log this msg for now, as we try to track down when/why we are
// losing socket connection with ExeMgr
//#ifdef INFINIDB_DEBUG
tpl_scan_fetch_LogException( ti, ci, &ex);
//#endif
sm_stat = sm::CALPONT_INTERNAL_ERROR;
}
catch (...)
{
// @bug 2244. Always log this msg for now, as we try to track down when/why we are
// losing socket connection with ExeMgr
//#ifdef INFINIDB_DEBUG
tpl_scan_fetch_LogException( ti, ci, 0 );
//#endif
sm_stat = sm::CALPONT_INTERNAL_ERROR;
}
if (sm_stat == sm::STATUS_OK)
{
Field** f;
f = ti.msTablePtr->field;
//set all fields to null in null col bitmap
//memset(buf, -1, ti.msTablePtr->s->null_bytes);
std::vector<CalpontSystemCatalog::ColType>& colTypes = ti.tpl_scan_ctx->ctp;
int64_t intColVal = 0;
uint64_t uintColVal = 0;
char tmp[256];
RowGroup* rowGroup = ti.tpl_scan_ctx->rowGroup;
// table mode mysql expects all columns of the table. mapping between columnoid and position in rowgroup
// set coltype.position to be the position in rowgroup. only set once.
if (ti.tpl_scan_ctx->rowsreturned == 0 &&
(ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF))
{
for (uint32_t i = 0; i < rowGroup->getColumnCount(); i++)
{
int oid = rowGroup->getOIDs()[i];
int j = 0;
for (; j < num_attr; j++)
{
// mysql should haved eliminated duplicate projection columns
if (oid == colTypes[j].columnOID || oid == colTypes[j].ddn.dictOID)
{
colTypes[j].colPosition = i;
break;
}
}
}
}
rowgroup::Row row;
rowGroup->initRow(&row);
rowGroup->getRow(ti.tpl_scan_ctx->rowsreturned, &row);
int s;
for (int p = 0; p < num_attr; p++, f++)
{
//This col is going to be written
bitmap_set_bit(ti.msTablePtr->write_set, (*f)->field_index);
// get coltype if not there yet
if (colTypes[0].colWidth == 0)
{
for (short c = 0; c < num_attr; c++)
{
colTypes[c].colPosition = c;
colTypes[c].colWidth = rowGroup->getColumnWidth(c);
colTypes[c].colDataType = rowGroup->getColTypes()[c];
colTypes[c].columnOID = rowGroup->getOIDs()[c];
colTypes[c].scale = rowGroup->getScale()[c];
colTypes[c].precision = rowGroup->getPrecision()[c];
}
}
CalpontSystemCatalog::ColType colType(colTypes[p]);
// table mode handling
if (ti.tpl_scan_ctx->traceFlags & execplan::CalpontSelectExecutionPlan::TRACE_TUPLE_OFF)
{
if (colType.colPosition == -1) // not projected by tuplejoblist
continue;
else
s = colType.colPosition;
}
else
{
s = p;
}
// precision == -16 is borrowed as skip null check indicator for bit ops.
if (row.isNullValue(s) && colType.precision != -16)
{
// @2835. Handle empty string and null confusion. store empty string for string column
if (colType.colDataType == CalpontSystemCatalog::CHAR ||
colType.colDataType == CalpontSystemCatalog::VARCHAR ||
colType.colDataType == CalpontSystemCatalog::VARBINARY)
{
Field_varstring* f2 = (Field_varstring*)*f;
f2->store(tmp, 0, f2->charset());
}
continue;
}
// fetch and store data
switch (colType.colDataType)
{
case CalpontSystemCatalog::DATE:
{
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
intColVal = row.getUintField<4>(s);
DataConvert::dateToString(intColVal, tmp, 255);
Field_varstring* f2 = (Field_varstring*)*f;
f2->store(tmp, strlen(tmp), f2->charset());
f2->set_notnull();
break;
}
case CalpontSystemCatalog::DATETIME:
{
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
intColVal = row.getUintField<8>(s);
DataConvert::datetimeToString(intColVal, tmp, 255);
/* setting the field_length is a sort-of hack. The length
* at this point can be long enough to include mseconds.
* ColumnStore doesn't fully support mseconds yet so if
* they are requested, trim them off.
* At a later date we should set this more intelligently
* based on the result set.
*/
/* MCOL-683: UTF-8 datetime no msecs is 57, this sometimes happens! */
if (((*f)->field_length > 19) && ((*f)->field_length != 57))
(*f)->field_length = strlen(tmp);
Field_varstring* f2 = (Field_varstring*)*f;
f2->store(tmp, strlen(tmp), f2->charset());
f2->set_notnull();
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
{
Field_varstring* f2 = (Field_varstring*)*f;
switch (colType.colWidth)
{
case 1:
intColVal = row.getUintField<1>(s);
f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset());
break;
case 2:
intColVal = row.getUintField<2>(s);
f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset());
break;
case 4:
intColVal = row.getUintField<4>(s);
f2->store((char*)(&intColVal), strlen((char*)(&intColVal)), f2->charset());
break;
case 8:
//make sure we don't send strlen off into the weeds...
intColVal = row.getUintField<8>(s);
memcpy(tmp, &intColVal, 8);
tmp[8] = 0;
f2->store(tmp, strlen(tmp), f2->charset());
break;
default:
f2->store((const char*)row.getStringPointer(s), row.getStringLength(s), f2->charset());
}
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
f2->set_notnull();
break;
}
case CalpontSystemCatalog::VARBINARY:
{
Field_varstring* f2 = (Field_varstring*)*f;
if (current_thd->variables.infinidb_varbin_always_hex)
{
uint32_t l;
const uint8_t* p = row.getVarBinaryField(l, s);
uint32_t ll = l * 2;
boost::scoped_array<char> sca(new char[ll]);
vbin2hex(p, l, sca.get());
f2->store(sca.get(), ll, f2->charset());
}
else
f2->store((const char*)row.getVarBinaryField(s), row.getVarBinaryLength(s), f2->charset());
f2->set_notnull();
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
break;
}
case CalpontSystemCatalog::BIGINT:
{
intColVal = row.getIntField<8>(s);
storeNumericFieldGroupBy(f, intColVal, colType);
break;
}
case CalpontSystemCatalog::UBIGINT:
{
uintColVal = row.getUintField<8>(s);
storeNumericFieldGroupBy(f, uintColVal, colType);
break;
}
case CalpontSystemCatalog::INT:
{
intColVal = row.getIntField<4>(s);
storeNumericFieldGroupBy(f, intColVal, colType);
break;
}
case CalpontSystemCatalog::UINT:
{
uintColVal = row.getUintField<4>(s);
storeNumericFieldGroupBy(f, uintColVal, colType);
break;
}
case CalpontSystemCatalog::SMALLINT:
{
intColVal = row.getIntField<2>(s);
storeNumericFieldGroupBy(f, intColVal, colType);
break;
}
case CalpontSystemCatalog::USMALLINT:
{
uintColVal = row.getUintField<2>(s);
storeNumericFieldGroupBy(f, uintColVal, colType);
break;
}
case CalpontSystemCatalog::TINYINT:
{
intColVal = row.getIntField<1>(s);
storeNumericFieldGroupBy(f, intColVal, colType);
break;
}
case CalpontSystemCatalog::UTINYINT:
{
uintColVal = row.getUintField<1>(s);
storeNumericFieldGroupBy(f, uintColVal, colType);
break;
}
//In this case, we're trying to load a double output column with float data. This is the
// case when you do sum(floatcol), e.g.
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
float dl = row.getFloatField(s);
if (dl == std::numeric_limits<float>::infinity())
continue;
//int64_t* icvp = (int64_t*)&dl;
//intColVal = *icvp;
Field_float* f2 = (Field_float*)*f;
// bug 3485, reserve enough space for the longest float value
// -3.402823466E+38 to -1.175494351E-38, 0, and
// 1.175494351E-38 to 3.402823466E+38.
(*f)->field_length = 40;
//float float_val = *(float*)(&value);
//f2->store(float_val);
if (f2->decimals() < (uint32_t)row.getScale(s))
f2->dec = (uint32_t)row.getScale(s);
f2->store(dl);
f2->set_notnull();
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
break;
//storeNumericField(f, intColVal, colType);
//break;
}
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
{
double dl = row.getDoubleField(s);
if (dl == std::numeric_limits<double>::infinity())
continue;
Field_double* f2 = (Field_double*)*f;
// bug 3483, reserve enough space for the longest double value
// -1.7976931348623157E+308 to -2.2250738585072014E-308, 0, and
// 2.2250738585072014E-308 to 1.7976931348623157E+308.
(*f)->field_length = 310;
//double double_val = *(double*)(&value);
//f2->store(double_val);
if (f2->decimals() < (uint32_t)row.getScale(s))
f2->dec = (uint32_t)row.getScale(s);
f2->store(dl);
f2->set_notnull();
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
break;
//int64_t* icvp = (int64_t*)&dl;
//intColVal = *icvp;
//storeNumericField(f, intColVal, colType);
//break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
intColVal = row.getIntField(s);
storeNumericFieldGroupBy(f, intColVal, colType);
break;
}
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::TEXT:
{
Field_blob* f2 = (Field_blob*)*f;
f2->set_ptr(row.getVarBinaryLength(s), (unsigned char*)row.getVarBinaryField(s));
if ((*f)->null_ptr)
*(*f)->null_ptr &= ~(*f)->null_bit;
break;
}
default: // treat as int64
{
intColVal = row.getUintField<8>(s);
storeNumericFieldGroupBy(f, intColVal, colType);
break;
}
}
}
ti.tpl_scan_ctx->rowsreturned++;
ti.c++;
#ifdef INFINIDB_DEBUG
if ((ti.c % 1000000) == 0)
cerr << "fetchNextRow so far table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl;
#endif
ti.moreRows = true;
rc = 0;
}
else if (sm_stat == sm::SQL_NOT_FOUND)
{
IDEBUG( cerr << "fetchNextRow done for table " << ti.msTablePtr->s->table_name.str << " rows = " << ti.c << endl );
ti.c = 0;
ti.moreRows = false;
rc = HA_ERR_END_OF_FILE;
}
else if (sm_stat == sm::CALPONT_INTERNAL_ERROR)
{
ti.moreRows = false;
rc = ER_INTERNAL_ERROR;
ci->rc = rc;
}
else if ((uint32_t)sm_stat == logging::ERR_LOST_CONN_EXEMGR)
{
ti.moreRows = false;
rc = logging::ERR_LOST_CONN_EXEMGR;
sm::sm_init(tid2sid(current_thd->thread_id), &ci->cal_conn_hndl,
current_thd->variables.infinidb_local_query);
idbassert(ci->cal_conn_hndl != 0);
ci->rc = rc;
}
else if (sm_stat == sm::SQL_KILLED)
{
// query was aborted by the user. treat it the same as limit query. close
// connection after rnd_close.
ti.c = 0;
ti.moreRows = false;
rc = HA_ERR_END_OF_FILE;
ci->rc = rc;
}
else
{
ti.moreRows = false;
rc = sm_stat;
ci->rc = rc;
}
return rc;
}
void makeUpdateScalarJoin(const ParseTree* n, void* obj)
{
TreeNode* tn = n->data();
@ -5925,8 +5058,7 @@ int ha_calpont_impl_rnd_pos(uchar* buf, uchar* pos)
}
/*@brief ha_calpont_impl_group_by_init - Get data for MariaDB group_by
pushdown handler
*/
pushdown handler */
/***********************************************************
* DESCRIPTION:
* Prepares data for group_by_handler::next_row() calls.
@ -5977,19 +5109,13 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
// @bug 2232. Basic SP support. Error out non support sp cases.
// @bug 3939. Only error out for sp with select. Let pass for alter table in sp.
/*if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE)
if (thd->infinidb_vtable.call_sp && (thd->lex)->sql_command != SQLCOM_ALTER_TABLE)
{
setError(thd, ER_CHECK_NOT_IMPLEMENTED, "This stored procedure syntax is not supported by Columnstore in this version");
thd->infinidb_vtable.vtable_state = THD::INFINIDB_ERROR;
return ER_INTERNAL_ERROR;
}
// mysql reads table twice for order by
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_REDO_PHASE1 ||
thd->infinidb_vtable.vtable_state == THD::INFINIDB_ORDER_BY)
return 0;
*/
uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(CalpontSystemCatalog::FE);
@ -6001,15 +5127,12 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
idbassert(ci != 0);
/*
MySQL sometimes calls rnd_init multiple times, plan should only be
generated and sent once.
TO DO Check this statement.
// MySQL sometimes calls rnd_init multiple times, plan should only be
// generated and sent once.
if (thd->infinidb_vtable.vtable_state == THD::INFINIDB_DISABLE_VTABLE &&
!thd->infinidb_vtable.isNewQuery)
return 0;
*/
if (thd->killed == KILL_QUERY || thd->killed == KILL_QUERY_HARD)
{
@ -6097,7 +5220,6 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
if (thd->infinidb_vtable.vtable_state != THD::INFINIDB_SELECT_VTABLE)
{
//CalpontSelectExecutionPlan csep;
if (!csep)
csep.reset(new CalpontSelectExecutionPlan());
@ -6133,19 +5255,7 @@ int ha_calpont_impl_group_by_init(ha_calpont_group_by_handler* group_hand, TABLE
// send plan whenever group_init is called
int status = cp_get_group_plan(thd, csep, gi);
/*
if (thd->db)
csep->schemaName(thd->db);
csep->traceFlags(ci->traceFlags);
if (thd->infinidb_vtable.isInsertSelect)
csep->queryType(CalpontSelectExecutionPlan::INSERT_SELECT);
//get plan
int status = cp_get_plan(thd, csep);
*/
//if (cp_get_plan(thd, csep) != 0)
if (status > 0)
goto internal_error;
else if (status < 0)
@ -6445,14 +5555,6 @@ internal_error:
***********************************************************/
int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE* table)
{
//if (!first_row)
//return(HA_ERR_END_OF_FILE);
//first_row = 0;
//Field *field = *(table->field);
//field->store(5LL, 1);
//field->set_notnull();
//return(0);
THD* thd = current_thd;
/* If this node is the slave, ignore DML to IDB tables */
@ -6539,9 +5641,9 @@ int ha_calpont_impl_group_by_next(ha_calpont_group_by_handler* group_hand, TABLE
try
{
rc = fetchNextRowGrHandGroupBy(ti, ci);
// MCOL-1052
//rc = 0;
// fetchNextRow interface forces to use buf.
unsigned char buf;
rc = fetchNextRow(&buf, ti, ci, true);
}
catch (std::exception& e)
{
@ -6739,7 +5841,7 @@ int ha_calpont_impl_group_by_end(ha_calpont_group_by_handler* group_hand, TABLE*
}
catch (...)
{
setError(thd, ER_INTERNAL_ERROR, "Internal error throwed in rnd_end");
setError(thd, ER_INTERNAL_ERROR, "Internal error throwed in group_by_end");
rc = ER_INTERNAL_ERROR;
}
}

View File

@ -100,7 +100,6 @@ struct gp_walk_info
execplan::CalpontSelectExecutionPlan::ReturnedColumnList subGroupByCols;
execplan::CalpontSelectExecutionPlan::ReturnedColumnList orderByCols;
std::vector <Item*> havingAggColsItems;
std::vector <Item*> extSelectColsItems;
execplan::CalpontSelectExecutionPlan::ColumnMap columnMap;
// This vector temporarily hold the projection columns to be added
// to the returnedCols vector for subquery processing. It will be appended