1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Merge pull request #2741 from mariadb-corporation/MDEV-25080-CS-dev

MDEV-25080 Allow pushdown of queries involving UNIONs in outer select to ColumnStore
This commit is contained in:
Roman Nozdrin
2023-02-28 11:23:50 +00:00
committed by GitHub
21 changed files with 989 additions and 202 deletions

View File

@ -6393,12 +6393,13 @@ boost::any CalpontSystemCatalog::ColType::convertColumnData(const std::string& d
}
CalpontSystemCatalog::ColType CalpontSystemCatalog::ColType::convertUnionColType(
vector<CalpontSystemCatalog::ColType>& types)
vector<CalpontSystemCatalog::ColType>& types,
unsigned int& rc)
{
idbassert(types.size());
CalpontSystemCatalog::ColType unionedType = types[0];
for (uint64_t i = 1; i < types.size(); i++)
dataconvert::DataConvert::joinColTypeForUnion(unionedType, types[i]);
dataconvert::DataConvert::joinColTypeForUnion(unionedType, types[i], rc);
return unionedType;
}

View File

@ -306,7 +306,7 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog
return !(*this == t);
}
static ColType convertUnionColType(std::vector<ColType>&);
static ColType convertUnionColType(std::vector<ColType>&, unsigned int&);
};
/** the structure of a table infomation

View File

@ -5102,11 +5102,18 @@ SJSTEP unionQueries(JobStepVector& queries, uint64_t distinctUnionNum, JobInfo&
unionStep->inputAssociation(jsaToUnion);
unionStep->outputAssociation(jsa);
// This return code in the call to convertUnionColType() below would
// always be 0. This is because convertUnionColType() is also called
// in the connector code in getSelectPlan()/getGroupPlan() which handle
// the non-zero return code scenarios from this function call and error
// out, in which case, the execution does not even get to ExeMgr.
unsigned int dummyUnionedTypeRc = 0;
// get unioned column types
for (uint64_t j = 0; j < colCount; ++j)
{
CalpontSystemCatalog::ColType colType =
CalpontSystemCatalog::ColType::convertUnionColType(queryColTypes[j]);
CalpontSystemCatalog::ColType::convertUnionColType(queryColTypes[j], dummyUnionedTypeRc);
types.push_back(colType.colDataType);
csNums.push_back(colType.charsetNumber);
scale.push_back(colType.scale);

View File

@ -1766,12 +1766,14 @@ void TupleUnion::writeNull(Row* out, uint32_t col)
{
case 1: out->setUintField<1>(joblist::TINYINTNULL, col); break;
case 2: out->setUintField<1>(joblist::SMALLINTNULL, col); break;
case 2: out->setUintField<2>(joblist::SMALLINTNULL, col); break;
case 4: out->setUintField<4>(joblist::INTNULL, col); break;
case 8: out->setUintField<8>(joblist::BIGINTNULL, col); break;
case 16: out->setInt128Field(datatypes::Decimal128Null, col); break;
default:
{
}

View File

@ -45,7 +45,8 @@ group_by_handler* create_columnstore_group_by_handler(THD* thd, Query* query);
derived_handler* create_columnstore_derived_handler(THD* thd, TABLE_LIST* derived);
select_handler* create_columnstore_select_handler(THD* thd, SELECT_LEX* sel);
select_handler* create_columnstore_select_handler(THD* thd, SELECT_LEX* sel_lex, SELECT_LEX_UNIT* sel_unit);
select_handler* create_columnstore_unit_handler(THD* thd, SELECT_LEX_UNIT* sel_unit);
/* Variables for example share methods */
@ -1835,6 +1836,7 @@ static int columnstore_init_func(void* p)
mcs_hton->create_group_by = create_columnstore_group_by_handler;
mcs_hton->create_derived = create_columnstore_derived_handler;
mcs_hton->create_select = create_columnstore_select_handler;
mcs_hton->create_unit = create_columnstore_unit_handler;
mcs_hton->db_type = DB_TYPE_AUTOASSIGN;
#ifdef HAVE_PSI_INTERFACE

