1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

fix(plugin): MCOL-4740: This fixes update rows counter for multi-table update

For UPDATEs involving a single table, the server call to handler::direct_update_rows() is used to correctly set the count for the number of updated rows in the UPDATE statement.
However, for UPDATEs involving multi-tables, the server does not call handler::direct_update_rows(). This patch adds support to correctly report the number of updated rows to the client by setting
multi_update::updated and multi_update::found in handler::rnd_end().
This commit is contained in:
Roman Nozdrin
2023-10-31 12:54:18 +00:00
parent eb744eafed
commit 6579180810
8 changed files with 302 additions and 225 deletions

View File

@ -302,18 +302,18 @@ int fetchNextRow(uchar* buf, cal_table_info& ti, cal_connection_info* ci, long t
{
// @bug 2244. Always log this msg for now, as we try to track down when/why we are
// losing socket connection with ExeMgr
//#ifdef INFINIDB_DEBUG
// #ifdef INFINIDB_DEBUG
tpl_scan_fetch_LogException(ti, ci, &ex);
//#endif
// #endif
sm_stat = sm::CALPONT_INTERNAL_ERROR;
}
catch (...)
{
// @bug 2244. Always log this msg for now, as we try to track down when/why we are
// losing socket connection with ExeMgr
//#ifdef INFINIDB_DEBUG
// #ifdef INFINIDB_DEBUG
tpl_scan_fetch_LogException(ti, ci, 0);
//#endif
// #endif
sm_stat = sm::CALPONT_INTERNAL_ERROR;
}
@ -758,7 +758,8 @@ vector<string> getOnUpdateTimestampColumns(string& schema, string& tableName, in
uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& condStack)
{
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -827,7 +828,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
{
Message::Args args;
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
args.add("Update");
#if 0
else if (thd->rgi_slave && thd->rgi_slave->m_table_map.count() != 0)
@ -884,7 +885,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
updateCP->isDML(true);
//@Bug 2753. the memory already freed by destructor of UpdateSqlStatement
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
{
ColumnAssignment* columnAssignmentPtr;
Item_field* item;
@ -896,7 +897,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
while ((item = (Item_field*)field_it++))
{
string tmpTableName = bestTableName(item);
//@Bug 5312 populate aliasname with tablename if it is empty
@ -1174,7 +1174,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
}
// Exit early if there is nothing to update
if (colAssignmentListPtr->empty() && isUpdateStatement(thd->lex->sql_command))
if (colAssignmentListPtr->empty() && ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
{
ci->affectedRows = 0;
return 0;
@ -1187,7 +1187,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
CalpontSystemCatalog::TableName aTableName;
TABLE_LIST* first_table = 0;
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
{
aTableName.schema = schemaName;
aTableName.table = tableName;
@ -1209,7 +1209,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
IDEBUG(cout << "STMT: " << dmlStmt << " and sessionID " << thd->thread_id << endl);
VendorDMLStatement dmlStatement(dmlStmt, sessionID);
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
dmlStatement.set_DMLStatementType(DML_UPDATE);
else
dmlStatement.set_DMLStatementType(DML_DELETE);
@ -1220,7 +1220,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
//@Bug 2753. To make sure the momory is freed.
updateStmt.fColAssignmentListPtr = colAssignmentListPtr;
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
{
qualifiedTablName->fName = tableName;
qualifiedTablName->fSchema = schemaName;
@ -1314,7 +1314,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
List<Item> items;
SELECT_LEX select_lex;
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
{
items = (thd->lex->first_select_lex()->item_list);
thd->lex->first_select_lex()->item_list = thd->lex->value_list;
@ -1463,7 +1463,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
returnedCols.push_back((updateCP->columnMap()).begin()->second);
//@Bug 6123. get the correct returned columnlist
if (isDeleteStatement(thd->lex->sql_command))
if (ha_mcs_common::isDeleteStatement(thd->lex->sql_command))
{
returnedCols.clear();
// choose the smallest column to project
@ -1514,7 +1514,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
updateCP->returnedCols(returnedCols);
if (isUpdateStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateStatement(thd->lex->sql_command))
{
const ParseTree* ptsub = updateCP->filters();
@ -1973,7 +1973,7 @@ int ha_mcs_impl_analyze(THD* thd, TABLE* table)
if (table->s->db.length && strcmp(table->s->db.str, "information_schema") == 0)
return 0;
bool columnStore = (table ? isMCSTable(table) : true);
bool columnStore = (table ? ha_mcs_common::isMCSTable(table) : true);
// Skip non columnstore tables.
if (!columnStore)
return 0;
@ -2041,7 +2041,8 @@ int ha_mcs_impl_analyze(THD* thd, TABLE* table)
query.assign(idb_mysql_query_str(thd));
caep->data(query);
if (!get_fe_conn_info_ptr()) {
if (!get_fe_conn_info_ptr())
{
set_fe_conn_info_ptr(reinterpret_cast<void*>(new cal_connection_info(), thd));
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -2147,7 +2148,8 @@ int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows* affected_rows,
gwi.thd = thd;
int rc = 0;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
if (thd->slave_thread && !get_replication_slave(thd) &&
ha_mcs_common::isDMLStatement(thd->lex->sql_command))
{
if (affected_rows)
*affected_rows = 0;
@ -2179,11 +2181,10 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack)
gwi.thd = thd;
if (thd->slave_thread && !get_replication_slave(thd) &&
(isDMLStatement(thd->lex->sql_command) ||
thd->lex->sql_command == SQLCOM_ALTER_TABLE))
(ha_mcs_common::isDMLStatement(thd->lex->sql_command) || thd->lex->sql_command == SQLCOM_ALTER_TABLE))
return 0;
// check whether the system is ready to process statement.
// check whether the system is ready to process statement.
static DBRM dbrm(true);
int bSystemQueryReady = dbrm.getSystemQueryReady();
@ -2232,14 +2233,15 @@ int ha_mcs::impl_rnd_init(TABLE* table, const std::vector<COND*>& condStack)
UPDATE innotab1 SET a=100 WHERE a NOT IN (SELECT a FROM cstab1 WHERE a=1);
*/
if (!isReadOnly() && // make sure the current table is being modified
isUpdateOrDeleteStatement(thd->lex->sql_command))
ha_mcs_common::isUpdateOrDeleteStatement(thd->lex->sql_command))
return doUpdateDelete(thd, gwi, condStack);
uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(CalpontSystemCatalog::FE);
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -2555,11 +2557,10 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table, long timeZone)
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) &&
(isDMLStatement(thd->lex->sql_command) ||
thd->lex->sql_command == SQLCOM_ALTER_TABLE))
(ha_mcs_common::isDMLStatement(thd->lex->sql_command) || thd->lex->sql_command == SQLCOM_ALTER_TABLE))
return HA_ERR_END_OF_FILE;
if (isMCSTableUpdate(thd) || isMCSTableDelete(thd))
if (ha_mcs_common::isMCSTableUpdate(thd) || ha_mcs_common::isMCSTableDelete(thd))
return HA_ERR_END_OF_FILE;
// @bug 2547
@ -2567,7 +2568,8 @@ int ha_mcs_impl_rnd_next(uchar* buf, TABLE* table, long timeZone)
// if (MIGR::infinidb_vtable.impossibleWhereOnUnion)
// return HA_ERR_END_OF_FILE;
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -2638,8 +2640,7 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) &&
(isDMLStatement(thd->lex->sql_command) ||
thd->lex->sql_command == SQLCOM_ALTER_TABLE))
(ha_mcs_common::isDMLStatement(thd->lex->sql_command) || thd->lex->sql_command == SQLCOM_ALTER_TABLE))
return 0;
cal_connection_info* ci = nullptr;
@ -2650,7 +2651,7 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
if ((thd->lex)->sql_command == SQLCOM_ALTER_TABLE)
return rc;
if (isMCSTableUpdate(thd) || isMCSTableDelete(thd))
if (ha_mcs_common::isMCSTableUpdate(thd) || ha_mcs_common::isMCSTableDelete(thd))
return rc;
if (!ci)
@ -2663,18 +2664,18 @@ int ha_mcs_impl_rnd_end(TABLE* table, bool is_pushdown_hand)
if (thd->lex->analyze_stmt && ci->cal_conn_hndl && ci->cal_conn_hndl->exeMgr)
{
// The ANALYZE statement leaves ExeMgr hanging. This clears it up.
ci->cal_conn_hndl->exeMgr->read(); // Ignore the returned buffer
ci->cal_conn_hndl->exeMgr->read(); // Ignore the returned buffer
ByteStream msg;
ByteStream::quadbyte qb = 1; // Tell PrimProc front session to eat all the rows
ByteStream::quadbyte qb = 1; // Tell PrimProc front session to eat all the rows
msg << qb;
ci->cal_conn_hndl->exeMgr->write(msg);
// This is the command to start sending return values. because we previously sent the swallow
// rows command, there won't be anything useful coming back, but it needs this to flush internal queues.
qb = 5; // Read the result data.
qb = 5; // Read the result data.
msg.reset();
msg << qb;
ci->cal_conn_hndl->exeMgr->write(msg);
qb = 0; // End the query
qb = 0; // End the query
msg.reset();
msg << qb;
ci->cal_conn_hndl->exeMgr->write(msg);
@ -2775,7 +2776,8 @@ int ha_mcs_impl_create(const char* name, TABLE* table_arg, HA_CREATE_INFO* creat
{
THD* thd = current_thd;
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -2819,7 +2821,8 @@ int ha_mcs_impl_delete_table(const char* name)
if (!memcmp((uchar*)name, tmp_file_prefix, tmp_file_prefix_length))
return 0;
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -2884,7 +2887,8 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed,
return ER_CHECK_NOT_IMPLEMENTED;
}
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -2937,7 +2941,8 @@ int ha_mcs_impl_write_row(const uchar* buf, TABLE* table, uint64_t rows_changed,
int ha_mcs_impl_update_row()
{
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -2953,7 +2958,8 @@ int ha_mcs_impl_update_row()
int ha_mcs_impl_delete_row()
{
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -2974,7 +2980,8 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
if (thd->slave_thread && !get_replication_slave(thd))
return;
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -3101,7 +3108,7 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
ci->useXbit = false;
//@bug 6122 Check how many columns have not null constraint. columnn with not null constraint will not
//show up in header.
// show up in header.
unsigned int numberNotNull = 0;
for (unsigned int j = 0; j < colrids.size(); j++)
@ -3279,7 +3286,6 @@ void ha_mcs_impl_start_bulk_insert(ha_rows rows, TABLE* table, bool is_cache_ins
// Set read_set used for bulk insertion of Fields inheriting
// from Field_blob|Field_varstring. Used in ColWriteBatchString()
bitmap_set_all(table->read_set);
}
else
{
@ -3395,7 +3401,8 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
std::string aTmpDir(startup::StartUp::tmpDir());
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -3421,7 +3428,6 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
((thd->lex)->sql_command == SQLCOM_LOAD) || ((thd->lex)->sql_command == SQLCOM_INSERT_SELECT) ||
ci->isCacheInsert))
{
if ((thd->killed > 0) && (ci->cpimport_pid > 0)) // handle CTRL-C
{
// cout << "sending ctrl-c to cpimport" << endl;
@ -3500,12 +3506,9 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
else
{
ostringstream oss;
oss << "End SQL statement with error, rc=" << rc
<< ", aPid=" << aPid
<< ", WIF=" << WIFEXITED(aStatus)
<< ", WEXIT=" << WEXITSTATUS(aStatus);
ha_mcs_impl::log_this(thd, oss.str().c_str(), logging::LOG_TYPE_DEBUG,
tid2sid(thd->thread_id));
oss << "End SQL statement with error, rc=" << rc << ", aPid=" << aPid
<< ", WIF=" << WIFEXITED(aStatus) << ", WEXIT=" << WEXITSTATUS(aStatus);
ha_mcs_impl::log_this(thd, oss.str().c_str(), logging::LOG_TYPE_DEBUG, tid2sid(thd->thread_id));
}
ci->columnTypes.clear();
@ -3591,7 +3594,8 @@ int ha_mcs_impl_end_bulk_insert(bool abort, TABLE* table)
int ha_mcs_impl_commit(handlerton* hton, THD* thd, bool all)
{
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -3626,7 +3630,8 @@ int ha_mcs_impl_commit(handlerton* hton, THD* thd, bool all)
int ha_mcs_impl_rollback(handlerton* hton, THD* thd, bool all)
{
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -3698,7 +3703,8 @@ int ha_mcs_impl_rename_table(const char* from, const char* to)
{
IDEBUG(cout << "ha_mcs_impl_rename_table: " << from << " => " << to << endl);
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(current_thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -3734,7 +3740,7 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector<COND*>& condSt
{
THD* thd = current_thd;
if (isUpdateOrDeleteStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateOrDeleteStatement(thd->lex->sql_command))
{
condStack.push_back(cond);
return nullptr;
@ -3744,7 +3750,8 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector<COND*>& condSt
alias.assign(table->alias.ptr(), table->alias.length());
IDEBUG(cout << "ha_mcs_impl_cond_push: " << alias << endl);
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -3815,7 +3822,7 @@ COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table, std::vector<COND*>& condSt
inline void disableBinlogForDML(THD* thd)
{
if (isDMLStatement(thd->lex->sql_command) && (thd->variables.option_bits & OPTION_BIN_LOG))
if (ha_mcs_common::isDMLStatement(thd->lex->sql_command) && (thd->variables.option_bits & OPTION_BIN_LOG))
{
set_original_option_bits(thd->variables.option_bits, thd);
thd->variables.option_bits &= ~OPTION_BIN_LOG;
@ -3825,7 +3832,7 @@ inline void disableBinlogForDML(THD* thd)
inline void restoreBinlogForDML(THD* thd)
{
if (isDMLStatement(thd->lex->sql_command))
if (ha_mcs_common::isDMLStatement(thd->lex->sql_command))
{
ulonglong orig_option_bits = get_original_option_bits(thd);
@ -3853,7 +3860,8 @@ int ha_mcs::impl_external_lock(THD* thd, TABLE* table, int lock_type)
alias.assign(table->alias.ptr(), table->alias.length());
IDEBUG(cout << "external_lock for " << alias << endl);
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -4011,12 +4019,12 @@ int ha_mcs_impl_group_by_init(mcs_handler_info* handler_info, TABLE* table)
return ER_INTERNAL_ERROR;
}
uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(CalpontSystemCatalog::FE);
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -4429,13 +4437,15 @@ int ha_mcs_impl_group_by_next(TABLE* table, long timeZone)
{
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
if (thd->slave_thread && !get_replication_slave(thd) &&
ha_mcs_common::isDMLStatement(thd->lex->sql_command))
return HA_ERR_END_OF_FILE;
if (isMCSTableUpdate(thd) || isMCSTableDelete(thd))
if (ha_mcs_common::isMCSTableUpdate(thd) || ha_mcs_common::isMCSTableDelete(thd))
return HA_ERR_END_OF_FILE;
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -4506,7 +4516,8 @@ int ha_mcs_impl_group_by_end(TABLE* table)
int rc = 0;
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
if (thd->slave_thread && !get_replication_slave(thd) &&
ha_mcs_common::isDMLStatement(thd->lex->sql_command))
return 0;
cal_connection_info* ci = nullptr;
@ -4672,7 +4683,8 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool
IDEBUG(cout << "pushdown_init for table " << endl);
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
if (thd->slave_thread && !get_replication_slave(thd) &&
ha_mcs_common::isDMLStatement(thd->lex->sql_command))
return 0;
const char* timeZone = thd->variables.time_zone->get_name()->ptr();
@ -4713,14 +4725,15 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool
// MCOL-4023 We need to test this code path.
// Update and delete code
if (isUpdateOrDeleteStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateOrDeleteStatement(thd->lex->sql_command))
return doUpdateDelete(thd, gwi, std::vector<COND*>());
uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
csc->identity(CalpontSystemCatalog::FE);
if (!get_fe_conn_info_ptr()) {
if (!get_fe_conn_info_ptr())
{
set_fe_conn_info_ptr(reinterpret_cast<void*>(new cal_connection_info(), thd));
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
@ -5118,19 +5131,21 @@ int ha_mcs_impl_select_next(uchar* buf, TABLE* table, long timeZone)
{
THD* thd = current_thd;
if (thd->slave_thread && !get_replication_slave(thd) && isDMLStatement(thd->lex->sql_command))
if (thd->slave_thread && !get_replication_slave(thd) &&
ha_mcs_common::isDMLStatement(thd->lex->sql_command))
return HA_ERR_END_OF_FILE;
int rc = HA_ERR_END_OF_FILE;
if (get_fe_conn_info_ptr() == nullptr) {
if (get_fe_conn_info_ptr() == nullptr)
{
set_fe_conn_info_ptr((void*)new cal_connection_info());
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
}
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
if (isUpdateOrDeleteStatement(thd->lex->sql_command))
if (ha_mcs_common::isUpdateOrDeleteStatement(thd->lex->sql_command))
return rc;
// @bug 2547