1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Gagan Goel 9b6d3c3870 MCOL-5021 Add support for AUX column in the client code calling
CalpontSystemCatalog::columnRIDs().
2022-08-05 14:40:49 -04:00

4970 lines
138 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: we_ddlcommandproc.cpp 3082 2011-09-26 22:00:38Z chao $
#include <unistd.h>
#include "boost/filesystem/operations.hpp"
#include "boost/filesystem/path.hpp"
#include "boost/scoped_ptr.hpp"
using namespace std;
#include "bytestream.h"
using namespace messageqcpp;
#include "we_messages.h"
#include "we_message_handlers.h"
#include "we_ddlcommon.h"
#include "we_ddlcommandproc.h"
#include "ddlpkg.h"
using namespace ddlpackage;
#include <ctime>
#include "dataconvert.h"
using namespace dataconvert;
//#include "we_brm.h"
namespace fs = boost::filesystem;
#include "cacheutils.h"
#include "IDBDataFile.h"
#include "IDBPolicy.h"
using namespace idbdatafile;
using namespace execplan;
namespace WriteEngine
{
WE_DDLCommandProc::WE_DDLCommandProc()
{
filesPerColumnPartition = 8;
extentsPerSegmentFile = 1;
dbrootCnt = 1;
extentRows = 0x800000;
config::Config* cf = config::Config::makeConfig();
string fpc = cf->getConfig("ExtentMap", "FilesPerColumnPartition");
if (fpc.length() != 0)
filesPerColumnPartition = cf->uFromText(fpc);
// MCOL-4685: remove the option to set more than 2 extents per file (ExtentsPreSegmentFile).
extentsPerSegmentFile = DEFAULT_EXTENTS_PER_SEGMENT_FILE;
string dbct = cf->getConfig("SystemConfig", "DBRootCount");
if (dbct.length() != 0)
dbrootCnt = cf->uFromText(dbct);
}
WE_DDLCommandProc::WE_DDLCommandProc(const WE_DDLCommandProc& rhs)
{
}
WE_DDLCommandProc::~WE_DDLCommandProc()
{
}
uint8_t WE_DDLCommandProc::updateSyscolumnNextval(ByteStream& bs, std::string& err)
{
uint32_t columnOid, sessionID;
uint64_t nextVal;
int rc = 0;
bs >> columnOid;
bs >> nextVal;
bs >> sessionID;
uint16_t dbRoot;
BRM::OID_t oid = 1021;
fDbrm.getSysCatDBRoot(oid, dbRoot);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
oids[columnOid] = columnOid;
// oidsToFlush.push_back(columnOid);
if (idbdatafile::IDBPolicy::useHdfs())
fWEWrapper.startTransaction(sessionID);
rc = fWEWrapper.updateNextValue(sessionID, columnOid, nextVal, sessionID, dbRoot);
if (rc != 0)
{
err = "Error in WE::updateNextValue";
rc = 1;
}
if (idbdatafile::IDBPolicy::useHdfs())
{
fWEWrapper.flushDataFiles(rc, sessionID, oids);
fWEWrapper.confirmTransaction(sessionID);
if (rc == 0)
fWEWrapper.endTransaction(sessionID, true);
else
fWEWrapper.endTransaction(sessionID, false);
}
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::writeSystable(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
int txnID, tableOID, tableAUXColumnOID;
uint32_t tableWithAutoi;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> tmp32;
tableOID = tmp32;
bs >> tmp32;
tableAUXColumnOID = tmp32;
bs >> tmp32;
tableWithAutoi = tmp32;
bs >> tmp32;
uint16_t dbroot = tmp32;
ddlpackage::TableDef tableDef;
tableDef.unserialize(bs);
WriteEngine::ColTuple colTuple;
WriteEngine::ColStruct colStruct;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::ColTupleList colTuples;
WriteEngine::dictStr dctColTuples;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::ColValueList colValuesList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DictStrList dctnryValueList;
WriteEngine::RIDList ridList;
CalpontSystemCatalog::TableName tableName;
CalpontSystemCatalog::ROPair sysTableROPair;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
ColumnList columns;
ColumnList::const_iterator column_iterator;
DDLColumn column;
int error = 0;
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSTABLE_TABLE;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
try
{
sysTableROPair = systemCatalogPtr->tableRID(tableName);
getColumnsForTable(sessionID, tableName.schema, tableName.table, columns);
column_iterator = columns.begin();
std::string tmpStr("");
while (column_iterator != columns.end())
{
column = *column_iterator;
boost::to_lower(column.tableColName.column);
if (TABLENAME_COL == column.tableColName.column)
{
std::string tablename = tableDef.fQualifiedName->fName;
colTuple.data = tablename;
tmpStr = tablename;
}
else if (SCHEMA_COL == column.tableColName.column)
{
std::string schema = tableDef.fQualifiedName->fSchema;
colTuple.data = schema;
tmpStr = schema;
}
else if (OBJECTID_COL == column.tableColName.column)
{
colTuple.data = tableOID;
}
else if (AUXCOLUMNOID_COL == column.tableColName.column)
{
colTuple.data = tableAUXColumnOID;
}
else if (CREATEDATE_COL == column.tableColName.column)
{
time_t t;
struct tm tmp;
Date aDay;
t = time(NULL);
gmtime_r(&t, &tmp);
aDay.year = tmp.tm_year + 1900;
aDay.month = tmp.tm_mon + 1;
aDay.day = tmp.tm_mday;
colTuple.data = *(reinterpret_cast<int*>(&aDay));
}
else if (INIT_COL == column.tableColName.column)
{
colTuple.data = column.colType.getNullValueForType();
}
else if (NEXT_COL == column.tableColName.column)
{
colTuple.data = column.colType.getNullValueForType();
}
else if (AUTOINC_COL == column.tableColName.column)
{
colTuple.data = tableWithAutoi;
}
else
{
colTuple.data = column.colType.getNullValueForType();
}
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.tokenFlag = column.colType.colWidth > 8 ? true : false;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbroot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
dctnryStruct.fColDbRoot = dbroot;
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = column.oid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = column.oid;
}
colStructs.push_back(colStruct);
cscColTypeList.push_back(column.colType);
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
colTuples.push_back(colTuple);
dctColTuples.push_back(tmpStr);
colValuesList.push_back(colTuples);
dctnryStructList.push_back(dctnryStruct);
dctnryValueList.push_back(dctColTuples);
colTuples.pop_back();
dctColTuples.pop_back();
++column_iterator;
}
// fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
if (0 != colStructs.size())
{
// MCOL-66 The DBRM can't handle concurrent transactions to sys tables
// TODO: This may be redundant
static boost::mutex dbrmMutex;
boost::mutex::scoped_lock lk(dbrmMutex);
error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList,
dctnryStructList, dctnryValueList, SYSCOLUMN_BASE);
if (error != WriteEngine::NO_ERROR)
{
if (error == ERR_BRM_WR_VB_ENTRY)
{
throw std::runtime_error("WE: Error writing to BRM.");
}
else
{
WErrorCodes ec;
throw std::runtime_error("WE: Error updating calpontsys.systable:" + ec.errorString(error));
}
}
if (idbdatafile::IDBPolicy::useHdfs())
{
int rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
}
}
catch (exception& ex)
{
err += ex.what();
rc = 1;
}
catch (...)
{
err += "Unknown exception caught";
rc = 1;
}
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
return rc;
}
uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32, columnSize, dictSize, i;
uint8_t tmp8;
int txnID, colpos;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> columnSize;
// deserialize column Oid and dictionary oid
vector<uint32_t> coloids;
vector<uint32_t> dictoids;
for (i = 0; i < columnSize; ++i)
{
bs >> tmp32;
coloids.push_back(tmp32);
}
bs >> dictSize;
for (i = 0; i < dictSize; ++i)
{
bs >> tmp32;
dictoids.push_back(tmp32);
}
bool alterFlag = 0;
bs >> tmp8;
alterFlag = (tmp8 != 0);
bs >> tmp32;
colpos = tmp32;
bs >> tmp32;
uint16_t dbroot = tmp32;
ddlpackage::TableDef tableDef;
tableDef.unserialize(bs);
WriteEngine::ColTuple colTuple;
WriteEngine::ColStruct colStruct;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::ColTupleList colTuples;
WriteEngine::dictStr dctColTuples;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::ColValueList colValuesList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DictStrList dctnryValueList;
WriteEngine::RIDList ridList;
CalpontSystemCatalog::TableName tableName;
CalpontSystemCatalog::ROPair sysTableROPair;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
ColumnList columns;
ColumnList::const_iterator column_iterator;
DDLColumn column;
int error = 0;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
std::map<uint32_t, uint32_t> oids;
int rc1 = 0;
ColumnDef* colDefPtr = 0;
ColumnDefList::const_iterator iter;
int startPos = colpos;
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
getColumnsForTable(sessionID, tableName.schema, tableName.table, columns);
unsigned int numCols = columns.size();
// WriteEngine::ColTupleList colList[numCols];
// ColTupleList is NOT POD, so let's try this:
std::vector<WriteEngine::ColTupleList> colList;
// WriteEngine::dictStr dctColList[numCols];
std::vector<WriteEngine::dictStr> dctColList;
ColumnDefList tableDefCols = tableDef.fColumns;
ddlpackage::QualifiedName qualifiedName = *(tableDef.fQualifiedName);
iter = tableDefCols.begin();
// colpos = 0;
std::string tmpStr("");
for (unsigned int ii = 0; ii < numCols; ii++)
{
colList.push_back(WriteEngine::ColTupleList());
dctColList.push_back(WriteEngine::dictStr());
}
try
{
unsigned int col = 0;
unsigned int dictcol = 0;
while (iter != tableDefCols.end())
{
colDefPtr = *iter;
DictOID dictOID = {0, 0, 0, 0, 0};
int dataType;
dataType = convertDataType(colDefPtr->fType->fType);
if (dataType == CalpontSystemCatalog::DECIMAL || dataType == CalpontSystemCatalog::UDECIMAL)
{
if (colDefPtr->fType->fPrecision > 38) // precision cannot be over 38.
{
ostringstream os;
os << "Syntax error: The maximum precision (total number of digits) that can be specified is 38";
throw std::runtime_error(os.str());
}
else if (colDefPtr->fType->fPrecision < colDefPtr->fType->fScale)
{
ostringstream os;
os << "Syntax error: scale should be less than precision, precision: "
<< colDefPtr->fType->fPrecision << " scale: " << colDefPtr->fType->fScale;
throw std::runtime_error(os.str());
}
colDefPtr->convertDecimal();
}
bool hasDict = false;
if ((dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) ||
(dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::BLOB && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::TEXT && colDefPtr->fType->fLength > 7))
{
hasDict = true;
dictOID.compressionType = colDefPtr->fType->fCompressiontype;
dictOID.colWidth = colDefPtr->fType->fLength;
dictOID.dictOID = dictoids[dictcol];
dictcol++;
//@Bug 2534. Take away the limit of 255 and set the limit to 8000.
if ((colDefPtr->fType->fLength > 8000) && (dataType != CalpontSystemCatalog::BLOB) &&
(dataType != CalpontSystemCatalog::TEXT))
{
ostringstream os;
os << "char, varchar and varbinary length may not exceed 8000 bytes";
throw std::runtime_error(os.str());
}
}
else if ((dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB ||
dataType == CalpontSystemCatalog::TEXT) &&
colDefPtr->fType->fLength <= 7)
{
ostringstream os;
os << "varbinary and blob length may not be less than 8";
throw std::runtime_error(os.str());
}
unsigned int i = 0;
column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
boost::to_lower(column.tableColName.column);
if (SCHEMA_COL == column.tableColName.column)
{
colTuple.data = qualifiedName.fSchema;
tmpStr = qualifiedName.fSchema;
}
else if (TABLENAME_COL == column.tableColName.column)
{
colTuple.data = qualifiedName.fName;
tmpStr = qualifiedName.fName;
}
else if (COLNAME_COL == column.tableColName.column)
{
boost::to_lower(colDefPtr->fName);
colTuple.data = colDefPtr->fName;
tmpStr = colDefPtr->fName;
}
else if (OBJECTID_COL == column.tableColName.column)
{
if (alterFlag)
colTuple.data = coloids[col];
else
colTuple.data = coloids[col];
}
else if (DATATYPE_COL == column.tableColName.column)
{
colTuple.data = dataType;
}
else if (COLUMNLEN_COL == column.tableColName.column)
{
//@Bug 2089 Disallow zero length char and varch column to be created
if (dataType == CalpontSystemCatalog::CHAR || dataType == CalpontSystemCatalog::VARCHAR ||
dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB ||
dataType == CalpontSystemCatalog::TEXT)
{
if (colDefPtr->fType->fLength <= 0)
{
ostringstream os;
os << "char, varchar and varbinary length must be greater than zero";
throw std::runtime_error(os.str());
}
}
colTuple.data = colDefPtr->fType->fLength;
}
else if (COLUMNPOS_COL == column.tableColName.column)
{
colTuple.data = colpos;
}
else if (DEFAULTVAL_COL == column.tableColName.column)
{
if (colDefPtr->fDefaultValue)
{
colTuple.data = colDefPtr->fDefaultValue->fValue;
tmpStr = colDefPtr->fDefaultValue->fValue;
}
else
{
tmpStr = "";
// colTuple.data = column.colType.getNullValueForType();
}
}
else if (NULLABLE_COL == column.tableColName.column)
{
int nullable = 1;
ColumnConstraintList& colConstraints = colDefPtr->fConstraints;
ColumnConstraintList::const_iterator constraint_iter = colConstraints.begin();
while (constraint_iter != colConstraints.end())
{
ColumnConstraintDef* consDefPtr = *constraint_iter;
if (consDefPtr->fConstraintType == ddlpackage::DDL_NOT_NULL)
{
nullable = 0;
break;
}
++constraint_iter;
}
colTuple.data = nullable;
}
else if (SCALE_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fScale;
}
else if (PRECISION_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fPrecision;
}
else if (DICTOID_COL == column.tableColName.column)
{
if (hasDict)
{
colTuple.data = dictOID.dictOID;
}
else
{
colTuple.data = column.colType.getNullValueForType();
}
}
else if (LISTOBJID_COL == column.tableColName.column)
{
colTuple.data = column.colType.getNullValueForType();
}
else if (TREEOBJID_COL == column.tableColName.column)
{
colTuple.data = column.colType.getNullValueForType();
}
else if (MINVAL_COL == column.tableColName.column)
{
tmpStr = "";
}
else if (MAXVAL_COL == column.tableColName.column)
{
tmpStr = "";
}
else if (COMPRESSIONTYPE_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fCompressiontype;
}
else if (AUTOINC_COL == column.tableColName.column)
{
// cout << "autoincrement= " << colDefPtr->fType->fAutoincrement << endl;
colTuple.data = colDefPtr->fType->fAutoincrement;
}
else if (NEXTVALUE_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fNextvalue;
}
else
{
colTuple.data = column.colType.getNullValueForType();
}
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.tokenFlag = column.colType.colWidth > 8 ? true : false;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbroot;
dctnryStruct.fColDbRoot = dbroot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag) // TODO: XXX: this is copied aplenty. NEED TO REFACTOR.
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = column.oid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = column.oid;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
if (colpos == startPos)
{
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
}
colList[i].push_back(colTuple);
// colList.push_back(WriteEngine::ColTupleList());
// colList.back().push_back(colTuple);
dctColList[i].push_back(tmpStr);
// dctColList.push_back(WriteEngine::dictStr());
// dctColList.back().push_back(tmpStr);
++i;
++column_iterator;
}
++colpos;
col++;
++iter;
}
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
if (0 != colStructs.size())
{
for (unsigned int n = 0; n < numCols; n++)
{
colValuesList.push_back(colList[n]);
dctnryValueList.push_back(dctColList[n]);
}
// fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3);
error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList,
dctnryStructList, dctnryValueList, SYSCOLUMN_BASE);
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error != WriteEngine::NO_ERROR)
{
if (error == ERR_BRM_WR_VB_ENTRY)
{
throw std::runtime_error("writeSysColumnMetaData WE: Error writing to BRM.");
}
else
{
WErrorCodes ec;
throw std::runtime_error("WE: Error updating calpontsys.syscolumn. " + ec.errorString(error));
}
}
else
error = rc1;
}
}
catch (exception& ex)
{
err += ex.what();
rc = 1;
}
catch (...)
{
err += "Unknown exception caught";
rc = 1;
}
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
return rc;
}
uint8_t WE_DDLCommandProc::writeSyscolumn(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32, coloid, dictoid;
int txnID, startPos;
string schema, tablename;
uint8_t tmp8;
bool isAlter = false;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
bs >> coloid;
bs >> dictoid;
bs >> tmp8; // alterFlag
bs >> tmp32;
startPos = tmp32;
isAlter = (tmp8 != 0);
boost::scoped_ptr<ddlpackage::ColumnDef> colDefPtr(new ddlpackage::ColumnDef());
colDefPtr->unserialize(bs);
WriteEngine::ColStruct colStruct;
WriteEngine::ColTuple colTuple;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::ColTupleList colTuples;
WriteEngine::DctColTupleList dctColTuples;
WriteEngine::ColValueList colValuesList;
WriteEngine::RIDList ridList;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::dictStr dctnryTuple;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DictStrList dctnryValueList;
CalpontSystemCatalog::TableName tableName;
ColumnList columns;
ColumnList::const_iterator column_iterator;
DDLColumn column;
int error = 0;
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
getColumnsForTable(sessionID, tableName.schema, tableName.table, columns);
unsigned int numCols = columns.size();
// WriteEngine::ColTupleList colList[numCols];
// ColTupleList is NOT POD, so let's try this:
std::vector<WriteEngine::ColTupleList> colList;
// WriteEngine::dictStr dctColList[numCols];
std::vector<WriteEngine::dictStr> dctColList;
std::map<uint32_t, uint32_t> oids;
std::vector<BRM::OID_t> oidsToFlush;
// colpos = 0;
std::string tmpStr("");
for (unsigned int ii = 0; ii < numCols; ii++)
{
colList.push_back(WriteEngine::ColTupleList());
dctColList.push_back(WriteEngine::dictStr());
}
try
{
DictOID dictOID = {0, 0, 0, 0, 0};
int dataType = convertDataType(colDefPtr->fType->fType);
if (dataType == CalpontSystemCatalog::DECIMAL || dataType == CalpontSystemCatalog::UDECIMAL)
{
if (colDefPtr->fType->fPrecision > 38) //@Bug 5717 precision cannot be over 38.
{
ostringstream os;
os << "Syntax error: The maximum precision (total number of digits) that can be specified is 38";
throw std::runtime_error(os.str());
}
else if (colDefPtr->fType->fPrecision < colDefPtr->fType->fScale)
{
ostringstream os;
os << "Syntax error: scale should be less than precision, precision: " << colDefPtr->fType->fPrecision
<< " scale: " << colDefPtr->fType->fScale;
throw std::runtime_error(os.str());
}
colDefPtr->convertDecimal();
}
if (dictoid > 0)
{
dictOID.compressionType = colDefPtr->fType->fCompressiontype;
dictOID.colWidth = colDefPtr->fType->fLength;
dictOID.dictOID = dictoid;
//@Bug 2534. Take away the limit of 255 and set the limit to 8000.
if ((colDefPtr->fType->fLength > 8000) && (dataType != CalpontSystemCatalog::BLOB) &&
(dataType != CalpontSystemCatalog::TEXT))
{
ostringstream os;
os << "char, varchar and varbinary length may not exceed 8000 bytes";
throw std::runtime_error(os.str());
}
}
else if ((dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB ||
dataType == CalpontSystemCatalog::TEXT) &&
colDefPtr->fType->fLength <= 7)
{
ostringstream os;
os << "varbinary and blob length may not be less than 8";
throw std::runtime_error(os.str());
}
unsigned int i = 0;
uint16_t dbRoot;
BRM::OID_t sysOid = 1021;
// Find out where syscolumn is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
boost::to_lower(column.tableColName.column);
if (SCHEMA_COL == column.tableColName.column)
{
colTuple.data = schema;
tmpStr = schema;
}
else if (TABLENAME_COL == column.tableColName.column)
{
colTuple.data = tablename;
tmpStr = tablename;
}
else if (COLNAME_COL == column.tableColName.column)
{
boost::to_lower(colDefPtr->fName);
colTuple.data = colDefPtr->fName;
tmpStr = colDefPtr->fName;
}
else if (OBJECTID_COL == column.tableColName.column)
{
colTuple.data = coloid;
}
else if (DATATYPE_COL == column.tableColName.column)
{
colTuple.data = dataType;
}
else if (COLUMNLEN_COL == column.tableColName.column)
{
//@Bug 2089 Disallow zero length char and varch column to be created
if (dataType == CalpontSystemCatalog::CHAR || dataType == CalpontSystemCatalog::VARCHAR ||
dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB ||
dataType == CalpontSystemCatalog::TEXT)
{
if (colDefPtr->fType->fLength <= 0)
{
ostringstream os;
os << "char, varchar and varbinary length must be greater than zero";
throw std::runtime_error(os.str());
}
}
colTuple.data = colDefPtr->fType->fLength;
}
else if (COLUMNPOS_COL == column.tableColName.column)
{
colTuple.data = startPos;
}
else if (DEFAULTVAL_COL == column.tableColName.column)
{
if (colDefPtr->fDefaultValue)
{
colTuple.data = colDefPtr->fDefaultValue->fValue;
tmpStr = colDefPtr->fDefaultValue->fValue;
}
else
{
tmpStr = "";
// colTuple.data = column.colType.getNullValueForType();
}
}
else if (NULLABLE_COL == column.tableColName.column)
{
int nullable = 1;
ColumnConstraintList& colConstraints = colDefPtr->fConstraints;
ColumnConstraintList::const_iterator constraint_iter = colConstraints.begin();
while (constraint_iter != colConstraints.end())
{
ColumnConstraintDef* consDefPtr = *constraint_iter;
if (consDefPtr->fConstraintType == ddlpackage::DDL_NOT_NULL)
{
nullable = 0;
break;
}
++constraint_iter;
}
colTuple.data = nullable;
}
else if (SCALE_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fScale;
}
else if (PRECISION_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fPrecision;
}
else if (DICTOID_COL == column.tableColName.column)
{
if (dictoid > 0)
{
colTuple.data = dictOID.dictOID;
}
else
{
colTuple.data = column.colType.getNullValueForType();
}
}
else if (LISTOBJID_COL == column.tableColName.column)
{
colTuple.data = column.colType.getNullValueForType();
}
else if (TREEOBJID_COL == column.tableColName.column)
{
colTuple.data = column.colType.getNullValueForType();
}
else if (MINVAL_COL == column.tableColName.column)
{
tmpStr = "";
}
else if (MAXVAL_COL == column.tableColName.column)
{
tmpStr = "";
}
else if (COMPRESSIONTYPE_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fCompressiontype;
}
else if (AUTOINC_COL == column.tableColName.column)
{
// cout << "autoincrement= " << colDefPtr->fType->fAutoincrement << endl;
colTuple.data = colDefPtr->fType->fAutoincrement;
}
else if (NEXTVALUE_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fNextvalue;
}
else
{
colTuple.data = column.colType.getNullValueForType();
}
colStruct.dataOid = column.oid;
oids[column.oid] = column.oid;
oidsToFlush.push_back(column.oid);
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.fColDbRoot = dbRoot;
colStruct.tokenFlag = column.colType.colWidth > 8 ? true : false;
colStruct.colDataType = column.colType.colDataType;
dctnryStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = column.oid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = column.oid;
}
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
colList[i].push_back(colTuple);
// colList.push_back(WriteEngine::ColTupleList());
// colList.back().push_back(colTuple);
dctColList[i].push_back(tmpStr);
// dctColList.push_back(WriteEngine::dictStr());
// dctColList.back().push_back(tmpStr);
++i;
++column_iterator;
}
if (0 != colStructs.size())
{
// FIXME: Is there a cleaner way to do this? Isn't colValuesList the same as colList after this?
for (unsigned int n = 0; n < numCols; n++)
{
colValuesList.push_back(colList[n]);
dctnryValueList.push_back(dctColList[n]);
}
// fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
int rc1 = 0;
error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList,
dctnryStructList, dctnryValueList, SYSCOLUMN_BASE);
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error != WriteEngine::NO_ERROR)
{
if (error == ERR_BRM_WR_VB_ENTRY)
{
throw std::runtime_error("writeSysColumnMetaData WE: Error writing to BRM.");
}
else
{
WErrorCodes ec;
throw std::runtime_error("WE: Error updating calpontsys.syscolumn. " + ec.errorString(error));
}
}
else
error = rc1;
}
}
catch (exception& ex)
{
err += ex.what();
rc = 1;
}
catch (...)
{
err += "Unknown exception caught";
rc = 1;
}
purgeFDCache();
if (isAlter)
{
if (idbdatafile::IDBPolicy::useHdfs())
cacheutils::flushOIDsFromCache(oidsToFlush);
}
return rc;
}
uint8_t WE_DDLCommandProc::createtablefiles(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t size, i;
uint16_t tmp16;
uint32_t tmp32;
uint8_t tmp8;
OID dataOid;
int colWidth;
bool tokenFlag;
int txnID;
CalpontSystemCatalog::ColDataType colDataType;
uint16_t colDbRoot;
int compressionType;
bs >> tmp32;
txnID = tmp32;
bs >> size;
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
std::map<uint32_t, uint32_t> oids;
for (i = 0; i < size; ++i)
{
bs >> tmp32;
dataOid = tmp32;
bs >> tmp8;
colDataType = (CalpontSystemCatalog::ColDataType)tmp8;
bs >> tmp8;
tokenFlag = (tmp8 != 0);
bs >> tmp32;
colWidth = tmp32;
bs >> tmp16;
colDbRoot = tmp16;
bs >> tmp32;
compressionType = tmp32;
oids[dataOid] = dataOid;
if (tokenFlag)
{
rc = fWEWrapper.createDctnry(0, dataOid, colWidth, colDbRoot, 0, 0, compressionType);
}
else
{
rc = fWEWrapper.createColumn(0, dataOid, colDataType, colWidth, colDbRoot, 0, compressionType);
}
if (rc != 0)
break;
}
// cout << "creating column file got error code " << rc << endl;
if (rc != 0)
{
WErrorCodes ec;
ostringstream oss;
oss << "WE: Error creating column file for oid " << dataOid << "; " << ec.errorString(rc) << endl;
err = oss.str();
}
// if (idbdatafile::IDBPolicy::useHdfs())
fWEWrapper.flushDataFiles(rc, txnID, oids);
purgeFDCache();
return rc;
}
uint8_t WE_DDLCommandProc::commitVersion(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tmp32;
int txnID;
bs >> tmp32;
txnID = tmp32;
rc = fWEWrapper.commit(txnID);
if (rc != 0)
{
WErrorCodes ec;
ostringstream oss;
oss << "WE: Error commiting transaction; " << txnID << ec.errorString(rc) << endl;
err = oss.str();
}
return rc;
}
uint8_t WE_DDLCommandProc::rollbackBlocks(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
rc = fWEWrapper.rollbackBlocks(txnID, sessionID);
if (rc != 0)
{
WErrorCodes ec;
ostringstream oss;
oss << "WE: Error rolling back files " << txnID << " for session " << sessionID << "; "
<< ec.errorString(rc) << endl;
err = oss.str();
}
std::map<uint32_t, uint32_t> oids;
if (idbdatafile::IDBPolicy::useHdfs())
fWEWrapper.flushDataFiles(rc, txnID, oids);
purgeFDCache();
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
return rc;
}
uint8_t WE_DDLCommandProc::rollbackVersion(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
rc = fWEWrapper.rollbackVersion(txnID, sessionID);
if (rc != 0)
{
WErrorCodes ec;
ostringstream oss;
oss << "WE: Error rolling back transaction " << txnID << " for session " << sessionID << "; "
<< ec.errorString(rc) << endl;
err = oss.str();
}
purgeFDCache();
return rc;
}
uint8_t WE_DDLCommandProc::deleteSyscolumn(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
int txnID;
string schema, tablename;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
ddlpackage::QualifiedName sysCatalogTableName;
sysCatalogTableName.fSchema = CALPONT_SCHEMA;
sysCatalogTableName.fName = SYSCOLUMN_TABLE;
CalpontSystemCatalog::TableName userTableName;
userTableName.schema = schema;
userTableName.table = tablename;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
uint16_t dbRoot;
BRM::OID_t sysOid = 1021;
// Find out where syscolumn is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
fWEWrapper.setTransId(txnID);
fWEWrapper.startTransaction(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
try
{
CalpontSystemCatalog::RIDList colRidList = systemCatalogPtr->columnRIDs(userTableName);
WriteEngine::ColStruct colStruct;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<void*> colValuesList;
WriteEngine::RIDList ridList;
std::vector<WriteEngine::RIDList> ridLists;
DDLColumn column;
CalpontSystemCatalog::RIDList::const_iterator colrid_iterator = colRidList.begin();
while (colrid_iterator != colRidList.end())
{
WriteEngine::RID rid = (*colrid_iterator).rid;
ridList.push_back(rid);
++colrid_iterator;
}
ColumnList columns;
getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);
ColumnList::const_iterator column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
colStructs.push_back(colStruct);
cscColTypeList.push_back(column.colType);
++column_iterator;
}
colExtentsStruct.push_back(colStructs);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(ridList);
if (0 != colStructs.size() && 0 != ridLists[0].size())
{
int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
SYSCOLUMN_BASE);
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error == NO_ERROR)
rc = rc1;
else
rc = error;
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
catch (...)
{
err = "Unknown exception caught";
rc = 1;
}
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::deleteSyscolumnRow(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
;
int txnID;
string schema, tablename, columnname;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
bs >> columnname;
ddlpackage::QualifiedName sysCatalogTableName;
sysCatalogTableName.fSchema = CALPONT_SCHEMA;
sysCatalogTableName.fName = SYSCOLUMN_TABLE;
CalpontSystemCatalog::TableColName tableColName;
tableColName.schema = schema;
tableColName.table = tablename;
tableColName.column = columnname;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
uint16_t dbRoot;
BRM::OID_t sysOid = 1021;
// Find out where syscolumn is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
try
{
CalpontSystemCatalog::ROPair colRO = systemCatalogPtr->columnRID(tableColName);
if (colRO.objnum < 0)
{
err = "Column not found:" + tableColName.table + "." + tableColName.column;
throw std::runtime_error(err);
}
WriteEngine::ColStruct colStruct;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<void*> colValuesList;
WriteEngine::RIDList ridList;
std::vector<WriteEngine::RIDList> ridLists;
DDLColumn column;
ridList.push_back(colRO.rid);
ColumnList columns;
getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);
ColumnList::const_iterator column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
colStructs.push_back(colStruct);
cscColTypeList.push_back(column.colType);
++column_iterator;
}
colExtentsStruct.push_back(colStructs);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(ridList);
if (0 != colStructs.size() && 0 != ridLists[0].size())
{
int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
SYSCOLUMN_BASE);
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error == NO_ERROR)
rc = rc1;
else
rc = error;
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
catch (...)
{
err = "Unknown exception caught";
rc = 1;
}
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::deleteSystable(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
;
int txnID;
string schema, tablename;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
WriteEngine::WriteEngineWrapper writeEngine;
ddlpackage::QualifiedName sysCatalogTableName;
sysCatalogTableName.fSchema = CALPONT_SCHEMA;
sysCatalogTableName.fName = SYSTABLE_TABLE;
CalpontSystemCatalog::TableName userTableName;
userTableName.schema = schema;
userTableName.table = tablename;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
uint16_t dbRoot;
BRM::OID_t sysOid = 1001;
// Find out where systcolumn is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
fWEWrapper.setTransId(txnID);
fWEWrapper.startTransaction(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
try
{
CalpontSystemCatalog::ROPair userTableROPair = systemCatalogPtr->tableRID(userTableName);
if (userTableROPair.rid == std::numeric_limits<WriteEngine::RID>::max())
{
err = "RowID is not valid ";
throw std::runtime_error(err);
}
WriteEngine::ColStruct colStruct;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<void*> colValuesList;
WriteEngine::RIDList ridList;
std::vector<WriteEngine::RIDList> ridLists;
DDLColumn column;
ridList.push_back(userTableROPair.rid);
ColumnList columns;
getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);
ColumnList::const_iterator column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
colStructs.push_back(colStruct);
cscColTypeList.push_back(column.colType);
++column_iterator;
}
colExtentsStruct.push_back(colStructs);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(ridList);
if (0 != colStructs.size() && 0 != ridLists[0].size())
{
int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
SYSCOLUMN_BASE);
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error == NO_ERROR)
rc = rc1;
else
rc = error;
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
catch (...)
{
err = "Unknown exception caught";
rc = 1;
}
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::deleteSystables(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
;
int txnID;
string schema, tablename;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
WriteEngine::WriteEngineWrapper writeEngine;
ddlpackage::QualifiedName sysCatalogTableName;
sysCatalogTableName.fSchema = CALPONT_SCHEMA;
sysCatalogTableName.fName = SYSTABLE_TABLE;
CalpontSystemCatalog::TableName userTableName;
userTableName.schema = schema;
userTableName.table = tablename;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
WriteEngine::ColStruct colStruct;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<void*> colValuesList;
WriteEngine::RIDList ridList;
std::vector<WriteEngine::RIDList> ridLists;
DDLColumn column;
uint16_t dbRoot;
BRM::OID_t sysOid = 1003;
// Find out where systable is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
try
{
CalpontSystemCatalog::ROPair userTableROPair = systemCatalogPtr->tableRID(userTableName);
if (userTableROPair.rid == std::numeric_limits<WriteEngine::RID>::max())
{
err = "RowID is not valid ";
throw std::runtime_error(err);
}
ridList.push_back(userTableROPair.rid);
ColumnList columns;
getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);
ColumnList::const_iterator column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
colStructs.push_back(colStruct);
cscColTypeList.push_back(column.colType);
++column_iterator;
}
colExtentsStruct.push_back(colStructs);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(ridList);
{
int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
SYSCOLUMN_BASE);
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error == NO_ERROR)
rc = rc1;
else
rc = error;
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
catch (...)
{
err = "Unknown exception caught";
rc = 1;
}
if (rc != 0)
return rc;
// deleting from SYSCOLUMN
sysCatalogTableName.fSchema = CALPONT_SCHEMA;
sysCatalogTableName.fName = SYSCOLUMN_TABLE;
sysOid = 1021;
// Find out where syscolumn is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
try
{
CalpontSystemCatalog::RIDList colRidList = systemCatalogPtr->columnRIDs(userTableName);
colStructs.clear();
cscColTypeList.clear();
colExtentsStruct.clear();
colExtentsColType.clear();
colValuesList.clear();
ridList.clear();
ridLists.clear();
oids.clear();
DDLColumn column;
CalpontSystemCatalog::RIDList::const_iterator colrid_iterator = colRidList.begin();
while (colrid_iterator != colRidList.end())
{
WriteEngine::RID rid = (*colrid_iterator).rid;
ridList.push_back(rid);
++colrid_iterator;
}
ColumnList columns;
getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);
ColumnList::const_iterator column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column.colType);
// oidsToFlush.push_back(colStruct.dataOid);
++column_iterator;
}
colExtentsStruct.push_back(colStructs);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(ridList);
if (0 != colStructs.size() && 0 != ridLists[0].size())
{
int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
SYSCOLUMN_BASE);
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error == NO_ERROR)
rc = rc1;
else
rc = error;
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
catch (...)
{
err = "Unknown exception caught";
rc = 1;
}
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::dropFiles(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t size, i;
uint32_t tmp32;
std::vector<int32_t> dataOids;
bs >> size;
for (i = 0; i < size; ++i)
{
bs >> tmp32;
dataOids.push_back(tmp32);
}
try
{
rc = fWEWrapper.dropFiles(0, dataOids);
}
catch (...)
{
err = "WE: Error removing files ";
rc = 1;
}
purgeFDCache();
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnAuto(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, tablename;
int txnID;
uint8_t tmp8;
bool autoIncrement = false;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
bs >> tmp8;
autoIncrement = true;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = tablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::RIDList roList;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
roList = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Build colStructs for SYSTABLE
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
DDLColumn column;
WriteEngine::ColTuple colTuple;
findColumnData(sessionID, tableName, AUTOINC_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.colDataType = column.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
string s1("y"), s2("n");
boost::any datavalue1 = s1;
boost::any datavalue2 = s2;
if (autoIncrement)
colTuple.data = datavalue1;
else
colTuple.data = datavalue2;
colStruct.colDataType = column.colType.colDataType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
for (unsigned int i = 0; i < roList.size(); i++)
{
aColList.push_back(colTuple);
}
colValuesList.push_back(aColList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
CalpontSystemCatalog::OID oid = 1021;
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.setTransId(txnID);
for (unsigned int i = 0; i < roList.size(); i++)
{
convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
if (extentsinfo.empty())
extentsinfo.push_back(aExtentinfo);
else if (extentsinfo.back() != aExtentinfo)
extentsinfo.push_back(aExtentinfo);
ridList.push_back(roList[i].rid);
}
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
if (idbdatafile::IDBPolicy::useHdfs())
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnNextvalCol(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, tablename;
int txnID;
uint8_t tmp8;
bool autoIncrement = false;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
bs >> tmp8;
autoIncrement = true;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = tablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::RIDList roList;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
roList = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Build colStructs for SYSTABLE
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
DDLColumn column;
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::ColTuple colTuple;
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
boost::any datavalue;
findColumnData(sessionID, tableName, AUTOINC_COL, column);
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.colDataType = column.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
string ystr("y");
string nstr("n");
if (autoIncrement)
colTuple.data = ystr;
else
colTuple.data = nstr;
colStruct.colDataType = column.colType.colDataType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
for (unsigned int i = 0; i < roList.size(); i++)
{
aColList.push_back(colTuple);
}
colValuesList.push_back(aColList);
// get start dbroot for this PM.
// int PMNum = Config::getLocalModuleID();
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
// oam.getDbroots(PMNum);
// dbRoot will be the first dbroot on this pm. dbrootCnt will be how many dbroots on this PM.
CalpontSystemCatalog::OID oid = 1021;
for (unsigned int i = 0; i < roList.size(); i++)
{
convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
if (extentsinfo.empty())
extentsinfo.push_back(aExtentinfo);
else if (extentsinfo.back() != aExtentinfo)
extentsinfo.push_back(aExtentinfo);
ridList.push_back(roList[i].rid);
}
std::vector<WriteEngine::RIDList> ridLists;
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
ridLists.push_back(ridList);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
fWEWrapper.setTransId(txnID);
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
}
if (idbdatafile::IDBPolicy::useHdfs())
{
fWEWrapper.flushDataFiles(rc, txnID, oids);
fWEWrapper.confirmTransaction(txnID);
if (rc == 0)
fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
systemCatalogPtr->flushCache();
purgeFDCache();
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, oldTablename, newTablename;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> oldTablename;
bs >> newTablename;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = oldTablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
WriteEngine::ColTuple colTuple;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::RIDList roList;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
roList = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Build colStructs for SYSTABLE
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
DDLColumn column;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::DctnryTuple dictTuple;
dictTuple.isNull = false;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) ||
(column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) ||
(column.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
column.colType.precision > 18)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = column.colType.colWidth;
}
colStruct.colDataType = column.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
for (unsigned int i = 0; i < roList.size(); i++)
{
aColList.push_back(colTuple);
}
colValuesList.push_back(aColList);
// It's the same string for each column, so we just need one dictionary struct
void* dictTuplePtr = static_cast<void*>(&dictTuple);
memset(dictTuplePtr, 0, sizeof(dictTuple));
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
dctColList = dictTuple;
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
CalpontSystemCatalog::OID oid = 1021;
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
for (unsigned int i = 0; i < roList.size(); i++)
{
convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
if (extentsinfo.empty())
extentsinfo.push_back(aExtentinfo);
else if (extentsinfo.back() != aExtentinfo)
extentsinfo.push_back(aExtentinfo);
ridList.push_back(roList[i].rid);
}
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
// cout << "rename:syscolumn is updated" << endl;
return rc;
}
uint8_t WE_DDLCommandProc::updateSystableAuto(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32, autoVal;
std::string schema, tablename;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
bs >> autoVal;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = tablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::ROPair ropair;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
ropair = systemCatalogPtr->tableRID(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
if (ropair.objnum < 0)
{
err = "No such table: " + tableName.table;
rc = 1;
return rc;
}
// now we have to prepare the various structures for the WE to update the column.
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
boost::any datavalue;
datavalue = autoVal;
WriteEngine::ColTuple colTuple;
// Build colStructs for SYSTABLE
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSTABLE_TABLE;
DDLColumn column;
findColumnData(sessionID, tableName, AUTOINC_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.colDataType = column.colType.colDataType;
colTuple.data = datavalue;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
colStructs.push_back(colStruct);
cscColTypeList.push_back(column.colType);
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
dctnryStructList.push_back(dctnryStruct);
aColList.push_back(colTuple);
colValuesList.push_back(aColList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
WriteEngine::DctnryTuple dctnryTuple;
dctColList = dctnryTuple;
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// In this case, there's only 1 row, so only one one extent, but keep it generic...
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
CalpontSystemCatalog::OID oid = 1003;
convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid);
ridList.push_back(ropair.rid);
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
extentsinfo.push_back(aExtentinfo);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
colExtentsColType.push_back(cscColTypeList);
dctnryExtentsStruct.push_back(dctnryStructList);
}
// call the write engine to update the row
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, oldTablename, newTablename;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> oldTablename;
bs >> newTablename;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = oldTablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::ROPair ropair;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
ropair = systemCatalogPtr->tableRID(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
if (ropair.objnum < 0)
{
err = "No such table: " + tableName.table;
return 1;
}
// now we have to prepare the various structures for the WE to update the column.
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
boost::any datavalue;
datavalue = newTablename;
WriteEngine::ColTuple colTuple;
// Build colStructs for SYSTABLE
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSTABLE_TABLE;
DDLColumn column;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = true;
colStruct.colDataType = column.colType.colDataType;
// Tokenize the data value
WriteEngine::DctnryStruct dictStruct;
dictStruct.dctnryOid = column.colType.ddn.dictOID;
dictStruct.columnOid = column.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
dictTuple.isNull = false;
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
aColList.push_back(colTuple);
colValuesList.push_back(aColList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
dctColList = dictTuple;
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// In this case, there's only 1 row, so only one one extent, but keep it generic...
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
CalpontSystemCatalog::OID oid = 1003;
convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid);
ridList.push_back(ropair.rid);
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
extentsinfo.push_back(aExtentinfo);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
if (rc != 0)
return rc;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
// cout << "rename:syscolumn is updated" << endl;
return rc;
}
uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, oldTablename, newTablename;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> oldTablename;
bs >> newTablename;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = oldTablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::ROPair ropair;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
//@bug 4592 Error handling for syscat call
try
{
ropair = systemCatalogPtr->tableRID(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
if (ropair.objnum < 0)
{
err = "No such table: " + tableName.table;
return 1;
}
// now we have to prepare the various structures for the WE to update the column.
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
boost::any datavalue;
datavalue = newTablename;
WriteEngine::ColTuple colTuple;
// Build colStructs for SYSTABLE
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSTABLE_TABLE;
DDLColumn column;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = true;
colStruct.colDataType = column.colType.colDataType;
// Tokenize the data value
WriteEngine::DctnryStruct dictStruct;
dictStruct.dctnryOid = column.colType.ddn.dictOID;
dictStruct.columnOid = column.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
dictTuple.isNull = false;
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
// int error = NO_ERROR;
// if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple)))
//{
// WErrorCodes ec;
// throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
//}
// WriteEngine::Token aToken = dictTuple.token;
// colTuple.data = aToken;
// cout << "token value for new table name is op:fbo = " << aToken.op <<":" << aToken.fbo << " null flag = "
// << (uint32_t)dictTuple.isNull<< endl;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
aColList.push_back(colTuple);
colValuesList.push_back(aColList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
dctColList = dictTuple;
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// In this case, there's only 1 row, so only one one extent, but keep it generic...
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
CalpontSystemCatalog::OID oid = 1003;
convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid);
ridList.push_back(ropair.rid);
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
extentsinfo.push_back(aExtentinfo);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
if (rc != 0)
return rc;
}
// cout << "rename:systable is updated to " << newTablename << " for rid " << ropair.rid << endl;
// Update SYSCOLUMN table
tableName.schema = schema;
tableName.table = oldTablename;
dctnryStructList.clear();
dctnryValueList.clear();
dctRowList.clear();
CalpontSystemCatalog::RIDList roList;
try
{
roList = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Build colStructs for SYSCOLUMN
ridList.clear();
colValuesList.clear();
aColList.clear();
colStructs.clear();
cscColTypeList.clear();
colOldValuesList.clear();
oids.clear();
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) ||
(column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) ||
(column.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
column.colType.precision > 18)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = column.colType.colWidth;
}
colStruct.colDataType = column.colType.colDataType;
// Tokenize the data value
dictStruct.dctnryOid = column.colType.ddn.dictOID;
dictStruct.columnOid = column.colType.columnOID;
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
/*
if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple)))
{
WErrorCodes ec;
throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
}
aToken = dictTuple.token;
colTuple.data = aToken; */
colStruct.colDataType = column.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
for (unsigned int i = 0; i < roList.size(); i++)
{
aColList.push_back(colTuple);
}
colValuesList.push_back(aColList);
// It's the same string for each column, so we just need one dictionary struct
void* dictTuplePtr = static_cast<void*>(&dictTuple);
memset(dictTuplePtr, 0, sizeof(dictTuple));
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
dctColList = dictTuple;
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
extentsinfo.clear();
colExtentsStruct.clear();
colExtentsColType.clear();
dctnryExtentsStruct.clear();
oid = 1021;
for (unsigned int i = 0; i < roList.size(); i++)
{
convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
if (extentsinfo.empty())
extentsinfo.push_back(aExtentinfo);
else if (extentsinfo.back() != aExtentinfo)
extentsinfo.push_back(aExtentinfo);
ridList.push_back(roList[i].rid);
}
ridLists.clear();
ridLists.push_back(ridList);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
// cout << "rename:syscolumn is updated" << endl;
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnColumnposCol(messageqcpp::ByteStream& bs, std::string& err)
{
int rc = 0;
int colPos;
string schema, atableName;
uint32_t sessionID, tmp32;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> atableName;
bs >> tmp32;
colPos = tmp32;
WriteEngine::RIDList ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColValueList colOldValuesList;
CalpontSystemCatalog::TableName tableName;
tableName.table = atableName;
tableName.schema = schema;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::RIDList rids;
try
{
rids = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
CalpontSystemCatalog::RIDList::const_iterator rid_iter = rids.begin();
boost::any value;
WriteEngine::ColTupleList colTuples;
CalpontSystemCatalog::ColType columnType;
CalpontSystemCatalog::ROPair colRO;
// cout << "colpos is " << colPos << endl;
try
{
while (rid_iter != rids.end())
{
// look up colType
colRO = *rid_iter;
columnType = systemCatalogPtr->colType(colRO.objnum);
if (columnType.colPosition < colPos)
{
++rid_iter;
continue;
}
ridList.push_back(colRO.rid);
value = columnType.colPosition - 1;
WriteEngine::ColTuple colTuple;
colTuple.data = value;
colTuples.push_back(colTuple);
++rid_iter;
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
colValuesList.push_back(colTuples);
uint16_t dbRoot;
BRM::OID_t sysOid = 1021;
// Find out where systable is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
if (colTuples.size() > 0)
{
WriteEngine::ColStructList colStructs;
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::CSCTypesList cscColTypeList;
CalpontSystemCatalog::ColType colType;
// Build column structure for COLUMNPOS_COL
colType.columnOID = colStruct.dataOid = OID_SYSCOLUMN_COLUMNPOS;
colType.colWidth = colStruct.colWidth = 4;
colStruct.tokenFlag = false;
colType.colDataType = colStruct.colDataType = CalpontSystemCatalog::INT;
colStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
colStructs.push_back(colStruct);
cscColTypeList.push_back(colType);
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
rc = fWEWrapper.updateColumnRecs(txnID, cscColTypeList, colStructs, colValuesList, ridList,
SYSCOLUMN_BASE);
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::fillNewColumn(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tmp32;
uint8_t tmp8;
int txnID;
OID dataOid, dictOid, refColOID;
CalpontSystemCatalog::ColDataType dataType, refColDataType;
bool autoincrement;
int dataWidth, scale, precision, compressionType, refColWidth, refCompressionType;
string defaultValStr;
ColTuple defaultVal;
long timeZone;
bs >> tmp32;
txnID = tmp32;
bs >> tmp32;
dataOid = tmp32;
bs >> tmp32;
dictOid = tmp32;
bs >> tmp8;
dataType = (CalpontSystemCatalog::ColDataType)tmp8;
bs >> tmp8;
autoincrement = (tmp8 != 0);
bs >> tmp32;
dataWidth = tmp32;
bs >> tmp32;
scale = tmp32;
bs >> tmp32;
precision = tmp32;
bs >> defaultValStr;
bs >> tmp8;
compressionType = tmp8;
bs >> tmp32;
refColOID = tmp32;
bs >> tmp8;
refColDataType = (CalpontSystemCatalog::ColDataType)tmp8;
bs >> tmp32;
refColWidth = tmp32;
bs >> tmp8;
refCompressionType = tmp8;
messageqcpp::ByteStream::octbyte timeZoneTemp;
bs >> timeZoneTemp;
timeZone = timeZoneTemp;
// Find the fill in value
bool isNULL = false;
if (defaultValStr == "")
isNULL = true;
CalpontSystemCatalog::ColType colType;
colType.colDataType = static_cast<CalpontSystemCatalog::ColDataType>(dataType);
colType.colWidth = dataWidth;
colType.scale = scale;
colType.precision = precision;
bool pushWarning = false;
defaultVal.data = colType.convertColumnData(defaultValStr, pushWarning, timeZone, isNULL, false, false);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
std::map<uint32_t, uint32_t> oids;
oids[dataOid] = dataOid;
oids[refColOID] = refColOID;
rc = fWEWrapper.fillColumn(txnID, dataOid, colType, defaultVal, refColOID, refColDataType, refColWidth,
refCompressionType, isNULL, compressionType, defaultValStr, dictOid,
autoincrement);
if (rc != 0)
{
WErrorCodes ec;
err = ec.errorString(rc);
}
purgeFDCache();
return rc;
}
uint8_t WE_DDLCommandProc::writeTruncateLog(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tableOid, numOid, tmp32;
bs >> tableOid;
bs >> numOid;
std::vector<uint32_t> oids;
for (uint32_t i = 0; i < numOid; i++)
{
bs >> tmp32;
oids.push_back(tmp32);
}
string prefix;
config::Config* config = config::Config::makeConfig();
prefix = config->getConfig("SystemConfig", "DBRMRoot");
if (prefix.length() == 0)
{
err = "Need a valid DBRMRoot entry in Calpont configuation file";
rc = 1;
return rc;
}
uint64_t pos = prefix.find_last_of("/");
std::string DDLLogFileName;
if (pos != string::npos)
{
DDLLogFileName = prefix.substr(0, pos + 1); // Get the file path
}
else
{
err = "Cannot find the dbrm directory for the DDL log file";
rc = 1;
return rc;
}
std::ostringstream oss;
oss << tableOid;
DDLLogFileName += "DDL_TRUNCATETABLE_Log_" + oss.str();
boost::scoped_ptr<idbdatafile::IDBDataFile> DDLLogFile(IDBDataFile::open(
IDBPolicy::getType(DDLLogFileName.c_str(), IDBPolicy::WRITEENG), DDLLogFileName.c_str(), "w", 0));
if (!DDLLogFile)
{
err = "DDL truncate table log file cannot be created";
rc = 1;
return rc;
}
std::ostringstream buf;
for (unsigned i = 0; i < oids.size(); i++)
buf << oids[i] << std::endl;
std::string tmp(buf.str());
DDLLogFile->write(tmp.c_str(), tmp.size());
// DDLLogFile is a scoped_ptr, will be closed after return.
return rc;
}
uint8_t WE_DDLCommandProc::writeDropPartitionLog(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tableOid, numParts, numOid, tmp32;
bs >> tableOid;
std::set<BRM::LogicalPartition> partitionNums;
bs >> numParts;
BRM::LogicalPartition lp;
for (uint32_t i = 0; i < numParts; i++)
{
lp.unserialize(bs);
partitionNums.insert(lp);
}
bs >> numOid;
std::vector<uint32_t> oids;
for (uint32_t i = 0; i < numOid; i++)
{
bs >> tmp32;
oids.push_back(tmp32);
}
string prefix;
config::Config* config = config::Config::makeConfig();
prefix = config->getConfig("SystemConfig", "DBRMRoot");
if (prefix.length() == 0)
{
err = "Need a valid DBRMRoot entry in Calpont configuation file";
rc = 1;
return rc;
}
uint64_t pos = prefix.find_last_of("/");
std::string DDLLogFileName;
if (pos != string::npos)
{
DDLLogFileName = prefix.substr(0, pos + 1); // Get the file path
}
else
{
err = "Cannot find the dbrm directory for the DDL drop partitions log file";
rc = 1;
return rc;
}
std::ostringstream oss;
oss << tableOid;
DDLLogFileName += "DDL_DROPPARTITION_Log_" + oss.str();
boost::scoped_ptr<idbdatafile::IDBDataFile> DDLLogFile(IDBDataFile::open(
IDBPolicy::getType(DDLLogFileName.c_str(), IDBPolicy::WRITEENG), DDLLogFileName.c_str(), "w", 0));
if (!DDLLogFile)
{
err = "DDL drop partitions log file cannot be created";
rc = 1;
return rc;
}
std::ostringstream buf;
// @SN write partition numbers to the log file, separated by space
set<BRM::LogicalPartition>::const_iterator it;
for (it = partitionNums.begin(); it != partitionNums.end(); ++it)
buf << (*it) << endl;
// -1 indicates the end of partition list
BRM::LogicalPartition end(-1, -1, -1);
buf << end << endl;
for (unsigned i = 0; i < oids.size(); i++)
buf << oids[i] << std::endl;
std::string tmp(buf.str());
DDLLogFile->write(tmp.c_str(), tmp.size());
return rc;
}
uint8_t WE_DDLCommandProc::writeDropTableLog(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tableOid, numOid, tmp32;
bs >> tableOid;
bs >> numOid;
std::vector<uint32_t> oids;
for (uint32_t i = 0; i < numOid; i++)
{
bs >> tmp32;
oids.push_back(tmp32);
}
string prefix;
config::Config* config = config::Config::makeConfig();
prefix = config->getConfig("SystemConfig", "DBRMRoot");
if (prefix.length() == 0)
{
err = "Need a valid DBRMRoot entry in Calpont configuation file";
rc = 1;
return rc;
}
uint64_t pos = prefix.find_last_of("/");
std::string DDLLogFileName;
if (pos != string::npos)
{
DDLLogFileName = prefix.substr(0, pos + 1); // Get the file path
}
else
{
err = "Cannot find the dbrm directory for the DDL drop partitions log file";
rc = 1;
return rc;
}
std::ostringstream oss;
oss << tableOid;
DDLLogFileName += "DDL_DROPTABLE_Log_" + oss.str();
boost::scoped_ptr<idbdatafile::IDBDataFile> DDLLogFile(IDBDataFile::open(
IDBPolicy::getType(DDLLogFileName.c_str(), IDBPolicy::WRITEENG), DDLLogFileName.c_str(), "w", 0));
if (!DDLLogFile)
{
err = "DDL drop table log file cannot be created";
rc = 1;
return rc;
}
std::ostringstream buf;
for (unsigned i = 0; i < oids.size(); i++)
buf << oids[i] << std::endl;
std::string tmp(buf.str());
DDLLogFile->write(tmp.c_str(), tmp.size());
return rc;
}
uint8_t WE_DDLCommandProc::deleteDDLLog(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tableOid, fileType;
bs >> fileType;
bs >> tableOid;
string prefix;
config::Config* config = config::Config::makeConfig();
prefix = config->getConfig("SystemConfig", "DBRMRoot");
if (prefix.length() == 0)
{
err = "Need a valid DBRMRoot entry in Calpont configuation file";
rc = 1;
return rc;
}
uint64_t pos = prefix.find_last_of("/");
std::string DDLLogFileName;
if (pos != string::npos)
{
DDLLogFileName = prefix.substr(0, pos + 1); // Get the file path
}
else
{
err = "Cannot find the dbrm directory for the DDL drop partitions log file";
rc = 1;
return rc;
}
std::ostringstream oss;
oss << tableOid;
switch (fileType)
{
case DROPTABLE_LOG:
{
DDLLogFileName += "DDL_DROPTABLE_Log_" + oss.str();
break;
}
case DROPPART_LOG:
{
DDLLogFileName += "DDL_DROPPARTITION_Log_" + oss.str();
break;
}
case TRUNCATE_LOG:
{
DDLLogFileName += "DDL_TRUNCATETABLE_Log_" + oss.str();
break;
}
default: break;
}
IDBPolicy::remove(DDLLogFileName.c_str());
return rc;
}
uint8_t WE_DDLCommandProc::fetchDDLLog(ByteStream& bs, std::string& err)
{
int rc = 0;
// Find the ddl log files under DBRMRoot directory
string prefix, ddlLogDir;
config::Config* config = config::Config::makeConfig();
prefix = config->getConfig("SystemConfig", "DBRMRoot");
if (prefix.length() == 0)
{
rc = 1;
err = "Need a valid DBRMRoot entry in Calpont configuation file";
return rc;
}
uint64_t pos = prefix.find_last_of("/");
if (pos != string::npos)
{
ddlLogDir = prefix.substr(0, pos + 1); // Get the file path
}
else
{
rc = 1;
err = "Cannot find the dbrm directory for the DDL log file";
return rc;
}
boost::filesystem::path filePath;
filePath = fs::system_complete(fs::path(ddlLogDir));
if (!fs::exists(filePath))
{
rc = 1;
err = "\nDDL log file path is Not found: ";
return rc;
}
std::vector<string> fileNames;
if (fs::is_directory(filePath))
{
fs::directory_iterator end_iter;
for (fs::directory_iterator dir_itr(filePath); dir_itr != end_iter; ++dir_itr)
{
try
{
if (!fs::is_directory(*dir_itr))
{
#if BOOST_VERSION >= 105200
fileNames.push_back(dir_itr->path().generic_string());
#else
fileNames.push_back(dir_itr->string());
#endif
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
}
CalpontSystemCatalog::OID fileoid;
string tableName;
bs.restart();
for (unsigned i = 0; i < fileNames.size(); i++)
{
pos = fileNames[i].find("DDL_DROPTABLE_Log_");
if (pos != string::npos)
{
// Read the file to get oids
// cout << "Found file " << fileNames[i] << endl;
boost::scoped_ptr<idbdatafile::IDBDataFile> ddlLogFile(IDBDataFile::open(
IDBPolicy::getType(fileNames[i].c_str(), IDBPolicy::WRITEENG), fileNames[i].c_str(), "r", 0));
if (!ddlLogFile)
continue;
// find the table oid
pos = fileNames[i].find_last_of("_");
string tableOidStr = fileNames[i].substr(pos + 1, fileNames[i].length() - pos - 1);
char* ep = NULL;
uint32_t tableOid = strtoll(tableOidStr.c_str(), &ep, 10);
bs << tableOid;
bs << (uint32_t)DROPTABLE_LOG;
std::vector<CalpontSystemCatalog::OID> oidList;
ssize_t fileSize = ddlLogFile->size();
boost::scoped_array<char> buf(new char[fileSize]);
if (ddlLogFile->read(buf.get(), fileSize) != fileSize)
return (uint8_t)ERR_FILE_READ;
std::istringstream strbuf(string(buf.get(), fileSize));
while (strbuf >> fileoid)
oidList.push_back(fileoid);
bs << (uint32_t)oidList.size();
for (unsigned j = 0; j < oidList.size(); j++)
{
bs << (uint32_t)oidList[j];
}
bs << (uint32_t)0;
}
else // Find drop partition log file
{
pos = fileNames[i].find("DDL_DROPPARTITION_Log_");
if (pos != string::npos)
{
boost::scoped_ptr<idbdatafile::IDBDataFile> ddlLogFile(IDBDataFile::open(
IDBPolicy::getType(fileNames[i].c_str(), IDBPolicy::WRITEENG), fileNames[i].c_str(), "r", 0));
BRM::LogicalPartition partition;
vector<BRM::LogicalPartition> partitionNums;
// find the table oid
pos = fileNames[i].find_last_of("_");
string tableOidStr = fileNames[i].substr(pos + 1, fileNames[i].length() - pos - 1);
char* ep = NULL;
uint32_t tableOid = strtoll(tableOidStr.c_str(), &ep, 10);
bs << tableOid;
bs << (uint32_t)DROPPART_LOG;
ssize_t fileSize = ddlLogFile->size();
boost::scoped_array<char> buf(new char[fileSize]);
if (ddlLogFile->read(buf.get(), fileSize) != fileSize)
return (uint8_t)ERR_FILE_READ;
std::istringstream strbuf(string(buf.get(), fileSize));
while (strbuf >> partition)
{
if (partition.dbroot == (uint16_t)-1)
break;
partitionNums.push_back(partition);
}
std::vector<CalpontSystemCatalog::OID> oidPartList;
while (strbuf >> fileoid)
oidPartList.push_back(fileoid);
bs << (uint32_t)oidPartList.size();
for (unsigned j = 0; j < oidPartList.size(); j++)
{
bs << (uint32_t)oidPartList[j];
}
bs << (uint32_t)partitionNums.size();
for (unsigned j = 0; j < partitionNums.size(); j++)
{
partitionNums[j].serialize(bs);
}
}
else // find truncate table log file
{
pos = fileNames[i].find("DDL_TRUNCATETABLE_Log_");
if (pos != string::npos)
{
boost::scoped_ptr<idbdatafile::IDBDataFile> ddlLogFile(IDBDataFile::open(
IDBPolicy::getType(fileNames[i].c_str(), IDBPolicy::WRITEENG), fileNames[i].c_str(), "r", 0));
if (!ddlLogFile)
{
continue;
}
// find the table oid
pos = fileNames[i].find_last_of("_");
string tableOidStr = fileNames[i].substr(pos + 1, fileNames[i].length() - pos - 1);
char* ep = NULL;
uint32_t tableOid = strtoll(tableOidStr.c_str(), &ep, 10);
bs << tableOid;
bs << (uint32_t)TRUNCATE_LOG;
std::vector<CalpontSystemCatalog::OID> oidList;
ssize_t fileSize = ddlLogFile->size();
boost::scoped_array<char> buf(new char[fileSize]);
if (ddlLogFile->read(buf.get(), fileSize) != fileSize)
return (uint8_t)ERR_FILE_READ;
std::istringstream strbuf(string(buf.get(), fileSize));
while (strbuf >> fileoid)
oidList.push_back(fileoid);
bs << (uint32_t)oidList.size();
for (unsigned j = 0; j < oidList.size(); j++)
{
bs << (uint32_t)oidList[j];
}
bs << (uint32_t)0;
}
}
}
}
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs, std::string& err)
{
// Will update five columns: columnname, defaultvalue, nullable, autoincrement, nextvalue.
int rc = 0;
uint32_t tmp32;
string schema, tableName, colName, defaultvalue;
int txnID;
uint32_t sessionID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tableName;
bs >> colName;
bs >> defaultvalue;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::TableName atableName;
CalpontSystemCatalog::TableColName tableColName;
tableColName.schema = schema;
tableColName.table = tableName;
tableColName.column = colName;
CalpontSystemCatalog::ROPair ropair;
try
{
ropair = systemCatalogPtr->columnRID(tableColName);
if (ropair.objnum < 0)
{
ostringstream oss;
oss << "No such column: " << tableColName;
throw std::runtime_error(oss.str().c_str());
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
catch (...)
{
err = "renameColumn:Unknown exception caught";
rc = 1;
return rc;
}
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
std::vector<WriteEngine::RID> ridList;
ridList.push_back(ropair.rid);
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList1;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
WriteEngine::ColTuple colTuple;
// Build colStructs for SYSCOLUMN
atableName.schema = CALPONT_SCHEMA;
atableName.table = SYSCOLUMN_TABLE;
DDLColumn column;
findColumnData(sessionID, atableName, DEFAULTVAL_COL, column); // DEFAULTVAL_COL column
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.setTransId(txnID);
fWEWrapper.startTransaction(txnID);
// Build DEFAULTVAL_COL structure
WriteEngine::ColTupleList aColList;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) ||
(column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) ||
(column.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
column.colType.precision > 18)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = column.colType.colWidth;
}
colStruct.colDataType = column.colType.colDataType;
if (colStruct.tokenFlag)
{
WriteEngine::DctnryStruct dictStruct;
dictStruct.dctnryOid = column.colType.ddn.dictOID;
dictStruct.columnOid = column.colType.columnOID;
if (defaultvalue.length() <= 0) // null token
{
WriteEngine::Token nullToken;
colTuple.data = nullToken;
}
else
{
WriteEngine::DctnryTuple dictTuple;
dictTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dictTuple.sigSize = defaultvalue.length();
dictTuple.isNull = false;
int error = NO_ERROR;
if (NO_ERROR !=
(error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple, false))) // @bug 5572 HDFS tmp file
{
WErrorCodes ec;
throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
}
WriteEngine::Token aToken = dictTuple.token;
colTuple.data = aToken;
}
}
colStruct.colDataType = column.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
dctnryStructList.push_back(dctnryStruct);
aColList.push_back(colTuple);
colValuesList.push_back(aColList);
WriteEngine::DctnryTuple dctnryTuple;
if (defaultvalue.length() > 0)
{
dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dctnryTuple.sigSize = defaultvalue.length();
dctnryTuple.isNull = false;
}
else
{
dctnryTuple.isNull = true;
}
dctColList = dctnryTuple;
dctRowList.clear();
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
// In this case, there's only 1 row, so only one one extent, but keep it generic...
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
convertRidToColumn(ropair.rid, dbRoot, partition, segment, 1021);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
extentsinfo.push_back(aExtentinfo);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
if (NO_ERROR != fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList,
colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList,
SYSCOLUMN_BASE))
{
err = "WE: Update failed on: " + atableName.table;
rc = 1;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
// flush syscat cahche
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& bs, std::string& err)
{
// Will update five columns: columnname, defaultvalue, nullable, autoincrement, nextvalue.
int rc = 0;
uint64_t nextVal;
uint32_t tmp32, nullable;
string schema, tableName, colOldname, autoinc, colNewName, defaultvalue;
int txnID;
uint32_t sessionID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tableName;
bs >> colOldname;
bs >> colNewName;
bs >> autoinc;
bs >> nextVal;
bs >> nullable;
bs >> defaultvalue;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::TableName atableName;
CalpontSystemCatalog::TableColName tableColName;
tableColName.schema = schema;
tableColName.table = tableName;
tableColName.column = colOldname;
CalpontSystemCatalog::ROPair ropair;
try
{
ropair = systemCatalogPtr->columnRID(tableColName);
if (ropair.objnum < 0)
{
ostringstream oss;
oss << "No such column: " << tableColName;
throw std::runtime_error(oss.str().c_str());
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
catch (...)
{
err = "renameColumn:Unknown exception caught";
rc = 1;
return rc;
}
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
std::vector<WriteEngine::RID> ridList;
ridList.push_back(ropair.rid);
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList1;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
boost::any datavalue;
datavalue = colNewName;
WriteEngine::ColTuple colTuple;
// Build colStructs for SYSCOLUMN
atableName.schema = CALPONT_SCHEMA;
atableName.table = SYSCOLUMN_TABLE;
DDLColumn column1, column2, column3, column4, column5;
findColumnData(sessionID, atableName, COLNAME_COL, column1); // COLNAME_COL column
findColumnData(sessionID, atableName, AUTOINC_COL, column2); // AUTOINC_COL column
findColumnData(sessionID, atableName, NEXTVALUE_COL, column3); // NEXTVALUE_COL column
findColumnData(sessionID, atableName, NULLABLE_COL, column4); // NULLABLE_COL column
findColumnData(sessionID, atableName, DEFAULTVAL_COL, column5); // DEFAULTVAL_COL column
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
// Build COLNAME_COL structure
colStruct.dataOid = column1.oid;
colStruct.colWidth = column1.colType.colWidth > 8 ? 8 : column1.colType.colWidth;
colStruct.tokenFlag = false;
if ((column1.colType.colDataType == CalpontSystemCatalog::CHAR && column1.colType.colWidth > 8) ||
(column1.colType.colDataType == CalpontSystemCatalog::VARCHAR && column1.colType.colWidth > 7) ||
(column1.colType.colDataType == CalpontSystemCatalog::VARBINARY && column1.colType.colWidth > 7) ||
(column1.colType.colDataType == CalpontSystemCatalog::BLOB && column1.colType.colWidth > 7) ||
(column1.colType.colDataType == CalpontSystemCatalog::TEXT && column1.colType.colWidth > 7) ||
(column1.colType.colDataType == CalpontSystemCatalog::DECIMAL && column1.colType.precision > 18) ||
(column1.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
column1.colType.precision > 18)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = column1.colType.colWidth;
}
colStruct.colDataType = column1.colType.colDataType;
if (colStruct.tokenFlag)
{
WriteEngine::DctnryStruct dictStruct;
if (idbdatafile::IDBPolicy::useHdfs())
{
dictStruct.fCompressionType = 2;
}
dictStruct.dctnryOid = column1.colType.ddn.dictOID;
dictStruct.columnOid = column1.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
dictTuple.sigValue = (unsigned char*)colNewName.c_str();
dictTuple.sigSize = colNewName.length();
dictTuple.isNull = false;
int error = NO_ERROR;
if (NO_ERROR !=
(error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple, false))) // @bug 5572 HDFS tmp file
{
WErrorCodes ec;
err = ec.errorString(error);
rc = error;
return rc;
}
WriteEngine::Token aToken = dictTuple.token;
colTuple.data = aToken;
}
else
colTuple.data = datavalue;
colStruct.colDataType = column1.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column1.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column1.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column1.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
dctnryStructList.push_back(dctnryStruct);
aColList1.push_back(colTuple);
colValuesList.push_back(aColList1);
WriteEngine::DctnryTuple dctnryTuple;
boost::to_lower(colNewName);
dctnryTuple.sigValue = (unsigned char*)colNewName.c_str();
dctnryTuple.sigSize = colNewName.length();
dctnryTuple.isNull = false;
dctColList = dctnryTuple;
dctRowList.clear();
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// Build AUTOINC_COL structure
WriteEngine::ColTupleList aColList2;
colStruct.dataOid = column2.oid;
colStruct.colWidth = column2.colType.colWidth > 8 ? 8 : column2.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.colDataType = column2.colType.colDataType;
colTuple.data = autoinc;
colStruct.colDataType = column2.colType.colDataType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column2.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
dctnryStructList.push_back(dctnryStruct);
aColList2.push_back(colTuple);
colValuesList.push_back(aColList2);
dctnryTuple.isNull = true;
dctColList = dctnryTuple;
dctRowList.clear();
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// Build NEXTVALUE_COL structure
WriteEngine::ColTupleList aColList3;
colStruct.dataOid = column3.oid;
colStruct.colWidth = column3.colType.colWidth > 8 ? 8 : column3.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.colDataType = column3.colType.colDataType;
colTuple.data = nextVal;
colStruct.colDataType = column3.colType.colDataType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column3.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
dctnryStructList.push_back(dctnryStruct);
aColList3.push_back(colTuple);
colValuesList.push_back(aColList3);
dctnryTuple.isNull = true;
dctColList = dctnryTuple;
dctRowList.clear();
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// Build NULLABLE_COL structure
WriteEngine::ColTupleList aColList4;
colStruct.dataOid = column4.oid;
colStruct.colWidth = column4.colType.colWidth > 8 ? 8 : column4.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.colDataType = column4.colType.colDataType;
colTuple.data = nullable;
colStruct.colDataType = column4.colType.colDataType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column4.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
dctnryStructList.push_back(dctnryStruct);
aColList4.push_back(colTuple);
colValuesList.push_back(aColList4);
dctnryTuple.isNull = true;
dctColList = dctnryTuple;
dctRowList.clear();
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// Build DEFAULTVAL_COL structure
WriteEngine::ColTupleList aColList5;
colStruct.dataOid = column5.oid;
colStruct.colWidth = column5.colType.colWidth > 8 ? 8 : column5.colType.colWidth;
colStruct.tokenFlag = false;
if ((column5.colType.colDataType == CalpontSystemCatalog::CHAR && column5.colType.colWidth > 8) ||
(column5.colType.colDataType == CalpontSystemCatalog::VARCHAR && column5.colType.colWidth > 7) ||
(column5.colType.colDataType == CalpontSystemCatalog::VARBINARY && column5.colType.colWidth > 7) ||
(column5.colType.colDataType == CalpontSystemCatalog::BLOB && column5.colType.colWidth > 7) ||
(column5.colType.colDataType == CalpontSystemCatalog::TEXT && column5.colType.colWidth > 7) ||
(column5.colType.colDataType == CalpontSystemCatalog::DECIMAL && column5.colType.precision > 18) ||
(column5.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
column5.colType.precision > 18)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = column5.colType.colWidth;
}
colStruct.colDataType = column5.colType.colDataType;
if (colStruct.tokenFlag)
{
WriteEngine::DctnryStruct dictStruct;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dictStruct.fCompressionType = 2;
}
dictStruct.dctnryOid = column5.colType.ddn.dictOID;
dictStruct.columnOid = column5.colType.columnOID;
if (defaultvalue.length() <= 0) // null token
{
WriteEngine::Token nullToken;
colTuple.data = nullToken;
}
else
{
WriteEngine::DctnryTuple dictTuple;
dictTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dictTuple.sigSize = defaultvalue.length();
dictTuple.isNull = false;
int error = NO_ERROR;
if (NO_ERROR !=
(error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple, false))) // @bug 5572 HDFS tmp file
{
WErrorCodes ec;
throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
}
WriteEngine::Token aToken = dictTuple.token;
colTuple.data = aToken;
}
}
fWEWrapper.flushDataFiles(rc, txnID, oids);
colStruct.colDataType = column5.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column5.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column5.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column5.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
aColList5.push_back(colTuple);
colValuesList.push_back(aColList5);
if (defaultvalue.length() > 0)
{
dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dctnryTuple.sigSize = defaultvalue.length();
dctnryTuple.isNull = false;
}
else
{
dctnryTuple.isNull = true;
}
dctColList = dctnryTuple;
dctRowList.clear();
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
// In this case, there's only 1 row, so only one one extent, but keep it generic...
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
convertRidToColumn(ropair.rid, dbRoot, partition, segment, 1021);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
extentsinfo.push_back(aExtentinfo);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
if (NO_ERROR != fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList,
colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList,
SYSCOLUMN_BASE))
{
err = "WE: Update failed on: " + atableName.table;
rc = 1;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
// flush syscat cahche
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::dropPartitions(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t size, i;
uint32_t tmp32;
std::vector<OID> dataOids;
std::vector<BRM::PartitionInfo> partitions;
bs >> size;
for (i = 0; i < size; ++i)
{
bs >> tmp32;
dataOids.push_back(tmp32);
}
bs >> size;
BRM::PartitionInfo pi;
for (i = 0; i < size; ++i)
{
pi.unserialize(bs);
partitions.push_back(pi);
}
try
{
rc = fWEWrapper.deletePartitions(dataOids, partitions);
}
catch (...)
{
err = "WE: Error removing files ";
rc = 1;
}
return rc;
}
void WE_DDLCommandProc::purgeFDCache()
{
if (idbdatafile::IDBPolicy::useHdfs())
{
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(SYSCOLUMN_BASE);
ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
ColExtsInfo::iterator aIt;
std::vector<BRM::FileInfo> files;
BRM::FileInfo aFile;
vector<BRM::LBID_t> lbidList;
BRM::LBID_t startLbid;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aFile.oid = it->first;
while (aIt != (it->second).end())
{
aFile.partitionNum = aIt->partNum;
aFile.dbRoot = aIt->dbRoot;
aFile.segmentNum = aIt->segNum;
aFile.compType = aIt->compType;
files.push_back(aFile);
fDbrm.lookupLocalStartLbid(aFile.oid, aFile.partitionNum, aFile.segmentNum, aIt->hwm, startLbid);
// cout <<"Added to files oid:dbroot:part:seg:compType = " <<
// aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
//<<":"<<aFile.compType <<endl;
aIt++;
}
it++;
}
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
fDbrm.invalidateUncommittedExtentLBIDs(0, false, &lbidList);
}
TableMetaData::removeTableMetaData(SYSCOLUMN_BASE);
}
} // namespace WriteEngine