View File

@ -6682,7 +6682,8 @@ void setExecutionParams(gp_walk_info& gwi, SCSEP& csep)
* RETURNS
* error id as an int
***********************************************************/
int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep)
int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep,
bool isSelectHandlerTop, bool isSelectLexUnit)
{
// populate table map and trigger syscolumn cache for all the tables (@bug 1637).
// all tables on FROM list must have at least one col in colmap
@ -6819,9 +6820,9 @@ int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP&
bool unionSel = false;
// UNION master unit check
// Existed pushdown handlers won't get in this scope
// except UNION pushdown that is to come.
// MDEV-25080 Union pushdown would enter this scope
// is_unit_op() give a segv for derived_handler's SELECT_LEX
if (!isUnion && select_lex.master_unit()->is_unit_op())
if (!isUnion && (!isSelectHandlerTop || isSelectLexUnit) && select_lex.master_unit()->is_unit_op())
{
// MCOL-2178 isUnion member only assigned, never used
// MIGR::infinidb_vtable.isUnion = true;
@ -7383,7 +7384,8 @@ void buildInToExistsFilter(gp_walk_info& gwi, SELECT_LEX& select_lex)
* error id as an int
***********************************************************/
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool isUnion,
bool isSelectHandlerTop, const std::vector<COND*>& condStack)
bool isSelectHandlerTop, bool isSelectLexUnit,
const std::vector<COND*>& condStack)
{
#ifdef DEBUG_WALK_COND
cerr << "getSelectPlan()" << endl;
@ -7411,13 +7413,12 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
CalpontSelectExecutionPlan::SelectList derivedTbList;
// @bug 1796. Remember table order on the FROM list.
gwi.clauseType = FROM;
if ((rc = processFrom(isUnion, select_lex, gwi, csep)))
if ((rc = processFrom(isUnion, select_lex, gwi, csep, isSelectHandlerTop,
isSelectLexUnit)))
{
return rc;
}
bool unionSel = (!isUnion && select_lex.master_unit()->is_unit_op()) ? true : false;
gwi.clauseType = WHERE;
if ((rc = processWhere(select_lex, gwi, csep, condStack)))
{
@ -7860,25 +7861,32 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
// @bug4388 normalize the project coltypes for union main select list
if (!csep->unionVec().empty())
{
unsigned int unionedTypeRc = 0;
for (uint32_t i = 0; i < gwi.returnedCols.size(); i++)
{
vector<CalpontSystemCatalog::ColType> coltypes;
for (uint32_t j = 0; j < csep->unionVec().size(); j++)
{
coltypes.push_back(dynamic_cast<CalpontSelectExecutionPlan*>(csep->unionVec()[j].get())
->returnedCols()[i]
->resultType());
CalpontSelectExecutionPlan* unionCsep =
dynamic_cast<CalpontSelectExecutionPlan*>(csep->unionVec()[j].get());
coltypes.push_back(unionCsep->returnedCols()[i]->resultType());
// @bug5976. set hasAggregate true for the main column if
// one corresponding union column has aggregate
if (dynamic_cast<CalpontSelectExecutionPlan*>(csep->unionVec()[j].get())
->returnedCols()[i]
->hasAggregate())
if (unionCsep->returnedCols()[i]->hasAggregate())
gwi.returnedCols[i]->hasAggregate(true);
}
gwi.returnedCols[i]->resultType(CalpontSystemCatalog::ColType::convertUnionColType(coltypes));
gwi.returnedCols[i]->resultType(CalpontSystemCatalog::ColType::convertUnionColType(coltypes, unionedTypeRc));
if (unionedTypeRc != 0)
{
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(unionedTypeRc);
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
return ER_CHECK_NOT_IMPLEMENTED;
}
}
}
@ -8047,6 +8055,8 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
SRCP minSc; // min width projected column. for count(*) use
bool unionSel = (!isUnion && select_lex.master_unit()->is_unit_op()) ? true : false;
// Group by list. not valid for union main query
if (!unionSel)
{
@ -8805,9 +8815,10 @@ int cs_get_derived_plan(ha_columnstore_derived_handler* handler, THD* thd, SCSEP
return 0;
}
int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP& csep, gp_walk_info& gwi)
int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP& csep, gp_walk_info& gwi,
bool isSelectLexUnit)
{
SELECT_LEX& select_lex = *handler->select;
SELECT_LEX& select_lex = handler->select_lex ? *handler->select_lex : *handler->lex_unit->first_select();
if (select_lex.where)
{
@ -8819,7 +8830,7 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP&
convertOuterJoinToInnerJoin(&select_lex.top_join_list, gwi.tableOnExprList, gwi.condList,
handler->tableOuterJoinMap);
int status = getSelectPlan(gwi, select_lex, csep, false, true);
int status = getSelectPlan(gwi, select_lex, csep, false, true, isSelectLexUnit);
if (status > 0)
return ER_INTERNAL_ERROR;
@ -9683,25 +9694,32 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
// @bug4388 normalize the project coltypes for union main select list
if (!csep->unionVec().empty())
{
unsigned int unionedTypeRc = 0;
for (uint32_t i = 0; i < gwi.returnedCols.size(); i++)
{
vector<CalpontSystemCatalog::ColType> coltypes;
for (uint32_t j = 0; j < csep->unionVec().size(); j++)
{
coltypes.push_back(dynamic_cast<CalpontSelectExecutionPlan*>(csep->unionVec()[j].get())
->returnedCols()[i]
->resultType());
CalpontSelectExecutionPlan* unionCsep =
dynamic_cast<CalpontSelectExecutionPlan*>(csep->unionVec()[j].get());
coltypes.push_back(unionCsep->returnedCols()[i]->resultType());
// @bug5976. set hasAggregate true for the main column if
// one corresponding union column has aggregate
if (dynamic_cast<CalpontSelectExecutionPlan*>(csep->unionVec()[j].get())
->returnedCols()[i]
->hasAggregate())
if (unionCsep->returnedCols()[i]->hasAggregate())
gwi.returnedCols[i]->hasAggregate(true);
}
gwi.returnedCols[i]->resultType(CalpontSystemCatalog::ColType::convertUnionColType(coltypes));
gwi.returnedCols[i]->resultType(CalpontSystemCatalog::ColType::convertUnionColType(coltypes, unionedTypeRc));
if (unionedTypeRc != 0)
{
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(unionedTypeRc);
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
return ER_CHECK_NOT_IMPLEMENTED;
}
}
}

View File

@ -1392,7 +1392,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
gwi.clauseType = WHERE;
if (getSelectPlan(gwi, select_lex, updateCP, false, false, condStack) !=
if (getSelectPlan(gwi, select_lex, updateCP, false, false, false, condStack) !=
0) //@Bug 3030 Modify the error message for unsupported functions
{
if (gwi.cs_vtable_is_update_with_derive)
@ -4886,7 +4886,7 @@ int ha_mcs_impl_group_by_end(TABLE* table)
* RETURN:
* rc as int
***********************************************************/
int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool isSelectLexUnit)
{
IDEBUG(cout << "pushdown_init for table " << endl);
THD* thd = current_thd;
@ -5076,7 +5076,7 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
if (handler_info->hndl_type == mcs_handler_types_t::SELECT)
{
sh = reinterpret_cast<ha_columnstore_select_handler*>(handler_info->hndl_ptr);
status = cs_get_select_plan(sh, thd, csep, gwi);
status = cs_get_select_plan(sh, thd, csep, gwi, isSelectLexUnit);
}
else if (handler_info->hndl_type == DERIVED)
{

View File

@ -44,7 +44,7 @@ extern int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows* affected
const std::vector<COND*>& condStack);
extern int ha_mcs_impl_delete_row();
extern int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos);
extern int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table);
extern int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool isSelectLexUnit = false);
extern int ha_mcs_impl_select_next(uchar* buf, TABLE* table, long timeZone);
extern int ha_mcs_impl_group_by_init(mcs_handler_info* handler_info, TABLE* table);
extern int ha_mcs_impl_group_by_next(TABLE* table, long timeZone);

View File

@ -397,9 +397,9 @@ int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_in
int cs_get_derived_plan(ha_columnstore_derived_handler* handler, THD* thd, execplan::SCSEP& csep,
gp_walk_info& gwi);
int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, execplan::SCSEP& csep,
gp_walk_info& gwi);
gp_walk_info& gwi, bool isSelectLexUnit);
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false,
bool isSelectHandlerTop = false,
bool isSelectHandlerTop = false, bool isSelectLexUnit = false,
const std::vector<COND*>& condStack = std::vector<COND*>());
int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi,
bool isUnion = false);

