1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-4023 Pushdown WHERE conditions for UPDATE/DELETE.

For certain queries, such as:
  update cs1 set i = 41 where i = 42 or (i is null and 42 is null);
the SELECT_LEX.where does not contain the required where conditions.
Server sends the where conditions in the call to cond_push(), so
we are storing them in a handler data member, condStack, and later
push them down to getSelectPlan() for UPDATES/DELETEs.
This commit is contained in:
Gagan Goel
2020-05-29 22:30:34 -04:00
committed by Patrick LeBlanc
parent a8f5d353bd
commit 01ff2652a6
6 changed files with 98 additions and 22 deletions

View File

@ -448,7 +448,7 @@ int ha_mcs::direct_update_rows(ha_rows *update_rows)
int rc; int rc;
try try
{ {
rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows); rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows, condStack);
} }
catch (std::runtime_error& e) catch (std::runtime_error& e)
{ {
@ -464,7 +464,7 @@ int ha_mcs::direct_update_rows(ha_rows *update_rows, ha_rows *found_rows)
int rc; int rc;
try try
{ {
rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows); rc = ha_mcs_impl_direct_update_delete_rows(false, update_rows, condStack);
*found_rows = *update_rows; *found_rows = *update_rows;
} }
catch (std::runtime_error& e) catch (std::runtime_error& e)
@ -487,7 +487,7 @@ int ha_mcs::direct_delete_rows(ha_rows *deleted_rows)
int rc; int rc;
try try
{ {
rc = ha_mcs_impl_direct_update_delete_rows(true, deleted_rows); rc = ha_mcs_impl_direct_update_delete_rows(true, deleted_rows, condStack);
} }
catch (std::runtime_error& e) catch (std::runtime_error& e)
{ {
@ -629,7 +629,7 @@ int ha_mcs::rnd_init(bool scan)
{ {
try try
{ {
rc = ha_mcs_impl_rnd_init(table); rc = ha_mcs_impl_rnd_init(table, condStack);
} }
catch (std::runtime_error& e) catch (std::runtime_error& e)
{ {
@ -1110,7 +1110,7 @@ const COND* ha_mcs::cond_push(const COND* cond)
COND* ret_cond = NULL; COND* ret_cond = NULL;
try try
{ {
ret_cond = ha_mcs_impl_cond_push(const_cast<COND*>(cond), table); ret_cond = ha_mcs_impl_cond_push(const_cast<COND*>(cond), table, condStack);
} }
catch (std::runtime_error& e) catch (std::runtime_error& e)
{ {
@ -1119,6 +1119,23 @@ const COND* ha_mcs::cond_push(const COND* cond)
DBUG_RETURN(ret_cond); DBUG_RETURN(ret_cond);
} }
void ha_mcs::cond_pop()
{
DBUG_ENTER("ha_mcs::cond_pop");
THD* thd = current_thd;
if ((((thd->lex)->sql_command == SQLCOM_UPDATE) ||
((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) ||
((thd->lex)->sql_command == SQLCOM_DELETE) ||
((thd->lex)->sql_command == SQLCOM_DELETE_MULTI)) &&
!condStack.empty())
{
condStack.pop_back();
}
DBUG_VOID_RETURN;
}
struct st_mysql_storage_engine columnstore_storage_engine = struct st_mysql_storage_engine columnstore_storage_engine =
{ MYSQL_HANDLERTON_INTERFACE_VERSION }; { MYSQL_HANDLERTON_INTERFACE_VERSION };

View File

@ -18,6 +18,7 @@
MA 02110-1301, USA. */ MA 02110-1301, USA. */
#ifndef HA_MCS_H__ #ifndef HA_MCS_H__
#define HA_MCS_H__ #define HA_MCS_H__
#include <my_config.h> #include <my_config.h>
#include "idb_mysql.h" #include "idb_mysql.h"
#include "ha_mcs_sysvars.h" #include "ha_mcs_sysvars.h"
@ -44,6 +45,11 @@ class ha_mcs: public handler
THR_LOCK_DATA lock; ///< MySQL lock THR_LOCK_DATA lock; ///< MySQL lock
COLUMNSTORE_SHARE* share; ///< Shared lock info COLUMNSTORE_SHARE* share; ///< Shared lock info
ulonglong int_table_flags; ulonglong int_table_flags;
// We are using a vector here to mimick the stack functionality
// using push_back() and pop_back()
// as apparently there is a linker error on the std::stack<COND*>::pop()
// call on Ubuntu18.
std::vector<COND*> condStack;
public: public:
ha_mcs(handlerton* hton, TABLE_SHARE* table_arg); ha_mcs(handlerton* hton, TABLE_SHARE* table_arg);
@ -222,6 +228,7 @@ public:
THR_LOCK_DATA** store_lock(THD* thd, THR_LOCK_DATA** to, THR_LOCK_DATA** store_lock(THD* thd, THR_LOCK_DATA** to,
enum thr_lock_type lock_type); ///< required enum thr_lock_type lock_type); ///< required
const COND* cond_push(const COND* cond); const COND* cond_push(const COND* cond);
void cond_pop() override;
uint8 table_cache_type() uint8 table_cache_type()
{ {
return HA_CACHE_TBL_NOCACHE; return HA_CACHE_TBL_NOCACHE;

View File

@ -6333,10 +6333,12 @@ int processFrom(bool &isUnion,
int processWhere(SELECT_LEX &select_lex, int processWhere(SELECT_LEX &select_lex,
gp_walk_info &gwi, gp_walk_info &gwi,
SCSEP &csep, SCSEP &csep,
List<Item> &on_expr_list) List<Item> &on_expr_list,
const std::vector<COND*>& condStack)
{ {
JOIN* join = select_lex.join; JOIN* join = select_lex.join;
Item_cond* icp = 0; Item_cond* icp = 0;
bool isUpdateDelete = false;
if (join != 0) if (join != 0)
icp = reinterpret_cast<Item_cond*>(join->conds); icp = reinterpret_cast<Item_cond*>(join->conds);
@ -6354,7 +6356,7 @@ int processWhere(SELECT_LEX &select_lex,
((gwi.thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) || ((gwi.thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) ||
((gwi.thd->lex)->sql_command == SQLCOM_DELETE_MULTI ))) ((gwi.thd->lex)->sql_command == SQLCOM_DELETE_MULTI )))
{ {
icp = reinterpret_cast<Item_cond*>(select_lex.where); isUpdateDelete = true;
} }
if (icp) if (icp)
@ -6393,6 +6395,51 @@ int processWhere(SELECT_LEX &select_lex,
return ER_INTERNAL_ERROR; return ER_INTERNAL_ERROR;
} }
} }
else if (isUpdateDelete)
{
// MCOL-4023 For updates/deletes, we iterate over the pushed down condStack
if (!condStack.empty())
{
std::vector<COND*>::const_iterator condStackIter = condStack.begin();
while (condStackIter != condStack.end())
{
COND* cond = *condStackIter++;
cond->traverse_cond(gp_walk, &gwi, Item::POSTFIX);
if (gwi.fatalParseError)
{
if (gwi.thd->derived_tables_processing)
{
gwi.cs_vtable_is_update_with_derive = true;
return -1;
}
setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi);
return ER_INTERNAL_ERROR;
}
}
}
// if condStack is empty(), check the select_lex for where conditions
// as a last resort
else if ((icp = reinterpret_cast<Item_cond*>(select_lex.where)) != 0)
{
icp->traverse_cond(gp_walk, &gwi, Item::POSTFIX);
if (gwi.fatalParseError)
{
if (gwi.thd->derived_tables_processing)
{
gwi.cs_vtable_is_update_with_derive = true;
return -1;
}
setError(gwi.thd, ER_INTERNAL_ERROR, gwi.parseErrorText, gwi);
return ER_INTERNAL_ERROR;
}
}
}
else if (join && join->zero_result_cause) else if (join && join->zero_result_cause)
{ {
gwi.rcWorkStack.push(new ConstantColumn((int64_t)0, ConstantColumn::NUM)); gwi.rcWorkStack.push(new ConstantColumn((int64_t)0, ConstantColumn::NUM));
@ -6688,7 +6735,8 @@ int processLimitAndOffset(
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
SCSEP& csep, SCSEP& csep,
bool isUnion, bool isUnion,
bool isSelectHandlerTop) bool isSelectHandlerTop,
const std::vector<COND*>& condStack)
{ {
#ifdef DEBUG_WALK_COND #ifdef DEBUG_WALK_COND
cerr << "getSelectPlan()" << endl; cerr << "getSelectPlan()" << endl;
@ -6724,7 +6772,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex,
bool unionSel = (!isUnion && select_lex.master_unit()->is_unit_op()) ? true : false; bool unionSel = (!isUnion && select_lex.master_unit()->is_unit_op()) ? true : false;
gwi.clauseType = WHERE; gwi.clauseType = WHERE;
if ((rc = processWhere(select_lex, gwi, csep, on_expr_list))) if ((rc = processWhere(select_lex, gwi, csep, on_expr_list, condStack)))
{ {
return rc; return rc;
} }

View File

@ -1194,7 +1194,7 @@ vector<string> getOnUpdateTimestampColumns(string& schema, string& tableName, in
return returnVal; return returnVal;
} }
uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& condStack)
{ {
if (get_fe_conn_info_ptr() == nullptr) if (get_fe_conn_info_ptr() == nullptr)
set_fe_conn_info_ptr((void*)new cal_connection_info()); set_fe_conn_info_ptr((void*)new cal_connection_info());
@ -1780,7 +1780,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
gwi.clauseType = WHERE; gwi.clauseType = WHERE;
if (getSelectPlan(gwi, select_lex, updateCP, false) != 0) //@Bug 3030 Modify the error message for unsupported functions if (getSelectPlan(gwi, select_lex, updateCP, false, false, condStack) != 0) //@Bug 3030 Modify the error message for unsupported functions
{ {
if (gwi.cs_vtable_is_update_with_derive) if (gwi.cs_vtable_is_update_with_derive)
{ {
@ -2284,7 +2284,7 @@ int ha_mcs_impl_discover_existence(const char* schema, const char* name)
return 0; return 0;
} }
int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows) int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, const std::vector<COND*>& condStack)
{ {
THD* thd = current_thd; THD* thd = current_thd;
int rc = 0; int rc = 0;
@ -2308,7 +2308,7 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows)
if (execute) if (execute)
{ {
rc = doUpdateDelete(thd, gwi); rc = doUpdateDelete(thd, gwi, condStack);
} }
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr()); cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
@ -2320,7 +2320,7 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows)
return rc; return rc;
} }
int ha_mcs_impl_rnd_init(TABLE* table) int ha_mcs_impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack)
{ {
IDEBUG( cout << "rnd_init for table " << table->s->table_name.str << endl ); IDEBUG( cout << "rnd_init for table " << table->s->table_name.str << endl );
THD* thd = current_thd; THD* thd = current_thd;
@ -2384,7 +2384,7 @@ int ha_mcs_impl_rnd_init(TABLE* table)
//Update and delete code //Update and delete code
if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
return doUpdateDelete(thd, gwi); return doUpdateDelete(thd, gwi, condStack);
uint32_t sessionID = tid2sid(thd->thread_id); uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
@ -3985,7 +3985,7 @@ int ha_mcs_impl_delete_row(const uchar* buf)
return 0; return 0;
} }
COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table) COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector<COND*>& condStack)
{ {
THD* thd = current_thd; THD* thd = current_thd;
@ -3993,7 +3993,10 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table)
((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI) ||
((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE) ||
((thd->lex)->sql_command == SQLCOM_DELETE_MULTI)) ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI))
return cond; {
condStack.push_back(cond);
return nullptr;
}
string alias; string alias;
alias.assign(table->alias.ptr(), table->alias.length()); alias.assign(table->alias.ptr(), table->alias.length());
@ -4959,9 +4962,10 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
thd->lex->sql_command == SQLCOM_LOAD)) thd->lex->sql_command == SQLCOM_LOAD))
return 0; return 0;
// MCOL-4023 We need to test this code path.
//Update and delete code //Update and delete code
if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
return doUpdateDelete(thd, gwi); return doUpdateDelete(thd, gwi, std::vector<COND*>());
uint32_t sessionID = tid2sid(thd->thread_id); uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);

View File

@ -29,7 +29,7 @@ extern int ha_mcs_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO
extern int ha_mcs_impl_delete_table(const char* name); extern int ha_mcs_impl_delete_table(const char* name);
extern int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked); extern int ha_mcs_impl_open(const char* name, int mode, uint32_t test_if_locked);
extern int ha_mcs_impl_close(void); extern int ha_mcs_impl_close(void);
extern int ha_mcs_impl_rnd_init(TABLE* table); extern int ha_mcs_impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack);
extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table); extern int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table);
extern int ha_mcs_impl_rnd_end(TABLE* table, bool is_derived_hand = false); extern int ha_mcs_impl_rnd_end(TABLE* table, bool is_derived_hand = false);
extern int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed); extern int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed);
@ -39,10 +39,10 @@ extern int ha_mcs_impl_rename_table(const char* from, const char* to);
extern int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all); extern int ha_mcs_impl_commit (handlerton* hton, THD* thd, bool all);
extern int ha_mcs_impl_rollback (handlerton* hton, THD* thd, bool all); extern int ha_mcs_impl_rollback (handlerton* hton, THD* thd, bool all);
extern int ha_mcs_impl_close_connection (handlerton* hton, THD* thd); extern int ha_mcs_impl_close_connection (handlerton* hton, THD* thd);
extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table); extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector<COND*>&);
extern int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type); extern int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type);
extern int ha_mcs_impl_update_row(); extern int ha_mcs_impl_update_row();
extern int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows); extern int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows *affected_rows, const std::vector<COND*>& condStack);
extern int ha_mcs_impl_delete_row(); extern int ha_mcs_impl_delete_row();
extern int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos); extern int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos);
extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table); extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table);

View File

@ -342,7 +342,7 @@ int cp_get_table_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_table_in
int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_info& gi); int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_info& gi);
int cs_get_derived_plan(derived_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi); int cs_get_derived_plan(derived_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi);
int cs_get_select_plan(select_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi); int cs_get_select_plan(select_handler* handler, THD* thd, execplan::SCSEP& csep, gp_walk_info& gwi);
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false, bool isSelectHandlerTop = false); int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false, bool isSelectHandlerTop = false, const std::vector<COND*>& condStack = std::vector<COND*>());
int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi, bool isUnion = false); int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi, bool isUnion = false);
void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi); void setError(THD* thd, uint32_t errcode, const std::string errmsg, gp_walk_info* gwi);
void setError(THD* thd, uint32_t errcode, const std::string errmsg); void setError(THD* thd, uint32_t errcode, const std::string errmsg);