1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-20 09:07:44 +03:00
Gagan Goel d50a0fa2e6 MCOL-5005 Add charset number to system catalog - Part 2.
1. Extend the calpontsys.syscolumn system catalog table
  with a new column, 'charsetnum'.

  'charsetnum' field is set to the 'number' member of the
  'charset_info_st' struct defined in the server in m_ctype.h.

  For CHAR/VARCHAR/TEXT column types, 'charset_info_st' is
  initialized to the charset/collation of the column, which
  is set at the column-level or at the table-level in the DDL.

  For BLOB/VARBINARY binary column types, 'charset_info_st' is
  initialized to my_charset_bin (charsetnum=63).

  For all other column types, charsetnum is set to 0.

  2. Add support for the newly added 'charsetnum' column in the
  automatic system catalog upgrade logic in dbbuilder.

  For existing table definitions, charsetnum for the column is
  defaulted to 0.

  3. Add MTR test case that creates a few table definitions with
  a range of charset/collation combinations and queries the
  calpontsys.syscolumn system catalog table with the charsetnum
  field for the columns in the table DDLs.
2023-08-15 17:21:47 +00:00

4979 lines
138 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: we_ddlcommandproc.cpp 3082 2011-09-26 22:00:38Z chao $
#include <unistd.h>
#include "boost/filesystem/operations.hpp"
#include "boost/filesystem/path.hpp"
#include "boost/scoped_ptr.hpp"
using namespace std;
#include "bytestream.h"
using namespace messageqcpp;
#include "we_messages.h"
#include "we_message_handlers.h"
#include "we_ddlcommon.h"
#include "we_ddlcommandproc.h"
#include "ddlpkg.h"
using namespace ddlpackage;
#include <ctime>
#include "dataconvert.h"
using namespace dataconvert;
//#include "we_brm.h"
namespace fs = boost::filesystem;
#include "cacheutils.h"
#include "IDBDataFile.h"
#include "IDBPolicy.h"
using namespace idbdatafile;
using namespace execplan;
namespace WriteEngine
{
WE_DDLCommandProc::WE_DDLCommandProc()
{
filesPerColumnPartition = 8;
extentsPerSegmentFile = 1;
dbrootCnt = 1;
extentRows = 0x800000;
config::Config* cf = config::Config::makeConfig();
string fpc = cf->getConfig("ExtentMap", "FilesPerColumnPartition");
if (fpc.length() != 0)
filesPerColumnPartition = cf->uFromText(fpc);
// MCOL-4685: remove the option to set more than 2 extents per file (ExtentsPreSegmentFile).
extentsPerSegmentFile = DEFAULT_EXTENTS_PER_SEGMENT_FILE;
string dbct = cf->getConfig("SystemConfig", "DBRootCount");
if (dbct.length() != 0)
dbrootCnt = cf->uFromText(dbct);
}
WE_DDLCommandProc::WE_DDLCommandProc(const WE_DDLCommandProc& rhs)
{
}
WE_DDLCommandProc::~WE_DDLCommandProc()
{
}
uint8_t WE_DDLCommandProc::updateSyscolumnNextval(ByteStream& bs, std::string& err)
{
uint32_t columnOid, sessionID;
uint64_t nextVal;
int rc = 0;
bs >> columnOid;
bs >> nextVal;
bs >> sessionID;
uint16_t dbRoot;
BRM::OID_t oid = 1021;
fDbrm.getSysCatDBRoot(oid, dbRoot);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
oids[columnOid] = columnOid;
// oidsToFlush.push_back(columnOid);
if (idbdatafile::IDBPolicy::useHdfs())
fWEWrapper.startTransaction(sessionID);
rc = fWEWrapper.updateNextValue(sessionID, columnOid, nextVal, sessionID, dbRoot);
if (rc != 0)
{
err = "Error in WE::updateNextValue";
rc = 1;
}
if (idbdatafile::IDBPolicy::useHdfs())
{
fWEWrapper.flushDataFiles(rc, sessionID, oids);
fWEWrapper.confirmTransaction(sessionID);
if (rc == 0)
fWEWrapper.endTransaction(sessionID, true);
else
fWEWrapper.endTransaction(sessionID, false);
}
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::writeSystable(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
int txnID, tableOID, tableAUXColumnOID;
uint32_t tableWithAutoi;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> tmp32;
tableOID = tmp32;
bs >> tmp32;
tableAUXColumnOID = tmp32;
bs >> tmp32;
tableWithAutoi = tmp32;
bs >> tmp32;
uint16_t dbroot = tmp32;
ddlpackage::TableDef tableDef;
tableDef.unserialize(bs);
WriteEngine::ColTuple colTuple;
WriteEngine::ColStruct colStruct;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::ColTupleList colTuples;
WriteEngine::dictStr dctColTuples;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::ColValueList colValuesList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DictStrList dctnryValueList;
WriteEngine::RIDList ridList;
CalpontSystemCatalog::TableName tableName;
CalpontSystemCatalog::ROPair sysTableROPair;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
ColumnList columns;
ColumnList::const_iterator column_iterator;
DDLColumn column;
int error = 0;
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSTABLE_TABLE;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
try
{
sysTableROPair = systemCatalogPtr->tableRID(tableName);
getColumnsForTable(sessionID, tableName.schema, tableName.table, columns);
column_iterator = columns.begin();
NullString tmpStr;
while (column_iterator != columns.end())
{
column = *column_iterator;
boost::to_lower(column.tableColName.column);
tmpStr.dropString();
if (TABLENAME_COL == column.tableColName.column)
{
std::string tablename = tableDef.fQualifiedName->fName;
colTuple.data = tablename;
tmpStr.assign(tablename);
}
else if (SCHEMA_COL == column.tableColName.column)
{
std::string schema = tableDef.fQualifiedName->fSchema;
colTuple.data = schema;
tmpStr.assign(schema);
}
else if (OBJECTID_COL == column.tableColName.column)
{
colTuple.data = tableOID;
}
else if (AUXCOLUMNOID_COL == column.tableColName.column)
{
colTuple.data = tableAUXColumnOID;
}
else if (CREATEDATE_COL == column.tableColName.column)
{
time_t t;
struct tm tmp;
Date aDay;
t = time(NULL);
gmtime_r(&t, &tmp);
aDay.year = tmp.tm_year + 1900;
aDay.month = tmp.tm_mon + 1;
aDay.day = tmp.tm_mday;
colTuple.data = *(reinterpret_cast<int*>(&aDay));
}
else if (INIT_COL == column.tableColName.column)
{
colTuple.data = column.colType.getNullValueForType();
}
else if (NEXT_COL == column.tableColName.column)
{
colTuple.data = column.colType.getNullValueForType();
}
else if (AUTOINC_COL == column.tableColName.column)
{
colTuple.data = tableWithAutoi;
}
else
{
colTuple.data = column.colType.getNullValueForType();
}
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.tokenFlag = column.colType.colWidth > 8 ? true : false;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbroot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
dctnryStruct.fColDbRoot = dbroot;
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = column.oid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = column.oid;
}
colStructs.push_back(colStruct);
cscColTypeList.push_back(column.colType);
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
colTuples.push_back(colTuple);
dctColTuples.push_back(tmpStr);
colValuesList.push_back(colTuples);
dctnryStructList.push_back(dctnryStruct);
dctnryValueList.push_back(dctColTuples);
colTuples.pop_back();
dctColTuples.pop_back();
++column_iterator;
}
// fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
if (0 != colStructs.size())
{
// MCOL-66 The DBRM can't handle concurrent transactions to sys tables
// TODO: This may be redundant
static boost::mutex dbrmMutex;
boost::mutex::scoped_lock lk(dbrmMutex);
error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList,
dctnryStructList, dctnryValueList, SYSCOLUMN_BASE);
if (error != WriteEngine::NO_ERROR)
{
if (error == ERR_BRM_WR_VB_ENTRY)
{
throw std::runtime_error("WE: Error writing to BRM.");
}
else
{
WErrorCodes ec;
throw std::runtime_error("WE: Error updating calpontsys.systable:" + ec.errorString(error));
}
}
if (idbdatafile::IDBPolicy::useHdfs())
{
int rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
}
}
catch (exception& ex)
{
err += ex.what();
rc = 1;
}
catch (...)
{
err += "Unknown exception caught";
rc = 1;
}
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
return rc;
}
uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32, columnSize, dictSize, i;
uint8_t tmp8;
int txnID, colpos;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> columnSize;
// deserialize column Oid and dictionary oid
vector<uint32_t> coloids;
vector<uint32_t> dictoids;
for (i = 0; i < columnSize; ++i)
{
bs >> tmp32;
coloids.push_back(tmp32);
}
bs >> dictSize;
for (i = 0; i < dictSize; ++i)
{
bs >> tmp32;
dictoids.push_back(tmp32);
}
bool alterFlag = 0;
bs >> tmp8;
alterFlag = (tmp8 != 0);
bs >> tmp32;
colpos = tmp32;
bs >> tmp32;
uint16_t dbroot = tmp32;
ddlpackage::TableDef tableDef;
tableDef.unserialize(bs);
WriteEngine::ColTuple colTuple;
WriteEngine::ColStruct colStruct;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::ColTupleList colTuples;
WriteEngine::dictStr dctColTuples;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::ColValueList colValuesList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DictStrList dctnryValueList;
WriteEngine::RIDList ridList;
CalpontSystemCatalog::TableName tableName;
CalpontSystemCatalog::ROPair sysTableROPair;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
ColumnList columns;
ColumnList::const_iterator column_iterator;
DDLColumn column;
int error = 0;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
std::map<uint32_t, uint32_t> oids;
int rc1 = 0;
ColumnDef* colDefPtr = 0;
ColumnDefList::const_iterator iter;
int startPos = colpos;
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
getColumnsForTable(sessionID, tableName.schema, tableName.table, columns);
unsigned int numCols = columns.size();
// WriteEngine::ColTupleList colList[numCols];
// ColTupleList is NOT POD, so let's try this:
std::vector<WriteEngine::ColTupleList> colList;
// WriteEngine::dictStr dctColList[numCols];
std::vector<WriteEngine::dictStr> dctColList;
ColumnDefList tableDefCols = tableDef.fColumns;
ddlpackage::QualifiedName qualifiedName = *(tableDef.fQualifiedName);
iter = tableDefCols.begin();
// colpos = 0;
NullString tmpStr;
for (unsigned int ii = 0; ii < numCols; ii++)
{
colList.push_back(WriteEngine::ColTupleList());
dctColList.push_back(WriteEngine::dictStr());
}
try
{
unsigned int col = 0;
unsigned int dictcol = 0;
while (iter != tableDefCols.end())
{
colDefPtr = *iter;
DictOID dictOID = {0, 0, 0, 0, 0};
int dataType;
dataType = convertDataType(colDefPtr->fType->fType);
if (dataType == CalpontSystemCatalog::DECIMAL || dataType == CalpontSystemCatalog::UDECIMAL)
{
if (colDefPtr->fType->fPrecision > 38) // precision cannot be over 38.
{
ostringstream os;
os << "Syntax error: The maximum precision (total number of digits) that can be specified is 38";
throw std::runtime_error(os.str());
}
else if (colDefPtr->fType->fPrecision < colDefPtr->fType->fScale)
{
ostringstream os;
os << "Syntax error: scale should be less than precision, precision: "
<< colDefPtr->fType->fPrecision << " scale: " << colDefPtr->fType->fScale;
throw std::runtime_error(os.str());
}
colDefPtr->convertDecimal();
}
bool hasDict = false;
if ((dataType == CalpontSystemCatalog::CHAR && colDefPtr->fType->fLength > 8) ||
(dataType == CalpontSystemCatalog::VARCHAR && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::VARBINARY && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::BLOB && colDefPtr->fType->fLength > 7) ||
(dataType == CalpontSystemCatalog::TEXT && colDefPtr->fType->fLength > 7))
{
hasDict = true;
dictOID.compressionType = colDefPtr->fType->fCompressiontype;
dictOID.colWidth = colDefPtr->fType->fLength;
dictOID.dictOID = dictoids[dictcol];
dictcol++;
//@Bug 2534. Take away the limit of 255 and set the limit to 8000.
if ((colDefPtr->fType->fLength > 8000) && (dataType != CalpontSystemCatalog::BLOB) &&
(dataType != CalpontSystemCatalog::TEXT))
{
ostringstream os;
os << "char, varchar and varbinary length may not exceed 8000 bytes";
throw std::runtime_error(os.str());
}
}
else if ((dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB ||
dataType == CalpontSystemCatalog::TEXT) &&
colDefPtr->fType->fLength <= 7)
{
ostringstream os;
os << "varbinary and blob length may not be less than 8";
throw std::runtime_error(os.str());
}
unsigned int i = 0;
column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
boost::to_lower(column.tableColName.column);
if (SCHEMA_COL == column.tableColName.column)
{
colTuple.data = qualifiedName.fSchema;
tmpStr.assign(qualifiedName.fSchema);
}
else if (TABLENAME_COL == column.tableColName.column)
{
colTuple.data = qualifiedName.fName;
tmpStr.assign(qualifiedName.fName);
}
else if (COLNAME_COL == column.tableColName.column)
{
boost::to_lower(colDefPtr->fName);
colTuple.data = colDefPtr->fName;
tmpStr.assign(colDefPtr->fName);
}
else if (OBJECTID_COL == column.tableColName.column)
{
if (alterFlag)
colTuple.data = coloids[col];
else
colTuple.data = coloids[col];
}
else if (DATATYPE_COL == column.tableColName.column)
{
colTuple.data = dataType;
}
else if (COLUMNLEN_COL == column.tableColName.column)
{
//@Bug 2089 Disallow zero length char and varch column to be created
if (dataType == CalpontSystemCatalog::CHAR || dataType == CalpontSystemCatalog::VARCHAR ||
dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB ||
dataType == CalpontSystemCatalog::TEXT)
{
if (colDefPtr->fType->fLength <= 0)
{
ostringstream os;
os << "char, varchar and varbinary length must be greater than zero";
throw std::runtime_error(os.str());
}
}
colTuple.data = colDefPtr->fType->fLength;
}
else if (COLUMNPOS_COL == column.tableColName.column)
{
colTuple.data = colpos;
}
else if (DEFAULTVAL_COL == column.tableColName.column)
{
if (colDefPtr->fDefaultValue && !colDefPtr->fDefaultValue->fNull)
{
tmpStr.assign(colDefPtr->fDefaultValue->fValue);
}
else
{
tmpStr.dropString();
}
colTuple.data = tmpStr;
}
else if (NULLABLE_COL == column.tableColName.column)
{
int nullable = 1;
ColumnConstraintList& colConstraints = colDefPtr->fConstraints;
ColumnConstraintList::const_iterator constraint_iter = colConstraints.begin();
while (constraint_iter != colConstraints.end())
{
ColumnConstraintDef* consDefPtr = *constraint_iter;
if (consDefPtr->fConstraintType == ddlpackage::DDL_NOT_NULL)
{
nullable = 0;
break;
}
++constraint_iter;
}
colTuple.data = nullable;
}
else if (SCALE_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fScale;
}
else if (PRECISION_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fPrecision;
}
else if (DICTOID_COL == column.tableColName.column)
{
if (hasDict)
{
colTuple.data = dictOID.dictOID;
}
else
{
colTuple.data = column.colType.getNullValueForType();
}
}
else if (LISTOBJID_COL == column.tableColName.column)
{
colTuple.data = column.colType.getNullValueForType();
}
else if (TREEOBJID_COL == column.tableColName.column)
{
colTuple.data = column.colType.getNullValueForType();
}
else if (MINVAL_COL == column.tableColName.column)
{
tmpStr.dropString();
}
else if (MAXVAL_COL == column.tableColName.column)
{
tmpStr.dropString();
}
else if (COMPRESSIONTYPE_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fCompressiontype;
}
else if (AUTOINC_COL == column.tableColName.column)
{
// cout << "autoincrement= " << colDefPtr->fType->fAutoincrement << endl;
colTuple.data = colDefPtr->fType->fAutoincrement;
}
else if (NEXTVALUE_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fNextvalue;
}
else if (CHARSETNUM_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fCharsetNum;
}
else
{
colTuple.data = column.colType.getNullValueForType();
}
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.tokenFlag = column.colType.colWidth > 8 ? true : false;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbroot;
dctnryStruct.fColDbRoot = dbroot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag) // TODO: XXX: this is copied aplenty. NEED TO REFACTOR.
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = column.oid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = column.oid;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
if (colpos == startPos)
{
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
}
colList[i].push_back(colTuple);
// colList.push_back(WriteEngine::ColTupleList());
// colList.back().push_back(colTuple);
dctColList[i].push_back(tmpStr);
// dctColList.push_back(WriteEngine::dictStr());
// dctColList.back().push_back(tmpStr);
++i;
++column_iterator;
}
++colpos;
col++;
++iter;
}
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
if (0 != colStructs.size())
{
for (unsigned int n = 0; n < numCols; n++)
{
colValuesList.push_back(colList[n]);
dctnryValueList.push_back(dctColList[n]);
}
// fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3);
error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList,
dctnryStructList, dctnryValueList, SYSCOLUMN_BASE);
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error != WriteEngine::NO_ERROR)
{
if (error == ERR_BRM_WR_VB_ENTRY)
{
throw std::runtime_error("writeSysColumnMetaData WE: Error writing to BRM.");
}
else
{
WErrorCodes ec;
throw std::runtime_error("WE: Error updating calpontsys.syscolumn. " + ec.errorString(error));
}
}
else
error = rc1;
}
}
catch (exception& ex)
{
err += ex.what();
rc = 1;
}
catch (...)
{
err += "Unknown exception caught";
rc = 1;
}
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
return rc;
}
uint8_t WE_DDLCommandProc::writeSyscolumn(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32, coloid, dictoid;
int txnID, startPos;
string schema, tablename;
uint8_t tmp8;
bool isAlter = false;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
bs >> coloid;
bs >> dictoid;
bs >> tmp8; // alterFlag
bs >> tmp32;
startPos = tmp32;
isAlter = (tmp8 != 0);
boost::scoped_ptr<ddlpackage::ColumnDef> colDefPtr(new ddlpackage::ColumnDef());
colDefPtr->unserialize(bs);
WriteEngine::ColStruct colStruct;
WriteEngine::ColTuple colTuple;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::ColTupleList colTuples;
WriteEngine::DctColTupleList dctColTuples;
WriteEngine::ColValueList colValuesList;
WriteEngine::RIDList ridList;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::dictStr dctnryTuple;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DictStrList dctnryValueList;
CalpontSystemCatalog::TableName tableName;
ColumnList columns;
ColumnList::const_iterator column_iterator;
DDLColumn column;
int error = 0;
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
getColumnsForTable(sessionID, tableName.schema, tableName.table, columns);
unsigned int numCols = columns.size();
// WriteEngine::ColTupleList colList[numCols];
// ColTupleList is NOT POD, so let's try this:
std::vector<WriteEngine::ColTupleList> colList;
// WriteEngine::dictStr dctColList[numCols];
std::vector<WriteEngine::dictStr> dctColList;
std::map<uint32_t, uint32_t> oids;
std::vector<BRM::OID_t> oidsToFlush;
// colpos = 0;
for (unsigned int ii = 0; ii < numCols; ii++)
{
colList.push_back(WriteEngine::ColTupleList());
dctColList.push_back(WriteEngine::dictStr());
}
try
{
DictOID dictOID = {0, 0, 0, 0, 0};
int dataType = convertDataType(colDefPtr->fType->fType);
if (dataType == CalpontSystemCatalog::DECIMAL || dataType == CalpontSystemCatalog::UDECIMAL)
{
if (colDefPtr->fType->fPrecision > 38) //@Bug 5717 precision cannot be over 38.
{
ostringstream os;
os << "Syntax error: The maximum precision (total number of digits) that can be specified is 38";
throw std::runtime_error(os.str());
}
else if (colDefPtr->fType->fPrecision < colDefPtr->fType->fScale)
{
ostringstream os;
os << "Syntax error: scale should be less than precision, precision: " << colDefPtr->fType->fPrecision
<< " scale: " << colDefPtr->fType->fScale;
throw std::runtime_error(os.str());
}
colDefPtr->convertDecimal();
}
if (dictoid > 0)
{
dictOID.compressionType = colDefPtr->fType->fCompressiontype;
dictOID.colWidth = colDefPtr->fType->fLength;
dictOID.dictOID = dictoid;
//@Bug 2534. Take away the limit of 255 and set the limit to 8000.
if ((colDefPtr->fType->fLength > 8000) && (dataType != CalpontSystemCatalog::BLOB) &&
(dataType != CalpontSystemCatalog::TEXT))
{
ostringstream os;
os << "char, varchar and varbinary length may not exceed 8000 bytes";
throw std::runtime_error(os.str());
}
}
else if ((dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB ||
dataType == CalpontSystemCatalog::TEXT) &&
colDefPtr->fType->fLength <= 7)
{
ostringstream os;
os << "varbinary and blob length may not be less than 8";
throw std::runtime_error(os.str());
}
unsigned int i = 0;
uint16_t dbRoot;
BRM::OID_t sysOid = 1021;
// Find out where syscolumn is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
column_iterator = columns.begin();
while (column_iterator != columns.end())
{
NullString tmpStr;
column = *column_iterator;
boost::to_lower(column.tableColName.column);
if (SCHEMA_COL == column.tableColName.column)
{
colTuple.data = schema;
tmpStr.assign(schema);
}
else if (TABLENAME_COL == column.tableColName.column)
{
colTuple.data = tablename;
tmpStr.assign(tablename);
}
else if (COLNAME_COL == column.tableColName.column)
{
boost::to_lower(colDefPtr->fName);
colTuple.data = colDefPtr->fName;
tmpStr.assign(colDefPtr->fName);
}
else if (OBJECTID_COL == column.tableColName.column)
{
colTuple.data = coloid;
}
else if (DATATYPE_COL == column.tableColName.column)
{
colTuple.data = dataType;
}
else if (COLUMNLEN_COL == column.tableColName.column)
{
//@Bug 2089 Disallow zero length char and varch column to be created
if (dataType == CalpontSystemCatalog::CHAR || dataType == CalpontSystemCatalog::VARCHAR ||
dataType == CalpontSystemCatalog::VARBINARY || dataType == CalpontSystemCatalog::BLOB ||
dataType == CalpontSystemCatalog::TEXT)
{
if (colDefPtr->fType->fLength <= 0)
{
ostringstream os;
os << "char, varchar and varbinary length must be greater than zero";
throw std::runtime_error(os.str());
}
}
colTuple.data = colDefPtr->fType->fLength;
}
else if (COLUMNPOS_COL == column.tableColName.column)
{
colTuple.data = startPos;
}
else if (DEFAULTVAL_COL == column.tableColName.column)
{
if (colDefPtr->fDefaultValue && !colDefPtr->fDefaultValue->fNull)
{
tmpStr.assign(colDefPtr->fDefaultValue->fValue);
}
else
{
tmpStr.dropString();
// colTuple.data = column.colType.getNullValueForType();
}
colTuple.data = tmpStr;
}
else if (NULLABLE_COL == column.tableColName.column)
{
int nullable = 1;
ColumnConstraintList& colConstraints = colDefPtr->fConstraints;
ColumnConstraintList::const_iterator constraint_iter = colConstraints.begin();
while (constraint_iter != colConstraints.end())
{
ColumnConstraintDef* consDefPtr = *constraint_iter;
if (consDefPtr->fConstraintType == ddlpackage::DDL_NOT_NULL)
{
nullable = 0;
break;
}
++constraint_iter;
}
colTuple.data = nullable;
}
else if (SCALE_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fScale;
}
else if (PRECISION_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fPrecision;
}
else if (DICTOID_COL == column.tableColName.column)
{
if (dictoid > 0)
{
colTuple.data = dictOID.dictOID;
}
else
{
colTuple.data = column.colType.getNullValueForType();
}
}
else if (LISTOBJID_COL == column.tableColName.column)
{
colTuple.data = column.colType.getNullValueForType();
}
else if (TREEOBJID_COL == column.tableColName.column)
{
colTuple.data = column.colType.getNullValueForType();
}
else if (MINVAL_COL == column.tableColName.column)
{
tmpStr.dropString();
}
else if (MAXVAL_COL == column.tableColName.column)
{
tmpStr.dropString();
}
else if (COMPRESSIONTYPE_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fCompressiontype;
}
else if (AUTOINC_COL == column.tableColName.column)
{
// cout << "autoincrement= " << colDefPtr->fType->fAutoincrement << endl;
colTuple.data = colDefPtr->fType->fAutoincrement;
}
else if (NEXTVALUE_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fNextvalue;
}
else if (CHARSETNUM_COL == column.tableColName.column)
{
colTuple.data = colDefPtr->fType->fCharsetNum;
}
else
{
colTuple.data = column.colType.getNullValueForType();
}
colStruct.dataOid = column.oid;
oids[column.oid] = column.oid;
oidsToFlush.push_back(column.oid);
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.fColDbRoot = dbRoot;
colStruct.tokenFlag = column.colType.colWidth > 8 ? true : false;
colStruct.colDataType = column.colType.colDataType;
dctnryStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = column.oid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = column.oid;
}
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
colList[i].push_back(colTuple);
// colList.push_back(WriteEngine::ColTupleList());
// colList.back().push_back(colTuple);
dctColList[i].push_back(tmpStr);
// dctColList.push_back(WriteEngine::dictStr());
// dctColList.back().push_back(tmpStr);
++i;
++column_iterator;
}
if (0 != colStructs.size())
{
// FIXME: Is there a cleaner way to do this? Isn't colValuesList the same as colList after this?
for (unsigned int n = 0; n < numCols; n++)
{
colValuesList.push_back(colList[n]);
dctnryValueList.push_back(dctColList[n]);
}
// fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
int rc1 = 0;
error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList,
dctnryStructList, dctnryValueList, SYSCOLUMN_BASE);
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error != WriteEngine::NO_ERROR)
{
if (error == ERR_BRM_WR_VB_ENTRY)
{
throw std::runtime_error("writeSysColumnMetaData WE: Error writing to BRM.");
}
else
{
WErrorCodes ec;
throw std::runtime_error("WE: Error updating calpontsys.syscolumn. " + ec.errorString(error));
}
}
else
error = rc1;
}
}
catch (exception& ex)
{
err += ex.what();
rc = 1;
}
catch (...)
{
err += "Unknown exception caught";
rc = 1;
}
purgeFDCache();
if (isAlter)
{
if (idbdatafile::IDBPolicy::useHdfs())
cacheutils::flushOIDsFromCache(oidsToFlush);
}
return rc;
}
uint8_t WE_DDLCommandProc::createtablefiles(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t size, i;
uint16_t tmp16;
uint32_t tmp32;
uint8_t tmp8;
OID dataOid;
int colWidth;
bool tokenFlag;
int txnID;
CalpontSystemCatalog::ColDataType colDataType;
uint16_t colDbRoot;
int compressionType;
bs >> tmp32;
txnID = tmp32;
bs >> size;
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
std::map<uint32_t, uint32_t> oids;
for (i = 0; i < size; ++i)
{
bs >> tmp32;
dataOid = tmp32;
bs >> tmp8;
colDataType = (CalpontSystemCatalog::ColDataType)tmp8;
bs >> tmp8;
tokenFlag = (tmp8 != 0);
bs >> tmp32;
colWidth = tmp32;
bs >> tmp16;
colDbRoot = tmp16;
bs >> tmp32;
compressionType = tmp32;
oids[dataOid] = dataOid;
if (tokenFlag)
{
rc = fWEWrapper.createDctnry(0, dataOid, colWidth, colDbRoot, 0, 0, compressionType);
}
else
{
rc = fWEWrapper.createColumn(0, dataOid, colDataType, colWidth, colDbRoot, 0, compressionType);
}
if (rc != 0)
break;
}
// cout << "creating column file got error code " << rc << endl;
if (rc != 0)
{
WErrorCodes ec;
ostringstream oss;
oss << "WE: Error creating column file for oid " << dataOid << "; " << ec.errorString(rc) << endl;
err = oss.str();
}
// if (idbdatafile::IDBPolicy::useHdfs())
fWEWrapper.flushDataFiles(rc, txnID, oids);
purgeFDCache();
return rc;
}
uint8_t WE_DDLCommandProc::commitVersion(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tmp32;
int txnID;
bs >> tmp32;
txnID = tmp32;
rc = fWEWrapper.commit(txnID);
if (rc != 0)
{
WErrorCodes ec;
ostringstream oss;
oss << "WE: Error commiting transaction; " << txnID << ec.errorString(rc) << endl;
err = oss.str();
}
return rc;
}
uint8_t WE_DDLCommandProc::rollbackBlocks(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
rc = fWEWrapper.rollbackBlocks(txnID, sessionID);
if (rc != 0)
{
WErrorCodes ec;
ostringstream oss;
oss << "WE: Error rolling back files " << txnID << " for session " << sessionID << "; "
<< ec.errorString(rc) << endl;
err = oss.str();
}
std::map<uint32_t, uint32_t> oids;
if (idbdatafile::IDBPolicy::useHdfs())
fWEWrapper.flushDataFiles(rc, txnID, oids);
purgeFDCache();
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
return rc;
}
uint8_t WE_DDLCommandProc::rollbackVersion(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
rc = fWEWrapper.rollbackVersion(txnID, sessionID);
if (rc != 0)
{
WErrorCodes ec;
ostringstream oss;
oss << "WE: Error rolling back transaction " << txnID << " for session " << sessionID << "; "
<< ec.errorString(rc) << endl;
err = oss.str();
}
purgeFDCache();
return rc;
}
uint8_t WE_DDLCommandProc::deleteSyscolumn(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
int txnID;
string schema, tablename;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
ddlpackage::QualifiedName sysCatalogTableName;
sysCatalogTableName.fSchema = CALPONT_SCHEMA;
sysCatalogTableName.fName = SYSCOLUMN_TABLE;
CalpontSystemCatalog::TableName userTableName;
userTableName.schema = schema;
userTableName.table = tablename;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
uint16_t dbRoot;
BRM::OID_t sysOid = 1021;
// Find out where syscolumn is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
fWEWrapper.setTransId(txnID);
fWEWrapper.startTransaction(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
try
{
CalpontSystemCatalog::RIDList colRidList = systemCatalogPtr->columnRIDs(userTableName);
WriteEngine::ColStruct colStruct;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<void*> colValuesList;
WriteEngine::RIDList ridList;
std::vector<WriteEngine::RIDList> ridLists;
DDLColumn column;
CalpontSystemCatalog::RIDList::const_iterator colrid_iterator = colRidList.begin();
while (colrid_iterator != colRidList.end())
{
WriteEngine::RID rid = (*colrid_iterator).rid;
ridList.push_back(rid);
++colrid_iterator;
}
ColumnList columns;
getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);
ColumnList::const_iterator column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
colStructs.push_back(colStruct);
cscColTypeList.push_back(column.colType);
++column_iterator;
}
colExtentsStruct.push_back(colStructs);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(ridList);
if (0 != colStructs.size() && 0 != ridLists[0].size())
{
int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
SYSCOLUMN_BASE);
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error == NO_ERROR)
rc = rc1;
else
rc = error;
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
catch (...)
{
err = "Unknown exception caught";
rc = 1;
}
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::deleteSyscolumnRow(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
;
int txnID;
string schema, tablename, columnname;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
bs >> columnname;
ddlpackage::QualifiedName sysCatalogTableName;
sysCatalogTableName.fSchema = CALPONT_SCHEMA;
sysCatalogTableName.fName = SYSCOLUMN_TABLE;
CalpontSystemCatalog::TableColName tableColName;
tableColName.schema = schema;
tableColName.table = tablename;
tableColName.column = columnname;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
uint16_t dbRoot;
BRM::OID_t sysOid = 1021;
// Find out where syscolumn is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
try
{
CalpontSystemCatalog::ROPair colRO = systemCatalogPtr->columnRID(tableColName);
if (colRO.objnum < 0)
{
err = "Column not found:" + tableColName.table + "." + tableColName.column;
throw std::runtime_error(err);
}
WriteEngine::ColStruct colStruct;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<void*> colValuesList;
WriteEngine::RIDList ridList;
std::vector<WriteEngine::RIDList> ridLists;
DDLColumn column;
ridList.push_back(colRO.rid);
ColumnList columns;
getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);
ColumnList::const_iterator column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
colStructs.push_back(colStruct);
cscColTypeList.push_back(column.colType);
++column_iterator;
}
colExtentsStruct.push_back(colStructs);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(ridList);
if (0 != colStructs.size() && 0 != ridLists[0].size())
{
int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
SYSCOLUMN_BASE);
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error == NO_ERROR)
rc = rc1;
else
rc = error;
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
catch (...)
{
err = "Unknown exception caught";
rc = 1;
}
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::deleteSystable(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
;
int txnID;
string schema, tablename;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
WriteEngine::WriteEngineWrapper writeEngine;
ddlpackage::QualifiedName sysCatalogTableName;
sysCatalogTableName.fSchema = CALPONT_SCHEMA;
sysCatalogTableName.fName = SYSTABLE_TABLE;
CalpontSystemCatalog::TableName userTableName;
userTableName.schema = schema;
userTableName.table = tablename;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
uint16_t dbRoot;
BRM::OID_t sysOid = 1001;
// Find out where systcolumn is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
fWEWrapper.setTransId(txnID);
fWEWrapper.startTransaction(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
try
{
CalpontSystemCatalog::ROPair userTableROPair = systemCatalogPtr->tableRID(userTableName);
if (userTableROPair.rid == std::numeric_limits<WriteEngine::RID>::max())
{
err = "RowID is not valid ";
throw std::runtime_error(err);
}
WriteEngine::ColStruct colStruct;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<void*> colValuesList;
WriteEngine::RIDList ridList;
std::vector<WriteEngine::RIDList> ridLists;
DDLColumn column;
ridList.push_back(userTableROPair.rid);
ColumnList columns;
getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);
ColumnList::const_iterator column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
colStructs.push_back(colStruct);
cscColTypeList.push_back(column.colType);
++column_iterator;
}
colExtentsStruct.push_back(colStructs);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(ridList);
if (0 != colStructs.size() && 0 != ridLists[0].size())
{
int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
SYSCOLUMN_BASE);
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error == NO_ERROR)
rc = rc1;
else
rc = error;
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
catch (...)
{
err = "Unknown exception caught";
rc = 1;
}
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::deleteSystables(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
;
int txnID;
string schema, tablename;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
WriteEngine::WriteEngineWrapper writeEngine;
ddlpackage::QualifiedName sysCatalogTableName;
sysCatalogTableName.fSchema = CALPONT_SCHEMA;
sysCatalogTableName.fName = SYSTABLE_TABLE;
CalpontSystemCatalog::TableName userTableName;
userTableName.schema = schema;
userTableName.table = tablename;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
WriteEngine::ColStruct colStruct;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<void*> colValuesList;
WriteEngine::RIDList ridList;
std::vector<WriteEngine::RIDList> ridLists;
DDLColumn column;
uint16_t dbRoot;
BRM::OID_t sysOid = 1003;
// Find out where systable is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
try
{
CalpontSystemCatalog::ROPair userTableROPair = systemCatalogPtr->tableRID(userTableName);
if (userTableROPair.rid == std::numeric_limits<WriteEngine::RID>::max())
{
err = "RowID is not valid ";
throw std::runtime_error(err);
}
ridList.push_back(userTableROPair.rid);
ColumnList columns;
getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);
ColumnList::const_iterator column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
colStructs.push_back(colStruct);
cscColTypeList.push_back(column.colType);
++column_iterator;
}
colExtentsStruct.push_back(colStructs);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(ridList);
{
int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
SYSCOLUMN_BASE);
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error == NO_ERROR)
rc = rc1;
else
rc = error;
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
catch (...)
{
err = "Unknown exception caught";
rc = 1;
}
if (rc != 0)
return rc;
// deleting from SYSCOLUMN
sysCatalogTableName.fSchema = CALPONT_SCHEMA;
sysCatalogTableName.fName = SYSCOLUMN_TABLE;
sysOid = 1021;
// Find out where syscolumn is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
try
{
CalpontSystemCatalog::RIDList colRidList = systemCatalogPtr->columnRIDs(userTableName);
colStructs.clear();
cscColTypeList.clear();
colExtentsStruct.clear();
colExtentsColType.clear();
colValuesList.clear();
ridList.clear();
ridLists.clear();
oids.clear();
DDLColumn column;
CalpontSystemCatalog::RIDList::const_iterator colrid_iterator = colRidList.begin();
while (colrid_iterator != colRidList.end())
{
WriteEngine::RID rid = (*colrid_iterator).rid;
ridList.push_back(rid);
++colrid_iterator;
}
ColumnList columns;
getColumnsForTable(sessionID, sysCatalogTableName.fSchema, sysCatalogTableName.fName, columns);
ColumnList::const_iterator column_iterator = columns.begin();
while (column_iterator != columns.end())
{
column = *column_iterator;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.colDataType = column.colType.colDataType;
colStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column.colType);
// oidsToFlush.push_back(colStruct.dataOid);
++column_iterator;
}
colExtentsStruct.push_back(colStructs);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(ridList);
if (0 != colStructs.size() && 0 != ridLists[0].size())
{
int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists,
SYSCOLUMN_BASE);
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(error, txnID, oids);
if ((error == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (error == NO_ERROR)
rc = rc1;
else
rc = error;
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
catch (...)
{
err = "Unknown exception caught";
rc = 1;
}
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::dropFiles(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t size, i;
uint32_t tmp32;
std::vector<int32_t> dataOids;
bs >> size;
for (i = 0; i < size; ++i)
{
bs >> tmp32;
dataOids.push_back(tmp32);
}
try
{
rc = fWEWrapper.dropFiles(0, dataOids);
}
catch (...)
{
err = "WE: Error removing files ";
rc = 1;
}
purgeFDCache();
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnAuto(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, tablename;
int txnID;
uint8_t tmp8;
bool autoIncrement = false;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
bs >> tmp8;
autoIncrement = true;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = tablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::RIDList roList;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
roList = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Build colStructs for SYSTABLE
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
DDLColumn column;
WriteEngine::ColTuple colTuple;
findColumnData(sessionID, tableName, AUTOINC_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.colDataType = column.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
string s1("y"), s2("n");
boost::any datavalue1 = s1;
boost::any datavalue2 = s2;
if (autoIncrement)
colTuple.data = datavalue1;
else
colTuple.data = datavalue2;
colStruct.colDataType = column.colType.colDataType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
for (unsigned int i = 0; i < roList.size(); i++)
{
aColList.push_back(colTuple);
}
colValuesList.push_back(aColList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
CalpontSystemCatalog::OID oid = 1021;
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.setTransId(txnID);
for (unsigned int i = 0; i < roList.size(); i++)
{
convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
if (extentsinfo.empty())
extentsinfo.push_back(aExtentinfo);
else if (extentsinfo.back() != aExtentinfo)
extentsinfo.push_back(aExtentinfo);
ridList.push_back(roList[i].rid);
}
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
if (idbdatafile::IDBPolicy::useHdfs())
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnNextvalCol(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, tablename;
int txnID;
uint8_t tmp8;
bool autoIncrement = false;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
bs >> tmp8;
autoIncrement = true;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = tablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::RIDList roList;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
roList = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Build colStructs for SYSTABLE
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
DDLColumn column;
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::ColTuple colTuple;
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
boost::any datavalue;
findColumnData(sessionID, tableName, AUTOINC_COL, column);
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.colDataType = column.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
string ystr("y");
string nstr("n");
if (autoIncrement)
colTuple.data = ystr;
else
colTuple.data = nstr;
colStruct.colDataType = column.colType.colDataType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
for (unsigned int i = 0; i < roList.size(); i++)
{
aColList.push_back(colTuple);
}
colValuesList.push_back(aColList);
// get start dbroot for this PM.
// int PMNum = Config::getLocalModuleID();
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
// oam.getDbroots(PMNum);
// dbRoot will be the first dbroot on this pm. dbrootCnt will be how many dbroots on this PM.
CalpontSystemCatalog::OID oid = 1021;
for (unsigned int i = 0; i < roList.size(); i++)
{
convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
if (extentsinfo.empty())
extentsinfo.push_back(aExtentinfo);
else if (extentsinfo.back() != aExtentinfo)
extentsinfo.push_back(aExtentinfo);
ridList.push_back(roList[i].rid);
}
std::vector<WriteEngine::RIDList> ridLists;
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
ridLists.push_back(ridList);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
fWEWrapper.setTransId(txnID);
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
}
if (idbdatafile::IDBPolicy::useHdfs())
{
fWEWrapper.flushDataFiles(rc, txnID, oids);
fWEWrapper.confirmTransaction(txnID);
if (rc == 0)
fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
systemCatalogPtr->flushCache();
purgeFDCache();
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, oldTablename, newTablename;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> oldTablename;
bs >> newTablename;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = oldTablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
WriteEngine::ColTuple colTuple;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::RIDList roList;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
roList = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Build colStructs for SYSTABLE
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
DDLColumn column;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::DctnryTuple dictTuple;
dictTuple.isNull = false;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) ||
(column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) ||
(column.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
column.colType.precision > 18)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = column.colType.colWidth;
}
colStruct.colDataType = column.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
for (unsigned int i = 0; i < roList.size(); i++)
{
aColList.push_back(colTuple);
}
colValuesList.push_back(aColList);
// It's the same string for each column, so we just need one dictionary struct
void* dictTuplePtr = static_cast<void*>(&dictTuple);
memset(dictTuplePtr, 0, sizeof(dictTuple));
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
dctColList = dictTuple;
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
CalpontSystemCatalog::OID oid = 1021;
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
for (unsigned int i = 0; i < roList.size(); i++)
{
convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
if (extentsinfo.empty())
extentsinfo.push_back(aExtentinfo);
else if (extentsinfo.back() != aExtentinfo)
extentsinfo.push_back(aExtentinfo);
ridList.push_back(roList[i].rid);
}
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
// cout << "rename:syscolumn is updated" << endl;
return rc;
}
uint8_t WE_DDLCommandProc::updateSystableAuto(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32, autoVal;
std::string schema, tablename;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tablename;
bs >> autoVal;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = tablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::ROPair ropair;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
ropair = systemCatalogPtr->tableRID(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
if (ropair.objnum < 0)
{
err = "No such table: " + tableName.table;
rc = 1;
return rc;
}
// now we have to prepare the various structures for the WE to update the column.
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
boost::any datavalue;
datavalue = autoVal;
WriteEngine::ColTuple colTuple;
// Build colStructs for SYSTABLE
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSTABLE_TABLE;
DDLColumn column;
findColumnData(sessionID, tableName, AUTOINC_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.colDataType = column.colType.colDataType;
colTuple.data = datavalue;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
colStructs.push_back(colStruct);
cscColTypeList.push_back(column.colType);
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
dctnryStructList.push_back(dctnryStruct);
aColList.push_back(colTuple);
colValuesList.push_back(aColList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
WriteEngine::DctnryTuple dctnryTuple;
dctColList = dctnryTuple;
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// In this case, there's only 1 row, so only one one extent, but keep it generic...
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
CalpontSystemCatalog::OID oid = 1003;
convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid);
ridList.push_back(ropair.rid);
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
extentsinfo.push_back(aExtentinfo);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
colExtentsColType.push_back(cscColTypeList);
dctnryExtentsStruct.push_back(dctnryStructList);
}
// call the write engine to update the row
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, oldTablename, newTablename;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> oldTablename;
bs >> newTablename;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = oldTablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::ROPair ropair;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
ropair = systemCatalogPtr->tableRID(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
if (ropair.objnum < 0)
{
err = "No such table: " + tableName.table;
return 1;
}
// now we have to prepare the various structures for the WE to update the column.
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
boost::any datavalue;
datavalue = newTablename;
WriteEngine::ColTuple colTuple;
// Build colStructs for SYSTABLE
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSTABLE_TABLE;
DDLColumn column;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = true;
colStruct.colDataType = column.colType.colDataType;
// Tokenize the data value
WriteEngine::DctnryStruct dictStruct;
dictStruct.dctnryOid = column.colType.ddn.dictOID;
dictStruct.columnOid = column.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
dictTuple.isNull = false;
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
aColList.push_back(colTuple);
colValuesList.push_back(aColList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
dctColList = dictTuple;
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// In this case, there's only 1 row, so only one one extent, but keep it generic...
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
CalpontSystemCatalog::OID oid = 1003;
convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid);
ridList.push_back(ropair.rid);
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
extentsinfo.push_back(aExtentinfo);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
if (rc != 0)
return rc;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
// cout << "rename:syscolumn is updated" << endl;
return rc;
}
uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
std::string schema, oldTablename, newTablename;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> oldTablename;
bs >> newTablename;
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = oldTablename;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
CalpontSystemCatalog::ROPair ropair;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
//@bug 4592 Error handling for syscat call
try
{
ropair = systemCatalogPtr->tableRID(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
if (ropair.objnum < 0)
{
err = "No such table: " + tableName.table;
return 1;
}
// now we have to prepare the various structures for the WE to update the column.
std::vector<WriteEngine::RID> ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
boost::any datavalue;
datavalue = newTablename;
WriteEngine::ColTuple colTuple;
// Build colStructs for SYSTABLE
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSTABLE_TABLE;
DDLColumn column;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = true;
colStruct.colDataType = column.colType.colDataType;
// Tokenize the data value
WriteEngine::DctnryStruct dictStruct;
dictStruct.dctnryOid = column.colType.ddn.dictOID;
dictStruct.columnOid = column.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
dictTuple.isNull = false;
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
// int error = NO_ERROR;
// if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple)))
//{
// WErrorCodes ec;
// throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
//}
// WriteEngine::Token aToken = dictTuple.token;
// colTuple.data = aToken;
// cout << "token value for new table name is op:fbo = " << aToken.op <<":" << aToken.fbo << " null flag = "
// << (uint32_t)dictTuple.isNull<< endl;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
aColList.push_back(colTuple);
colValuesList.push_back(aColList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
dctColList = dictTuple;
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// In this case, there's only 1 row, so only one one extent, but keep it generic...
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
CalpontSystemCatalog::OID oid = 1003;
convertRidToColumn(ropair.rid, dbRoot, partition, segment, oid);
ridList.push_back(ropair.rid);
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
extentsinfo.push_back(aExtentinfo);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
if (rc != 0)
return rc;
}
// cout << "rename:systable is updated to " << newTablename << " for rid " << ropair.rid << endl;
// Update SYSCOLUMN table
tableName.schema = schema;
tableName.table = oldTablename;
dctnryStructList.clear();
dctnryValueList.clear();
dctRowList.clear();
CalpontSystemCatalog::RIDList roList;
try
{
roList = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Build colStructs for SYSCOLUMN
ridList.clear();
colValuesList.clear();
aColList.clear();
colStructs.clear();
cscColTypeList.clear();
colOldValuesList.clear();
oids.clear();
tableName.schema = CALPONT_SCHEMA;
tableName.table = SYSCOLUMN_TABLE;
findColumnData(sessionID, tableName, TABLENAME_COL, column);
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) ||
(column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) ||
(column.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
column.colType.precision > 18)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = column.colType.colWidth;
}
colStruct.colDataType = column.colType.colDataType;
// Tokenize the data value
dictStruct.dctnryOid = column.colType.ddn.dictOID;
dictStruct.columnOid = column.colType.columnOID;
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
/*
if (NO_ERROR != (error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple)))
{
WErrorCodes ec;
throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
}
aToken = dictTuple.token;
colTuple.data = aToken; */
colStruct.colDataType = column.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(column.colType);
for (unsigned int i = 0; i < roList.size(); i++)
{
aColList.push_back(colTuple);
}
colValuesList.push_back(aColList);
// It's the same string for each column, so we just need one dictionary struct
void* dictTuplePtr = static_cast<void*>(&dictTuple);
memset(dictTuplePtr, 0, sizeof(dictTuple));
dictTuple.sigValue = (unsigned char*)newTablename.c_str();
dictTuple.sigSize = newTablename.length();
dictTuple.isNull = false;
dctColList = dictTuple;
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
extentsinfo.clear();
colExtentsStruct.clear();
colExtentsColType.clear();
dctnryExtentsStruct.clear();
oid = 1021;
for (unsigned int i = 0; i < roList.size(); i++)
{
convertRidToColumn(roList[i].rid, dbRoot, partition, segment, oid);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
if (extentsinfo.empty())
extentsinfo.push_back(aExtentinfo);
else if (extentsinfo.back() != aExtentinfo)
extentsinfo.push_back(aExtentinfo);
ridList.push_back(roList[i].rid);
}
ridLists.clear();
ridLists.push_back(ridList);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList,
ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE);
if (rc != NO_ERROR)
{
// build the logging message
err = "WE: Update failed on: " + tableName.table;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
// cout << "rename:syscolumn is updated" << endl;
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnColumnposCol(messageqcpp::ByteStream& bs, std::string& err)
{
int rc = 0;
int colPos;
string schema, atableName;
uint32_t sessionID, tmp32;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> atableName;
bs >> tmp32;
colPos = tmp32;
WriteEngine::RIDList ridList;
WriteEngine::ColValueList colValuesList;
WriteEngine::ColValueList colOldValuesList;
CalpontSystemCatalog::TableName tableName;
tableName.table = atableName;
tableName.schema = schema;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::RIDList rids;
try
{
rids = systemCatalogPtr->columnRIDs(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
CalpontSystemCatalog::RIDList::const_iterator rid_iter = rids.begin();
boost::any value;
WriteEngine::ColTupleList colTuples;
CalpontSystemCatalog::ColType columnType;
CalpontSystemCatalog::ROPair colRO;
// cout << "colpos is " << colPos << endl;
try
{
while (rid_iter != rids.end())
{
// look up colType
colRO = *rid_iter;
columnType = systemCatalogPtr->colType(colRO.objnum);
if (columnType.colPosition < colPos)
{
++rid_iter;
continue;
}
ridList.push_back(colRO.rid);
value = columnType.colPosition - 1;
WriteEngine::ColTuple colTuple;
colTuple.data = value;
colTuples.push_back(colTuple);
++rid_iter;
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
colValuesList.push_back(colTuples);
uint16_t dbRoot;
BRM::OID_t sysOid = 1021;
// Find out where systable is
rc = fDbrm.getSysCatDBRoot(sysOid, dbRoot);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
if (colTuples.size() > 0)
{
WriteEngine::ColStructList colStructs;
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::CSCTypesList cscColTypeList;
CalpontSystemCatalog::ColType colType;
// Build column structure for COLUMNPOS_COL
colType.columnOID = colStruct.dataOid = OID_SYSCOLUMN_COLUMNPOS;
colType.colWidth = colStruct.colWidth = 4;
colStruct.tokenFlag = false;
colType.colDataType = colStruct.colDataType = CalpontSystemCatalog::INT;
colStruct.fColDbRoot = dbRoot;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
}
colStructs.push_back(colStruct);
cscColTypeList.push_back(colType);
oids[colStruct.dataOid] = colStruct.dataOid;
// oidsToFlush.push_back(colStruct.dataOid);
rc = fWEWrapper.updateColumnRecs(txnID, cscColTypeList, colStructs, colValuesList, ridList,
SYSCOLUMN_BASE);
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::fillNewColumn(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tmp32;
uint8_t tmp8;
int txnID;
OID dataOid, dictOid, refColOID;
CalpontSystemCatalog::ColDataType dataType, refColDataType;
bool autoincrement;
int dataWidth, scale, precision, compressionType, refColWidth, refCompressionType;
string defaultValStr;
ColTuple defaultVal;
long timeZone;
bs >> tmp32;
txnID = tmp32;
bs >> tmp32;
dataOid = tmp32;
bs >> tmp32;
dictOid = tmp32;
bs >> tmp8;
dataType = (CalpontSystemCatalog::ColDataType)tmp8;
bs >> tmp8;
autoincrement = (tmp8 != 0);
bs >> tmp32;
dataWidth = tmp32;
bs >> tmp32;
scale = tmp32;
bs >> tmp32;
precision = tmp32;
bs >> defaultValStr;
bs >> tmp8;
compressionType = tmp8;
bs >> tmp32;
refColOID = tmp32;
bs >> tmp8;
refColDataType = (CalpontSystemCatalog::ColDataType)tmp8;
bs >> tmp32;
refColWidth = tmp32;
bs >> tmp8;
refCompressionType = tmp8;
messageqcpp::ByteStream::octbyte timeZoneTemp;
bs >> timeZoneTemp;
timeZone = timeZoneTemp;
// Find the fill in value
bool isNULL = false;
if (defaultValStr == "")
isNULL = true;
CalpontSystemCatalog::ColType colType;
colType.colDataType = static_cast<CalpontSystemCatalog::ColDataType>(dataType);
colType.colWidth = dataWidth;
colType.scale = scale;
colType.precision = precision;
bool pushWarning = false;
defaultVal.data = colType.convertColumnData(defaultValStr, pushWarning, timeZone, isNULL, false, false);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
std::map<uint32_t, uint32_t> oids;
oids[dataOid] = dataOid;
oids[refColOID] = refColOID;
rc = fWEWrapper.fillColumn(txnID, dataOid, colType, defaultVal, refColOID, refColDataType, refColWidth,
refCompressionType, isNULL, compressionType, defaultValStr, dictOid,
autoincrement);
if (rc != 0)
{
WErrorCodes ec;
err = ec.errorString(rc);
}
purgeFDCache();
return rc;
}
uint8_t WE_DDLCommandProc::writeTruncateLog(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tableOid, numOid, tmp32;
bs >> tableOid;
bs >> numOid;
std::vector<uint32_t> oids;
for (uint32_t i = 0; i < numOid; i++)
{
bs >> tmp32;
oids.push_back(tmp32);
}
string prefix;
config::Config* config = config::Config::makeConfig();
prefix = config->getConfig("SystemConfig", "DBRMRoot");
if (prefix.length() == 0)
{
err = "Need a valid DBRMRoot entry in Calpont configuation file";
rc = 1;
return rc;
}
uint64_t pos = prefix.find_last_of("/");
std::string DDLLogFileName;
if (pos != string::npos)
{
DDLLogFileName = prefix.substr(0, pos + 1); // Get the file path
}
else
{
err = "Cannot find the dbrm directory for the DDL log file";
rc = 1;
return rc;
}
std::ostringstream oss;
oss << tableOid;
DDLLogFileName += "DDL_TRUNCATETABLE_Log_" + oss.str();
boost::scoped_ptr<idbdatafile::IDBDataFile> DDLLogFile(IDBDataFile::open(
IDBPolicy::getType(DDLLogFileName.c_str(), IDBPolicy::WRITEENG), DDLLogFileName.c_str(), "w", 0));
if (!DDLLogFile)
{
err = "DDL truncate table log file cannot be created";
rc = 1;
return rc;
}
std::ostringstream buf;
for (unsigned i = 0; i < oids.size(); i++)
buf << oids[i] << std::endl;
std::string tmp(buf.str());
DDLLogFile->write(tmp.c_str(), tmp.size());
// DDLLogFile is a scoped_ptr, will be closed after return.
return rc;
}
uint8_t WE_DDLCommandProc::writeDropPartitionLog(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tableOid, numParts, numOid, tmp32;
bs >> tableOid;
std::set<BRM::LogicalPartition> partitionNums;
bs >> numParts;
BRM::LogicalPartition lp;
for (uint32_t i = 0; i < numParts; i++)
{
lp.unserialize(bs);
partitionNums.insert(lp);
}
bs >> numOid;
std::vector<uint32_t> oids;
for (uint32_t i = 0; i < numOid; i++)
{
bs >> tmp32;
oids.push_back(tmp32);
}
string prefix;
config::Config* config = config::Config::makeConfig();
prefix = config->getConfig("SystemConfig", "DBRMRoot");
if (prefix.length() == 0)
{
err = "Need a valid DBRMRoot entry in Calpont configuation file";
rc = 1;
return rc;
}
uint64_t pos = prefix.find_last_of("/");
std::string DDLLogFileName;
if (pos != string::npos)
{
DDLLogFileName = prefix.substr(0, pos + 1); // Get the file path
}
else
{
err = "Cannot find the dbrm directory for the DDL drop partitions log file";
rc = 1;
return rc;
}
std::ostringstream oss;
oss << tableOid;
DDLLogFileName += "DDL_DROPPARTITION_Log_" + oss.str();
boost::scoped_ptr<idbdatafile::IDBDataFile> DDLLogFile(IDBDataFile::open(
IDBPolicy::getType(DDLLogFileName.c_str(), IDBPolicy::WRITEENG), DDLLogFileName.c_str(), "w", 0));
if (!DDLLogFile)
{
err = "DDL drop partitions log file cannot be created";
rc = 1;
return rc;
}
std::ostringstream buf;
// @SN write partition numbers to the log file, separated by space
set<BRM::LogicalPartition>::const_iterator it;
for (it = partitionNums.begin(); it != partitionNums.end(); ++it)
buf << (*it) << endl;
// -1 indicates the end of partition list
BRM::LogicalPartition end(-1, -1, -1);
buf << end << endl;
for (unsigned i = 0; i < oids.size(); i++)
buf << oids[i] << std::endl;
std::string tmp(buf.str());
DDLLogFile->write(tmp.c_str(), tmp.size());
return rc;
}
uint8_t WE_DDLCommandProc::writeDropTableLog(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tableOid, numOid, tmp32;
bs >> tableOid;
bs >> numOid;
std::vector<uint32_t> oids;
for (uint32_t i = 0; i < numOid; i++)
{
bs >> tmp32;
oids.push_back(tmp32);
}
string prefix;
config::Config* config = config::Config::makeConfig();
prefix = config->getConfig("SystemConfig", "DBRMRoot");
if (prefix.length() == 0)
{
err = "Need a valid DBRMRoot entry in Calpont configuation file";
rc = 1;
return rc;
}
uint64_t pos = prefix.find_last_of("/");
std::string DDLLogFileName;
if (pos != string::npos)
{
DDLLogFileName = prefix.substr(0, pos + 1); // Get the file path
}
else
{
err = "Cannot find the dbrm directory for the DDL drop partitions log file";
rc = 1;
return rc;
}
std::ostringstream oss;
oss << tableOid;
DDLLogFileName += "DDL_DROPTABLE_Log_" + oss.str();
boost::scoped_ptr<idbdatafile::IDBDataFile> DDLLogFile(IDBDataFile::open(
IDBPolicy::getType(DDLLogFileName.c_str(), IDBPolicy::WRITEENG), DDLLogFileName.c_str(), "w", 0));
if (!DDLLogFile)
{
err = "DDL drop table log file cannot be created";
rc = 1;
return rc;
}
std::ostringstream buf;
for (unsigned i = 0; i < oids.size(); i++)
buf << oids[i] << std::endl;
std::string tmp(buf.str());
DDLLogFile->write(tmp.c_str(), tmp.size());
return rc;
}
uint8_t WE_DDLCommandProc::deleteDDLLog(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tableOid, fileType;
bs >> fileType;
bs >> tableOid;
string prefix;
config::Config* config = config::Config::makeConfig();
prefix = config->getConfig("SystemConfig", "DBRMRoot");
if (prefix.length() == 0)
{
err = "Need a valid DBRMRoot entry in Calpont configuation file";
rc = 1;
return rc;
}
uint64_t pos = prefix.find_last_of("/");
std::string DDLLogFileName;
if (pos != string::npos)
{
DDLLogFileName = prefix.substr(0, pos + 1); // Get the file path
}
else
{
err = "Cannot find the dbrm directory for the DDL drop partitions log file";
rc = 1;
return rc;
}
std::ostringstream oss;
oss << tableOid;
switch (fileType)
{
case DROPTABLE_LOG:
{
DDLLogFileName += "DDL_DROPTABLE_Log_" + oss.str();
break;
}
case DROPPART_LOG:
{
DDLLogFileName += "DDL_DROPPARTITION_Log_" + oss.str();
break;
}
case TRUNCATE_LOG:
{
DDLLogFileName += "DDL_TRUNCATETABLE_Log_" + oss.str();
break;
}
default: break;
}
IDBPolicy::remove(DDLLogFileName.c_str());
return rc;
}
uint8_t WE_DDLCommandProc::fetchDDLLog(ByteStream& bs, std::string& err)
{
int rc = 0;
// Find the ddl log files under DBRMRoot directory
string prefix, ddlLogDir;
config::Config* config = config::Config::makeConfig();
prefix = config->getConfig("SystemConfig", "DBRMRoot");
if (prefix.length() == 0)
{
rc = 1;
err = "Need a valid DBRMRoot entry in Calpont configuation file";
return rc;
}
uint64_t pos = prefix.find_last_of("/");
if (pos != string::npos)
{
ddlLogDir = prefix.substr(0, pos + 1); // Get the file path
}
else
{
rc = 1;
err = "Cannot find the dbrm directory for the DDL log file";
return rc;
}
boost::filesystem::path filePath;
filePath = fs::system_complete(fs::path(ddlLogDir));
if (!fs::exists(filePath))
{
rc = 1;
err = "\nDDL log file path is Not found: ";
return rc;
}
std::vector<string> fileNames;
if (fs::is_directory(filePath))
{
fs::directory_iterator end_iter;
for (fs::directory_iterator dir_itr(filePath); dir_itr != end_iter; ++dir_itr)
{
try
{
if (!fs::is_directory(*dir_itr))
{
#if BOOST_VERSION >= 105200
fileNames.push_back(dir_itr->path().generic_string());
#else
fileNames.push_back(dir_itr->string());
#endif
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
}
CalpontSystemCatalog::OID fileoid;
string tableName;
bs.restart();
for (unsigned i = 0; i < fileNames.size(); i++)
{
pos = fileNames[i].find("DDL_DROPTABLE_Log_");
if (pos != string::npos)
{
// Read the file to get oids
// cout << "Found file " << fileNames[i] << endl;
boost::scoped_ptr<idbdatafile::IDBDataFile> ddlLogFile(IDBDataFile::open(
IDBPolicy::getType(fileNames[i].c_str(), IDBPolicy::WRITEENG), fileNames[i].c_str(), "r", 0));
if (!ddlLogFile)
continue;
// find the table oid
pos = fileNames[i].find_last_of("_");
string tableOidStr = fileNames[i].substr(pos + 1, fileNames[i].length() - pos - 1);
char* ep = NULL;
uint32_t tableOid = strtoll(tableOidStr.c_str(), &ep, 10);
bs << tableOid;
bs << (uint32_t)DROPTABLE_LOG;
std::vector<CalpontSystemCatalog::OID> oidList;
ssize_t fileSize = ddlLogFile->size();
boost::scoped_array<char> buf(new char[fileSize]);
if (ddlLogFile->read(buf.get(), fileSize) != fileSize)
return (uint8_t)ERR_FILE_READ;
std::istringstream strbuf(string(buf.get(), fileSize));
while (strbuf >> fileoid)
oidList.push_back(fileoid);
bs << (uint32_t)oidList.size();
for (unsigned j = 0; j < oidList.size(); j++)
{
bs << (uint32_t)oidList[j];
}
bs << (uint32_t)0;
}
else // Find drop partition log file
{
pos = fileNames[i].find("DDL_DROPPARTITION_Log_");
if (pos != string::npos)
{
boost::scoped_ptr<idbdatafile::IDBDataFile> ddlLogFile(IDBDataFile::open(
IDBPolicy::getType(fileNames[i].c_str(), IDBPolicy::WRITEENG), fileNames[i].c_str(), "r", 0));
BRM::LogicalPartition partition;
vector<BRM::LogicalPartition> partitionNums;
// find the table oid
pos = fileNames[i].find_last_of("_");
string tableOidStr = fileNames[i].substr(pos + 1, fileNames[i].length() - pos - 1);
char* ep = NULL;
uint32_t tableOid = strtoll(tableOidStr.c_str(), &ep, 10);
bs << tableOid;
bs << (uint32_t)DROPPART_LOG;
ssize_t fileSize = ddlLogFile->size();
boost::scoped_array<char> buf(new char[fileSize]);
if (ddlLogFile->read(buf.get(), fileSize) != fileSize)
return (uint8_t)ERR_FILE_READ;
std::istringstream strbuf(string(buf.get(), fileSize));
while (strbuf >> partition)
{
if (partition.dbroot == (uint16_t)-1)
break;
partitionNums.push_back(partition);
}
std::vector<CalpontSystemCatalog::OID> oidPartList;
while (strbuf >> fileoid)
oidPartList.push_back(fileoid);
bs << (uint32_t)oidPartList.size();
for (unsigned j = 0; j < oidPartList.size(); j++)
{
bs << (uint32_t)oidPartList[j];
}
bs << (uint32_t)partitionNums.size();
for (unsigned j = 0; j < partitionNums.size(); j++)
{
partitionNums[j].serialize(bs);
}
}
else // find truncate table log file
{
pos = fileNames[i].find("DDL_TRUNCATETABLE_Log_");
if (pos != string::npos)
{
boost::scoped_ptr<idbdatafile::IDBDataFile> ddlLogFile(IDBDataFile::open(
IDBPolicy::getType(fileNames[i].c_str(), IDBPolicy::WRITEENG), fileNames[i].c_str(), "r", 0));
if (!ddlLogFile)
{
continue;
}
// find the table oid
pos = fileNames[i].find_last_of("_");
string tableOidStr = fileNames[i].substr(pos + 1, fileNames[i].length() - pos - 1);
char* ep = NULL;
uint32_t tableOid = strtoll(tableOidStr.c_str(), &ep, 10);
bs << tableOid;
bs << (uint32_t)TRUNCATE_LOG;
std::vector<CalpontSystemCatalog::OID> oidList;
ssize_t fileSize = ddlLogFile->size();
boost::scoped_array<char> buf(new char[fileSize]);
if (ddlLogFile->read(buf.get(), fileSize) != fileSize)
return (uint8_t)ERR_FILE_READ;
std::istringstream strbuf(string(buf.get(), fileSize));
while (strbuf >> fileoid)
oidList.push_back(fileoid);
bs << (uint32_t)oidList.size();
for (unsigned j = 0; j < oidList.size(); j++)
{
bs << (uint32_t)oidList[j];
}
bs << (uint32_t)0;
}
}
}
}
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs, std::string& err)
{
// Will update five columns: columnname, defaultvalue, nullable, autoincrement, nextvalue.
int rc = 0;
uint32_t tmp32;
string schema, tableName, colName, defaultvalue;
int txnID;
uint32_t sessionID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tableName;
bs >> colName;
bs >> defaultvalue;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::TableName atableName;
CalpontSystemCatalog::TableColName tableColName;
tableColName.schema = schema;
tableColName.table = tableName;
tableColName.column = colName;
CalpontSystemCatalog::ROPair ropair;
try
{
ropair = systemCatalogPtr->columnRID(tableColName);
if (ropair.objnum < 0)
{
ostringstream oss;
oss << "No such column: " << tableColName;
throw std::runtime_error(oss.str().c_str());
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
catch (...)
{
err = "renameColumn:Unknown exception caught";
rc = 1;
return rc;
}
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
std::vector<WriteEngine::RID> ridList;
ridList.push_back(ropair.rid);
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList1;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
WriteEngine::ColTuple colTuple;
// Build colStructs for SYSCOLUMN
atableName.schema = CALPONT_SCHEMA;
atableName.table = SYSCOLUMN_TABLE;
DDLColumn column;
findColumnData(sessionID, atableName, DEFAULTVAL_COL, column); // DEFAULTVAL_COL column
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.setTransId(txnID);
fWEWrapper.startTransaction(txnID);
// Build DEFAULTVAL_COL structure
WriteEngine::ColTupleList aColList;
colStruct.dataOid = column.oid;
colStruct.colWidth = column.colType.colWidth > 8 ? 8 : column.colType.colWidth;
colStruct.tokenFlag = false;
if ((column.colType.colDataType == CalpontSystemCatalog::CHAR && column.colType.colWidth > 8) ||
(column.colType.colDataType == CalpontSystemCatalog::VARCHAR && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::VARBINARY && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::BLOB && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::TEXT && column.colType.colWidth > 7) ||
(column.colType.colDataType == CalpontSystemCatalog::DECIMAL && column.colType.precision > 18) ||
(column.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
column.colType.precision > 18)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = column.colType.colWidth;
}
colStruct.colDataType = column.colType.colDataType;
if (colStruct.tokenFlag)
{
WriteEngine::DctnryStruct dictStruct;
dictStruct.dctnryOid = column.colType.ddn.dictOID;
dictStruct.columnOid = column.colType.columnOID;
if (defaultvalue.length() <= 0) // null token
{
WriteEngine::Token nullToken;
colTuple.data = nullToken;
}
else
{
WriteEngine::DctnryTuple dictTuple;
dictTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dictTuple.sigSize = defaultvalue.length();
dictTuple.isNull = false;
int error = NO_ERROR;
if (NO_ERROR !=
(error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple, false))) // @bug 5572 HDFS tmp file
{
WErrorCodes ec;
throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
}
WriteEngine::Token aToken = dictTuple.token;
colTuple.data = aToken;
}
}
colStruct.colDataType = column.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
dctnryStructList.push_back(dctnryStruct);
aColList.push_back(colTuple);
colValuesList.push_back(aColList);
WriteEngine::DctnryTuple dctnryTuple;
if (defaultvalue.length() > 0)
{
dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dctnryTuple.sigSize = defaultvalue.length();
dctnryTuple.isNull = false;
}
else
{
dctnryTuple.isNull = true;
}
dctColList = dctnryTuple;
dctRowList.clear();
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
// In this case, there's only 1 row, so only one one extent, but keep it generic...
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
convertRidToColumn(ropair.rid, dbRoot, partition, segment, 1021);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
extentsinfo.push_back(aExtentinfo);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
if (NO_ERROR != fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList,
colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList,
SYSCOLUMN_BASE))
{
err = "WE: Update failed on: " + atableName.table;
rc = 1;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
// flush syscat cahche
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& bs, std::string& err)
{
// Will update five columns: columnname, defaultvalue, nullable, autoincrement, nextvalue.
int rc = 0;
uint64_t nextVal;
uint32_t tmp32, nullable;
string schema, tableName, colOldname, autoinc, colNewName, defaultvalue;
int txnID;
uint32_t sessionID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
bs >> schema;
bs >> tableName;
bs >> colOldname;
bs >> colNewName;
bs >> autoinc;
bs >> nextVal;
bs >> nullable;
bs >> defaultvalue;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::TableName atableName;
CalpontSystemCatalog::TableColName tableColName;
tableColName.schema = schema;
tableColName.table = tableName;
tableColName.column = colOldname;
CalpontSystemCatalog::ROPair ropair;
try
{
ropair = systemCatalogPtr->columnRID(tableColName);
if (ropair.objnum < 0)
{
ostringstream oss;
oss << "No such column: " << tableColName;
throw std::runtime_error(oss.str().c_str());
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
catch (...)
{
err = "renameColumn:Unknown exception caught";
rc = 1;
return rc;
}
uint16_t dbRoot = 0;
uint16_t segment;
uint32_t partition;
std::vector<WriteEngine::RID> ridList;
ridList.push_back(ropair.rid);
WriteEngine::ColValueList colValuesList;
WriteEngine::ColTupleList aColList1;
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
std::vector<void*> colOldValuesList;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::DctColTupleList dctRowList;
WriteEngine::DctnryTuple dctColList;
boost::any datavalue;
datavalue = colNewName;
WriteEngine::ColTuple colTuple;
// Build colStructs for SYSCOLUMN
atableName.schema = CALPONT_SCHEMA;
atableName.table = SYSCOLUMN_TABLE;
DDLColumn column1, column2, column3, column4, column5;
findColumnData(sessionID, atableName, COLNAME_COL, column1); // COLNAME_COL column
findColumnData(sessionID, atableName, AUTOINC_COL, column2); // AUTOINC_COL column
findColumnData(sessionID, atableName, NEXTVALUE_COL, column3); // NEXTVALUE_COL column
findColumnData(sessionID, atableName, NULLABLE_COL, column4); // NULLABLE_COL column
findColumnData(sessionID, atableName, DEFAULTVAL_COL, column5); // DEFAULTVAL_COL column
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(txnID);
// Build COLNAME_COL structure
colStruct.dataOid = column1.oid;
colStruct.colWidth = column1.colType.colWidth > 8 ? 8 : column1.colType.colWidth;
colStruct.tokenFlag = false;
if ((column1.colType.colDataType == CalpontSystemCatalog::CHAR && column1.colType.colWidth > 8) ||
(column1.colType.colDataType == CalpontSystemCatalog::VARCHAR && column1.colType.colWidth > 7) ||
(column1.colType.colDataType == CalpontSystemCatalog::VARBINARY && column1.colType.colWidth > 7) ||
(column1.colType.colDataType == CalpontSystemCatalog::BLOB && column1.colType.colWidth > 7) ||
(column1.colType.colDataType == CalpontSystemCatalog::TEXT && column1.colType.colWidth > 7) ||
(column1.colType.colDataType == CalpontSystemCatalog::DECIMAL && column1.colType.precision > 18) ||
(column1.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
column1.colType.precision > 18)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = column1.colType.colWidth;
}
colStruct.colDataType = column1.colType.colDataType;
if (colStruct.tokenFlag)
{
WriteEngine::DctnryStruct dictStruct;
if (idbdatafile::IDBPolicy::useHdfs())
{
dictStruct.fCompressionType = 2;
}
dictStruct.dctnryOid = column1.colType.ddn.dictOID;
dictStruct.columnOid = column1.colType.columnOID;
WriteEngine::DctnryTuple dictTuple;
dictTuple.sigValue = (unsigned char*)colNewName.c_str();
dictTuple.sigSize = colNewName.length();
dictTuple.isNull = false;
int error = NO_ERROR;
if (NO_ERROR !=
(error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple, false))) // @bug 5572 HDFS tmp file
{
WErrorCodes ec;
err = ec.errorString(error);
rc = error;
return rc;
}
WriteEngine::Token aToken = dictTuple.token;
colTuple.data = aToken;
}
else
colTuple.data = datavalue;
colStruct.colDataType = column1.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column1.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column1.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column1.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
dctnryStructList.push_back(dctnryStruct);
aColList1.push_back(colTuple);
colValuesList.push_back(aColList1);
WriteEngine::DctnryTuple dctnryTuple;
boost::to_lower(colNewName);
dctnryTuple.sigValue = (unsigned char*)colNewName.c_str();
dctnryTuple.sigSize = colNewName.length();
dctnryTuple.isNull = false;
dctColList = dctnryTuple;
dctRowList.clear();
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// Build AUTOINC_COL structure
WriteEngine::ColTupleList aColList2;
colStruct.dataOid = column2.oid;
colStruct.colWidth = column2.colType.colWidth > 8 ? 8 : column2.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.colDataType = column2.colType.colDataType;
colTuple.data = autoinc;
colStruct.colDataType = column2.colType.colDataType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column2.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
dctnryStructList.push_back(dctnryStruct);
aColList2.push_back(colTuple);
colValuesList.push_back(aColList2);
dctnryTuple.isNull = true;
dctColList = dctnryTuple;
dctRowList.clear();
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// Build NEXTVALUE_COL structure
WriteEngine::ColTupleList aColList3;
colStruct.dataOid = column3.oid;
colStruct.colWidth = column3.colType.colWidth > 8 ? 8 : column3.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.colDataType = column3.colType.colDataType;
colTuple.data = nextVal;
colStruct.colDataType = column3.colType.colDataType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column3.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
dctnryStructList.push_back(dctnryStruct);
aColList3.push_back(colTuple);
colValuesList.push_back(aColList3);
dctnryTuple.isNull = true;
dctColList = dctnryTuple;
dctRowList.clear();
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// Build NULLABLE_COL structure
WriteEngine::ColTupleList aColList4;
colStruct.dataOid = column4.oid;
colStruct.colWidth = column4.colType.colWidth > 8 ? 8 : column4.colType.colWidth;
colStruct.tokenFlag = false;
colStruct.colDataType = column4.colType.colDataType;
colTuple.data = nullable;
colStruct.colDataType = column4.colType.colDataType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
colStructs.push_back(colStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column4.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
dctnryStructList.push_back(dctnryStruct);
aColList4.push_back(colTuple);
colValuesList.push_back(aColList4);
dctnryTuple.isNull = true;
dctColList = dctnryTuple;
dctRowList.clear();
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
// Build DEFAULTVAL_COL structure
WriteEngine::ColTupleList aColList5;
colStruct.dataOid = column5.oid;
colStruct.colWidth = column5.colType.colWidth > 8 ? 8 : column5.colType.colWidth;
colStruct.tokenFlag = false;
if ((column5.colType.colDataType == CalpontSystemCatalog::CHAR && column5.colType.colWidth > 8) ||
(column5.colType.colDataType == CalpontSystemCatalog::VARCHAR && column5.colType.colWidth > 7) ||
(column5.colType.colDataType == CalpontSystemCatalog::VARBINARY && column5.colType.colWidth > 7) ||
(column5.colType.colDataType == CalpontSystemCatalog::BLOB && column5.colType.colWidth > 7) ||
(column5.colType.colDataType == CalpontSystemCatalog::TEXT && column5.colType.colWidth > 7) ||
(column5.colType.colDataType == CalpontSystemCatalog::DECIMAL && column5.colType.precision > 18) ||
(column5.colType.colDataType == CalpontSystemCatalog::UDECIMAL &&
column5.colType.precision > 18)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = column5.colType.colWidth;
}
colStruct.colDataType = column5.colType.colDataType;
if (colStruct.tokenFlag)
{
WriteEngine::DctnryStruct dictStruct;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dictStruct.fCompressionType = 2;
}
dictStruct.dctnryOid = column5.colType.ddn.dictOID;
dictStruct.columnOid = column5.colType.columnOID;
if (defaultvalue.length() <= 0) // null token
{
WriteEngine::Token nullToken;
colTuple.data = nullToken;
}
else
{
WriteEngine::DctnryTuple dictTuple;
dictTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dictTuple.sigSize = defaultvalue.length();
dictTuple.isNull = false;
int error = NO_ERROR;
if (NO_ERROR !=
(error = fWEWrapper.tokenize(txnID, dictStruct, dictTuple, false))) // @bug 5572 HDFS tmp file
{
WErrorCodes ec;
throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
}
WriteEngine::Token aToken = dictTuple.token;
colTuple.data = aToken;
}
}
fWEWrapper.flushDataFiles(rc, txnID, oids);
colStruct.colDataType = column5.colType.colDataType;
if (idbdatafile::IDBPolicy::useHdfs())
{
colStruct.fCompressionType = 2;
dctnryStruct.fCompressionType = 2;
}
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = column5.colType.ddn.dictOID;
dctnryStruct.fCharsetNumber = column5.colType.charsetNumber;
dctnryStruct.columnOid = colStruct.dataOid;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
oids[colStruct.dataOid] = colStruct.dataOid;
cscColTypeList.push_back(column5.colType);
// oidsToFlush.push_back(colStruct.dataOid);
if (dctnryStruct.dctnryOid > 0)
{
oids[dctnryStruct.dctnryOid] = dctnryStruct.dctnryOid;
// oidsToFlush.push_back(dctnryStruct.dctnryOid);
}
aColList5.push_back(colTuple);
colValuesList.push_back(aColList5);
if (defaultvalue.length() > 0)
{
dctnryTuple.sigValue = (unsigned char*)defaultvalue.c_str();
dctnryTuple.sigSize = defaultvalue.length();
dctnryTuple.isNull = false;
}
else
{
dctnryTuple.isNull = true;
}
dctColList = dctnryTuple;
dctRowList.clear();
dctRowList.push_back(dctColList);
dctnryValueList.push_back(dctRowList);
std::vector<WriteEngine::ColStructList> colExtentsStruct;
std::vector<WriteEngine::CSCTypesList> colExtentsColType;
std::vector<WriteEngine::DctnryStructList> dctnryExtentsStruct;
std::vector<WriteEngine::RIDList> ridLists;
ridLists.push_back(ridList);
// In this case, there's only 1 row, so only one one extent, but keep it generic...
std::vector<extentInfo> extentsinfo;
extentInfo aExtentinfo;
convertRidToColumn(ropair.rid, dbRoot, partition, segment, 1021);
aExtentinfo.dbRoot = dbRoot;
aExtentinfo.partition = partition;
aExtentinfo.segment = segment;
extentsinfo.push_back(aExtentinfo);
// build colExtentsStruct
for (unsigned i = 0; i < extentsinfo.size(); i++)
{
for (unsigned j = 0; j < colStructs.size(); j++)
{
colStructs[j].fColPartition = extentsinfo[i].partition;
colStructs[j].fColSegment = extentsinfo[i].segment;
colStructs[j].fColDbRoot = extentsinfo[i].dbRoot;
dctnryStructList[j].fColPartition = extentsinfo[i].partition;
dctnryStructList[j].fColSegment = extentsinfo[i].segment;
dctnryStructList[j].fColDbRoot = extentsinfo[i].dbRoot;
}
colExtentsStruct.push_back(colStructs);
dctnryExtentsStruct.push_back(dctnryStructList);
colExtentsColType.push_back(cscColTypeList);
}
// call the write engine to update the row
if (NO_ERROR != fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList,
colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList,
SYSCOLUMN_BASE))
{
err = "WE: Update failed on: " + atableName.table;
rc = 1;
}
int rc1 = 0;
if (idbdatafile::IDBPolicy::useHdfs())
{
rc1 = fWEWrapper.flushDataFiles(rc, txnID, oids);
if ((rc == 0) && (rc1 == 0))
{
rc1 = fWEWrapper.confirmTransaction(txnID);
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(txnID, true);
else
fWEWrapper.endTransaction(txnID, false);
}
else
{
fWEWrapper.endTransaction(txnID, false);
}
}
if (rc == 0)
rc = rc1;
// flush syscat cahche
systemCatalogPtr->flushCache();
purgeFDCache();
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DDLCommandProc::dropPartitions(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t size, i;
uint32_t tmp32;
std::vector<OID> dataOids;
std::vector<BRM::PartitionInfo> partitions;
bs >> size;
for (i = 0; i < size; ++i)
{
bs >> tmp32;
dataOids.push_back(tmp32);
}
bs >> size;
BRM::PartitionInfo pi;
for (i = 0; i < size; ++i)
{
pi.unserialize(bs);
partitions.push_back(pi);
}
try
{
rc = fWEWrapper.deletePartitions(dataOids, partitions);
}
catch (...)
{
err = "WE: Error removing files ";
rc = 1;
}
return rc;
}
void WE_DDLCommandProc::purgeFDCache()
{
if (idbdatafile::IDBPolicy::useHdfs())
{
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(SYSCOLUMN_BASE);
ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
ColExtsInfo::iterator aIt;
std::vector<BRM::FileInfo> files;
BRM::FileInfo aFile;
vector<BRM::LBID_t> lbidList;
BRM::LBID_t startLbid;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aFile.oid = it->first;
while (aIt != (it->second).end())
{
aFile.partitionNum = aIt->partNum;
aFile.dbRoot = aIt->dbRoot;
aFile.segmentNum = aIt->segNum;
aFile.compType = aIt->compType;
files.push_back(aFile);
fDbrm.lookupLocalStartLbid(aFile.oid, aFile.partitionNum, aFile.segmentNum, aIt->hwm, startLbid);
// cout <<"Added to files oid:dbroot:part:seg:compType = " <<
// aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
//<<":"<<aFile.compType <<endl;
aIt++;
}
it++;
}
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
fDbrm.invalidateUncommittedExtentLBIDs(0, false, &lbidList);
}
TableMetaData::removeTableMetaData(SYSCOLUMN_BASE);
}
} // namespace WriteEngine