1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

fix(writeengine) MCOL-4202: use schema name when renaming table and change it's fields in syscat

This commit is contained in:
Leonid Fedorov
2023-12-06 21:05:51 +00:00
committed by Leonid Fedorov
parent 74c1a38f2c
commit fadb102712
12 changed files with 214 additions and 657 deletions

View File

@ -36,7 +36,7 @@ using namespace ddlpackage;
#include <ctime>
#include "dataconvert.h"
using namespace dataconvert;
//#include "we_brm.h"
// #include "we_brm.h"
namespace fs = boost::filesystem;
#include "cacheutils.h"
#include "IDBDataFile.h"
@ -2366,23 +2366,14 @@ uint8_t WE_DDLCommandProc::updateSyscolumnNextvalCol(ByteStream& bs, std::string
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& err)
uint8_t WE_DDLCommandProc::updateSystableEntryForSysColumn(int32_t sessionID, uint32_t txnID,
const DDLColumn& column, const std::string& value,
const std::string& oldValue,
execplan::CalpontSystemCatalog::RIDList& roList,
std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, oldTablename, newTablename;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> oldTablename;
bs >> newTablename;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = oldTablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
@ -2395,33 +2386,13 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string&
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::RIDList roList;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
roList = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Build colStructs for SYSTABLE
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
DDLColumn column;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::DctnryTuple dictTuple;
@ -2491,8 +2462,8 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string&
// It's the same string for each column, so we just need one dictionary struct
void* dictTuplePtr = static_cast<void*>(&dictTuple);
memset(dictTuplePtr, 0, sizeof(dictTuple));
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.sigValue = (unsigned char*)value.c_str();
dictTuple.sigSize = value.length();
dictTuple.isNull = false;
dctColList = dictTuple;
dctRowList.push_back(dctColList);
@ -2553,7 +2524,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string&
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
err = "WE: Update failed on: " + value;
}
int rc1 = 0;
@ -2580,6 +2551,55 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string&
if (rc == 0)
rc = rc1;
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, txnID;
std::string schema, oldTablename, newTablename, newSchema;
bs >> sessionID;
bs >> txnID;
bs >> schema;
bs >> oldTablename;
bs >> newTablename;
bs >> newSchema;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = oldTablename;
CalpontSystemCatalog::RIDList roList;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
roList = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Build colStructs for SYSTABLE
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
DDLColumn column;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
rc = updateSystableEntryForSysColumn(sessionID, txnID, column, newTablename, oldTablename, roList, err);
if (newSchema != schema && rc == NO_ERROR)
{
findColumnData(sessionID, tableName, SCHEMA_COL, column);
rc = updateSystableEntryForSysColumn(sessionID, txnID, column, newSchema, schema, roList, err);
}
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
@ -2771,56 +2791,12 @@ uint8_t WE_DDLCommandProc::updateSystableAuto(ByteStream& bs, std::string& err)
return rc;
}
uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& err)
uint8_t WE_DDLCommandProc::updateSystableEntryForSysTable(int32_t sessionID, uint32_t txnID,
const DDLColumn& column, const std::string& value,
const std::string& oldValue,
CalpontSystemCatalog::ROPair ropair,
std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, oldTablename, newTablename;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> oldTablename;
bs >> newTablename;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = oldTablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::ROPair ropair;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
ropair = systemCatalogPtr->tableRID(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
if (ropair.objnum < 0)
{
err = "No such table: " + tableName.table;
return 1;
}
// now we have to prepare the various structures for the WE to update the column.
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
@ -2828,19 +2804,16 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string&
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
boost::any datavalue;
datavalue = newTablename;
WriteEngine::ColTuple colTuple;
// Build colStructs for SYSTABLE
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSTABLE_TABLE;
DDLColumn column;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = true;
@ -2852,8 +2825,8 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string&
dictStruct.columnOid = column.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
dictTuple.isNull = false;
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.sigValue = (unsigned char*)value.c_str();
dictTuple.sigSize = value.length();
if (idbdatafile::IDBPolicy::useHdfs())
{
@ -2899,6 +2872,11 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string&
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
CalpontSystemCatalog::OID oid = 1003;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid);
ridList.push_back(ropair.rid);
@ -2934,13 +2912,14 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string&
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
int rc =
fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
err = "WE: Update failed on: " + oldValue;
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
@ -2993,20 +2972,14 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string&
if (rc == 0)
rc = rc1;
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
// cout << "rename:syscolumn is updated" << endl;
return rc;
}
uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& err)
uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, oldTablename, newTablename;
int txnID;
uint8_t rc;
uint32_t sessionID, tmp32, txnID;
std::string schema, oldTablename, newTablename, newSchema;
bs >> sessionID;
bs >> tmp32;
@ -3014,26 +2987,17 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string&
bs >> schema;
bs >> oldTablename;
bs >> newTablename;
bs >> newSchema;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = oldTablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::ROPair ropair;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
//@bug 4592 Error handling for syscat call
try
{
ropair = systemCatalogPtr->tableRID(tableName);
@ -3053,361 +3017,20 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string&
// now we have to prepare the various structures for the WE to update the column.
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
boost::any datavalue;
datavalue = newTablename;
WriteEngine::ColTuple colTuple;
// Build colStructs for SYSTABLE
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSTABLE_TABLE;
DDLColumn column;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = true;
colStruct.colDataType = column.colType.colDataType;
// Tokenize the data value
WriteEngine::DctnryStruct dictStruct;
dictStruct.dctnryOid = column.colType.ddn.dictOID;
dictStruct.columnOid = column.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
dictTuple.isNull = false;
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
// int error = NO_ERROR;
// if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple)))
//{
// WErrorCodes ec;
// throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
//}
// WriteEngine::Token aToken = dictTuple.token;
rc = updateSystableEntryForSysTable(sessionID, txnID, column, newTablename, oldTablename, ropair, err);
// colTuple.data = aToken;
// cout << "token value for new table name is op:fbo = " << aToken.op <<":" << aToken.fbo << " null flag = "
// << (uint32_t)dictTuple.isNull<< endl;
if (idbdatafile::IDBPolicy::useHdfs())
if (newSchema != schema && rc == NO_ERROR)
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
findColumnData(sessionID, tableName, SCHEMA_COL, column);
rc = updateSystableEntryForSysTable(sessionID, txnID, column, newSchema, schema, ropair, err);
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
aColList.push_back(colTuple);
colValuesList.push_back(aColList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
dctColList = dictTuple;
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// In this case, there's only 1 row, so only one one extent, but keep it generic...
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
CalpontSystemCatalog::OID oid = 1003;
convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid);
ridList.push_back(ropair.rid);
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
extentsinfo.push_back(aExtentinfo);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
if (rc != 0)
return rc;
}
// cout << "rename:systable is updated to " << newTablename << " for rid " << ropair.rid << endl;
// Update SYSCOLUMN table
tableName.schema = schema;
tableName.table = oldTablename;
dctnryStructList.clear();
dctnryValueList.clear();
dctRowList.clear();
CalpontSystemCatalog::RIDList roList;
try
{
roList = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Build colStructs for SYSCOLUMN
ridList.clear();
colValuesList.clear();
aColList.clear();
colStructs.clear();
cscColTypeList.clear();
colOldValuesList.clear();
oids.clear();
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) ||
(column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) ||
(column.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
column.colType.precision > 18)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = column.colType.colWidth;
}
colStruct.colDataType = column.colType.colDataType;
// Tokenize the data value
dictStruct.dctnryOid = column.colType.ddn.dictOID;
dictStruct.columnOid = column.colType.columnOID;
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
/*
if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple)))
{
WErrorCodes ec;
throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
}
aToken = dictTuple.token;
colTuple.data = aToken; */
colStruct.colDataType = column.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
for (unsigned int i = 0; i < roList.size(); i++)
{
aColList.push_back(colTuple);
}
colValuesList.push_back(aColList);
// It's the same string for each column, so we just need one dictionary struct
void* dictTuplePtr = static_cast<void*>(&dictTuple);
memset(dictTuplePtr, 0, sizeof(dictTuple));
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
dctColList = dictTuple;
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
extentsinfo.clear();
colExtentsStruct.clear();
colExtentsColType.clear();
dctnryExtentsStruct.clear();
oid = 1021;
for (unsigned int i = 0; i < roList.size(); i++)
{
convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
if (extentsinfo.empty())
extentsinfo.push_back(aExtentinfo);
else if (extentsinfo.back() != aExtentinfo)
extentsinfo.push_back(aExtentinfo);
ridList.push_back(roList[i].rid);
}
ridLists.clear();
ridLists.push_back(ridList);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())

