1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-5943: MCOL-4740 update rows counter for multi-table update (#3555)

* fix(plugin): MCOL-4740: This fixes update rows counter for multi-table update
For UPDATEs involving a single table, the server call to handler::direct_update_rows() is used to correctly set the count for the number of updated rows in the UPDATE statement.
However, for UPDATEs involving multi-tables, the server does not call handler::direct_update_rows(). This patch adds support to correctly report the number of updated rows to the client by setting
multi_update::updated and multi_update::found in handler::rnd_end().

* fix(plugin): MCOL-4740: this is to addres the original patch QA found in the original patch

---------

Co-authored-by: Roman Nozdrin <rnozdrin@mariadb.com>
Co-authored-by: drrtuy <roman.nozdrin@mariadb.com>
This commit is contained in:
Leonid Fedorov
2025-05-29 17:23:37 +04:00
committed by GitHub
parent feb919f2ad
commit dc4ca8d588
9 changed files with 403 additions and 177 deletions

View File

@ -832,7 +832,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
{
Message::Args args;
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
args.add("Update");
#if 0
else if (thd->rgi_slave && thd->rgi_slave->m_table_map.count() != 0)
@ -889,7 +889,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
updateCP->isDML(true);
//@Bug 2753. the memory already freed by destructor of UpdateSqlStatement
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
{
ColumnAssignment* columnAssignmentPtr = nullptr;
Item_field* item;
@ -1184,7 +1184,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
}
// Exit early if there is nothing to update
if (colAssignmentListPtr->empty() && isUpdateStatement(thd->lex->sql_command))
if (colAssignmentListPtr->empty() && ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
{
ci->affectedRows = 0;
delete colAssignmentListPtr;
@ -1198,7 +1198,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
CalpontSystemCatalog::TableName aTableName;
TABLE_LIST* first_table = 0;
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
{
aTableName.schema = schemaName;
aTableName.table = tableName;
@ -1220,7 +1220,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
IDEBUG(cout << "STMT: " << dmlStmt << " and sessionID " << thd->thread_id << endl);
VendorDMLStatement dmlStatement(dmlStmt, sessionID);
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
dmlStatement.set_DMLStatementType(DML_UPDATE);
else
dmlStatement.set_DMLStatementType(DML_DELETE);
@ -1229,7 +1229,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
//@Bug 2753. To make sure the momory is freed.
updateStmt.fColAssignmentListPtr = colAssignmentListPtr;
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
{
TableName* qualifiedTablName = new TableName();
qualifiedTablName->fName = tableName;
@ -1318,7 +1318,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
List<Item> items;
SELECT_LEX select_lex;
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
{
items = (thd->lex->first_select_lex()->item_list);
thd->lex->first_select_lex()->item_list = thd->lex->value_list;
@ -1467,7 +1467,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
returnedCols.push_back((updateCP->columnMap()).begin()->second);
//@Bug 6123. get the correct returned columnlist
if (isDeleteStatement(thd->lex->sql_command))
if (ha_mcs_common::isDeleteStatement(thd->lex->sql_command))
{
returnedCols.clear();
// choose the smallest column to project
@ -1518,7 +1518,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
updateCP->returnedCols(returnedCols);
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
{
const ParseTree* ptsub = updateCP->filters();
@ -1979,7 +1979,7 @@ int ha_mcs_impl_analyze(THD* thd, TABLE* table)
if (table->s->db.length && strcmp(table->s->db.str, "information_schema") == 0)
return 0;
bool columnStore = (table ? isMCSTable(table) : true);
bool columnStore = (table ? ha_mcs_common::isMCSTable(table) : true);
// Skip non columnstore tables.
if (!columnStore)
return 0;
@ -2156,7 +2156,8 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows* affected_rows,
gwi.thd = thd;
int rc = 0;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
if (thd->slave_thread && !get_replication_slave(thd) &&
ha_mcs_common::isDMLStatement(thd->lex->sql_command))
{
if (affected_rows)
*affected_rows = 0;
@ -2189,7 +2190,7 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack)
gwi.thd = thd;
if (thd->slave_thread && !get_replication_slave(thd) &&
(isDMLStatement(thd->lex->sql_command) || thd->lex->sql_command == SQLCOM_ALTER_TABLE))
(ha_mcs_common::isDMLStatement(thd->lex->sql_command) || thd->lex->sql_command == SQLCOM_ALTER_TABLE))
return 0;
// check whether the system is ready to process statement.
@ -2241,7 +2242,7 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack)
UPDATE innotab1 SET a=100 WHERE a NOT IN (SELECT a FROM cstab1 WHERE a=1);
*/
if (!isReadOnly() && // make sure the current table is being modified
isUpdateOrDeleteStatement(thd->lex->sql_command))
ha_mcs_common::isUpdateOrDeleteStatement(thd->lex->sql_command))
return doUpdateDelete(thd, gwi, condStack);
uint32_t sessionID = tid2sid(thd->thread_id);
@ -2578,10 +2579,10 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table, long timeZone)
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) &&
(isDMLStatement(thd->lex->sql_command) || thd->lex->sql_command == SQLCOM_ALTER_TABLE))
(ha_mcs_common::isDMLStatement(thd->lex->sql_command) || thd->lex->sql_command == SQLCOM_ALTER_TABLE))
return HA_ERR_END_OF_FILE;
if (isMCSTableUpdate(thd) || isMCSTableDelete(thd))
if (ha_mcs_common::isMCSTableUpdate(thd) || ha_mcs_common::isMCSTableDelete(thd))
return HA_ERR_END_OF_FILE;
// @bug 2547
@ -2661,18 +2662,17 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) &&
(isDMLStatement(thd->lex->sql_command) || thd->lex->sql_command == SQLCOM_ALTER_TABLE))
return 0;
(ha_mcs_common::isDMLStatement(thd->lex->sql_command) || thd->lex->sql_command == SQLCOM_ALTER_TABLE))
return rc;
cal_connection_info* ci = nullptr;
if (get_fe_conn_info_ptr() != NULL)
ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if ((thd->lex)->sql_command == SQLCOM_ALTER_TABLE)
return rc;
if (isMCSTableUpdate(thd) || isMCSTableDelete(thd))
if (ha_mcs_common::isMCSTableUpdate(thd) || ha_mcs_common::isMCSTableDelete(thd))
return rc;
if (!ci)
@ -3764,7 +3764,7 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector<COND*>& condSt
{
THD* thd = current_thd;
if (isUpdateOrDeleteStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateOrDeleteStatement(thd->lex->sql_command))
{
condStack.push_back(cond);
return nullptr;
@ -3848,7 +3848,7 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector<COND*>& condSt
inline void disableBinlogForDML(THD* thd)
{
if (isDMLStatement(thd->lex->sql_command) && (thd->variables.option_bits & OPTION_BIN_LOG))
if (ha_mcs_common::isDMLStatement(thd->lex->sql_command) && (thd->variables.option_bits & OPTION_BIN_LOG))
{
set_original_option_bits(thd->variables.option_bits, thd);
thd->variables.option_bits &= ~OPTION_BIN_LOG;
@ -3858,7 +3858,7 @@ inline void disableBinlogForDML(THD* thd)
inline void restoreBinlogForDML(THD* thd)
{
if (isDMLStatement(thd->lex->sql_command))
if (ha_mcs_common::isDMLStatement(thd->lex->sql_command))
{
ulonglong orig_option_bits = get_original_option_bits(thd);
@ -4027,7 +4027,8 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool
IDEBUG(cout << "pushdown_init for table " << endl);
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
if (thd->slave_thread && !get_replication_slave(thd) &&
ha_mcs_common::isDMLStatement(thd->lex->sql_command))
return 0;
const char* timeZone = thd->variables.time_zone->get_name()->ptr();
@ -4069,7 +4070,7 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool
// MCOL-4023 We need to test this code path.
// Update and delete code
if (isUpdateOrDeleteStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateOrDeleteStatement(thd->lex->sql_command))
return doUpdateDelete(thd, gwi, std::vector<COND*>());
uint32_t sessionID = tid2sid(thd->thread_id);
@ -4493,7 +4494,8 @@ int ha_mcs_impl_select_next(uchar* buf, TABLE* table, long timeZone)
{
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
if (thd->slave_thread && !get_replication_slave(thd) &&
ha_mcs_common::isDMLStatement(thd->lex->sql_command))
return HA_ERR_END_OF_FILE;
int rc = HA_ERR_END_OF_FILE;
@ -4506,7 +4508,7 @@ int ha_mcs_impl_select_next(uchar* buf, TABLE* table, long timeZone)
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (isUpdateOrDeleteStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateOrDeleteStatement(thd->lex->sql_command))
return rc;
// @bug 2547