You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
Merge pull request #1970 from tntnatbry/MCOL-4525
MCOL-4525 Implement columnstore_select_handler=AUTO.
This commit is contained in:
@ -76,6 +76,7 @@ using namespace dataconvert;
|
||||
|
||||
#include "sm.h"
|
||||
#include "ha_mcs_pushdown.h"
|
||||
#include "ha_mcs_sysvars.h"
|
||||
|
||||
#include "bytestream.h"
|
||||
#include "messagequeue.h"
|
||||
@ -2692,11 +2693,12 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed)
|
||||
ha_rows rowsInserted = 0;
|
||||
int rc = 0;
|
||||
|
||||
// ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
|
||||
// transaction or not. User should use this option very carefully since
|
||||
// cpimport currently does not support rollbacks
|
||||
if (((ci->useCpimport == 2) ||
|
||||
((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
|
||||
// ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::ALWAYS means ALWAYS use
|
||||
// cpimport, whether it's in a transaction or not. User should use this option
|
||||
// very carefully since cpimport currently does not support rollbacks
|
||||
if (((ci->useCpimport == mcs_use_import_for_batchinsert_mode_t::ALWAYS) ||
|
||||
((ci->useCpimport == mcs_use_import_for_batchinsert_mode_t::ON) &&
|
||||
(!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
|
||||
(!ci->singleInsert) &&
|
||||
((ci->isLoaddataInfile) ||
|
||||
((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
||||
@ -2832,20 +2834,21 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
|
||||
(thd->lex)->sql_command == SQLCOM_INSERT_SELECT ||
|
||||
ci->isCacheInsert) && !ci->singleInsert )
|
||||
{
|
||||
ci->useCpimport = get_use_import_for_batchinsert(thd);
|
||||
ci->useCpimport = get_use_import_for_batchinsert_mode(thd);
|
||||
|
||||
if (((thd->lex)->sql_command == SQLCOM_INSERT) && (rows > 0))
|
||||
ci->useCpimport = 0;
|
||||
ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::OFF;
|
||||
|
||||
// For now, disable cpimport for cache inserts
|
||||
if (ci->isCacheInsert)
|
||||
ci->useCpimport = 0;
|
||||
ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::OFF;
|
||||
|
||||
// ci->useCpimport = 2 means ALWAYS use cpimport, whether it's in a
|
||||
// transaction or not. User should use this option very carefully since
|
||||
// cpimport currently does not support rollbacks
|
||||
if ((ci->useCpimport == 2) ||
|
||||
((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) //If autocommit on batch insert will use cpimport to load data
|
||||
// ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::ALWAYS means ALWAYS use
|
||||
// cpimport, whether it's in a transaction or not. User should use this option
|
||||
// very carefully since cpimport currently does not support rollbacks
|
||||
if ((ci->useCpimport == mcs_use_import_for_batchinsert_mode_t::ALWAYS) ||
|
||||
((ci->useCpimport == mcs_use_import_for_batchinsert_mode_t::ON) &&
|
||||
(!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) //If autocommit on batch insert will use cpimport to load data
|
||||
{
|
||||
//store table info to connection info
|
||||
CalpontSystemCatalog::TableName tableName;
|
||||
@ -3315,8 +3318,9 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
|
||||
// @bug 2515. Check command intead of vtable state
|
||||
if ( ( ((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) || (thd->lex)->sql_command == SQLCOM_INSERT_SELECT || ci->isCacheInsert) && !ci->singleInsert )
|
||||
{
|
||||
if (((ci->useCpimport == 2) ||
|
||||
((ci->useCpimport == 1) && (!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
|
||||
if (((ci->useCpimport == mcs_use_import_for_batchinsert_mode_t::ALWAYS) ||
|
||||
((ci->useCpimport == mcs_use_import_for_batchinsert_mode_t::ON) &&
|
||||
(!(thd->variables.option_bits & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))))) &&
|
||||
(!ci->singleInsert) &&
|
||||
((ci->isLoaddataInfile) ||
|
||||
((thd->lex)->sql_command == SQLCOM_INSERT) || ((thd->lex)->sql_command == SQLCOM_LOAD) ||
|
||||
@ -3526,7 +3530,7 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
|
||||
ci->isCacheInsert = false;
|
||||
ci->tableOid = 0;
|
||||
ci->rowsHaveInserted = 0;
|
||||
ci->useCpimport = 1;
|
||||
ci->useCpimport = mcs_use_import_for_batchinsert_mode_t::ON;
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "idb_mysql.h"
|
||||
#include "ha_mcs_sysvars.h"
|
||||
|
||||
struct st_ha_create_information;
|
||||
class ha_columnstore_select_handler;
|
||||
@ -274,7 +275,7 @@ struct cal_connection_info
|
||||
filePtr(0),
|
||||
headerLength(0),
|
||||
useXbit(false),
|
||||
useCpimport(1),
|
||||
useCpimport(mcs_use_import_for_batchinsert_mode_t::ON),
|
||||
delimiter('\7'),
|
||||
affectedRows(0)
|
||||
{
|
||||
@ -341,7 +342,7 @@ struct cal_connection_info
|
||||
FILE* filePtr;
|
||||
uint8_t headerLength;
|
||||
bool useXbit;
|
||||
uint8_t useCpimport;
|
||||
mcs_use_import_for_batchinsert_mode_t useCpimport;
|
||||
char delimiter;
|
||||
char enclosed_by;
|
||||
std::vector <execplan::CalpontSystemCatalog::ColType> columnTypes;
|
||||
|
@ -73,38 +73,52 @@ void in_subselect_rewrite_walk(const Item* item_arg, void* arg)
|
||||
}
|
||||
}
|
||||
|
||||
/* @brief opt_flag_unset_PS() - Unsets first_cond_optimization */
|
||||
// Sets SELECT_LEX::first_cond_optimization
|
||||
void first_cond_optimization_flag_set(SELECT_LEX* select_lex)
|
||||
{
|
||||
select_lex->first_cond_optimization = true;
|
||||
}
|
||||
|
||||
// Unsets SELECT_LEX::first_cond_optimization
|
||||
void first_cond_optimization_flag_unset(SELECT_LEX* select_lex)
|
||||
{
|
||||
select_lex->first_cond_optimization = false;
|
||||
}
|
||||
|
||||
/* @brief first_cond_optimization_flag_toggle() - Sets/Unsets first_cond_optimization */
|
||||
/************************************************************
|
||||
* DESCRIPTION:
|
||||
* This function traverses derived tables to unset
|
||||
* SELECT_LEX::first_cond_optimization: a marker to control
|
||||
* optimizations executing PS. If set it allows to apply
|
||||
* optimizations. If unset, it disables optimizations.
|
||||
* This function traverses SELECT_LEX::table_list recursively
|
||||
* to set/unset SELECT_LEX::first_cond_optimization: a marker
|
||||
* to control optimizations executing PS. If set it allows to
|
||||
* apply optimizations. If unset, it disables optimizations.
|
||||
* PARAMETERS:
|
||||
* select_lex - SELECT_LEX* that describes the query.
|
||||
* func - Pointer to a function which either sets/unsets
|
||||
* SELECT_LEX::first_cond_optimization
|
||||
***********************************************************/
|
||||
void opt_flag_unset_PS(SELECT_LEX *select_lex)
|
||||
void first_cond_optimization_flag_toggle(SELECT_LEX *select_lex, void (*func)(SELECT_LEX*))
|
||||
{
|
||||
TABLE_LIST *tbl;
|
||||
List_iterator_fast<TABLE_LIST> li(select_lex->leaf_tables);
|
||||
|
||||
while ((tbl= li++))
|
||||
for (TABLE_LIST *tl= select_lex->get_table_list(); tl; tl= tl->next_local)
|
||||
{
|
||||
if (tbl->is_view_or_derived())
|
||||
if (tl->is_view_or_derived())
|
||||
{
|
||||
SELECT_LEX_UNIT *unit= tbl->get_unit();
|
||||
SELECT_LEX_UNIT *unit= tl->get_unit();
|
||||
|
||||
for (SELECT_LEX *sl= unit->first_select(); sl; sl= sl->next_select())
|
||||
opt_flag_unset_PS(sl);
|
||||
if (unit)
|
||||
{
|
||||
for (SELECT_LEX *sl= unit->first_select(); sl; sl= sl->next_select())
|
||||
{
|
||||
first_cond_optimization_flag_toggle(sl, func);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (select_lex->first_cond_optimization)
|
||||
{
|
||||
select_lex->first_cond_optimization= false;
|
||||
}
|
||||
(*func)(select_lex);
|
||||
}
|
||||
|
||||
|
||||
/* @brief in_subselect_rewrite - Rewrites Item_in_subselect */
|
||||
/************************************************************
|
||||
* DESCRIPTION:
|
||||
|
@ -21,6 +21,8 @@
|
||||
#include "idb_mysql.h"
|
||||
|
||||
bool in_subselect_rewrite(SELECT_LEX *select_lex);
|
||||
void opt_flag_unset_PS(SELECT_LEX *select_lex);
|
||||
void first_cond_optimization_flag_toggle(SELECT_LEX *select_lex, void (*func)(SELECT_LEX*));
|
||||
void first_cond_optimization_flag_unset(SELECT_LEX*);
|
||||
void first_cond_optimization_flag_set(SELECT_LEX*);
|
||||
|
||||
#endif
|
||||
|
@ -21,23 +21,12 @@
|
||||
|
||||
void check_walk(const Item* item, void* arg);
|
||||
|
||||
void disable_indices_for_CEJ(THD *thd_)
|
||||
void restore_query_state(ha_columnstore_select_handler* handler)
|
||||
{
|
||||
TABLE_LIST* global_list;
|
||||
for (global_list = thd_->lex->query_tables; global_list; global_list = global_list->next_global)
|
||||
for (auto iter = handler->tableOuterJoinMap.begin();
|
||||
iter != handler->tableOuterJoinMap.end(); iter++)
|
||||
{
|
||||
// MCOL-652 - doing this with derived tables can cause bad things to happen
|
||||
if (!global_list->derived)
|
||||
{
|
||||
global_list->index_hints = new (thd_->mem_root) List<Index_hint>();
|
||||
|
||||
global_list->index_hints->push_front(new (thd_->mem_root)
|
||||
Index_hint(INDEX_HINT_USE,
|
||||
INDEX_HINT_MASK_JOIN,
|
||||
NULL,
|
||||
0), thd_->mem_root);
|
||||
|
||||
}
|
||||
iter->first->outer_join = iter->second;
|
||||
}
|
||||
}
|
||||
|
||||
@ -407,7 +396,8 @@ create_columnstore_group_by_handler(THD* thd, Query* query)
|
||||
// MCOL-2178 Disable SP support in the group_by_handler for now
|
||||
// Check the session variable value to enable/disable use of
|
||||
// group_by_handler. There is no GBH if SH works for the query.
|
||||
if (get_select_handler(thd) || !get_group_by_handler(thd) || (thd->lex)->sphead)
|
||||
if ((get_select_handler_mode(thd) == mcs_select_handler_mode_t::ON) ||
|
||||
!get_group_by_handler(thd) || (thd->lex)->sphead)
|
||||
{
|
||||
return handler;
|
||||
}
|
||||
@ -754,14 +744,14 @@ int ha_mcs_group_by_handler::end_scan()
|
||||
select_handler*
|
||||
create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
|
||||
{
|
||||
ha_columnstore_select_handler* handler = NULL;
|
||||
mcs_select_handler_mode_t select_handler_mode = get_select_handler_mode(thd);
|
||||
|
||||
// Check the session variable value to enable/disable use of
|
||||
// select_handler
|
||||
if (!get_select_handler(thd) ||
|
||||
if ((select_handler_mode == mcs_select_handler_mode_t::OFF) ||
|
||||
((thd->lex)->sphead && !get_select_handler_in_stored_procedures(thd)))
|
||||
{
|
||||
return handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Flag to indicate if this is a prepared statement
|
||||
@ -774,14 +764,14 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
|
||||
!((select_dumpvar *)(thd->lex)->result)->var_list.is_empty()) &&
|
||||
(!isPS))
|
||||
{
|
||||
return handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Select_handler couldn't properly process UPSERT..SELECT
|
||||
if ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT
|
||||
&& thd->lex->duplicates == DUP_UPDATE)
|
||||
{
|
||||
return handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Iterate and traverse through the item list and the JOIN cond
|
||||
@ -792,71 +782,198 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
|
||||
{
|
||||
if (check_user_var(table_ptr->select_lex))
|
||||
{
|
||||
return handler;
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// We apply dedicated rewrites from MDB here so MDB's data structures
|
||||
// becomes dirty and CS has to raise an error in case of any problem
|
||||
// or unsupported feature.
|
||||
handler = new ha_columnstore_select_handler(thd, select_lex);
|
||||
ha_columnstore_select_handler* handler =
|
||||
new ha_columnstore_select_handler(thd, select_lex);
|
||||
|
||||
JOIN *join = select_lex->join;
|
||||
bool unsupported_feature = false;
|
||||
|
||||
if (select_lex->first_cond_optimization &&
|
||||
select_lex->handle_derived(thd->lex, DT_MERGE))
|
||||
{
|
||||
if (!thd->is_error())
|
||||
{
|
||||
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0),
|
||||
"Error occured in select_lex::handle_derived()");
|
||||
}
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
// This is partially taken from JOIN::optimize_inner() in sql/sql_select.cc
|
||||
if (select_lex->first_cond_optimization)
|
||||
{
|
||||
create_explain_query_if_not_exists(thd->lex, thd->mem_root);
|
||||
Query_arena *arena, backup;
|
||||
arena = thd->activate_stmt_arena_if_needed(&backup);
|
||||
disable_indices_for_CEJ(thd);
|
||||
COND* conds = join->conds;
|
||||
select_lex->where = conds;
|
||||
|
||||
if (isPS)
|
||||
{
|
||||
select_lex->prep_where = conds ? conds->copy_andor_structure(thd) : 0;
|
||||
}
|
||||
|
||||
select_lex->update_used_tables();
|
||||
|
||||
if (arena)
|
||||
thd->restore_active_arena(arena, &backup);
|
||||
|
||||
if (select_lex->handle_derived(thd->lex, DT_MERGE))
|
||||
#ifdef DEBUG_WALK_COND
|
||||
if (conds)
|
||||
{
|
||||
unsupported_feature = true;
|
||||
handler->err_msg.assign("create_columnstore_select_handler(): \
|
||||
Internal error occured in SL::handle_derived()");
|
||||
conds->traverse_cond(cal_impl_if::debug_walk, NULL, Item::POSTFIX);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// Attempt to execute the query using the select handler.
|
||||
// If query execution fails and columnstore_select_handler=AUTO,
|
||||
// fallback to table mode.
|
||||
// Skip execution for EXPLAIN queries
|
||||
if (!thd->lex->describe)
|
||||
{
|
||||
// This is taken from JOIN::optimize()
|
||||
join->fields= &select_lex->item_list;
|
||||
|
||||
// Instantiate handler::table, which is the place for the result set.
|
||||
if (handler->prepare())
|
||||
{
|
||||
// check fallback
|
||||
if (select_handler_mode == mcs_select_handler_mode_t::AUTO) // columnstore_select_handler=AUTO
|
||||
{
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999,
|
||||
"MCS select_handler execution failed. Falling back to server execution");
|
||||
restore_query_state(handler);
|
||||
delete handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// error out
|
||||
if (!thd->is_error())
|
||||
{
|
||||
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0),
|
||||
"Error occured in handler->prepare()");
|
||||
}
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
COND *conds = nullptr;
|
||||
if (!unsupported_feature)
|
||||
{
|
||||
// Rewrite once for PS
|
||||
// Refer to JOIN::optimize_inner() in sql/sql_select.cc
|
||||
// for details on the optimizations performed in this block.
|
||||
if (select_lex->first_cond_optimization)
|
||||
{
|
||||
create_explain_query_if_not_exists(thd->lex, thd->mem_root);
|
||||
arena = thd->activate_stmt_arena_if_needed(&backup);
|
||||
select_lex->first_cond_optimization= false;
|
||||
conds = join->conds;
|
||||
select_lex->where = conds;
|
||||
// Prepare query execution
|
||||
// This is taken from JOIN::exec_inner()
|
||||
if (!select_lex->outer_select() && // (1)
|
||||
select_lex != select_lex->master_unit()->fake_select_lex) // (2)
|
||||
thd->lex->set_limit_rows_examined();
|
||||
|
||||
if (isPS)
|
||||
if (!join->tables_list && (join->table_count || !select_lex->with_sum_func) &&
|
||||
!select_lex->have_window_funcs())
|
||||
{
|
||||
if (!thd->is_error())
|
||||
{
|
||||
restore_query_state(handler);
|
||||
delete handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
if (!join->zero_result_cause &&
|
||||
join->exec_const_cond && !join->exec_const_cond->val_int())
|
||||
join->zero_result_cause = "Impossible WHERE noticed after reading const tables";
|
||||
|
||||
// We've called exec_const_cond->val_int(). This may have caused an error.
|
||||
if (unlikely(thd->is_error()))
|
||||
{
|
||||
// error out
|
||||
handler->pushdown_init_rc = 1;
|
||||
return handler;
|
||||
}
|
||||
|
||||
if (join->zero_result_cause)
|
||||
{
|
||||
if (join->select_lex->have_window_funcs() && join->send_row_on_empty_set())
|
||||
{
|
||||
join->const_tables = join->table_count;
|
||||
join->first_select = sub_select_postjoin_aggr;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!thd->is_error())
|
||||
{
|
||||
select_lex->prep_where = conds ? conds->copy_andor_structure(thd) : 0;
|
||||
restore_query_state(handler);
|
||||
delete handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
select_lex->update_used_tables();
|
||||
|
||||
if (arena)
|
||||
thd->restore_active_arena(arena, &backup);
|
||||
|
||||
// Unset SL::first_cond_optimization
|
||||
opt_flag_unset_PS(select_lex);
|
||||
return handler;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef DEBUG_WALK_COND
|
||||
if (conds)
|
||||
if ((join->select_lex->options & OPTION_SCHEMA_TABLE) &&
|
||||
get_schema_tables_result(join, PROCESSED_BY_JOIN_EXEC))
|
||||
{
|
||||
if (!thd->is_error())
|
||||
{
|
||||
conds->traverse_cond(cal_impl_if::debug_walk, NULL, Item::POSTFIX);
|
||||
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0),
|
||||
"Error occured in get_schema_tables_result()");
|
||||
}
|
||||
#endif
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
handler->scan_initialized = true;
|
||||
mcs_handler_info mhi(reinterpret_cast<void*>(handler), SELECT);
|
||||
|
||||
if ((handler->pushdown_init_rc = ha_mcs_impl_pushdown_init(&mhi, handler->table)))
|
||||
{
|
||||
// check fallback
|
||||
if (select_handler_mode == mcs_select_handler_mode_t::AUTO)
|
||||
{
|
||||
restore_query_state(handler);
|
||||
std::ostringstream oss;
|
||||
oss << "MCS select_handler execution failed";
|
||||
|
||||
if (thd->is_error())
|
||||
{
|
||||
oss << " due to: ";
|
||||
oss << thd->get_stmt_da()->sql_errno() << ": " ;
|
||||
oss << thd->get_stmt_da()->message();
|
||||
oss << " F";
|
||||
thd->clear_error();
|
||||
}
|
||||
else
|
||||
{
|
||||
oss << ", f";
|
||||
}
|
||||
|
||||
oss << "alling back to server execution";
|
||||
thd->get_stmt_da()->clear_warning_info(thd->query_id);
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, oss.str().c_str());
|
||||
delete handler;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (!thd->is_error())
|
||||
{
|
||||
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0),
|
||||
"Error occured in ha_mcs_impl_pushdown_init()");
|
||||
}
|
||||
}
|
||||
|
||||
// Unset select_lex::first_cond_optimization
|
||||
if (select_lex->first_cond_optimization)
|
||||
{
|
||||
first_cond_optimization_flag_toggle(select_lex, &first_cond_optimization_flag_unset);
|
||||
}
|
||||
}
|
||||
|
||||
// We shouldn't raise error now so set an error to raise it later in init_SH.
|
||||
handler->rewrite_error = unsupported_feature;
|
||||
// Return SH even if init fails
|
||||
return handler;
|
||||
}
|
||||
|
||||
@ -869,10 +986,13 @@ create_columnstore_select_handler(THD* thd, SELECT_LEX* select_lex)
|
||||
***********************************************************/
|
||||
ha_columnstore_select_handler::ha_columnstore_select_handler(THD *thd,
|
||||
SELECT_LEX* select_lex)
|
||||
: select_handler(thd, mcs_hton)
|
||||
: select_handler(thd, mcs_hton),
|
||||
prepared(false),
|
||||
scan_ended(false),
|
||||
scan_initialized(false),
|
||||
pushdown_init_rc(0)
|
||||
{
|
||||
select = select_lex;
|
||||
rewrite_error = false;
|
||||
select = select_lex;
|
||||
}
|
||||
|
||||
/***********************************************************
|
||||
@ -881,6 +1001,10 @@ ha_columnstore_select_handler::ha_columnstore_select_handler(THD *thd,
|
||||
***********************************************************/
|
||||
ha_columnstore_select_handler::~ha_columnstore_select_handler()
|
||||
{
|
||||
if (scan_initialized && !scan_ended)
|
||||
{
|
||||
end_scan();
|
||||
}
|
||||
}
|
||||
|
||||
/*@brief Initiate the query for select_handler */
|
||||
@ -895,27 +1019,7 @@ ha_columnstore_select_handler::~ha_columnstore_select_handler()
|
||||
int ha_columnstore_select_handler::init_scan()
|
||||
{
|
||||
DBUG_ENTER("ha_columnstore_select_handler::init_scan");
|
||||
|
||||
int rc = 0;
|
||||
|
||||
if (!rewrite_error)
|
||||
{
|
||||
// handler::table is the place for the result set
|
||||
// Skip execution for EXPLAIN queries
|
||||
if (!thd->lex->describe)
|
||||
{
|
||||
mcs_handler_info mhi(reinterpret_cast<void*>(this), SELECT);
|
||||
rc = ha_mcs_impl_pushdown_init(&mhi, this->table);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
my_printf_error(ER_INTERNAL_ERROR, "%s", MYF(0), err_msg.c_str());
|
||||
sql_print_error("%s", err_msg.c_str());
|
||||
rc = ER_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
DBUG_RETURN(rc);
|
||||
DBUG_RETURN(pushdown_init_rc);
|
||||
}
|
||||
|
||||
/*@brief Fetch next row for select_handler */
|
||||
@ -951,7 +1055,30 @@ int ha_columnstore_select_handler::end_scan()
|
||||
{
|
||||
DBUG_ENTER("ha_columnstore_select_handler::end_scan");
|
||||
|
||||
scan_ended = true;
|
||||
|
||||
int rc = ha_mcs_impl_rnd_end(table, true);
|
||||
|
||||
DBUG_RETURN(rc);
|
||||
}
|
||||
|
||||
// Copy of select_handler::prepare (see sql/select_handler.cc),
|
||||
// with an added if guard
|
||||
bool ha_columnstore_select_handler::prepare()
|
||||
{
|
||||
DBUG_ENTER("ha_columnstore_select_handler::prepare");
|
||||
|
||||
if (prepared)
|
||||
DBUG_RETURN(pushdown_init_rc ? true : false);
|
||||
|
||||
prepared = true;
|
||||
|
||||
if ((!table && !(table = create_tmp_table(thd, select))) ||
|
||||
table->fill_item_list(&result_columns))
|
||||
{
|
||||
pushdown_init_rc = 1;
|
||||
DBUG_RETURN(true);
|
||||
}
|
||||
|
||||
DBUG_RETURN(false);
|
||||
}
|
||||
|
@ -138,10 +138,12 @@ class ha_columnstore_select_handler: public select_handler
|
||||
{
|
||||
private:
|
||||
COLUMNSTORE_SHARE *share;
|
||||
bool prepared;
|
||||
bool scan_ended;
|
||||
|
||||
public:
|
||||
bool rewrite_error;
|
||||
std::string err_msg;
|
||||
bool scan_initialized;
|
||||
int pushdown_init_rc;
|
||||
// MCOL-4525 Store the original TABLE_LIST::outer_join value in a hash map.
|
||||
// This will be used to restore to the original state later in case
|
||||
// query execution fails using the select_handler.
|
||||
@ -151,6 +153,7 @@ public:
|
||||
int init_scan() override;
|
||||
int next_row() override;
|
||||
int end_scan() override;
|
||||
bool prepare() override;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -71,13 +71,28 @@ static MYSQL_THDVAR_ULONGLONG(
|
||||
1
|
||||
);
|
||||
|
||||
static MYSQL_THDVAR_BOOL(
|
||||
const char* mcs_select_handler_mode_values[] = {
|
||||
"OFF",
|
||||
"ON",
|
||||
"AUTO",
|
||||
NullS
|
||||
};
|
||||
|
||||
static TYPELIB mcs_select_handler_mode_values_lib = {
|
||||
array_elements(mcs_select_handler_mode_values) - 1,
|
||||
"mcs_select_handler_mode_values",
|
||||
mcs_select_handler_mode_values,
|
||||
NULL
|
||||
};
|
||||
|
||||
static MYSQL_THDVAR_ENUM(
|
||||
select_handler,
|
||||
PLUGIN_VAR_NOCMDARG,
|
||||
"Enable/Disable the MCS select_handler",
|
||||
NULL,
|
||||
NULL,
|
||||
1
|
||||
PLUGIN_VAR_RQCMDARG,
|
||||
"Set the MCS select_handler to Disabled, Enabled, or Automatic",
|
||||
NULL, // check
|
||||
NULL, // update
|
||||
1, // default
|
||||
&mcs_select_handler_mode_values_lib // values lib
|
||||
);
|
||||
|
||||
static MYSQL_THDVAR_BOOL(
|
||||
@ -288,17 +303,17 @@ static MYSQL_THDVAR_ULONG(
|
||||
1 // block size
|
||||
);
|
||||
|
||||
const char* mcs_use_import_for_batchinsert_values[] = {
|
||||
const char* mcs_use_import_for_batchinsert_mode_values[] = {
|
||||
"OFF",
|
||||
"ON",
|
||||
"ALWAYS",
|
||||
NullS
|
||||
};
|
||||
|
||||
static TYPELIB mcs_use_import_for_batchinsert_values_lib = {
|
||||
array_elements(mcs_use_import_for_batchinsert_values) - 1,
|
||||
"mcs_use_import_for_batchinsert_values",
|
||||
mcs_use_import_for_batchinsert_values,
|
||||
static TYPELIB mcs_use_import_for_batchinsert_mode_values_lib = {
|
||||
array_elements(mcs_use_import_for_batchinsert_mode_values) - 1,
|
||||
"mcs_use_import_for_batchinsert_mode_values",
|
||||
mcs_use_import_for_batchinsert_mode_values,
|
||||
NULL
|
||||
};
|
||||
|
||||
@ -309,7 +324,7 @@ static MYSQL_THDVAR_ENUM(
|
||||
NULL, // check
|
||||
NULL, // update
|
||||
1, // default
|
||||
&mcs_use_import_for_batchinsert_values_lib // values lib
|
||||
&mcs_use_import_for_batchinsert_mode_values_lib // values lib
|
||||
);
|
||||
|
||||
static MYSQL_THDVAR_BOOL(
|
||||
@ -412,11 +427,12 @@ void set_original_optimizer_flags(ulonglong ptr, THD* thd)
|
||||
THDVAR(current_thd, original_optimizer_flags) = (uint64_t)(ptr);
|
||||
}
|
||||
|
||||
bool get_select_handler(THD* thd)
|
||||
mcs_select_handler_mode_t get_select_handler_mode(THD* thd)
|
||||
{
|
||||
return ( thd == NULL ) ? false : THDVAR(thd, select_handler);
|
||||
return ( thd == NULL ) ? mcs_select_handler_mode_t::ON :
|
||||
(mcs_select_handler_mode_t) THDVAR(thd, select_handler);
|
||||
}
|
||||
void set_select_handler(THD* thd, bool value)
|
||||
void set_select_handler_mode(THD* thd, ulong value)
|
||||
{
|
||||
THDVAR(thd, select_handler) = value;
|
||||
}
|
||||
@ -585,12 +601,12 @@ void set_local_query(THD* thd, ulong value)
|
||||
THDVAR(thd, local_query) = value;
|
||||
}
|
||||
|
||||
mcs_use_import_for_batchinsert_t get_use_import_for_batchinsert(THD* thd)
|
||||
mcs_use_import_for_batchinsert_mode_t get_use_import_for_batchinsert_mode(THD* thd)
|
||||
{
|
||||
return ( thd == NULL ) ? mcs_use_import_for_batchinsert_t::ON :
|
||||
(mcs_use_import_for_batchinsert_t) THDVAR(thd, use_import_for_batchinsert);
|
||||
return ( thd == NULL ) ? mcs_use_import_for_batchinsert_mode_t::ON :
|
||||
(mcs_use_import_for_batchinsert_mode_t) THDVAR(thd, use_import_for_batchinsert);
|
||||
}
|
||||
void set_use_import_for_batchinsert(THD* thd, ulong value)
|
||||
void set_use_import_for_batchinsert_mode(THD* thd, ulong value)
|
||||
{
|
||||
THDVAR(thd, use_import_for_batchinsert) = value;
|
||||
}
|
||||
|
@ -33,13 +33,20 @@ enum mcs_compression_type_t {
|
||||
SNAPPY = 2
|
||||
};
|
||||
|
||||
// use_import_for_batchinsert
|
||||
enum mcs_use_import_for_batchinsert_t {
|
||||
// use_import_for_batchinsert mode
|
||||
enum class mcs_use_import_for_batchinsert_mode_t {
|
||||
OFF = 0,
|
||||
ON = 1,
|
||||
ALWAYS = 2
|
||||
};
|
||||
|
||||
// select_handler mode
|
||||
enum class mcs_select_handler_mode_t {
|
||||
OFF = 0,
|
||||
ON = 1,
|
||||
AUTO = 2
|
||||
};
|
||||
|
||||
// simple setters/getters
|
||||
const char* get_original_query(THD* thd);
|
||||
void set_original_query(THD* thd, char* query);
|
||||
@ -53,8 +60,8 @@ void set_fe_conn_info_ptr(void* ptr, THD* thd = NULL);
|
||||
ulonglong get_original_optimizer_flags(THD* thd = NULL);
|
||||
void set_original_optimizer_flags(ulonglong ptr, THD* thd = NULL);
|
||||
|
||||
bool get_select_handler(THD* thd);
|
||||
void set_select_handler(THD* thd, bool value);
|
||||
mcs_select_handler_mode_t get_select_handler_mode(THD* thd);
|
||||
void set_select_handler_mode(THD* thd, ulong value);
|
||||
|
||||
bool get_derived_handler(THD* thd);
|
||||
void set_derived_handler(THD* thd, bool value);
|
||||
@ -107,8 +114,8 @@ void set_decimal_overflow_check(THD* thd, bool value);
|
||||
ulong get_local_query(THD* thd);
|
||||
void set_local_query(THD* thd, ulong value);
|
||||
|
||||
mcs_use_import_for_batchinsert_t get_use_import_for_batchinsert(THD* thd);
|
||||
void set_use_import_for_batchinsert(THD* thd, ulong value);
|
||||
mcs_use_import_for_batchinsert_mode_t get_use_import_for_batchinsert_mode(THD* thd);
|
||||
void set_use_import_for_batchinsert_mode(THD* thd, ulong value);
|
||||
|
||||
ulong get_import_for_batchinsert_delimiter(THD* thd);
|
||||
void set_import_for_batchinsert_delimiter(THD* thd, ulong value);
|
||||
|
@ -81,6 +81,7 @@ template <class T> bool isnan(T);
|
||||
#include "select_handler.h"
|
||||
#include "rpl_rli.h"
|
||||
#include "my_dbug.h"
|
||||
#include "sql_show.h"
|
||||
|
||||
// Now clean up the pollution as best we can...
|
||||
#include "mcsconfig_conflicting_defs_undef.h"
|
||||
|
Reference in New Issue
Block a user