You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-3903 Enable Select Handler to run query part of INSERT..SELECT.
Original SH implementation sends the result set back to the client thus it can't be used in INSERT..SELECT, SELECT INTO OUTFILE,CREATE TABLE AS SELECT etc. CLX-77 feature has been backported into MDB to enable SH to run query part of the mentioned queries.
This commit is contained in:
@ -3117,7 +3117,10 @@ CalpontSystemCatalog::ColType colType_MysqlToIDB (const Item* item)
|
||||
return ct;
|
||||
}
|
||||
|
||||
ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupport, bool pushdownHand, bool isRefItem)
|
||||
ReturnedColumn* buildReturnedColumn(
|
||||
Item* item, gp_walk_info& gwi,
|
||||
bool& nonSupport,
|
||||
bool isRefItem)
|
||||
{
|
||||
ReturnedColumn* rc = NULL;
|
||||
|
||||
@ -3275,9 +3278,9 @@ ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupp
|
||||
}
|
||||
|
||||
if (func_name == "+" || func_name == "-" || func_name == "*" || func_name == "/" )
|
||||
return buildArithmeticColumn(ifp, gwi, nonSupport, pushdownHand);
|
||||
return buildArithmeticColumn(ifp, gwi, nonSupport);
|
||||
else
|
||||
return buildFunctionColumn(ifp, gwi, nonSupport, pushdownHand);
|
||||
return buildFunctionColumn(ifp, gwi, nonSupport);
|
||||
}
|
||||
|
||||
case Item::SUM_FUNC_ITEM:
|
||||
@ -3410,8 +3413,7 @@ ReturnedColumn* buildReturnedColumn(Item* item, gp_walk_info& gwi, bool& nonSupp
|
||||
ArithmeticColumn* buildArithmeticColumn(
|
||||
Item_func* item,
|
||||
gp_walk_info& gwi,
|
||||
bool& nonSupport,
|
||||
bool pushdownHand)
|
||||
bool& nonSupport)
|
||||
{
|
||||
if (get_fe_conn_info_ptr() == NULL)
|
||||
set_fe_conn_info_ptr((void*)new cal_connection_info());
|
||||
@ -3435,7 +3437,7 @@ ArithmeticColumn* buildArithmeticColumn(
|
||||
{
|
||||
if (gwi.clauseType == SELECT || /*gwi.clauseType == HAVING || */gwi.clauseType == GROUP_BY || gwi.clauseType == FROM) // select list
|
||||
{
|
||||
lhs = new ParseTree(buildReturnedColumn(sfitempp[0], gwi, nonSupport, pushdownHand));
|
||||
lhs = new ParseTree(buildReturnedColumn(sfitempp[0], gwi, nonSupport));
|
||||
|
||||
if (!lhs->data() && (sfitempp[0]->type() == Item::FUNC_ITEM))
|
||||
{
|
||||
@ -3443,7 +3445,7 @@ ArithmeticColumn* buildArithmeticColumn(
|
||||
Item_func* ifp = (Item_func*)sfitempp[0];
|
||||
lhs = buildParseTree(ifp, gwi, nonSupport);
|
||||
}
|
||||
else if(pushdownHand && !lhs->data() && (sfitempp[0]->type() == Item::REF_ITEM))
|
||||
else if(!lhs->data() && (sfitempp[0]->type() == Item::REF_ITEM))
|
||||
{
|
||||
// There must be an aggregation column in extended SELECT
|
||||
// list so find the corresponding column.
|
||||
@ -3454,7 +3456,7 @@ ArithmeticColumn* buildArithmeticColumn(
|
||||
if(rc)
|
||||
lhs = new ParseTree(rc);
|
||||
}
|
||||
rhs = new ParseTree(buildReturnedColumn(sfitempp[1], gwi, nonSupport, pushdownHand));
|
||||
rhs = new ParseTree(buildReturnedColumn(sfitempp[1], gwi, nonSupport));
|
||||
|
||||
if (!rhs->data() && (sfitempp[1]->type() == Item::FUNC_ITEM))
|
||||
{
|
||||
@ -3462,7 +3464,7 @@ ArithmeticColumn* buildArithmeticColumn(
|
||||
Item_func* ifp = (Item_func*)sfitempp[1];
|
||||
rhs = buildParseTree(ifp, gwi, nonSupport);
|
||||
}
|
||||
else if(pushdownHand && !rhs->data() && (sfitempp[1]->type() == Item::REF_ITEM))
|
||||
else if(!rhs->data() && (sfitempp[1]->type() == Item::REF_ITEM))
|
||||
{
|
||||
// There must be an aggregation column in extended SELECT
|
||||
// list so find the corresponding column.
|
||||
@ -3638,7 +3640,6 @@ ReturnedColumn* buildFunctionColumn(
|
||||
Item_func* ifp,
|
||||
gp_walk_info& gwi,
|
||||
bool& nonSupport,
|
||||
bool pushdownHand,
|
||||
bool selectBetweenIn)
|
||||
{
|
||||
if (get_fe_conn_info_ptr() == NULL)
|
||||
@ -3680,7 +3681,7 @@ ReturnedColumn* buildFunctionColumn(
|
||||
// Arithmetic exp
|
||||
if (funcName == "+" || funcName == "-" || funcName == "*" || funcName == "/" )
|
||||
{
|
||||
ArithmeticColumn* ac = buildArithmeticColumn(ifp, gwi, nonSupport, pushdownHand);
|
||||
ArithmeticColumn* ac = buildArithmeticColumn(ifp, gwi, nonSupport);
|
||||
return ac;
|
||||
}
|
||||
|
||||
@ -3834,11 +3835,10 @@ ReturnedColumn* buildFunctionColumn(
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ReturnedColumn* rc = buildReturnedColumn(ifp->arguments()[i], gwi, nonSupport, pushdownHand);
|
||||
ReturnedColumn* rc = buildReturnedColumn(ifp->arguments()[i], gwi, nonSupport);
|
||||
|
||||
// MCOL-1510 It must be a temp table field, so find the corresponding column.
|
||||
if (!rc && pushdownHand
|
||||
&& ifp->arguments()[i]->type() == Item::REF_ITEM)
|
||||
if (!rc && ifp->arguments()[i]->type() == Item::REF_ITEM)
|
||||
{
|
||||
gwi.fatalParseError = false;
|
||||
rc = buildAggFrmTempField(ifp->arguments()[i], gwi);
|
||||
@ -5653,7 +5653,7 @@ void gp_walk(const Item* item, void* arg)
|
||||
|
||||
if (col->type() != Item::COND_ITEM)
|
||||
{
|
||||
rc = buildReturnedColumn(col, *gwip, gwip->fatalParseError, false, true);
|
||||
rc = buildReturnedColumn(col, *gwip, gwip->fatalParseError, false);
|
||||
|
||||
if ( col->type() == Item::FIELD_ITEM )
|
||||
gwip->fatalParseError = false;
|
||||
@ -6192,7 +6192,7 @@ int processFrom(bool &isUnion,
|
||||
if (table_ptr->derived)
|
||||
{
|
||||
SELECT_LEX* select_cursor = table_ptr->derived->first_select();
|
||||
FromSubQuery fromSub(gwi, select_cursor, true);
|
||||
FromSubQuery fromSub(gwi, select_cursor);
|
||||
string alias(table_ptr->alias.str);
|
||||
fromSub.alias(lower(alias));
|
||||
|
||||
@ -6304,7 +6304,7 @@ int processFrom(bool &isUnion,
|
||||
union_gwi.thd = gwi.thd;
|
||||
uint32_t err = 0;
|
||||
|
||||
if ((err = getSelectPlan(union_gwi, *sl, plan, unionSel, true)) != 0)
|
||||
if ((err = getSelectPlan(union_gwi, *sl, plan, unionSel)) != 0)
|
||||
return err;
|
||||
|
||||
unionVec.push_back(SCEP(plan));
|
||||
@ -6544,25 +6544,176 @@ int processWhere(SELECT_LEX &select_lex,
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*@brief Process LIMIT part of a query or sub-query */
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* Processes LIMIT and OFFSET parts
|
||||
* RETURNS
|
||||
* error id as an int
|
||||
***********************************************************/
|
||||
int processLimitAndOffset(
|
||||
SELECT_LEX& select_lex,
|
||||
gp_walk_info& gwi,
|
||||
SCSEP& csep,
|
||||
bool unionSel,
|
||||
bool isUnion,
|
||||
bool isSelectHandlerTop
|
||||
)
|
||||
{
|
||||
// LIMIT processing part
|
||||
uint64_t limitNum = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
// non-MAIN union branch
|
||||
if (unionSel || gwi.subSelectType != CalpontSelectExecutionPlan::MAIN_SELECT)
|
||||
{
|
||||
/* Consider the following query:
|
||||
"select a from t1 where exists (select b from t2 where a=b);"
|
||||
CS first builds a hash table for t2, then pushes down the hash to
|
||||
PrimProc for a distributed hash join execution, with t1 being the
|
||||
large-side table. However, the server applies an optimization in
|
||||
Item_exists_subselect::fix_length_and_dec in sql/item_subselect.cc
|
||||
(see server commit ae476868a5394041a00e75a29c7d45917e8dfae8)
|
||||
where it sets explicit_limit to true, which causes csep->limitNum set to 1.
|
||||
This causes the hash table for t2 to only contain a single record for the
|
||||
hash join, giving less number of rows in the output result set than expected.
|
||||
We therefore do not allow limit set to 1 here for such queries.
|
||||
*/
|
||||
if (gwi.subSelectType != CalpontSelectExecutionPlan::IN_SUBS
|
||||
&& gwi.subSelectType != CalpontSelectExecutionPlan::EXISTS_SUBS
|
||||
&& select_lex.master_unit()->global_parameters()->explicit_limit)
|
||||
{
|
||||
if (select_lex.master_unit()->global_parameters()->offset_limit)
|
||||
{
|
||||
Item_int* offset = (Item_int*)select_lex.master_unit()->global_parameters()->offset_limit;
|
||||
csep->limitStart(offset->val_int());
|
||||
}
|
||||
|
||||
if (select_lex.master_unit()->global_parameters()->select_limit)
|
||||
{
|
||||
Item_int* select = (Item_int*)select_lex.master_unit()->global_parameters()->select_limit;
|
||||
csep->limitNum(select->val_int());
|
||||
// MCOL-894 Activate parallel ORDER BY
|
||||
csep->orderByThreads(get_orderby_threads(gwi.thd));
|
||||
}
|
||||
}
|
||||
}
|
||||
// union with explicit select at the top level
|
||||
else if (isUnion && select_lex.explicit_limit)
|
||||
{
|
||||
if (select_lex.braces)
|
||||
{
|
||||
if (select_lex.offset_limit)
|
||||
csep->limitStart(((Item_int*)select_lex.offset_limit)->val_int());
|
||||
|
||||
if (select_lex.select_limit)
|
||||
csep->limitNum(((Item_int*)select_lex.select_limit)->val_int());
|
||||
}
|
||||
}
|
||||
// other types of queries that have explicit LIMIT
|
||||
else if (select_lex.explicit_limit)
|
||||
{
|
||||
uint32_t limitOffset = 0;
|
||||
|
||||
if (select_lex.join)
|
||||
{
|
||||
JOIN* join = select_lex.join;
|
||||
#if MYSQL_VERSION_ID >= 50172
|
||||
|
||||
// @bug5729. After upgrade, join->unit sometimes is uninitialized pointer
|
||||
// (not null though) and will cause seg fault. Prefer checking
|
||||
// select_lex->offset_limit if not null.
|
||||
if (join->select_lex &&
|
||||
join->select_lex->offset_limit &&
|
||||
join->select_lex->offset_limit->is_fixed() &&
|
||||
join->select_lex->select_limit &&
|
||||
join->select_lex->select_limit->is_fixed())
|
||||
{
|
||||
limitOffset = join->select_lex->offset_limit->val_int();
|
||||
limitNum = join->select_lex->select_limit->val_int();
|
||||
}
|
||||
else if (join->unit)
|
||||
{
|
||||
limitOffset = join->unit->offset_limit_cnt;
|
||||
limitNum = join->unit->select_limit_cnt - limitOffset;
|
||||
}
|
||||
|
||||
#else
|
||||
limitOffset = (join->unit)->offset_limit_cnt;
|
||||
limitNum = (join->unit)->select_limit_cnt - (join->unit)->offset_limit_cnt;
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
if (select_lex.master_unit()->global_parameters()->offset_limit)
|
||||
{
|
||||
Item_int* offset = (Item_int*)select_lex.master_unit()->global_parameters()->offset_limit;
|
||||
limitOffset = offset->val_int();
|
||||
}
|
||||
|
||||
if (select_lex.master_unit()->global_parameters()->select_limit)
|
||||
{
|
||||
Item_int* select = (Item_int*)select_lex.master_unit()->global_parameters()->select_limit;
|
||||
limitNum = select->val_int();
|
||||
}
|
||||
}
|
||||
|
||||
csep->limitStart(limitOffset);
|
||||
csep->limitNum(limitNum);
|
||||
}
|
||||
// If an explicit limit is not specified, use the system variable value
|
||||
else
|
||||
{
|
||||
csep->limitNum(gwi.thd->variables.select_limit);
|
||||
}
|
||||
|
||||
// We don't currently support limit with correlated subquery
|
||||
if (gwi.subQuery && !gwi.correlatedTbNameVec.empty() && csep->hasOrderBy())
|
||||
{
|
||||
gwi.fatalParseError = true;
|
||||
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_LIMIT_SUB);
|
||||
setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi);
|
||||
return ER_CHECK_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
// MDB applies OFFSET on its own for SH processing.
|
||||
// See related MDEV-16327 for details.
|
||||
if (isSelectHandlerTop && csep->limitStart())
|
||||
{
|
||||
if (std::numeric_limits<uint64_t>::max()-csep->limitStart()
|
||||
< csep->limitNum())
|
||||
{
|
||||
csep->limitNum(std::numeric_limits<uint64_t>::max());
|
||||
csep->limitStart(0);
|
||||
}
|
||||
else
|
||||
{
|
||||
csep->limitNum(csep->limitNum()+csep->limitStart());
|
||||
csep->limitStart(0);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*@brief Translates SELECT_LEX into CSEP */
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* This function takes SELECT_LEX and tries to produce
|
||||
* a corresponding CSEP out of it. It is made of parts that
|
||||
* process parts of the query, e.g. FROM, WHERE, SELECT,
|
||||
* HAVING, GROUP BY, ORDER BY. FROM and WHERE are processed
|
||||
* by processFrom(), processWhere(). CS calls getSelectPlan()
|
||||
* HAVING, GROUP BY, ORDER BY. FROM, WHERE, LIMIT are processed
|
||||
* by corresponding methods. CS calls getSelectPlan()
|
||||
* recursively to process subqueries.
|
||||
* ARGS
|
||||
* isUnion if true CS processes UNION unit now
|
||||
* isPushdownHand legacy to be removed
|
||||
* isSelectHandlerTop removes offset at the top of SH query.
|
||||
* RETURNS
|
||||
* error id as an int
|
||||
***********************************************************/
|
||||
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
|
||||
SCSEP& csep,
|
||||
bool isUnion,
|
||||
bool isPushdownHand)
|
||||
bool isSelectHandlerTop)
|
||||
{
|
||||
#ifdef DEBUG_WALK_COND
|
||||
cerr << "getSelectPlan()" << endl;
|
||||
@ -6770,7 +6921,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
|
||||
ReturnedColumn* rc;
|
||||
if (funcName == "in" || funcName == " IN " || funcName == "between")
|
||||
{
|
||||
rc = buildFunctionColumn(ifp, gwi, hasNonSupportItem, false, true);
|
||||
rc = buildFunctionColumn(ifp, gwi, hasNonSupportItem, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -6802,15 +6953,11 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
|
||||
parse_item(ifp, funcFieldVec, hasNonSupportItem, parseInfo, &gwi);
|
||||
uint32_t after_size = funcFieldVec.size();
|
||||
|
||||
// group by func and func in subquery can not be post processed
|
||||
// pushdown handler projection functions
|
||||
// @bug3881. set_user_var can not be treated as constant function
|
||||
// @bug5716. Try to avoid post process function for union query.
|
||||
if ((gwi.subQuery || select_lex.group_list.elements != 0 ||
|
||||
!csep->unionVec().empty() || isUnion || isPushdownHand ) &&
|
||||
!hasNonSupportItem && (after_size - before_size) == 0 &&
|
||||
!(parseInfo & AGG_BIT) && !(parseInfo & SUB_BIT)
|
||||
)
|
||||
if (!hasNonSupportItem && (after_size - before_size) == 0 &&
|
||||
!(parseInfo & AGG_BIT) && !(parseInfo & SUB_BIT))
|
||||
{
|
||||
String val, *str = ifp->val_str(&val);
|
||||
string valStr;
|
||||
@ -7491,6 +7638,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
|
||||
}
|
||||
}
|
||||
|
||||
// ORDER BY processing
|
||||
{
|
||||
SQL_I_List<ORDER> order_list = select_lex.order_list;
|
||||
ORDER* ordercol = reinterpret_cast<ORDER*>(order_list.first);
|
||||
@ -7504,9 +7652,9 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
|
||||
// MCOL-2166 Looking for this sorting item in GROUP_BY items list.
|
||||
// Shouldn't look into this if query doesn't have GROUP BY or
|
||||
// aggregations
|
||||
if(isPushdownHand
|
||||
&& select_lex.agg_func_used() && select_lex.group_list.first
|
||||
&& !sortItemIsInGrouping(*ordercol->item, select_lex.group_list.first))
|
||||
if(select_lex.agg_func_used() && select_lex.group_list.first
|
||||
&& !sortItemIsInGrouping(*ordercol->item,
|
||||
select_lex.group_list.first))
|
||||
{
|
||||
std::ostringstream ostream;
|
||||
std::ostringstream& osr = ostream;
|
||||
@ -7523,10 +7671,6 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
|
||||
// re-visit the first of ordercol list
|
||||
ordercol = reinterpret_cast<ORDER*>(order_list.first);
|
||||
|
||||
// for subquery or pushdown query, order+limit by will be supported in CS
|
||||
// union order by and limit are supported
|
||||
if (gwi.hasWindowFunc || isPushdownHand || ( isUnion && ordercol )
|
||||
|| gwi.subSelectType != CalpontSelectExecutionPlan::MAIN_SELECT )
|
||||
{
|
||||
for (; ordercol; ordercol = ordercol->next)
|
||||
{
|
||||
@ -7602,158 +7746,6 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
|
||||
gwi.orderByCols.push_back(SRCP(rc));
|
||||
}
|
||||
}
|
||||
// DRRTUY The whole block is marked for removal in 1.4.1
|
||||
#if 0
|
||||
else if (!isUnion)
|
||||
{
|
||||
vector <Item_field*> fieldVec;
|
||||
|
||||
// the following order by is just for redo phase
|
||||
if (!unionSel)
|
||||
{
|
||||
for (; ordercol; ordercol = ordercol->next)
|
||||
{
|
||||
Item* ord_item = *(ordercol->item);
|
||||
|
||||
// @bug5993. Could be nested ref.
|
||||
while (ord_item->type() == Item::REF_ITEM)
|
||||
ord_item = (*((Item_ref*)ord_item)->ref);
|
||||
|
||||
//ReturnedColumn* rc = 0;
|
||||
// check if this order by column is on the select list
|
||||
//Item_func* ifp = (Item_func*)(*(ordercol->item));
|
||||
//rc = buildFunctionColumn(ifp, gwi, gwi.fatalParseError);
|
||||
|
||||
if (ord_item->type() == Item::FUNC_ITEM)
|
||||
{
|
||||
//FunctionColumn* fc = dynamic_cast<FunctionColumn*>(rc);
|
||||
}
|
||||
else if (ord_item->type() == Item::SUBSELECT_ITEM)
|
||||
{
|
||||
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_ORDER_BY);
|
||||
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, emsg, gwi);
|
||||
return ER_CHECK_NOT_IMPLEMENTED;
|
||||
}
|
||||
else if (ord_item->type() == Item::SUM_FUNC_ITEM)
|
||||
{
|
||||
ReturnedColumn* ac = 0;
|
||||
|
||||
Item_sum* ifp = (Item_sum*)(*(ordercol->item));
|
||||
// @bug3477. add aggregate column to the select list of the create phase.
|
||||
ac = buildAggregateColumn(ifp, gwi);
|
||||
|
||||
if (!ac)
|
||||
{
|
||||
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED,
|
||||
IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_ORDER_BY), gwi);
|
||||
return ER_CHECK_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
// check if this order by column is on the select list
|
||||
for (uint32_t i = 0; i < gwi.returnedCols.size(); i++)
|
||||
{
|
||||
AggregateColumn* ret = dynamic_cast<AggregateColumn*>(gwi.returnedCols[i].get());
|
||||
|
||||
if (!ret)
|
||||
continue;
|
||||
|
||||
}
|
||||
|
||||
if (ac || !gwi.groupByCols.empty())
|
||||
{
|
||||
SRCP srcp(ac);
|
||||
gwi.returnedCols.push_back(srcp);
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
else if (ord_item->name.length && ord_item->type() == Item::FIELD_ITEM)
|
||||
{
|
||||
Item_field* field = reinterpret_cast<Item_field*>(ord_item);
|
||||
ReturnedColumn* rc = buildSimpleColumn(field, gwi);
|
||||
|
||||
for (uint32_t i = 0; i < gwi.returnedCols.size(); i++)
|
||||
{
|
||||
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(gwi.returnedCols[i].get());
|
||||
|
||||
if (sc && ((Item_field*)ord_item)->cached_table &&
|
||||
(strcasecmp(getViewName(((Item_field*)ord_item)->cached_table).c_str(), sc->viewName().c_str()) != 0))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (sc && sc->sameColumn(rc))
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// @bug 2719. Error out order by not on the distinct select list.
|
||||
if (select_lex.options & SELECT_DISTINCT)
|
||||
{
|
||||
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_ORDERBY_NOT_IN_DISTINCT);
|
||||
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
|
||||
return ER_CHECK_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
bool hasNonSupportItem = false;
|
||||
uint16_t parseInfo = 0;
|
||||
parse_item(ord_item, fieldVec, hasNonSupportItem, parseInfo, &gwi);
|
||||
|
||||
if (hasNonSupportItem)
|
||||
{
|
||||
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_ORDER_BY);
|
||||
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, emsg, gwi);
|
||||
return ER_CHECK_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// populate string to be added to the select list for order by
|
||||
for (uint32_t i = 0; i < fieldVec.size(); i++)
|
||||
{
|
||||
SimpleColumn* sc = buildSimpleColumn(fieldVec[i], gwi);
|
||||
|
||||
if (!sc)
|
||||
{
|
||||
string emsg = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_ORDER_BY);
|
||||
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, emsg, gwi);
|
||||
return ER_CHECK_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
String str;
|
||||
fieldVec[i]->print(&str, QT_ORDINARY);
|
||||
sc->alias(string(str.c_ptr()));
|
||||
SRCP srcp(sc);
|
||||
uint32_t j = 0;
|
||||
|
||||
for (; j < gwi.returnedCols.size(); j++)
|
||||
{
|
||||
if (sc->sameColumn(gwi.returnedCols[j].get()))
|
||||
{
|
||||
SimpleColumn* field = dynamic_cast<SimpleColumn*>(gwi.returnedCols[j].get());
|
||||
|
||||
if (field && field->alias() == sc->alias())
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (j == gwi.returnedCols.size())
|
||||
{
|
||||
gwi.returnedCols.push_back(srcp);
|
||||
gwi.columnMap.insert(CalpontSelectExecutionPlan::ColumnMap::value_type(string(fieldVec[i]->field_name.str), srcp));
|
||||
TABLE_LIST* tmp = (fieldVec[i]->cached_table ? fieldVec[i]->cached_table : 0);
|
||||
gwi.tableMap[make_aliastable(sc->schemaName(), sc->tableName(), sc->tableAlias(), sc->isColumnStore())] =
|
||||
make_pair(1, tmp);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
// make sure columnmap, returnedcols and count(*) arg_list are not empty
|
||||
TableMap::iterator tb_iter = gwi.tableMap.begin();
|
||||
|
||||
@ -7879,129 +7871,16 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
|
||||
{
|
||||
csep->hasOrderBy(true);
|
||||
// To activate LimitedOrderBy
|
||||
if(isPushdownHand)
|
||||
{
|
||||
csep->orderByThreads(get_orderby_threads(gwi.thd));
|
||||
csep->specHandlerProcessed(true);
|
||||
}
|
||||
csep->orderByThreads(get_orderby_threads(gwi.thd));
|
||||
csep->specHandlerProcessed(true);
|
||||
}
|
||||
}
|
||||
|
||||
// LIMIT processing part
|
||||
uint64_t limitNum = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
// non-MAIN union branch
|
||||
if (unionSel || gwi.subSelectType != CalpontSelectExecutionPlan::MAIN_SELECT)
|
||||
if ((rc = processLimitAndOffset(select_lex, gwi, csep, unionSel, isUnion, isSelectHandlerTop)))
|
||||
{
|
||||
/* Consider the following query:
|
||||
"select a from t1 where exists (select b from t2 where a=b);"
|
||||
CS first builds a hash table for t2, then pushes down the hash to
|
||||
PrimProc for a distributed hash join execution, with t1 being the
|
||||
large-side table. However, the server applies an optimization in
|
||||
Item_exists_subselect::fix_length_and_dec in sql/item_subselect.cc
|
||||
(see server commit ae476868a5394041a00e75a29c7d45917e8dfae8)
|
||||
where it sets explicit_limit to true, which causes csep->limitNum set to 1.
|
||||
This causes the hash table for t2 to only contain a single record for the
|
||||
hash join, giving less number of rows in the output result set than expected.
|
||||
We therefore do not allow limit set to 1 here for such queries.
|
||||
*/
|
||||
if (gwi.subSelectType != CalpontSelectExecutionPlan::IN_SUBS
|
||||
&& gwi.subSelectType != CalpontSelectExecutionPlan::EXISTS_SUBS
|
||||
&& select_lex.master_unit()->global_parameters()->explicit_limit)
|
||||
{
|
||||
if (select_lex.master_unit()->global_parameters()->offset_limit)
|
||||
{
|
||||
Item_int* offset = (Item_int*)select_lex.master_unit()->global_parameters()->offset_limit;
|
||||
csep->limitStart(offset->val_int());
|
||||
}
|
||||
|
||||
if (select_lex.master_unit()->global_parameters()->select_limit)
|
||||
{
|
||||
Item_int* select = (Item_int*)select_lex.master_unit()->global_parameters()->select_limit;
|
||||
csep->limitNum(select->val_int());
|
||||
// MCOL-894 Activate parallel ORDER BY
|
||||
csep->orderByThreads(get_orderby_threads(gwi.thd));
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
// union with explicit select at the top level
|
||||
else if (isUnion && select_lex.explicit_limit)
|
||||
{
|
||||
if (select_lex.braces)
|
||||
{
|
||||
if (select_lex.offset_limit)
|
||||
csep->limitStart(((Item_int*)select_lex.offset_limit)->val_int());
|
||||
|
||||
if (select_lex.select_limit)
|
||||
csep->limitNum(((Item_int*)select_lex.select_limit)->val_int());
|
||||
}
|
||||
}
|
||||
// other types of queries that have explicit LIMIT
|
||||
else if (select_lex.explicit_limit)
|
||||
{
|
||||
uint32_t limitOffset = 0;
|
||||
|
||||
if (select_lex.join)
|
||||
{
|
||||
JOIN* join = select_lex.join;
|
||||
#if MYSQL_VERSION_ID >= 50172
|
||||
|
||||
// @bug5729. After upgrade, join->unit sometimes is uninitialized pointer
|
||||
// (not null though) and will cause seg fault. Prefer checking
|
||||
// select_lex->offset_limit if not null.
|
||||
if (join->select_lex &&
|
||||
join->select_lex->offset_limit &&
|
||||
join->select_lex->offset_limit->is_fixed() &&
|
||||
join->select_lex->select_limit &&
|
||||
join->select_lex->select_limit->is_fixed())
|
||||
{
|
||||
limitOffset = join->select_lex->offset_limit->val_int();
|
||||
limitNum = join->select_lex->select_limit->val_int();
|
||||
}
|
||||
else if (join->unit)
|
||||
{
|
||||
limitOffset = join->unit->lim.get_offset_limit();
|
||||
limitNum = join->unit->lim.get_select_limit() - limitOffset;
|
||||
}
|
||||
|
||||
#else
|
||||
limitOffset = (join->unit)->offset_limit_cnt;
|
||||
limitNum = (join->unit)->select_limit_cnt - (join->unit)->offset_limit_cnt;
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
if (select_lex.master_unit()->global_parameters()->offset_limit)
|
||||
{
|
||||
Item_int* offset = (Item_int*)select_lex.master_unit()->global_parameters()->offset_limit;
|
||||
limitOffset = offset->val_int();
|
||||
}
|
||||
|
||||
if (select_lex.master_unit()->global_parameters()->select_limit)
|
||||
{
|
||||
Item_int* select = (Item_int*)select_lex.master_unit()->global_parameters()->select_limit;
|
||||
limitNum = select->val_int();
|
||||
}
|
||||
}
|
||||
|
||||
csep->limitStart(limitOffset);
|
||||
csep->limitNum(limitNum);
|
||||
}
|
||||
// If an explicit limit is not specified, use the system variable value
|
||||
else
|
||||
{
|
||||
csep->limitNum(gwi.thd->variables.select_limit);
|
||||
}
|
||||
|
||||
// We don't currently support limit with correlated subquery
|
||||
if (gwi.subQuery && !gwi.correlatedTbNameVec.empty() && csep->hasOrderBy())
|
||||
{
|
||||
gwi.fatalParseError = true;
|
||||
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(ERR_NON_SUPPORT_LIMIT_SUB);
|
||||
setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi);
|
||||
return ER_CHECK_NOT_IMPLEMENTED;
|
||||
}
|
||||
} // LIMIT processing finishes here
|
||||
} // ORDER BY end
|
||||
|
||||
if (select_lex.options & SELECT_DISTINCT)
|
||||
csep->distinct(true);
|
||||
@ -8215,7 +8094,7 @@ int cp_get_group_plan(THD* thd, SCSEP& csep, cal_impl_if::cal_group_info& gi)
|
||||
int cs_get_derived_plan(derived_handler* handler, THD* thd, SCSEP& csep, gp_walk_info& gwi)
|
||||
{
|
||||
SELECT_LEX select_lex = *handler->select;
|
||||
int status = getSelectPlan(gwi, select_lex, csep, false, true);
|
||||
int status = getSelectPlan(gwi, select_lex, csep, false);
|
||||
|
||||
if (status > 0)
|
||||
return ER_INTERNAL_ERROR;
|
||||
@ -8413,7 +8292,7 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
|
||||
|
||||
SELECT_LEX* select_cursor = table_ptr->derived->first_select();
|
||||
// Use Pushdown handler for subquery processing
|
||||
FromSubQuery fromSub(gwi, select_cursor, true);
|
||||
FromSubQuery fromSub(gwi, select_cursor);
|
||||
string alias(table_ptr->alias.str);
|
||||
fromSub.alias(lower(alias));
|
||||
|
||||
|
Reference in New Issue
Block a user