1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-3529 This patch implements direct_update and direct_delete features

to properly report the number of rows affected by UPDATE|DELETE.
This commit is contained in:
Roman Nozdrin
2019-11-27 09:33:21 -06:00
parent 449b51defe
commit 5cedeb110b
5 changed files with 73 additions and 86 deletions

View File

@ -197,10 +197,9 @@ ha_mcs::ha_mcs(handlerton* hton, TABLE_SHARE* table_arg) :
handler(hton, table_arg), handler(hton, table_arg),
int_table_flags(HA_BINLOG_STMT_CAPABLE | HA_BINLOG_ROW_CAPABLE | int_table_flags(HA_BINLOG_STMT_CAPABLE | HA_BINLOG_ROW_CAPABLE |
HA_TABLE_SCAN_ON_INDEX | HA_TABLE_SCAN_ON_INDEX |
HA_CAN_TABLE_CONDITION_PUSHDOWN) HA_CAN_TABLE_CONDITION_PUSHDOWN |
// int_table_flags(HA_NO_BLOBS | HA_BINLOG_STMT_CAPABLE) HA_CAN_DIRECT_UPDATE_AND_DELETE)
{ { }
}
/** /**
@ -292,25 +291,7 @@ int ha_mcs::close(void)
@details @details
Example of this would be: Example of this would be:
@code @code
for (Field **field=table->field ; *field ; field++)
{
...
}
@endcode @endcode
See ha_tina.cc for an example of extracting all of the data as strings.
ha_berekly.cc has an example of how to store it intact by "packing" it
for ha_berkeley's own native storage type.
See the note for update_row() on auto_increments and timestamps. This
case also applies to write_row().
Called from item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc, and sql_update.cc.
@see
item_sum.cc, item_sum.cc, sql_acl.cc, sql_insert.cc,
sql_insert.cc, sql_select.cc, sql_table.cc, sql_udf.cc and sql_update.cc
*/ */
int ha_mcs::write_row(const uchar* buf) int ha_mcs::write_row(const uchar* buf)
@ -328,14 +309,6 @@ void ha_mcs::start_bulk_insert(ha_rows rows, uint flags)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
int ha_mcs::end_bulk_insert(bool abort)
{
DBUG_ENTER("ha_mcs::end_bulk_insert");
int rc = ha_mcs_impl_end_bulk_insert(abort, table);
DBUG_RETURN(rc);
}
/**@bug 2461 - Overloaded end_bulk_insert. MariaDB uses the abort bool, mysql does not. */
int ha_mcs::end_bulk_insert() int ha_mcs::end_bulk_insert()
{ {
DBUG_ENTER("ha_mcs::end_bulk_insert"); DBUG_ENTER("ha_mcs::end_bulk_insert");
@ -351,19 +324,10 @@ int ha_mcs::end_bulk_insert()
clause was used. Consecutive ordering is not guaranteed. clause was used. Consecutive ordering is not guaranteed.
@details @details
Currently new_data will not have an updated auto_increament record, or @code
and updated timestamp field. You can do these for example by doing:
@code
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
table->timestamp_field->set_time();
if (table->next_number_field && record == table->record[0])
update_auto_increment();
@endcode @endcode
Called from sql_select.cc, sql_acl.cc, sql_update.cc, and sql_insert.cc.
@see @see
sql_select.cc, sql_acl.cc, sql_update.cc and sql_insert.cc
*/ */
int ha_mcs::update_row(const uchar* old_data, uchar* new_data) int ha_mcs::update_row(const uchar* old_data, uchar* new_data)
{ {
@ -373,7 +337,45 @@ int ha_mcs::update_row(const uchar* old_data, uchar* new_data)
DBUG_RETURN(rc); DBUG_RETURN(rc);
} }
// WIP
/**
@brief
Yes, update_row() does what you expect, it updates a row. old_data will have
the previous row record in it, while new_data will have the newest data in it.
Keep in mind that the server can do updates based on ordering if an ORDER BY
clause was used. Consecutive ordering is not guaranteed.
@details
@code
@endcode
@see
*/
int ha_mcs::direct_update_rows_init(List<Item> *update_fields)
{
DBUG_ENTER("ha_mcs::direct_update_rows_init");
DBUG_RETURN(0);
}
int ha_mcs::direct_update_rows(ha_rows *update_rows)
{
DBUG_ENTER("ha_mcs::direct_update_rows");
int rc = ha_mcs_impl_direct_update_delete_rows(update_rows);
DBUG_RETURN(rc);
}
int ha_mcs::direct_delete_rows_init()
{
DBUG_ENTER("ha_mcs::direct_delete_rows_init");
DBUG_RETURN(0);
}
int ha_mcs::direct_delete_rows(ha_rows *deleted_rows)
{
DBUG_ENTER("ha_mcs::direct_delete_rows");
int rc = ha_mcs_impl_direct_update_delete_rows(deleted_rows);
DBUG_RETURN(rc);
}
/** /**
@brief @brief
This will delete a row. buf will contain a copy of the row to be deleted. This will delete a row. buf will contain a copy of the row to be deleted.

View File

@ -143,12 +143,6 @@ public:
*/ */
void start_bulk_insert(ha_rows rows, uint flags = 0) ; void start_bulk_insert(ha_rows rows, uint flags = 0) ;
/** @brief
We implement this in ha_example.cc. It's not an obligatory method;
skip it and and MySQL will treat it as not implemented.
*/
int end_bulk_insert(bool abort) ;
/**@bug 2461 - Overloaded end_bulk_insert. MariaDB uses the abort bool, mysql does not. */ /**@bug 2461 - Overloaded end_bulk_insert. MariaDB uses the abort bool, mysql does not. */
int end_bulk_insert() ; int end_bulk_insert() ;
@ -157,12 +151,16 @@ public:
skip it and and MySQL will treat it as not implemented. skip it and and MySQL will treat it as not implemented.
*/ */
int update_row(const uchar* old_data, uchar* new_data); int update_row(const uchar* old_data, uchar* new_data);
int direct_update_rows_init(List<Item> *update_fields);
int direct_update_rows(ha_rows *update_rows);
/** @brief /** @brief
We implement this in ha_example.cc. It's not an obligatory method; We implement this in ha_example.cc. It's not an obligatory method;
skip it and and MySQL will treat it as not implemented. skip it and and MySQL will treat it as not implemented.
*/ */
int delete_row(const uchar* buf); int delete_row(const uchar* buf);
int direct_delete_rows_init();
int direct_delete_rows(ha_rows *deleted_rows);
/** @brief /** @brief
We implement this in ha_example.cc. It's not an obligatory method; We implement this in ha_example.cc. It's not an obligatory method;

View File

@ -16,11 +16,6 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */ MA 02110-1301, USA. */
/*
* $Id: ha_mcs_impl.cpp 9642 2013-06-24 14:57:42Z rdempsey $
*/
//#define DEBUG_WALK_COND
#include <my_config.h> #include <my_config.h>
#ifndef _MSC_VER #ifndef _MSC_VER
#include <unistd.h> #include <unistd.h>
@ -41,7 +36,6 @@
#include <cerrno> #include <cerrno>
#include <cstring> #include <cstring>
#include <time.h> #include <time.h>
//#define NDEBUG
#include <cassert> #include <cassert>
#include <vector> #include <vector>
#include <map> #include <map>
@ -61,7 +55,6 @@ using namespace std;
#include <boost/algorithm/string/case_conv.hpp> #include <boost/algorithm/string/case_conv.hpp>
#include <boost/regex.hpp> #include <boost/regex.hpp>
#include <boost/thread.hpp> #include <boost/thread.hpp>
//using namespace boost;
#include "idb_mysql.h" #include "idb_mysql.h"
@ -1184,7 +1177,7 @@ vector<string> getOnUpdateTimestampColumns(string& schema, string& tableName, in
return returnVal; return returnVal;
} }
uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi) uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, ha_rows *affected_rows)
{ {
if (get_fe_conn_info_ptr() == NULL) if (get_fe_conn_info_ptr() == NULL)
set_fe_conn_info_ptr((void*)new cal_connection_info()); set_fe_conn_info_ptr((void*)new cal_connection_info());
@ -1485,7 +1478,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
} }
else if ( value->type() == Item::NULL_ITEM ) else if ( value->type() == Item::NULL_ITEM )
{ {
// dmlStmt += "NULL";
columnAssignmentPtr->fScalarExpression = ""; columnAssignmentPtr->fScalarExpression = "";
columnAssignmentPtr->fFromCol = false; columnAssignmentPtr->fFromCol = false;
columnAssignmentPtr->fIsNull = true; columnAssignmentPtr->fIsNull = true;
@ -1494,13 +1486,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
{ {
isFromCol = true; isFromCol = true;
columnAssignmentPtr->fFromCol = true; columnAssignmentPtr->fFromCol = true;
// Item_field* setIt = reinterpret_cast<Item_field*> (value); isFromSameTable = false;
// string sectableName = string(setIt->table_name);
// string secschemaName = string(setIt->db_name);
// if ( (strcasecmp(tableName.c_str(), sectableName.c_str()) != 0) || (strcasecmp(schemaName.c_str(), secschemaName.c_str()) != 0))
// {
isFromSameTable = false;
// }
} }
//@Bug 4449 handle default value //@Bug 4449 handle default value
else if (value->type() == Item::DEFAULT_VALUE_ITEM) else if (value->type() == Item::DEFAULT_VALUE_ITEM)
@ -1554,6 +1540,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
{ {
if (timeStampColumnNames.find(onUpdateTimeStampColumns[i]) == timeStampColumnNames.end()) if (timeStampColumnNames.find(onUpdateTimeStampColumns[i]) == timeStampColumnNames.end())
{ {
// DRRTUY * That is far from ideal.
columnAssignmentPtr = new ColumnAssignment(string(onUpdateTimeStampColumns[i]), "=", ""); columnAssignmentPtr = new ColumnAssignment(string(onUpdateTimeStampColumns[i]), "=", "");
struct timeval tv; struct timeval tv;
char buf[64]; char buf[64];
@ -1729,9 +1716,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
return 0; return 0;
} }
//if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
{ {
//updateCP->subType (CalpontSelectExecutionPlan::SINGLEROW_SUBS); //set scalar
updateCP->subType (CalpontSelectExecutionPlan::SELECT_SUBS); updateCP->subType (CalpontSelectExecutionPlan::SELECT_SUBS);
//@Bug 2975. //@Bug 2975.
SessionManager sm; SessionManager sm;
@ -1834,6 +1819,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
} }
updateCP->tableList( tbList ); updateCP->tableList( tbList );
// DRRTUY * this is very optimisic assumption
updateCP->overrideLargeSideEstimate( true ); updateCP->overrideLargeSideEstimate( true );
//loop through returnedcols to find out constant columns //loop through returnedcols to find out constant columns
CalpontSelectExecutionPlan::ReturnedColumnList returnedCols = updateCP->returnedCols(); CalpontSelectExecutionPlan::ReturnedColumnList returnedCols = updateCP->returnedCols();
@ -1847,7 +1833,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
{ {
returnedCols.erase(coliter); returnedCols.erase(coliter);
coliter = returnedCols.begin(); coliter = returnedCols.begin();
//cout << "constant column " << endl;
} }
else else
coliter++; coliter++;
@ -1929,16 +1914,14 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
ptsub->walk(makeUpdateSemiJoin, &tbList[0] ); ptsub->walk(makeUpdateSemiJoin, &tbList[0] );
} }
//cout<< "Plan is " << endl << *updateCP << endl;
if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) ) if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
thd->lex->first_select_lex()->item_list = items; thd->lex->first_select_lex()->item_list = items;
} }
//cout<< "Plan is " << endl << *updateCP << endl;
} }
//updateCP->traceFlags(1);
//cout<< "Plan is " << endl << *updateCP << endl; //cout<< "Plan is " << endl << *updateCP << endl;
//updateCP->traceFlags(1);
pDMLPackage->HasFilter(true); pDMLPackage->HasFilter(true);
pDMLPackage->uuid(updateCP->uuid()); pDMLPackage->uuid(updateCP->uuid());
@ -1947,7 +1930,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
boost::shared_ptr<messageqcpp::ByteStream> plan = pDMLPackage->get_ExecutionPlan(); boost::shared_ptr<messageqcpp::ByteStream> plan = pDMLPackage->get_ExecutionPlan();
updateCP->rmParms(ci->rmParms); updateCP->rmParms(ci->rmParms);
updateCP->serialize(*plan); updateCP->serialize(*plan);
//cout << "plan has bytes " << plan->length() << endl;
pDMLPackage->write(bytestream); pDMLPackage->write(bytestream);
delete pDMLPackage; delete pDMLPackage;
@ -2203,8 +2185,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
//@Bug 2540. Set error status instead of warning //@Bug 2540. Set error status instead of warning
thd->raise_error_printf(ER_INTERNAL_ERROR, errorMsg.c_str()); thd->raise_error_printf(ER_INTERNAL_ERROR, errorMsg.c_str());
ci->rc = b; ci->rc = b;
thd->set_row_count_func(0); // WIP
thd->get_stmt_da()->set_overwrite_status(true); //thd->get_stmt_da()->set_overwrite_status(true);
//cout << " error status " << ci->rc << endl; //cout << " error status " << ci->rc << endl;
} }
@ -2219,7 +2201,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
} }
else else
{ {
thd->set_row_count_func(dmlRowCount+thd->get_row_count_func()); if (affected_rows)
*affected_rows = dmlRowCount;
} }
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARN_DATA_OUT_OF_RANGE, errorMsg.c_str()); push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARN_DATA_OUT_OF_RANGE, errorMsg.c_str());
@ -2227,19 +2210,11 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
else else
{ {
// if (dmlRowCount != 0) //Bug 5117. Handling self join. // if (dmlRowCount != 0) //Bug 5117. Handling self join.
thd->set_row_count_func(dmlRowCount+thd->get_row_count_func()); if (affected_rows)
*affected_rows = dmlRowCount;
//cout << " error status " << ci->rc << " and rowcount = " << dmlRowCount << endl; //cout << " error status " << ci->rc << " and rowcount = " << dmlRowCount << endl;
} }
// @bug 4027. comment out the following because this will cause mysql
// kernel assertion failure. not sure why this is here in the first place.
// if (thd->derived_tables)
// {
// thd->get_stmt_da()->set_overwrite_status(true);
// thd->get_stmt_da()->set_ok_status( thd, dmlRowCount, 0, "");
// }
// insert query stats // insert query stats
ci->stats.setEndTime(); ci->stats.setEndTime();
@ -2291,6 +2266,16 @@ int ha_mcs_impl_discover_existence(const char* schema, const char* name)
return 0; return 0;
} }
int ha_mcs_impl_direct_update_delete_rows(ha_rows *affected_rows)
{
THD* thd = current_thd;
cal_impl_if::gp_walk_info gwi;
gwi.thd = thd;
int rc = doUpdateDelete(thd, gwi, affected_rows);
return rc;
}
int ha_mcs_impl_rnd_init(TABLE* table) int ha_mcs_impl_rnd_init(TABLE* table)
{ {
IDEBUG( cout << "rnd_init for table " << table->s->table_name.str << endl ); IDEBUG( cout << "rnd_init for table " << table->s->table_name.str << endl );
@ -2350,7 +2335,7 @@ int ha_mcs_impl_rnd_init(TABLE* table)
//Update and delete code //Update and delete code
if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
return doUpdateDelete(thd, gwi); return doUpdateDelete(thd, gwi, NULL);
uint32_t sessionID = tid2sid(thd->thread_id); uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
@ -4080,6 +4065,7 @@ int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type)
&& (mapiter->second.condInfo && mapiter->second.csep) && (mapiter->second.condInfo && mapiter->second.csep)
&& lock_type == 2) && lock_type == 2)
{ {
// CS ends up processing query with handlers
// table mode // table mode
if (mapiter->second.conn_hndl) if (mapiter->second.conn_hndl)
{ {
@ -4125,6 +4111,7 @@ int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type)
ci->physTablesList.erase(table); ci->physTablesList.erase(table);
} }
// CS ends up processing query with handlers
if (iter != ci->physTablesList.end() && ci->physTablesList.empty()) if (iter != ci->physTablesList.end() && ci->physTablesList.empty())
{ {
if (!ci->cal_conn_hndl) if (!ci->cal_conn_hndl)
@ -4933,7 +4920,7 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
//Update and delete code //Update and delete code
if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI)) if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
return doUpdateDelete(thd, gwi); return doUpdateDelete(thd, gwi, NULL);
uint32_t sessionID = tid2sid(thd->thread_id); uint32_t sessionID = tid2sid(thd->thread_id);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);

View File

@ -42,6 +42,7 @@ extern int ha_mcs_impl_close_connection (handlerton* hton, THD* thd);
extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table); extern COND* ha_mcs_impl_cond_push(COND* cond, TABLE* table);
extern int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type); extern int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type);
extern int ha_mcs_impl_update_row(); extern int ha_mcs_impl_update_row();
extern int ha_mcs_impl_direct_update_delete_rows(ha_rows *affected_rows);
extern int ha_mcs_impl_delete_row(); extern int ha_mcs_impl_delete_row();
extern int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos); extern int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos);
extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table); extern int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table);

View File

@ -1,5 +1,6 @@
/* Copyright (C) 2014 InfiniDB, Inc. /* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation Copyright (C) 2016 MariaDB Corporation
Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@ -16,8 +17,6 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */ MA 02110-1301, USA. */
// $Id: ha_mcs_impl_if.h 9701 2013-07-17 18:51:55Z zzhu $
/** @file */
#ifndef HA_MCS_IMPL_IF_H__ #ifndef HA_MCS_IMPL_IF_H__
#define HA_MCS_IMPL_IF_H__ #define HA_MCS_IMPL_IF_H__
#include <string> #include <string>