You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-4525 Implement columnstore_select_handler=AUTO.
This feature allows a query execution to fallback to the server, in case query execution using the select_handler (SH) fails. In case of fallback, a warning message containing the original reason for query failure using SH is generated. To accomplish this task, SH execution is moved to an earlier step when we create the SH in create_columnstore_select_handler(), instead of the previous call to SH execution in ha_columnstore_select_handler::init_scan(). This requires some pre-requisite steps that occur in the server in JOIN::optimize() and JOIN::exec() to be performed before starting SH execution. In addition, missing test cases from MCOL-424 are also added to the MTR suite, and the corresponding fix using disable_indices_for_CEJ() is reverted back since the original fix now appears to be redundant.
This commit is contained in:
@ -76,6 +76,7 @@ using namespace dataconvert;
|
||||
|
||||
#include "sm.h"
|
||||
#include "ha_mcs_pushdown.h"
|
||||
#include "ha_mcs_sysvars.h"
|
||||
|
||||
#include "bytestream.h"
|
||||
#include "messagequeue.h"
|
||||
@ -2692,11 +2693,12 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed)
|
||||
ha_rows rowsInserted = 0;
|
||||
int rc = 0;
|
||||
|
||||
// ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
|
||||
// transaction or not. User should use this option very carefully since
|
||||
// cpimport currently does not support rollbacks
|
||||
if (((ci->useCpimport == 2) ||
|
||||
((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
|
||||
// ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::ALWAYS means ALWAYS use
|
||||
// cpimport, whether it's in a transaction or not. User should use this option
|
||||
// very carefully since cpimport currently does not support rollbacks
|
||||
if (((ci->useCpimport == mcs_use_import_for_batchinsert_mode_t::ALWAYS) ||
|
||||
((ci->useCpimport == mcs_use_import_for_batchinsert_mode_t::ON) &&
|
||||
(!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
|
||||
(!ci->singleInsert) &&
|
||||
((ci->isLoaddataInfile) ||
|
||||
((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
||||
@ -2832,20 +2834,21 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
(thd->lex)->sql_command == SQLCOM_INSERT_SELECT ||
|
||||
ci->isCacheInsert) && !ci->singleInsert )
|
||||
{
|
||||
ci->useCpimport = get_use_import_for_batchinsert(thd);
|
||||
ci->useCpimport = get_use_import_for_batchinsert_mode(thd);
|
||||
|
||||
if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0))
|
||||
ci->useCpimport = 0;
|
||||
ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::OFF;
|
||||
|
||||
// For now, disable cpimport for cache inserts
|
||||
if (ci->isCacheInsert)
|
||||
ci->useCpimport = 0;
|
||||
ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::OFF;
|
||||
|
||||
// ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
|
||||
// transaction or not. User should use this option very carefully since
|
||||
// cpimport currently does not support rollbacks
|
||||
if ((ci->useCpimport == 2) ||
|
||||
((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) //If autocommit on batch insert will use cpimport to load data
|
||||
// ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::ALWAYS means ALWAYS use
|
||||
// cpimport, whether it's in a transaction or not. User should use this option
|
||||
// very carefully since cpimport currently does not support rollbacks
|
||||
if ((ci->useCpimport == mcs_use_import_for_batchinsert_mode_t::ALWAYS) ||
|
||||
((ci->useCpimport == mcs_use_import_for_batchinsert_mode_t::ON) &&
|
||||
(!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) //If autocommit on batch insert will use cpimport to load data
|
||||
{
|
||||
//store table info to connection info
|
||||
CalpontSystemCatalog::TableName tableName;
|
||||
@ -3315,8 +3318,9 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
|
||||
// @bug 2515. Check command intead of vtable state
|
||||
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || ci->isCacheInsert) && !ci->singleInsert )
|
||||
{
|
||||
if (((ci->useCpimport == 2) ||
|
||||
((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
|
||||
if (((ci->useCpimport == mcs_use_import_for_batchinsert_mode_t::ALWAYS) ||
|
||||
((ci->useCpimport == mcs_use_import_for_batchinsert_mode_t::ON) &&
|
||||
(!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
|
||||
(!ci->singleInsert) &&
|
||||
((ci->isLoaddataInfile) ||
|
||||
((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
||||
@ -3526,7 +3530,7 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
|
||||
ci->isCacheInsert = false;
|
||||
ci->tableOid = 0;
|
||||
ci->rowsHaveInserted = 0;
|
||||
ci->useCpimport = 1;
|
||||
ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::ON;
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "idb_mysql.h"
|
||||
#include "ha_mcs_sysvars.h"
|
||||
|
||||
struct st_ha_create_information;
|
||||
class ha_columnstore_select_handler;
|
||||
@ -274,7 +275,7 @@ struct cal_connection_info
|
||||
filePtr(0),
|
||||
headerLength(0),
|
||||
useXbit(false),
|
||||
useCpimport(1),
|
||||
useCpimport(mcs_use_import_for_batchinsert_mode_t::ON),
|
||||
delimiter('\7'),
|
||||
affectedRows(0)
|
||||
{
|
||||
@ -341,7 +342,7 @@ struct cal_connection_info
|
||||
FILE* filePtr;
|
||||
uint8_t headerLength;
|
||||
bool useXbit;
|
||||
uint8_t useCpimport;
|
||||
mcs_use_import_for_batchinsert_mode_t useCpimport;
|
||||
char delimiter;
|
||||
char enclosed_by;
|
||||
std::vector <execplan::CalpontSystemCatalog::ColType> columnTypes;
|
||||
|
@ -73,38 +73,52 @@ void in_subselect_rewrite_walk(const Item* item_arg, void* arg)
|
||||
}
|
||||
}
|
||||
|
||||
/* @brief opt_flag_unset_PS() - Unsets first_cond_optimization */
|
||||
// Sets SELECT_LEX::first_cond_optimization
|
||||
void first_cond_optimization_flag_set(SELECT_LEX* select_lex)
|
||||
{
|
||||
select_lex->first_cond_optimization = true;
|
||||
}
|
||||
|
||||
// Unsets SELECT_LEX::first_cond_optimization
|
||||
void first_cond_optimization_flag_unset(SELECT_LEX* select_lex)
|
||||
{
|
||||
select_lex->first_cond_optimization = false;
|
||||
}
|
||||
|
||||
/* @brief first_cond_optimization_flag_toggle() - Sets/Unsets first_cond_optimization */
|
||||
/************************************************************
|
||||
* DESCRIPTION:
|
||||
* This function traverses derived tables to unset
|
||||
* SELECT_LEX::first_cond_optimization: a marker to control
|
||||
* optimizations executing PS. If set it allows to apply
|
||||
* optimizations. If unset, it disables optimizations.
|
||||
* This function traverses SELECT_LEX::table_list recursively
|
||||
* to set/unset SELECT_LEX::first_cond_optimization: a marker
|
||||
* to control optimizations executing PS. If set it allows to
|
||||
* apply optimizations. If unset, it disables optimizations.
|
||||
* PARAMETERS:
|
||||
* select_lex - SELECT_LEX* that describes the query.
|
||||
* func - Pointer to a function which either sets/unsets
|
||||
* SELECT_LEX::first_cond_optimization
|
||||
***********************************************************/
|
||||
void opt_flag_unset_PS(SELECT_LEX *select_lex)
|
||||
void first_cond_optimization_flag_toggle(SELECT_LEX *select_lex, void (*func)(SELECT_LEX*))
|
||||
{
|
||||
TABLE_LIST *tbl;
|
||||
List_iterator_fast<TABLE_LIST> li(select_lex->leaf_tables);
|
||||
|
||||
while ((tbl= li++))
|
||||
for (TABLE_LIST *tl= select_lex->get_table_list(); tl; tl= tl->next_local)
|
||||
{
|
||||
if (tbl->is_view_or_derived())
|
||||
if (tl->is_view_or_derived())
|
||||
{
|
||||
SELECT_LEX_UNIT *unit= tbl->get_unit();
|
||||
SELECT_LEX_UNIT *unit= tl->get_unit();
|
||||
|
||||
for (SELECT_LEX *sl= unit->first_select(); sl; sl= sl->next_select())
|
||||
opt_flag_unset_PS(sl);
|
||||
if (unit)
|
||||
{
|
||||
for (SELECT_LEX *sl= unit->first_select(); sl; sl= sl->next_select())
|
||||
{
|
||||
first_cond_optimization_flag_toggle(sl, func);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (select_lex->first_cond_optimization)
|
||||
{
|
||||
select_lex->first_cond_optimization= false;
|
||||
}
|
||||
(*func)(select_lex);
|
||||
}
|
||||
|
||||
|
||||
/* @brief in_subselect_rewrite - Rewrites Item_in_subselect */
|
||||
/************************************************************
|
||||
* DESCRIPTION:
|
||||
|
@ -21,6 +21,8 @@
|
||||
#include "idb_mysql.h"
|
||||
|
||||
bool in_subselect_rewrite(SELECT_LEX *select_lex);
|
||||
void opt_flag_unset_PS(SELECT_LEX *select_lex);
|
||||
void first_cond_optimization_flag_toggle(SELECT_LEX *select_lex, void (*func)(SELECT_LEX*));
|
||||
void first_cond_optimization_flag_unset(SELECT_LEX*);
|
||||
void first_cond_optimization_flag_set(SELECT_LEX*);
|
||||
|
||||
#endif
|
||||
|
@ -21,23 +21,12 @@
|
||||
|
||||
void check_walk(const Item* item, void* arg);
|
||||
|
||||
void disable_indices_for_CEJ(THD *thd_)
|
||||
void restore_query_state(ha_columnstore_select_handler* handler)
|
||||
{
|
||||
TABLE_LIST* global_list;
|
||||
for (global_list = thd_->lex->query_tables; global_list; global_list = global_list->next_global)
|
||||
for (auto iter = handler->tableOuterJoinMap.begin();
|
||||
iter != handler->tableOuterJoinMap.end(); iter++)
|
||||
{
|
||||
// MCOL-652 - doing this with derived tables can cause bad things to happen
|
||||
if (!global_list->derived)
|
||||
{
|
||||
global_list->index_hints = new (thd_->mem_root) List<Index_hint>();
|
||||
|
||||
global_list->index_hints->push_front(new (thd_->mem_root)
|
||||
Index_hint(INDEX_HINT_USE,
|
||||
INDEX_HINT_MASK_JOIN,
|
||||
NULL,
|
||||
0), thd_->mem_root);
|
||||
|
||||
}
|
||||
iter->first->outer_join = iter->second;
|
||||
}
|
||||
}
|
||||
|
||||
@ -407,7 +396,8 @@ create_columnstore_group_by_handler(THD* thd, Query* query)
|
||||
// MCOL-2178 Disable SP support in the group_by_handler for now
|
||||
// Check the session variable value to enable/disable use of
|
||||
// group_by_handler. There is no GBH if SH works for the query.
|
||||
if (get_select_handler(thd) || !get_group_by_handler(thd) || (thd->lex)->sphead)
|
||||
if ((get_select_handler_mode(thd) == mcs_select_handler_mode_t::ON) ||
|
||||
!get_group_by_handler(thd) || (thd->lex)->sphead)
|
||||
{
|
||||
return handler;
|
||||
}
|
||||
@ -754,14 +744,14 @@ int ha_mcs_group_by_handler::end_scan()
|
||||
select_handler*
|
||||
create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
|
||||
{
|
||||
ha_columnstore_select_handler* handler = NULL;
|
||||
mcs_select_handler_mode_t select_handler_mode = get_select_handler_mode(thd);
|
||||
|
||||
// Check the session variable value to enable/disable use of
|
||||
// select_handler
|
||||
if (!get_select_handler(thd) ||
|
||||
if ((select_handler_mode == mcs_select_handler_mode_t::OFF) ||
|
||||
((thd->lex)->sphead && !get_select_handler_in_stored_procedures(thd)))
|
||||
{
|
||||
return handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Flag to indicate if this is a prepared statement
|
||||
@ -774,14 +764,14 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
|
||||
!((select_dumpvar *)(thd->lex)->result)->var_list.is_empty()) &&
|
||||
(!isPS))
|
||||
{
|
||||
return handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Select_handler couldn't properly process UPSERT..SELECT
|
||||
if ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT
|
||||
&& thd->lex->duplicates == DUP_UPDATE)
|
||||
{
|
||||
return handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Iterate and traverse through the item list and the JOIN cond
|
||||
@ -792,71 +782,198 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
|
||||
{
|
||||
if (check_user_var(table_ptr->select_lex))
|
||||
{
|
||||
return handler;
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// We apply dedicated rewrites from MDB here so MDB's data structures
|
||||
// becomes dirty and CS has to raise an error in case of any problem
|
||||
// or unsupported feature.
|
||||
handler = new ha_columnstore_select_handler(thd, select_lex);
|
||||
ha_columnstore_select_handler* handler =
|
||||
new ha_columnstore_select_handler(thd, select_lex);
|
||||
|
||||
JOIN *join = select_lex->join;
|
||||
bool unsupported_feature = false;
|
||||
|
||||
if (select_lex->first_cond_optimization &&
|
||||
select_lex->handle_derived(thd->lex, DT_MERGE))
|
||||
{
|
||||
if (!thd->is_error())
|
||||
{
|
||||
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0),
|
||||
"Error occured in select_lex::handle_derived()");
|
||||
}
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
// This is partially taken from JOIN::optimize_inner() in sql/sql_select.cc
|
||||
if (select_lex->first_cond_optimization)
|
||||
{
|
||||
create_explain_query_if_not_exists(thd->lex, thd->mem_root);
|
||||
Query_arena *arena, backup;
|
||||
arena = thd->activate_stmt_arena_if_needed(&backup);
|
||||
disable_indices_for_CEJ(thd);
|
||||
COND* conds = join->conds;
|
||||
select_lex->where = conds;
|
||||
|
||||
if (isPS)
|
||||
{
|
||||
select_lex->prep_where = conds ? conds->copy_andor_structure(thd) : 0;
|
||||
}
|
||||
|
||||
select_lex->update_used_tables();
|
||||
|
||||
if (arena)
|
||||
thd->restore_active_arena(arena, &backup);
|
||||
|
||||
if (select_lex->handle_derived(thd->lex, DT_MERGE))
|
||||
#ifdef DEBUG_WALK_COND
|
||||
if (conds)
|
||||
{
|
||||
unsupported_feature = true;
|
||||
handler->err_msg.assign("create_columnstore_select_handler(): \
|
||||
Internal error occured in SL::handle_derived()");
|
||||
conds->traverse_cond(cal_impl_if::debug_walk, NULL, Item::POSTFIX);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// Attempt to execute the query using the select handler.
|
||||
// If query execution fails and columnstore_select_handler=AUTO,
|
||||
// fallback to table mode.
|
||||
// Skip execution for EXPLAIN queries
|
||||
if (!thd->lex->describe)
|
||||
{
|
||||
// This is taken from JOIN::optimize()
|
||||
join->fields= &select_lex->item_list;
|
||||
|
||||
// Instantiate handler::table, which is the place for the result set.
|
||||
if (handler->prepare())
|
||||
{
|
||||
// check fallback
|
||||
if (select_handler_mode == mcs_select_handler_mode_t::AUTO) // columnstore_select_handler=AUTO
|
||||
{
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999,
|
||||
"MCS select_handler execution failed. Falling back to server execution");
|
||||
restore_query_state(handler);
|
||||
delete handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// error out
|
||||
if (!thd->is_error())
|
||||
{
|
||||
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0),
|
||||
"Error occured in handler->prepare()");
|
||||
}
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
COND *conds = nullptr;
|
||||
if (!unsupported_feature)
|
||||
{
|
||||
// Rewrite once for PS
|
||||
// Refer to JOIN::optimize_inner() in sql/sql_select.cc
|
||||
// for details on the optimizations performed in this block.
|
||||
if (select_lex->first_cond_optimization)
|
||||
{
|
||||
create_explain_query_if_not_exists(thd->lex, thd->mem_root);
|
||||
arena = thd->activate_stmt_arena_if_needed(&backup);
|
||||
select_lex->first_cond_optimization= false;
|
||||
conds = join->conds;
|
||||
select_lex->where = conds;
|
||||
// Prepare query execution
|
||||
// This is taken from JOIN::exec_inner()
|
||||
if (!select_lex->outer_select() && // (1)
|
||||
select_lex != select_lex->master_unit()->fake_select_lex) // (2)
|
||||
thd->lex->set_limit_rows_examined();
|
||||
|
||||
if (isPS)
|
||||
if (!join->tables_list && (join->table_count || !select_lex->with_sum_func) &&
|
||||
!select_lex->have_window_funcs())
|
||||
{
|
||||
if (!thd->is_error())
|
||||
{
|
||||
restore_query_state(handler);
|
||||
delete handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
if (!join->zero_result_cause &&
|
||||
join->exec_const_cond && !join->exec_const_cond->val_int())
|
||||
join->zero_result_cause = "Impossible WHERE noticed after reading const tables";
|
||||
|
||||
// We've called exec_const_cond->val_int(). This may have caused an error.
|
||||
if (unlikely(thd->is_error()))
|
||||
{
|
||||
// error out
|
||||
handler->pushdown_init_rc = 1;
|
||||
return handler;
|
||||
}
|
||||
|
||||
if (join->zero_result_cause)
|
||||
{
|
||||
if (join->select_lex->have_window_funcs() && join->send_row_on_empty_set())
|
||||
{
|
||||
join->const_tables = join->table_count;
|
||||
join->first_select = sub_select_postjoin_aggr;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!thd->is_error())
|
||||
{
|
||||
select_lex->prep_where = conds ? conds->copy_andor_structure(thd) : 0;
|
||||
restore_query_state(handler);
|
||||
delete handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
select_lex->update_used_tables();
|
||||
|
||||
if (arena)
|
||||
thd->restore_active_arena(arena, &backup);
|
||||
|
||||
// Unset SL::first_cond_optimization
|
||||
opt_flag_unset_PS(select_lex);
|
||||
return handler;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef DEBUG_WALK_COND
|
||||
if (conds)
|
||||
if ((join->select_lex->options & OPTION_SCHEMA_TABLE) &&
|
||||
get_schema_tables_result(join, PROCESSED_BY_JOIN_EXEC))
|
||||
{
|
||||
if (!thd->is_error())
|
||||
{
|
||||
conds->traverse_cond(cal_impl_if::debug_walk, NULL, Item::POSTFIX);
|
||||
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0),
|
||||
"Error occured in get_schema_tables_result()");
|
||||
}
|
||||
#endif
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
handler->scan_initialized = true;
|
||||
mcs_handler_info mhi(reinterpret_cast<void*>(handler), SELECT);
|
||||
|
||||
if ((handler->pushdown_init_rc = ha_mcs_impl_pushdown_init(&mhi, handler->table)))
|
||||
{
|
||||
// check fallback
|
||||
if (select_handler_mode == mcs_select_handler_mode_t::AUTO)
|
||||
{
|
||||
restore_query_state(handler);
|
||||
std::ostringstream oss;
|
||||
oss << "MCS select_handler execution failed";
|
||||
|
||||
if (thd->is_error())
|
||||
{
|
||||
oss << " due to: ";
|
||||
oss << thd->get_stmt_da()->sql_errno() << ": " ;
|
||||
oss << thd->get_stmt_da()->message();
|
||||
oss << " F";
|
||||
thd->clear_error();
|
||||
}
|
||||
else
|
||||
{
|
||||
oss << ", f";
|
||||
}
|
||||
|
||||
oss << "alling back to server execution";
|
||||
thd->get_stmt_da()->clear_warning_info(thd->query_id);
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, oss.str().c_str());
|
||||
delete handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (!thd->is_error())
|
||||
{
|
||||
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0),
|
||||
"Error occured in ha_mcs_impl_pushdown_init()");
|
||||
}
|
||||
}
|
||||
|
||||
// Unset select_lex::first_cond_optimization
|
||||
if (select_lex->first_cond_optimization)
|
||||
{
|
||||
first_cond_optimization_flag_toggle(select_lex, &first_cond_optimization_flag_unset);
|
||||
}
|
||||
}
|
||||
|
||||
// We shouldn't raise error now so set an error to raise it later in init_SH.
|
||||
handler->rewrite_error = unsupported_feature;
|
||||
// Return SH even if init fails
|
||||
return handler;
|
||||
}
|
||||
|
||||
@ -869,10 +986,13 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
|
||||
***********************************************************/
|
||||
ha_columnstore_select_handler::ha_columnstore_select_handler(THD *thd,
|
||||
SELECT_LEX* select_lex)
|
||||
: select_handler(thd, mcs_hton)
|
||||
: select_handler(thd, mcs_hton),
|
||||
prepared(false),
|
||||
scan_ended(false),
|
||||
scan_initialized(false),
|
||||
pushdown_init_rc(0)
|
||||
{
|
||||
select = select_lex;
|
||||
rewrite_error = false;
|
||||
select = select_lex;
|
||||
}
|
||||
|
||||
/***********************************************************
|
||||
@ -881,6 +1001,10 @@ ha_columnstore_select_handler::ha_columnstore_select_handler(THD *thd,
|
||||
***********************************************************/
|
||||
ha_columnstore_select_handler::~ha_columnstore_select_handler()
|
||||
{
|
||||
if (scan_initialized && !scan_ended)
|
||||
{
|
||||
end_scan();
|
||||
}
|
||||
}
|
||||
|
||||
/*@brief Initiate the query for select_handler */
|
||||
@ -895,27 +1019,7 @@ ha_columnstore_select_handler::~ha_columnstore_select_handler()
|
||||
int ha_columnstore_select_handler::init_scan()
|
||||
{
|
||||
DBUG_ENTER("ha_columnstore_select_handler::init_scan");
|
||||
|
||||
int rc = 0;
|
||||
|
||||
if (!rewrite_error)
|
||||
{
|
||||
// handler::table is the place for the result set
|
||||
// Skip execution for EXPLAIN queries
|
||||
if (!thd->lex->describe)
|
||||
{
|
||||
mcs_handler_info mhi(reinterpret_cast<void*>(this), SELECT);
|
||||
rc = ha_mcs_impl_pushdown_init(&mhi, this->table);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0), err_msg.c_str());
|
||||
sql_print_error("%s", err_msg.c_str());
|
||||
rc = ER_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
DBUG_RETURN(rc);
|
||||
DBUG_RETURN(pushdown_init_rc);
|
||||
}
|
||||
|
||||
/*@brief Fetch next row for select_handler */
|
||||
@ -951,7 +1055,30 @@ int ha_columnstore_select_handler::end_scan()
|
||||
{
|
||||
DBUG_ENTER("ha_columnstore_select_handler::end_scan");
|
||||
|
||||
scan_ended = true;
|
||||
|
||||
int rc = ha_mcs_impl_rnd_end(table, true);
|
||||
|
||||
DBUG_RETURN(rc);
|
||||
}
|
||||
|
||||
// Copy of select_handler::prepare (see sql/select_handler.cc),
|
||||
// with an added if guard
|
||||
bool ha_columnstore_select_handler::prepare()
|
||||
{
|
||||
DBUG_ENTER("ha_columnstore_select_handler::prepare");
|
||||
|
||||
if (prepared)
|
||||
DBUG_RETURN(pushdown_init_rc ? true : false);
|
||||
|
||||
prepared = true;
|
||||
|
||||
if ((!table && !(table = create_tmp_table(thd, select))) ||
|
||||
table->fill_item_list(&result_columns))
|
||||
{
|
||||
pushdown_init_rc = 1;
|
||||
DBUG_RETURN(true);
|
||||
}
|
||||
|
||||
DBUG_RETURN(false);
|
||||
}
|
||||
|
@ -138,10 +138,12 @@ class ha_columnstore_select_handler: public select_handler
|
||||
{
|
||||
private:
|
||||
COLUMNSTORE_SHARE *share;
|
||||
bool prepared;
|
||||
bool scan_ended;
|
||||
|
||||
public:
|
||||
bool rewrite_error;
|
||||
std::string err_msg;
|
||||
bool scan_initialized;
|
||||
int pushdown_init_rc;
|
||||
// MCOL-4525 Store the original TABLE_LIST::outer_join value in a hash map.
|
||||
// This will be used to restore to the original state later in case
|
||||
// query execution fails using the select_handler.
|
||||
@ -151,6 +153,7 @@ public:
|
||||
int init_scan() override;
|
||||
int next_row() override;
|
||||
int end_scan() override;
|
||||
bool prepare() override;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -71,13 +71,28 @@ static MYSQL_THDVAR_ULONGLONG(
|
||||
1
|
||||
);
|
||||
|
||||
static MYSQL_THDVAR_BOOL(
|
||||
const char* mcs_select_handler_mode_values[] = {
|
||||
"OFF",
|
||||
"ON",
|
||||
"AUTO",
|
||||
NullS
|
||||
};
|
||||
|
||||
static TYPELIB mcs_select_handler_mode_values_lib = {
|
||||
array_elements(mcs_select_handler_mode_values) - 1,
|
||||
"mcs_select_handler_mode_values",
|
||||
mcs_select_handler_mode_values,
|
||||
NULL
|
||||
};
|
||||
|
||||
static MYSQL_THDVAR_ENUM(
|
||||
select_handler,
|
||||
PLUGIN_VAR_NOCMDARG,
|
||||
"Enable/Disable the MCS select_handler",
|
||||
NULL,
|
||||
NULL,
|
||||
1
|
||||
PLUGIN_VAR_RQCMDARG,
|
||||
"Set the MCS select_handler to Disabled, Enabled, or Automatic",
|
||||
NULL, // check
|
||||
NULL, // update
|
||||
1, // default
|
||||
&mcs_select_handler_mode_values_lib // values lib
|
||||
);
|
||||
|
||||
static MYSQL_THDVAR_BOOL(
|
||||
@ -288,17 +303,17 @@ static MYSQL_THDVAR_ULONG(
|
||||
1 // block size
|
||||
);
|
||||
|
||||
const char* mcs_use_import_for_batchinsert_values[] = {
|
||||
const char* mcs_use_import_for_batchinsert_mode_values[] = {
|
||||
"OFF",
|
||||
"ON",
|
||||
"ALWAYS",
|
||||
NullS
|
||||
};
|
||||
|
||||
static TYPELIB mcs_use_import_for_batchinsert_values_lib = {
|
||||
array_elements(mcs_use_import_for_batchinsert_values) - 1,
|
||||
"mcs_use_import_for_batchinsert_values",
|
||||
mcs_use_import_for_batchinsert_values,
|
||||
static TYPELIB mcs_use_import_for_batchinsert_mode_values_lib = {
|
||||
array_elements(mcs_use_import_for_batchinsert_mode_values) - 1,
|
||||
"mcs_use_import_for_batchinsert_mode_values",
|
||||
mcs_use_import_for_batchinsert_mode_values,
|
||||
NULL
|
||||
};
|
||||
|
||||
@ -309,7 +324,7 @@ static MYSQL_THDVAR_ENUM(
|
||||
NULL, // check
|
||||
NULL, // update
|
||||
1, // default
|
||||
&mcs_use_import_for_batchinsert_values_lib // values lib
|
||||
&mcs_use_import_for_batchinsert_mode_values_lib // values lib
|
||||
);
|
||||
|
||||
static MYSQL_THDVAR_BOOL(
|
||||
@ -412,11 +427,12 @@ void set_original_optimizer_flags(ulonglong ptr, THD* thd)
|
||||
THDVAR(current_thd, original_optimizer_flags) = (uint64_t)(ptr);
|
||||
}
|
||||
|
||||
bool get_select_handler(THD* thd)
|
||||
mcs_select_handler_mode_t get_select_handler_mode(THD* thd)
|
||||
{
|
||||
return ( thd == NULL ) ? false : THDVAR(thd, select_handler);
|
||||
return ( thd == NULL ) ? mcs_select_handler_mode_t::ON :
|
||||
(mcs_select_handler_mode_t) THDVAR(thd, select_handler);
|
||||
}
|
||||
void set_select_handler(THD* thd, bool value)
|
||||
void set_select_handler_mode(THD* thd, ulong value)
|
||||
{
|
||||
THDVAR(thd, select_handler) = value;
|
||||
}
|
||||
@ -585,12 +601,12 @@ void set_local_query(THD* thd, ulong value)
|
||||
THDVAR(thd, local_query) = value;
|
||||
}
|
||||
|
||||
mcs_use_import_for_batchinsert_t get_use_import_for_batchinsert(THD* thd)
|
||||
mcs_use_import_for_batchinsert_mode_t get_use_import_for_batchinsert_mode(THD* thd)
|
||||
{
|
||||
return ( thd == NULL ) ? mcs_use_import_for_batchinsert_t::ON :
|
||||
(mcs_use_import_for_batchinsert_t) THDVAR(thd, use_import_for_batchinsert);
|
||||
return ( thd == NULL ) ? mcs_use_import_for_batchinsert_mode_t::ON :
|
||||
(mcs_use_import_for_batchinsert_mode_t) THDVAR(thd, use_import_for_batchinsert);
|
||||
}
|
||||
void set_use_import_for_batchinsert(THD* thd, ulong value)
|
||||
void set_use_import_for_batchinsert_mode(THD* thd, ulong value)
|
||||
{
|
||||
THDVAR(thd, use_import_for_batchinsert) = value;
|
||||
}
|
||||
|
@ -33,13 +33,20 @@ enum mcs_compression_type_t {
|
||||
SNAPPY = 2
|
||||
};
|
||||
|
||||
// use_import_for_batchinsert
|
||||
enum mcs_use_import_for_batchinsert_t {
|
||||
// use_import_for_batchinsert mode
|
||||
enum class mcs_use_import_for_batchinsert_mode_t {
|
||||
OFF = 0,
|
||||
ON = 1,
|
||||
ALWAYS = 2
|
||||
};
|
||||
|
||||
// select_handler mode
|
||||
enum class mcs_select_handler_mode_t {
|
||||
OFF = 0,
|
||||
ON = 1,
|
||||
AUTO = 2
|
||||
};
|
||||
|
||||
// simple setters/getters
|
||||
const char* get_original_query(THD* thd);
|
||||
void set_original_query(THD* thd, char* query);
|
||||
@ -53,8 +60,8 @@ void set_fe_conn_info_ptr(void* ptr, THD* thd = NULL);
|
||||
ulonglong get_original_optimizer_flags(THD* thd = NULL);
|
||||
void set_original_optimizer_flags(ulonglong ptr, THD* thd = NULL);
|
||||
|
||||
bool get_select_handler(THD* thd);
|
||||
void set_select_handler(THD* thd, bool value);
|
||||
mcs_select_handler_mode_t get_select_handler_mode(THD* thd);
|
||||
void set_select_handler_mode(THD* thd, ulong value);
|
||||
|
||||
bool get_derived_handler(THD* thd);
|
||||
void set_derived_handler(THD* thd, bool value);
|
||||
@ -107,8 +114,8 @@ void set_decimal_overflow_check(THD* thd, bool value);
|
||||
ulong get_local_query(THD* thd);
|
||||
void set_local_query(THD* thd, ulong value);
|
||||
|
||||
mcs_use_import_for_batchinsert_t get_use_import_for_batchinsert(THD* thd);
|
||||
void set_use_import_for_batchinsert(THD* thd, ulong value);
|
||||
mcs_use_import_for_batchinsert_mode_t get_use_import_for_batchinsert_mode(THD* thd);
|
||||
void set_use_import_for_batchinsert_mode(THD* thd, ulong value);
|
||||
|
||||
ulong get_import_for_batchinsert_delimiter(THD* thd);
|
||||
void set_import_for_batchinsert_delimiter(THD* thd, ulong value);
|
||||
|
@ -81,6 +81,7 @@ template <class T> bool isnan(T);
|
||||
#include "select_handler.h"
|
||||
#include "rpl_rli.h"
|
||||
#include "my_dbug.h"
|
||||
#include "sql_show.h"
|
||||
|
||||
// Now clean up the pollution as best we can...
|
||||
#include "mcsconfig_conflicting_defs_undef.h"
|
||||
|
Reference in New Issue
Block a user