You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
fix(writeengine) MCOL-4202: use schema name when renaming table and change it's fields in syscat
This commit is contained in:
committed by
Leonid Fedorov
parent
a9ab71e675
commit
97abd9866b
@ -36,7 +36,7 @@ using namespace ddlpackage;
|
||||
#include <ctime>
|
||||
#include "dataconvert.h"
|
||||
using namespace dataconvert;
|
||||
//#include "we_brm.h"
|
||||
// #include "we_brm.h"
|
||||
namespace fs = boost::filesystem;
|
||||
#include "cacheutils.h"
|
||||
#include "IDBDataFile.h"
|
||||
@ -2366,23 +2366,14 @@ uint8_t WE_DDLCommandProc::updateSyscolumnNextvalCol(ByteStream& bs, std::string
|
||||
return rc;
|
||||
}
|
||||
|
||||
uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& err)
|
||||
uint8_t WE_DDLCommandProc::updateSystableEntryForSysColumn(int32_t sessionID, uint32_t txnID,
|
||||
const DDLColumn& column, const std::string& value,
|
||||
const std::string& oldValue,
|
||||
execplan::CalpontSystemCatalog::RIDList& roList,
|
||||
std::string& err)
|
||||
{
|
||||
int rc = 0;
|
||||
uint32_t sessionID, tmp32;
|
||||
std::string schema, oldTablename, newTablename;
|
||||
int txnID;
|
||||
|
||||
bs >> sessionID;
|
||||
bs >> tmp32;
|
||||
txnID = tmp32;
|
||||
bs >> schema;
|
||||
bs >> oldTablename;
|
||||
bs >> newTablename;
|
||||
|
||||
CalpontSystemCatalog::TableName tableName;
|
||||
tableName.schema = schema;
|
||||
tableName.table = oldTablename;
|
||||
WriteEngine::DctnryStructList dctnryStructList;
|
||||
WriteEngine::DctnryValueList dctnryValueList;
|
||||
WriteEngine::DctColTupleList dctRowList;
|
||||
@ -2395,33 +2386,13 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string&
|
||||
uint16_t segment;
|
||||
uint32_t partition;
|
||||
|
||||
CalpontSystemCatalog::RIDList roList;
|
||||
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
|
||||
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
||||
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
|
||||
|
||||
try
|
||||
{
|
||||
roList = systemCatalogPtr->columnRIDs(tableName);
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
err = ex.what();
|
||||
rc = 1;
|
||||
return rc;
|
||||
}
|
||||
|
||||
// Build colStructs for SYSTABLE
|
||||
std::vector<WriteEngine::RID> ridList;
|
||||
WriteEngine::ColValueList colValuesList;
|
||||
WriteEngine::ColTupleList aColList;
|
||||
WriteEngine::ColStructList colStructs;
|
||||
WriteEngine::CSCTypesList cscColTypeList;
|
||||
std::vector<void*> colOldValuesList;
|
||||
tableName.schema = CALPONT_SCHEMA;
|
||||
tableName.table = SYSCOLUMN_TABLE;
|
||||
DDLColumn column;
|
||||
findColumnData(sessionID, tableName, TABLENAME_COL, column);
|
||||
|
||||
WriteEngine::ColStruct colStruct;
|
||||
WriteEngine::DctnryStruct dctnryStruct;
|
||||
WriteEngine::DctnryTuple dictTuple;
|
||||
@ -2491,8 +2462,8 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string&
|
||||
// It's the same string for each column, so we just need one dictionary struct
|
||||
void* dictTuplePtr = static_cast<void*>(&dictTuple);
|
||||
memset(dictTuplePtr, 0, sizeof(dictTuple));
|
||||
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
|
||||
dictTuple.sigSize = newTablename.length();
|
||||
dictTuple.sigValue = (unsigned char*)value.c_str();
|
||||
dictTuple.sigSize = value.length();
|
||||
dictTuple.isNull = false;
|
||||
dctColList = dictTuple;
|
||||
dctRowList.push_back(dctColList);
|
||||
@ -2553,7 +2524,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string&
|
||||
if (rc != NO_ERROR)
|
||||
{
|
||||
// build the logging message
|
||||
err = "WE: Update failed on: " + tableName.table;
|
||||
err = "WE: Update failed on: " + value;
|
||||
}
|
||||
|
||||
int rc1 = 0;
|
||||
@ -2580,6 +2551,55 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string&
|
||||
if (rc == 0)
|
||||
rc = rc1;
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& err)
|
||||
{
|
||||
int rc = 0;
|
||||
uint32_t sessionID, txnID;
|
||||
std::string schema, oldTablename, newTablename, newSchema;
|
||||
|
||||
bs >> sessionID;
|
||||
bs >> txnID;
|
||||
bs >> schema;
|
||||
bs >> oldTablename;
|
||||
bs >> newTablename;
|
||||
bs >> newSchema;
|
||||
|
||||
CalpontSystemCatalog::TableName tableName;
|
||||
tableName.schema = schema;
|
||||
tableName.table = oldTablename;
|
||||
|
||||
CalpontSystemCatalog::RIDList roList;
|
||||
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
|
||||
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
||||
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
|
||||
|
||||
try
|
||||
{
|
||||
roList = systemCatalogPtr->columnRIDs(tableName);
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
err = ex.what();
|
||||
rc = 1;
|
||||
return rc;
|
||||
}
|
||||
|
||||
// Build colStructs for SYSTABLE
|
||||
tableName.schema = CALPONT_SCHEMA;
|
||||
tableName.table = SYSCOLUMN_TABLE;
|
||||
DDLColumn column;
|
||||
findColumnData(sessionID, tableName, TABLENAME_COL, column);
|
||||
rc = updateSystableEntryForSysColumn(sessionID, txnID, column, newTablename, oldTablename, roList, err);
|
||||
|
||||
if (newSchema != schema && rc == NO_ERROR)
|
||||
{
|
||||
findColumnData(sessionID, tableName, SCHEMA_COL, column);
|
||||
rc = updateSystableEntryForSysColumn(sessionID, txnID, column, newSchema, schema, roList, err);
|
||||
}
|
||||
|
||||
systemCatalogPtr->flushCache();
|
||||
purgeFDCache();
|
||||
// if (idbdatafile::IDBPolicy::useHdfs())
|
||||
@ -2771,56 +2791,12 @@ uint8_t WE_DDLCommandProc::updateSystableAuto(ByteStream& bs, std::string& err)
|
||||
return rc;
|
||||
}
|
||||
|
||||
uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& err)
|
||||
uint8_t WE_DDLCommandProc::updateSystableEntryForSysTable(int32_t sessionID, uint32_t txnID,
|
||||
const DDLColumn& column, const std::string& value,
|
||||
const std::string& oldValue,
|
||||
CalpontSystemCatalog::ROPair ropair,
|
||||
std::string& err)
|
||||
{
|
||||
int rc = 0;
|
||||
uint32_t sessionID, tmp32;
|
||||
std::string schema, oldTablename, newTablename;
|
||||
int txnID;
|
||||
|
||||
bs >> sessionID;
|
||||
bs >> tmp32;
|
||||
txnID = tmp32;
|
||||
bs >> schema;
|
||||
bs >> oldTablename;
|
||||
bs >> newTablename;
|
||||
|
||||
CalpontSystemCatalog::TableName tableName;
|
||||
tableName.schema = schema;
|
||||
tableName.table = oldTablename;
|
||||
WriteEngine::DctnryStructList dctnryStructList;
|
||||
WriteEngine::DctnryValueList dctnryValueList;
|
||||
WriteEngine::DctColTupleList dctRowList;
|
||||
WriteEngine::DctnryTuple dctColList;
|
||||
|
||||
uint16_t dbRoot = 0;
|
||||
uint16_t segment;
|
||||
uint32_t partition;
|
||||
|
||||
CalpontSystemCatalog::ROPair ropair;
|
||||
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
|
||||
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
||||
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
|
||||
|
||||
try
|
||||
{
|
||||
ropair = systemCatalogPtr->tableRID(tableName);
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
err = ex.what();
|
||||
rc = 1;
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (ropair.objnum < 0)
|
||||
{
|
||||
err = "No such table: " + tableName.table;
|
||||
return 1;
|
||||
}
|
||||
|
||||
// now we have to prepare the various structures for the WE to update the column.
|
||||
|
||||
std::vector<WriteEngine::RID> ridList;
|
||||
WriteEngine::ColValueList colValuesList;
|
||||
WriteEngine::ColTupleList aColList;
|
||||
@ -2828,19 +2804,16 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string&
|
||||
WriteEngine::CSCTypesList cscColTypeList;
|
||||
std::vector<void*> colOldValuesList;
|
||||
std::map<uint32_t, uint32_t> oids;
|
||||
// std::vector<BRM::OID_t> oidsToFlush;
|
||||
boost::any datavalue;
|
||||
datavalue = newTablename;
|
||||
|
||||
WriteEngine::ColTuple colTuple;
|
||||
|
||||
// Build colStructs for SYSTABLE
|
||||
tableName.schema = CALPONT_SCHEMA;
|
||||
tableName.table = SYSTABLE_TABLE;
|
||||
DDLColumn column;
|
||||
findColumnData(sessionID, tableName, TABLENAME_COL, column);
|
||||
WriteEngine::ColStruct colStruct;
|
||||
WriteEngine::DctnryStruct dctnryStruct;
|
||||
|
||||
WriteEngine::DctnryStructList dctnryStructList;
|
||||
WriteEngine::DctnryValueList dctnryValueList;
|
||||
WriteEngine::DctColTupleList dctRowList;
|
||||
WriteEngine::DctnryTuple dctColList;
|
||||
|
||||
colStruct.dataOid = column.oid;
|
||||
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
|
||||
colStruct.tokenFlag = true;
|
||||
@ -2852,8 +2825,8 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string&
|
||||
dictStruct.columnOid = column.colType.columnOID;
|
||||
WriteEngine::DctnryTuple dictTuple;
|
||||
dictTuple.isNull = false;
|
||||
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
|
||||
dictTuple.sigSize = newTablename.length();
|
||||
dictTuple.sigValue = (unsigned char*)value.c_str();
|
||||
dictTuple.sigSize = value.length();
|
||||
|
||||
if (idbdatafile::IDBPolicy::useHdfs())
|
||||
{
|
||||
@ -2899,6 +2872,11 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string&
|
||||
std::vector<extentInfo> extentsinfo;
|
||||
extentInfo aExtentinfo;
|
||||
CalpontSystemCatalog::OID oid = 1003;
|
||||
|
||||
uint16_t dbRoot = 0;
|
||||
uint16_t segment;
|
||||
uint32_t partition;
|
||||
|
||||
convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid);
|
||||
|
||||
ridList.push_back(ropair.rid);
|
||||
@ -2934,13 +2912,14 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string&
|
||||
fWEWrapper.setBulkFlag(false);
|
||||
fWEWrapper.startTransaction(txnID);
|
||||
|
||||
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
|
||||
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
|
||||
int rc =
|
||||
fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
|
||||
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
|
||||
|
||||
if (rc != NO_ERROR)
|
||||
{
|
||||
// build the logging message
|
||||
err = "WE: Update failed on: " + tableName.table;
|
||||
err = "WE: Update failed on: " + oldValue;
|
||||
int rc1 = 0;
|
||||
|
||||
if (idbdatafile::IDBPolicy::useHdfs())
|
||||
@ -2993,20 +2972,14 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string&
|
||||
if (rc == 0)
|
||||
rc = rc1;
|
||||
|
||||
systemCatalogPtr->flushCache();
|
||||
purgeFDCache();
|
||||
// if (idbdatafile::IDBPolicy::useHdfs())
|
||||
// cacheutils::flushOIDsFromCache(oidsToFlush);
|
||||
// cout << "rename:syscolumn is updated" << endl;
|
||||
return rc;
|
||||
}
|
||||
|
||||
uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& err)
|
||||
uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& err)
|
||||
{
|
||||
int rc = 0;
|
||||
uint32_t sessionID, tmp32;
|
||||
std::string schema, oldTablename, newTablename;
|
||||
int txnID;
|
||||
uint8_t rc;
|
||||
uint32_t sessionID, tmp32, txnID;
|
||||
std::string schema, oldTablename, newTablename, newSchema;
|
||||
|
||||
bs >> sessionID;
|
||||
bs >> tmp32;
|
||||
@ -3014,26 +2987,17 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string&
|
||||
bs >> schema;
|
||||
bs >> oldTablename;
|
||||
bs >> newTablename;
|
||||
bs >> newSchema;
|
||||
|
||||
CalpontSystemCatalog::TableName tableName;
|
||||
tableName.schema = schema;
|
||||
tableName.table = oldTablename;
|
||||
WriteEngine::DctnryStructList dctnryStructList;
|
||||
WriteEngine::DctnryValueList dctnryValueList;
|
||||
WriteEngine::DctColTupleList dctRowList;
|
||||
WriteEngine::DctnryTuple dctColList;
|
||||
|
||||
uint16_t dbRoot = 0;
|
||||
uint16_t segment;
|
||||
uint32_t partition;
|
||||
|
||||
CalpontSystemCatalog::ROPair ropair;
|
||||
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
|
||||
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
|
||||
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
|
||||
|
||||
//@bug 4592 Error handling for syscat call
|
||||
|
||||
try
|
||||
{
|
||||
ropair = systemCatalogPtr->tableRID(tableName);
|
||||
@ -3053,361 +3017,20 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string&
|
||||
|
||||
// now we have to prepare the various structures for the WE to update the column.
|
||||
|
||||
std::vector<WriteEngine::RID> ridList;
|
||||
WriteEngine::ColValueList colValuesList;
|
||||
WriteEngine::ColTupleList aColList;
|
||||
WriteEngine::ColStructList colStructs;
|
||||
WriteEngine::CSCTypesList cscColTypeList;
|
||||
std::vector<void*> colOldValuesList;
|
||||
std::map<uint32_t, uint32_t> oids;
|
||||
// std::vector<BRM::OID_t> oidsToFlush;
|
||||
boost::any datavalue;
|
||||
datavalue = newTablename;
|
||||
|
||||
WriteEngine::ColTuple colTuple;
|
||||
|
||||
// Build colStructs for SYSTABLE
|
||||
tableName.schema = CALPONT_SCHEMA;
|
||||
tableName.table = SYSTABLE_TABLE;
|
||||
DDLColumn column;
|
||||
findColumnData(sessionID, tableName, TABLENAME_COL, column);
|
||||
WriteEngine::ColStruct colStruct;
|
||||
WriteEngine::DctnryStruct dctnryStruct;
|
||||
colStruct.dataOid = column.oid;
|
||||
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
|
||||
colStruct.tokenFlag = true;
|
||||
|
||||
colStruct.colDataType = column.colType.colDataType;
|
||||
// Tokenize the data value
|
||||
WriteEngine::DctnryStruct dictStruct;
|
||||
dictStruct.dctnryOid = column.colType.ddn.dictOID;
|
||||
dictStruct.columnOid = column.colType.columnOID;
|
||||
WriteEngine::DctnryTuple dictTuple;
|
||||
dictTuple.isNull = false;
|
||||
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
|
||||
dictTuple.sigSize = newTablename.length();
|
||||
// int error = NO_ERROR;
|
||||
// if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple)))
|
||||
//{
|
||||
// WErrorCodes ec;
|
||||
// throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
|
||||
//}
|
||||
// WriteEngine::Token aToken = dictTuple.token;
|
||||
rc = updateSystableEntryForSysTable(sessionID, txnID, column, newTablename, oldTablename, ropair, err);
|
||||
|
||||
// colTuple.data = aToken;
|
||||
// cout << "token value for new table name is op:fbo = " << aToken.op <<":" << aToken.fbo << " null flag = "
|
||||
// << (uint32_t)dictTuple.isNull<< endl;
|
||||
if (idbdatafile::IDBPolicy::useHdfs())
|
||||
if (newSchema != schema && rc == NO_ERROR)
|
||||
{
|
||||
colStruct.fCompressionType = 2;
|
||||
dctnryStruct.fCompressionType = 2;
|
||||
findColumnData(sessionID, tableName, SCHEMA_COL, column);
|
||||
rc = updateSystableEntryForSysTable(sessionID, txnID, column, newSchema, schema, ropair, err);
|
||||
}
|
||||
|
||||
if (colStruct.tokenFlag)
|
||||
{
|
||||
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
|
||||
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
|
||||
dctnryStruct.columnOid = colStruct.dataOid;
|
||||
}
|
||||
else
|
||||
{
|
||||
dctnryStruct.dctnryOid = 0;
|
||||
dctnryStruct.columnOid = colStruct.dataOid;
|
||||
}
|
||||
|
||||
colStructs.push_back(colStruct);
|
||||
dctnryStructList.push_back(dctnryStruct);
|
||||
oids[colStruct.dataOid] = colStruct.dataOid;
|
||||
cscColTypeList.push_back(column.colType);
|
||||
|
||||
// oidsToFlush.push_back(colStruct.dataOid);
|
||||
if (dctnryStruct.dctnryOid > 0)
|
||||
{
|
||||
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
|
||||
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
|
||||
}
|
||||
|
||||
aColList.push_back(colTuple);
|
||||
colValuesList.push_back(aColList);
|
||||
std::vector<WriteEngine::ColStructList> colExtentsStruct;
|
||||
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
|
||||
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
|
||||
|
||||
dctColList = dictTuple;
|
||||
dctRowList.push_back(dctColList);
|
||||
dctnryValueList.push_back(dctRowList);
|
||||
|
||||
// In this case, there's only 1 row, so only one one extent, but keep it generic...
|
||||
std::vector<extentInfo> extentsinfo;
|
||||
extentInfo aExtentinfo;
|
||||
CalpontSystemCatalog::OID oid = 1003;
|
||||
convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid);
|
||||
|
||||
ridList.push_back(ropair.rid);
|
||||
std::vector<WriteEngine::RIDList> ridLists;
|
||||
ridLists.push_back(ridList);
|
||||
aExtentinfo.dbRoot = dbRoot;
|
||||
aExtentinfo.partition = partition;
|
||||
aExtentinfo.segment = segment;
|
||||
|
||||
extentsinfo.push_back(aExtentinfo);
|
||||
|
||||
// build colExtentsStruct
|
||||
for (unsigned i = 0; i < extentsinfo.size(); i++)
|
||||
{
|
||||
for (unsigned j = 0; j < colStructs.size(); j++)
|
||||
{
|
||||
colStructs[j].fColPartition = extentsinfo[i].partition;
|
||||
colStructs[j].fColSegment = extentsinfo[i].segment;
|
||||
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
|
||||
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
|
||||
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
|
||||
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
|
||||
}
|
||||
|
||||
colExtentsStruct.push_back(colStructs);
|
||||
dctnryExtentsStruct.push_back(dctnryStructList);
|
||||
colExtentsColType.push_back(cscColTypeList);
|
||||
}
|
||||
|
||||
// call the write engine to update the row
|
||||
fWEWrapper.setTransId(txnID);
|
||||
fWEWrapper.setIsInsert(false);
|
||||
fWEWrapper.setBulkFlag(false);
|
||||
fWEWrapper.startTransaction(txnID);
|
||||
|
||||
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
|
||||
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
|
||||
|
||||
if (rc != NO_ERROR)
|
||||
{
|
||||
// build the logging message
|
||||
err = "WE: Update failed on: " + tableName.table;
|
||||
int rc1 = 0;
|
||||
|
||||
if (idbdatafile::IDBPolicy::useHdfs())
|
||||
{
|
||||
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
|
||||
|
||||
if ((rc == 0) && (rc1 == 0))
|
||||
{
|
||||
rc1 = fWEWrapper.confirmTransaction(txnID);
|
||||
|
||||
if (rc1 == NO_ERROR)
|
||||
rc1 = fWEWrapper.endTransaction(txnID, true);
|
||||
else
|
||||
fWEWrapper.endTransaction(txnID, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
fWEWrapper.endTransaction(txnID, false);
|
||||
}
|
||||
}
|
||||
|
||||
if (rc == 0)
|
||||
rc = rc1;
|
||||
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
}
|
||||
|
||||
// cout << "rename:systable is updated to " << newTablename << " for rid " << ropair.rid << endl;
|
||||
// Update SYSCOLUMN table
|
||||
tableName.schema = schema;
|
||||
tableName.table = oldTablename;
|
||||
dctnryStructList.clear();
|
||||
dctnryValueList.clear();
|
||||
dctRowList.clear();
|
||||
|
||||
CalpontSystemCatalog::RIDList roList;
|
||||
|
||||
try
|
||||
{
|
||||
roList = systemCatalogPtr->columnRIDs(tableName);
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
err = ex.what();
|
||||
rc = 1;
|
||||
return rc;
|
||||
}
|
||||
|
||||
// Build colStructs for SYSCOLUMN
|
||||
ridList.clear();
|
||||
colValuesList.clear();
|
||||
aColList.clear();
|
||||
colStructs.clear();
|
||||
cscColTypeList.clear();
|
||||
colOldValuesList.clear();
|
||||
oids.clear();
|
||||
tableName.schema = CALPONT_SCHEMA;
|
||||
tableName.table = SYSCOLUMN_TABLE;
|
||||
findColumnData(sessionID, tableName, TABLENAME_COL, column);
|
||||
|
||||
colStruct.dataOid = column.oid;
|
||||
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
|
||||
colStruct.tokenFlag = false;
|
||||
|
||||
if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) ||
|
||||
(column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) ||
|
||||
(column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) ||
|
||||
(column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) ||
|
||||
(column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) ||
|
||||
(column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) ||
|
||||
(column.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
|
||||
column.colType.precision > 18)) // token
|
||||
{
|
||||
colStruct.colWidth = 8;
|
||||
colStruct.tokenFlag = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
colStruct.colWidth = column.colType.colWidth;
|
||||
}
|
||||
|
||||
colStruct.colDataType = column.colType.colDataType;
|
||||
|
||||
// Tokenize the data value
|
||||
dictStruct.dctnryOid = column.colType.ddn.dictOID;
|
||||
dictStruct.columnOid = column.colType.columnOID;
|
||||
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
|
||||
dictTuple.sigSize = newTablename.length();
|
||||
dictTuple.isNull = false;
|
||||
/*
|
||||
if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple)))
|
||||
{
|
||||
WErrorCodes ec;
|
||||
throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
|
||||
}
|
||||
aToken = dictTuple.token;
|
||||
colTuple.data = aToken; */
|
||||
|
||||
colStruct.colDataType = column.colType.colDataType;
|
||||
|
||||
if (idbdatafile::IDBPolicy::useHdfs())
|
||||
{
|
||||
colStruct.fCompressionType = 2;
|
||||
dctnryStruct.fCompressionType = 2;
|
||||
}
|
||||
|
||||
if (colStruct.tokenFlag)
|
||||
{
|
||||
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
|
||||
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
|
||||
dctnryStruct.columnOid = colStruct.dataOid;
|
||||
}
|
||||
else
|
||||
{
|
||||
dctnryStruct.dctnryOid = 0;
|
||||
dctnryStruct.columnOid = colStruct.dataOid;
|
||||
}
|
||||
|
||||
oids[colStruct.dataOid] = colStruct.dataOid;
|
||||
|
||||
// oidsToFlush.push_back(colStruct.dataOid);
|
||||
if (dctnryStruct.dctnryOid > 0)
|
||||
{
|
||||
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
|
||||
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
|
||||
}
|
||||
|
||||
colStructs.push_back(colStruct);
|
||||
dctnryStructList.push_back(dctnryStruct);
|
||||
cscColTypeList.push_back(column.colType);
|
||||
|
||||
for (unsigned int i = 0; i < roList.size(); i++)
|
||||
{
|
||||
aColList.push_back(colTuple);
|
||||
}
|
||||
|
||||
colValuesList.push_back(aColList);
|
||||
|
||||
// It's the same string for each column, so we just need one dictionary struct
|
||||
void* dictTuplePtr = static_cast<void*>(&dictTuple);
|
||||
memset(dictTuplePtr, 0, sizeof(dictTuple));
|
||||
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
|
||||
dictTuple.sigSize = newTablename.length();
|
||||
dictTuple.isNull = false;
|
||||
dctColList = dictTuple;
|
||||
dctRowList.push_back(dctColList);
|
||||
dctnryValueList.push_back(dctRowList);
|
||||
extentsinfo.clear();
|
||||
colExtentsStruct.clear();
|
||||
colExtentsColType.clear();
|
||||
dctnryExtentsStruct.clear();
|
||||
oid = 1021;
|
||||
|
||||
for (unsigned int i = 0; i < roList.size(); i++)
|
||||
{
|
||||
convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);
|
||||
|
||||
aExtentinfo.dbRoot = dbRoot;
|
||||
aExtentinfo.partition = partition;
|
||||
aExtentinfo.segment = segment;
|
||||
|
||||
if (extentsinfo.empty())
|
||||
extentsinfo.push_back(aExtentinfo);
|
||||
else if (extentsinfo.back() != aExtentinfo)
|
||||
extentsinfo.push_back(aExtentinfo);
|
||||
|
||||
ridList.push_back(roList[i].rid);
|
||||
}
|
||||
|
||||
ridLists.clear();
|
||||
ridLists.push_back(ridList);
|
||||
|
||||
// build colExtentsStruct
|
||||
for (unsigned i = 0; i < extentsinfo.size(); i++)
|
||||
{
|
||||
for (unsigned j = 0; j < colStructs.size(); j++)
|
||||
{
|
||||
colStructs[j].fColPartition = extentsinfo[i].partition;
|
||||
colStructs[j].fColSegment = extentsinfo[i].segment;
|
||||
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
|
||||
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
|
||||
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
|
||||
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
|
||||
}
|
||||
|
||||
colExtentsStruct.push_back(colStructs);
|
||||
dctnryExtentsStruct.push_back(dctnryStructList);
|
||||
colExtentsColType.push_back(cscColTypeList);
|
||||
}
|
||||
|
||||
// call the write engine to update the row
|
||||
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
|
||||
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
|
||||
|
||||
if (rc != NO_ERROR)
|
||||
{
|
||||
// build the logging message
|
||||
err = "WE: Update failed on: " + tableName.table;
|
||||
}
|
||||
|
||||
int rc1 = 0;
|
||||
|
||||
if (idbdatafile::IDBPolicy::useHdfs())
|
||||
{
|
||||
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
|
||||
|
||||
if ((rc == 0) && (rc1 == 0))
|
||||
{
|
||||
rc1 = fWEWrapper.confirmTransaction(txnID);
|
||||
|
||||
if (rc1 == NO_ERROR)
|
||||
rc1 = fWEWrapper.endTransaction(txnID, true);
|
||||
else
|
||||
fWEWrapper.endTransaction(txnID, false);
|
||||
}
|
||||
else
|
||||
{
|
||||
fWEWrapper.endTransaction(txnID, false);
|
||||
}
|
||||
}
|
||||
|
||||
if (rc == 0)
|
||||
rc = rc1;
|
||||
|
||||
systemCatalogPtr->flushCache();
|
||||
purgeFDCache();
|
||||
// if (idbdatafile::IDBPolicy::useHdfs())
|
||||
|
Reference in New Issue
Block a user