You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
Merge pull request #964 from mariadb-corporation/MCOL-3529
MCOL-3529 This patch implements direct_update and direct_delete features
This commit is contained in:
@ -16,11 +16,6 @@
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
/*
|
||||
* $Id: ha_mcs_impl.cpp 9642 2013-06-24 14:57:42Z rdempsey $
|
||||
*/
|
||||
|
||||
//#define DEBUG_WALK_COND
|
||||
#include <my_config.h>
|
||||
#ifndef _MSC_VER
|
||||
#include <unistd.h>
|
||||
@ -41,7 +36,6 @@
|
||||
#include <cerrno>
|
||||
#include <cstring>
|
||||
#include <time.h>
|
||||
//#define NDEBUG
|
||||
#include <cassert>
|
||||
#include <vector>
|
||||
#include <map>
|
||||
@ -61,7 +55,6 @@ using namespace std;
|
||||
#include <boost/algorithm/string/case_conv.hpp>
|
||||
#include <boost/regex.hpp>
|
||||
#include <boost/thread.hpp>
|
||||
//using namespace boost;
|
||||
|
||||
#include "idb_mysql.h"
|
||||
|
||||
@ -1201,7 +1194,7 @@ vector<string> getOnUpdateTimestampColumns(string& schema, string& tableName, in
|
||||
return returnVal;
|
||||
}
|
||||
|
||||
uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
|
||||
uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, ha_rows *affected_rows)
|
||||
{
|
||||
if (get_fe_conn_info_ptr() == NULL)
|
||||
set_fe_conn_info_ptr((void*)new cal_connection_info());
|
||||
@ -1502,7 +1495,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
|
||||
}
|
||||
else if ( value->type() == Item::NULL_ITEM )
|
||||
{
|
||||
// dmlStmt += "NULL";
|
||||
columnAssignmentPtr->fScalarExpression = "";
|
||||
columnAssignmentPtr->fFromCol = false;
|
||||
columnAssignmentPtr->fIsNull = true;
|
||||
@ -1511,13 +1503,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
|
||||
{
|
||||
isFromCol = true;
|
||||
columnAssignmentPtr->fFromCol = true;
|
||||
// Item_field* setIt = reinterpret_cast<Item_field*> (value);
|
||||
// string sectableName = string(setIt->table_name);
|
||||
// string secschemaName = string(setIt->db_name);
|
||||
// if ( (strcasecmp(tableName.c_str(), sectableName.c_str()) != 0) || (strcasecmp(schemaName.c_str(), secschemaName.c_str()) != 0))
|
||||
// {
|
||||
isFromSameTable = false;
|
||||
// }
|
||||
isFromSameTable = false;
|
||||
}
|
||||
//@Bug 4449 handle default value
|
||||
else if (value->type() == Item::DEFAULT_VALUE_ITEM)
|
||||
@ -1571,6 +1557,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
|
||||
{
|
||||
if (timeStampColumnNames.find(onUpdateTimeStampColumns[i]) == timeStampColumnNames.end())
|
||||
{
|
||||
// DRRTUY * That is far from ideal.
|
||||
columnAssignmentPtr = new ColumnAssignment(string(onUpdateTimeStampColumns[i]), "=", "");
|
||||
struct timeval tv;
|
||||
char buf[64];
|
||||
@ -1746,9 +1733,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
|
||||
return 0;
|
||||
}
|
||||
|
||||
//if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
|
||||
{
|
||||
//updateCP->subType (CalpontSelectExecutionPlan::SINGLEROW_SUBS); //set scalar
|
||||
updateCP->subType (CalpontSelectExecutionPlan::SELECT_SUBS);
|
||||
//@Bug 2975.
|
||||
SessionManager sm;
|
||||
@ -1851,6 +1836,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
|
||||
}
|
||||
|
||||
updateCP->tableList( tbList );
|
||||
// DRRTUY * this is very optimisic assumption
|
||||
updateCP->overrideLargeSideEstimate( true );
|
||||
//loop through returnedcols to find out constant columns
|
||||
CalpontSelectExecutionPlan::ReturnedColumnList returnedCols = updateCP->returnedCols();
|
||||
@ -1864,7 +1850,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
|
||||
{
|
||||
returnedCols.erase(coliter);
|
||||
coliter = returnedCols.begin();
|
||||
//cout << "constant column " << endl;
|
||||
}
|
||||
else
|
||||
coliter++;
|
||||
@ -1946,16 +1931,14 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
|
||||
ptsub->walk(makeUpdateSemiJoin, &tbList[0] );
|
||||
}
|
||||
|
||||
//cout<< "Plan is " << endl << *updateCP << endl;
|
||||
if (( (thd->lex)->sql_command == SQLCOM_UPDATE ) || ( (thd->lex)->sql_command == SQLCOM_UPDATE_MULTI ) )
|
||||
thd->lex->first_select_lex()->item_list = items;
|
||||
}
|
||||
|
||||
//cout<< "Plan is " << endl << *updateCP << endl;
|
||||
}
|
||||
|
||||
//updateCP->traceFlags(1);
|
||||
//cout<< "Plan is " << endl << *updateCP << endl;
|
||||
//updateCP->traceFlags(1);
|
||||
pDMLPackage->HasFilter(true);
|
||||
pDMLPackage->uuid(updateCP->uuid());
|
||||
|
||||
@ -1964,7 +1947,6 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
|
||||
boost::shared_ptr<messageqcpp::ByteStream> plan = pDMLPackage->get_ExecutionPlan();
|
||||
updateCP->rmParms(ci->rmParms);
|
||||
updateCP->serialize(*plan);
|
||||
//cout << "plan has bytes " << plan->length() << endl;
|
||||
pDMLPackage->write(bytestream);
|
||||
|
||||
delete pDMLPackage;
|
||||
@ -2220,8 +2202,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
|
||||
//@Bug 2540. Set error status instead of warning
|
||||
thd->raise_error_printf(ER_INTERNAL_ERROR, errorMsg.c_str());
|
||||
ci->rc = b;
|
||||
thd->set_row_count_func(0);
|
||||
thd->get_stmt_da()->set_overwrite_status(true);
|
||||
// WIP
|
||||
//thd->get_stmt_da()->set_overwrite_status(true);
|
||||
//cout << " error status " << ci->rc << endl;
|
||||
}
|
||||
|
||||
@ -2236,7 +2218,8 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
|
||||
}
|
||||
else
|
||||
{
|
||||
thd->set_row_count_func(dmlRowCount+thd->get_row_count_func());
|
||||
if (affected_rows)
|
||||
*affected_rows = dmlRowCount;
|
||||
}
|
||||
|
||||
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_WARN_DATA_OUT_OF_RANGE, errorMsg.c_str());
|
||||
@ -2244,19 +2227,11 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi)
|
||||
else
|
||||
{
|
||||
// if (dmlRowCount != 0) //Bug 5117. Handling self join.
|
||||
thd->set_row_count_func(dmlRowCount+thd->get_row_count_func());
|
||||
|
||||
|
||||
if (affected_rows)
|
||||
*affected_rows = dmlRowCount;
|
||||
//cout << " error status " << ci->rc << " and rowcount = " << dmlRowCount << endl;
|
||||
}
|
||||
|
||||
// @bug 4027. comment out the following because this will cause mysql
|
||||
// kernel assertion failure. not sure why this is here in the first place.
|
||||
// if (thd->derived_tables)
|
||||
// {
|
||||
// thd->get_stmt_da()->set_overwrite_status(true);
|
||||
// thd->get_stmt_da()->set_ok_status( thd, dmlRowCount, 0, "");
|
||||
// }
|
||||
// insert query stats
|
||||
ci->stats.setEndTime();
|
||||
|
||||
@ -2308,6 +2283,16 @@ int ha_mcs_impl_discover_existence(const char* schema, const char* name)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ha_mcs_impl_direct_update_delete_rows(ha_rows *affected_rows)
|
||||
{
|
||||
THD* thd = current_thd;
|
||||
cal_impl_if::gp_walk_info gwi;
|
||||
gwi.thd = thd;
|
||||
|
||||
int rc = doUpdateDelete(thd, gwi, affected_rows);
|
||||
return rc;
|
||||
}
|
||||
|
||||
int ha_mcs_impl_rnd_init(TABLE* table)
|
||||
{
|
||||
IDEBUG( cout << "rnd_init for table " << table->s->table_name.str << endl );
|
||||
@ -2367,7 +2352,7 @@ int ha_mcs_impl_rnd_init(TABLE* table)
|
||||
|
||||
//Update and delete code
|
||||
if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
|
||||
return doUpdateDelete(thd, gwi);
|
||||
return doUpdateDelete(thd, gwi, NULL);
|
||||
|
||||
uint32_t sessionID = tid2sid(thd->thread_id);
|
||||
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
||||
@ -4059,6 +4044,7 @@ int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type)
|
||||
// the table mode.
|
||||
if (mapiter != ci->tableMap.end() && mapiter->second.csep && lock_type == 2)
|
||||
{
|
||||
// CS ends up processing query with handlers
|
||||
// table mode
|
||||
if (mapiter->second.conn_hndl)
|
||||
{
|
||||
@ -4104,6 +4090,7 @@ int ha_mcs_impl_external_lock(THD* thd, TABLE* table, int lock_type)
|
||||
ci->physTablesList.erase(table);
|
||||
}
|
||||
|
||||
// CS ends up processing query with handlers
|
||||
if (iter != ci->physTablesList.end() && ci->physTablesList.empty())
|
||||
{
|
||||
if (!ci->cal_conn_hndl)
|
||||
@ -4912,7 +4899,7 @@ int ha_cs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
|
||||
|
||||
//Update and delete code
|
||||
if ( ((thd->lex)->sql_command == SQLCOM_UPDATE) || ((thd->lex)->sql_command == SQLCOM_DELETE) || ((thd->lex)->sql_command == SQLCOM_DELETE_MULTI) || ((thd->lex)->sql_command == SQLCOM_UPDATE_MULTI))
|
||||
return doUpdateDelete(thd, gwi);
|
||||
return doUpdateDelete(thd, gwi, NULL);
|
||||
|
||||
uint32_t sessionID = tid2sid(thd->thread_id);
|
||||
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
||||
|
Reference in New Issue
Block a user