View File

@ -32,8 +32,11 @@
#define EXPORT
namespace WriteEngine
{
struct DDLColumn;
class WE_DDLCommandProc
{
public:
@ -43,9 +46,9 @@ class WE_DDLCommandProc
DROPPART_LOG,
TRUNCATE_LOG
};
EXPORT WE_DDLCommandProc();
EXPORT WE_DDLCommandProc(const WE_DDLCommandProc& rhs);
EXPORT ~WE_DDLCommandProc();
WE_DDLCommandProc();
WE_DDLCommandProc(const WE_DDLCommandProc& rhs);
~WE_DDLCommandProc();
/** @brief Update SYSCOLUMN nextval column for the columnoid with nextVal.
*
* Update SYSCOLUMN nextval column for the columnoid with nexValue.
@ -53,35 +56,36 @@ class WE_DDLCommandProc
* @param nextVal (in) The partition number
* @return 0 on success, non-0 on error.
*/
EXPORT uint8_t updateSyscolumnNextval(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t writeSystable(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t writeSyscolumn(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t writeCreateSyscolumn(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t createtablefiles(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t commitVersion(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t rollbackBlocks(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t rollbackVersion(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t deleteSyscolumn(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t deleteSyscolumnRow(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t deleteSystable(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t deleteSystables(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t dropFiles(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t updateSyscolumnAuto(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t updateSyscolumnNextvalCol(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t updateSyscolumnTablename(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t updateSystableAuto(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t updateSystableTablename(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t updateSystablesTablename(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t updateSyscolumnColumnposCol(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t fillNewColumn(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t updateSyscolumnRenameColumn(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t updateSyscolumnSetDefault(messageqcpp::ByteStream& bs, std::string& err);
// EXPORT uint8_t updateSyscolumn(messageqcpp::ByteStream& bs, std::string & err);
EXPORT uint8_t writeTruncateLog(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t writeDropPartitionLog(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t writeDropTableLog(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t deleteDDLLog(messageqcpp::ByteStream& bs, std::string& err);
EXPORT uint8_t fetchDDLLog(messageqcpp::ByteStream& bs, std::string& err);
uint8_t updateSyscolumnNextval(messageqcpp::ByteStream& bs, std::string& err);
uint8_t writeSystable(messageqcpp::ByteStream& bs, std::string& err);
uint8_t writeSyscolumn(messageqcpp::ByteStream& bs, std::string& err);
uint8_t writeCreateSyscolumn(messageqcpp::ByteStream& bs, std::string& err);
uint8_t createtablefiles(messageqcpp::ByteStream& bs, std::string& err);
uint8_t commitVersion(messageqcpp::ByteStream& bs, std::string& err);
uint8_t rollbackBlocks(messageqcpp::ByteStream& bs, std::string& err);
uint8_t rollbackVersion(messageqcpp::ByteStream& bs, std::string& err);
uint8_t deleteSyscolumn(messageqcpp::ByteStream& bs, std::string& err);
uint8_t deleteSyscolumnRow(messageqcpp::ByteStream& bs, std::string& err);
uint8_t deleteSystable(messageqcpp::ByteStream& bs, std::string& err);
uint8_t deleteSystables(messageqcpp::ByteStream& bs, std::string& err);
uint8_t dropFiles(messageqcpp::ByteStream& bs, std::string& err);
uint8_t updateSyscolumnAuto(messageqcpp::ByteStream& bs, std::string& err);
uint8_t updateSyscolumnNextvalCol(messageqcpp::ByteStream& bs, std::string& err);
uint8_t updateSystableAuto(messageqcpp::ByteStream& bs, std::string& err);
uint8_t updateSyscolumnTablename(messageqcpp::ByteStream& bs, std::string& err);
uint8_t updateSystableTablename(messageqcpp::ByteStream& bs, std::string& err);
uint8_t updateSyscolumnColumnposCol(messageqcpp::ByteStream& bs, std::string& err);
uint8_t fillNewColumn(messageqcpp::ByteStream& bs, std::string& err);
uint8_t updateSyscolumnRenameColumn(messageqcpp::ByteStream& bs, std::string& err);
uint8_t updateSyscolumnSetDefault(messageqcpp::ByteStream& bs, std::string& err);
// uint8_t updateSyscolumn(messageqcpp::ByteStream& bs, std::string & err);
uint8_t writeTruncateLog(messageqcpp::ByteStream& bs, std::string& err);
uint8_t writeDropPartitionLog(messageqcpp::ByteStream& bs, std::string& err);
uint8_t writeDropTableLog(messageqcpp::ByteStream& bs, std::string& err);
uint8_t deleteDDLLog(messageqcpp::ByteStream& bs, std::string& err);
uint8_t fetchDDLLog(messageqcpp::ByteStream& bs, std::string& err);
void purgeFDCache();
/** @brief drop a set of partitions
*
@ -90,7 +94,7 @@ class WE_DDLCommandProc
* @param err (out) error message when error occurs
* @return 0 on success, otherwise error.
*/
EXPORT uint8_t dropPartitions(messageqcpp::ByteStream& bs, std::string& err);
uint8_t dropPartitions(messageqcpp::ByteStream& bs, std::string& err);
inline void convertRidToColumn(uint64_t& rid, uint16_t& dbRoot, uint32_t& partition, uint16_t& segment,
const int32_t oid)
{
@ -111,6 +115,24 @@ class WE_DDLCommandProc
rid = relRidInThisExtent + numExtentsInThisSegPart * extentRows;
}
private:
uint8_t updateSystableEntryForSysTable(int32_t sessionID,
uint32_t txnID,
const DDLColumn& column,
const std::string& value,
const std::string& oldValue,
execplan::CalpontSystemCatalog::ROPair ropair,
std::string& err);
uint8_t updateSystableEntryForSysColumn(int32_t sessionID,
uint32_t txnID,
const DDLColumn& column,
const std::string& value,
const std::string& oldValue,
execplan::CalpontSystemCatalog::RIDList& roList,
std::string& err);
private:
WriteEngineWrapper fWEWrapper;
BRM::DBRM fDbrm;

View File

@ -48,7 +48,6 @@ enum ServerMessages
WE_SVR_UPDATE_SYSCOLUMN_COLPOS,
WE_SVR_UPDATE_SYSCOLUMN_RENAMECOLUMN,
WE_SVR_UPDATE_SYSTABLE_TABLENAME,
WE_SVR_UPDATE_SYSTABLES_TABLENAME,
WE_SVR_DROP_PARTITIONS,
WE_SVR_SINGLE_INSERT,
WE_SVR_BATCH_KEEPALIVE,

View File

@ -325,12 +325,6 @@ void DmlReadThread::operator()()
break;
}
case WE_SVR_UPDATE_SYSTABLES_TABLENAME:
{
rc = fWeDDLprocessor->updateSystablesTablename(ibs, errMsg);
break;
}
case WE_SVR_FILL_COLUMN:
{
rc = fWeDDLprocessor->fillNewColumn(ibs, errMsg);