View File

@ -728,20 +728,21 @@ int ha_mcs_group_by_handler::end_scan()
DBUG_RETURN(rc);
}
/*@brief create_columnstore_select_handler- Creates handler*/
/************************************************************
/*@brief create_columnstore_select_handler_- Creates handler
************************************************************
* DESCRIPTION:
* Creates a select handler if there is no non-equi JOIN, e.g
* t1.c1 > t2.c2 and logical OR in the filter predicates.
* More details in server/sql/select_handler.h
* PARAMETERS:
* thd - THD pointer.
* select_lex - SELECT_LEX* that describes the query.
* sel_lex - SELECT_LEX* that describes the query.
* sel_unit - SELECT_LEX_UNIT* that describes the query.
* RETURN:
* select_handler if possible
* NULL in other case
***********************************************************/
select_handler* create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
select_handler* create_columnstore_select_handler_(THD* thd, SELECT_LEX* sel_lex, SELECT_LEX_UNIT* sel_unit)
{
mcs_select_handler_mode_t select_handler_mode = get_select_handler_mode(thd);
@ -778,60 +779,105 @@ select_handler* create_columnstore_select_handler(THD* thd, SELECT_LEX* select_l
return nullptr;
}
// Iterate and traverse through the item list and the JOIN cond
// and do not create SH if the unsupported (set_user_var)
// function is present.
TABLE_LIST* table_ptr = select_lex->get_table_list();
for (; table_ptr; table_ptr = table_ptr->next_global)
// MCOL-5432 Disable partial pushdown of the UNION operation if the query
// involves an order by or a limit clause.
if (sel_lex && sel_unit &&
(sel_unit->global_parameters()->limit_params.explicit_limit == true ||
sel_unit->global_parameters()->order_list.elements != 0))
{
if (check_user_var(table_ptr->select_lex))
return nullptr;
}
std::vector<SELECT_LEX*> select_lex_vec;
if (sel_unit && !sel_lex)
{
for (SELECT_LEX* sl = sel_unit->first_select(); sl; sl = sl->next_select())
{
return nullptr;
select_lex_vec.push_back(sl);
}
}
else
{
select_lex_vec.push_back(sel_lex);
}
for (size_t i = 0; i < select_lex_vec.size(); i++)
{
SELECT_LEX* select_lex = select_lex_vec[i];
// Iterate and traverse through the item list and the JOIN cond
// and do not create SH if the unsupported (set_user_var)
// function is present.
TABLE_LIST* table_ptr = select_lex->get_table_list();
for (; table_ptr; table_ptr = table_ptr->next_global)
{
if (check_user_var(table_ptr->select_lex))
{
return nullptr;
}
}
}
// We apply dedicated rewrites from MDB here so MDB's data structures
// becomes dirty and CS has to raise an error in case of any problem
// or unsupported feature.
ha_columnstore_select_handler* handler = new ha_columnstore_select_handler(thd, select_lex);
ha_columnstore_select_handler* handler;
JOIN* join = select_lex->join;
if (select_lex->first_cond_optimization && select_lex->handle_derived(thd->lex, DT_MERGE))
if (sel_unit && sel_lex) // partial pushdown of the SELECT_LEX_UNIT
{
if (!thd->is_error())
{
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0), "Error occured in select_lex::handle_derived()");
}
return handler;
handler = new ha_columnstore_select_handler(thd, sel_lex, sel_unit);
}
else if (sel_unit) // complete pushdown of the SELECT_LEX_UNIT
{
handler = new ha_columnstore_select_handler(thd, sel_unit);
}
else // Query only has a SELECT_LEX, no SELECT_LEX_UNIT
{
handler = new ha_columnstore_select_handler(thd, sel_lex);
}
// This is partially taken from JOIN::optimize_inner() in sql/sql_select.cc
if (select_lex->first_cond_optimization)
for (size_t i = 0; i < select_lex_vec.size(); i++)
{
create_explain_query_if_not_exists(thd->lex, thd->mem_root);
Query_arena *arena, backup;
arena = thd->activate_stmt_arena_if_needed(&backup);
COND* conds = join->conds;
select_lex->where = conds;
SELECT_LEX* select_lex = select_lex_vec[i];
JOIN* join = select_lex->join;
if (isPS)
if (select_lex->first_cond_optimization && select_lex->handle_derived(thd->lex, DT_MERGE))
{
select_lex->prep_where = conds ? conds->copy_andor_structure(thd) : 0;
if (!thd->is_error())
{
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0), "Error occured in select_lex::handle_derived()");
}
return handler;
}
select_lex->update_used_tables();
// This is partially taken from JOIN::optimize_inner() in sql/sql_select.cc
if (select_lex->first_cond_optimization)
{
create_explain_query_if_not_exists(thd->lex, thd->mem_root);
Query_arena *arena, backup;
arena = thd->activate_stmt_arena_if_needed(&backup);
COND* conds = join->conds;
select_lex->where = conds;
if (arena)
thd->restore_active_arena(arena, &backup);
if (isPS)
{
select_lex->prep_where = conds ? conds->copy_andor_structure(thd) : 0;
}
select_lex->update_used_tables();
if (arena)
thd->restore_active_arena(arena, &backup);
#ifdef DEBUG_WALK_COND
if (conds)
{
conds->traverse_cond(cal_impl_if::debug_walk, NULL, Item::POSTFIX);
}
if (conds)
{
conds->traverse_cond(cal_impl_if::debug_walk, NULL, Item::POSTFIX);
}
#endif
}
}
// Attempt to execute the query using the select handler.
@ -840,69 +886,45 @@ select_handler* create_columnstore_select_handler(THD* thd, SELECT_LEX* select_l
// Skip execution for EXPLAIN queries
if (!thd->lex->describe)
{
// This is taken from JOIN::optimize()
join->fields = &select_lex->item_list;
// Instantiate handler::table, which is the place for the result set.
if (handler->prepare())
for (size_t i = 0; i < select_lex_vec.size(); i++)
{
// check fallback
if (select_handler_mode == mcs_select_handler_mode_t::AUTO) // columnstore_select_handler=AUTO
SELECT_LEX* select_lex = select_lex_vec[i];
JOIN* join = select_lex->join;
// This is taken from JOIN::optimize()
join->fields = &select_lex->item_list;
// Instantiate handler::table, which is the place for the result set.
if ((i == 0) && handler->prepare())
{
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999,
"MCS select_handler execution failed. Falling back to server execution");
restore_query_state(handler);
delete handler;
return nullptr;
// check fallback
if (select_handler_mode == mcs_select_handler_mode_t::AUTO) // columnstore_select_handler=AUTO
{
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999,
"MCS select_handler execution failed. Falling back to server execution");
restore_query_state(handler);
delete handler;
return nullptr;
}
// error out
if (!thd->is_error())
{
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0), "Error occured in handler->prepare()");
}
return handler;
}
// error out
if (!thd->is_error())
{
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0), "Error occured in handler->prepare()");
}
// Prepare query execution
// This is taken from JOIN::exec_inner()
if (!select_lex->outer_select() && // (1)
select_lex != select_lex->master_unit()->fake_select_lex) // (2)
thd->lex->set_limit_rows_examined();
return handler;
}
// Prepare query execution
// This is taken from JOIN::exec_inner()
if (!select_lex->outer_select() && // (1)
select_lex != select_lex->master_unit()->fake_select_lex) // (2)
thd->lex->set_limit_rows_examined();
if (!join->tables_list && (join->table_count || !select_lex->with_sum_func) &&
!select_lex->have_window_funcs())
{
if (!thd->is_error())
{
restore_query_state(handler);
delete handler;
return nullptr;
}
return handler;
}
if (!join->zero_result_cause && join->exec_const_cond && !join->exec_const_cond->val_int())
join->zero_result_cause = "Impossible WHERE noticed after reading const tables";
// We've called exec_const_cond->val_int(). This may have caused an error.
if (unlikely(thd->is_error()))
{
// error out
handler->pushdown_init_rc = 1;
return handler;
}
if (join->zero_result_cause)
{
if (join->select_lex->have_window_funcs() && join->send_row_on_empty_set())
{
join->const_tables = join->table_count;
join->first_select = sub_select_postjoin_aggr;
}
else
if ((!sel_unit || sel_lex) && !join->tables_list &&
(join->table_count || !select_lex->with_sum_func) &&
!select_lex->have_window_funcs())
{
if (!thd->is_error())
{
@ -913,23 +935,56 @@ select_handler* create_columnstore_select_handler(THD* thd, SELECT_LEX* select_l
return handler;
}
}
if ((join->select_lex->options & OPTION_SCHEMA_TABLE) &&
get_schema_tables_result(join, PROCESSED_BY_JOIN_EXEC))
{
if (!thd->is_error())
if (!join->zero_result_cause && join->exec_const_cond && !join->exec_const_cond->val_int())
join->zero_result_cause = "Impossible WHERE noticed after reading const tables";
// We've called exec_const_cond->val_int(). This may have caused an error.
if (unlikely(thd->is_error()))
{
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0), "Error occured in get_schema_tables_result()");
// error out
handler->pushdown_init_rc = 1;
return handler;
}
return handler;
if (join->zero_result_cause)
{
if (join->select_lex->have_window_funcs() && join->send_row_on_empty_set())
{
join->const_tables = join->table_count;
join->first_select = sub_select_postjoin_aggr;
}
else
{
if (!thd->is_error())
{
restore_query_state(handler);
delete handler;
return nullptr;
}
return handler;
}
}
if ((join->select_lex->options & OPTION_SCHEMA_TABLE) &&
get_schema_tables_result(join, PROCESSED_BY_JOIN_EXEC))
{
if (!thd->is_error())
{
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0), "Error occured in get_schema_tables_result()");
}
return handler;
}
}
handler->scan_initialized = true;
mcs_handler_info mhi(reinterpret_cast<void*>(handler), SELECT);
if ((handler->pushdown_init_rc = ha_mcs_impl_pushdown_init(&mhi, handler->table)))
bool isSelectLexUnit = (sel_unit && !sel_lex) ? true : false;
if ((handler->pushdown_init_rc = ha_mcs_impl_pushdown_init(&mhi, handler->table, isSelectLexUnit)))
{
// check fallback
if (select_handler_mode == mcs_select_handler_mode_t::AUTO)
@ -967,31 +1022,101 @@ select_handler* create_columnstore_select_handler(THD* thd, SELECT_LEX* select_l
return handler;
}
// Unset select_lex::first_cond_optimization
if (select_lex->first_cond_optimization)
for (size_t i = 0; i < select_lex_vec.size(); i++)
{
first_cond_optimization_flag_toggle(select_lex, &first_cond_optimization_flag_unset);
SELECT_LEX* select_lex = select_lex_vec[i];
// Unset select_lex::first_cond_optimization
if (select_lex->first_cond_optimization)
{
first_cond_optimization_flag_toggle(select_lex, &first_cond_optimization_flag_unset);
}
}
}
return handler;
}
select_handler* create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex, SELECT_LEX_UNIT* sel_unit)
{
return create_columnstore_select_handler_(thd, select_lex, sel_unit);
}
select_handler* create_columnstore_unit_handler(THD* thd, SELECT_LEX_UNIT* sel_unit)
{
if (thd->lex->sql_command == SQLCOM_CREATE_VIEW)
{
return nullptr;
}
if (thd->stmt_arena && thd->stmt_arena->is_stmt_prepare())
{
return nullptr;
}
// MCOL-5432 Disable UNION pushdown if the query involves an order by
// or a limit clause.
if (sel_unit->global_parameters()->limit_params.explicit_limit == true ||
sel_unit->global_parameters()->order_list.elements != 0)
{
return nullptr;
}
return create_columnstore_select_handler_(thd, 0, sel_unit);
}
/***********************************************************
* DESCRIPTION:
* select_handler constructor
* PARAMETERS:
* thd - THD pointer.
* select_lex - sematic tree for the query.
* sel_lex - semantic tree for the query.
***********************************************************/
ha_columnstore_select_handler::ha_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
: select_handler(thd, mcs_hton)
ha_columnstore_select_handler::ha_columnstore_select_handler(THD* thd, SELECT_LEX* sel_lex)
: select_handler(thd, mcs_hton, sel_lex)
, prepared(false)
, scan_ended(false)
, scan_initialized(false)
, pushdown_init_rc(0)
{
const char* timeZone = thd->variables.time_zone->get_name()->ptr();
dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &time_zone);
}
/***********************************************************
* DESCRIPTION:
* select_handler constructor
* PARAMETERS:
* thd - THD pointer.
* sel_unit - semantic tree for the query.
***********************************************************/
ha_columnstore_select_handler::ha_columnstore_select_handler(THD* thd, SELECT_LEX_UNIT* sel_unit)
: select_handler(thd, mcs_hton, sel_unit)
, prepared(false)
, scan_ended(false)
, scan_initialized(false)
, pushdown_init_rc(0)
{
const char* timeZone = thd->variables.time_zone->get_name()->ptr();
dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &time_zone);
}
/***********************************************************
* DESCRIPTION:
* select_handler constructor
* PARAMETERS:
* thd - THD pointer.
* sel_lex - semantic tree for the query.
* sel_unit - unit containing SELECT_LEX's
***********************************************************/
ha_columnstore_select_handler::ha_columnstore_select_handler(THD* thd, SELECT_LEX* sel_lex,
SELECT_LEX_UNIT* sel_unit)
: select_handler(thd, mcs_hton, sel_lex, sel_unit)
, prepared(false)
, scan_ended(false)
, scan_initialized(false)
, pushdown_init_rc(0)
{
select = select_lex;
const char* timeZone = thd->variables.time_zone->get_name()->ptr();
dataconvert::timeZoneToOffset(timeZone, strlen(timeZone), &time_zone);
}
@ -1074,7 +1199,7 @@ bool ha_columnstore_select_handler::prepare()
prepared = true;
if ((!table && !(table = create_tmp_table(thd, select))) || table->fill_item_list(&result_columns))
if ((!table && !(table = create_tmp_table(thd))) || table->fill_item_list(&result_columns))
{
pushdown_init_rc = 1;
DBUG_RETURN(true);

View File

@ -151,7 +151,9 @@ class ha_columnstore_select_handler : public select_handler
// This will be used to restore to the original state later in case
// query execution fails using the select_handler.
cal_impl_if::TableOuterJoinMap tableOuterJoinMap;
ha_columnstore_select_handler(THD* thd_arg, SELECT_LEX* sel);
ha_columnstore_select_handler(THD* thd_arg, SELECT_LEX* sel_lex);
ha_columnstore_select_handler(THD* thd_arg, SELECT_LEX_UNIT* sel_unit);
ha_columnstore_select_handler(THD* thd_arg, SELECT_LEX* sel_lex, SELECT_LEX_UNIT* sel_unit);
~ha_columnstore_select_handler();
int init_scan() override;
int next_row() override;