1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Gagan Goel 006b92bba2 Revert "This commit fixes an incorrect predicate in the if condition (#2608)"
This reverts commit f4e3022fbdecc25a22ae6eaf072e462aa6695f35.

The commit apparently caused MCOL-5318 and MCOL-5319 which involve the
internal ColumnStore batch insert mechanism passing through the SQL
layer. The code block involved in this change is a predicate checking
for the HWM extent in WriteEngineServer at the end of the batch insert.
This is done in WE_DMLCommandProc::processBatchInsertHwm(). The original
predicate check in this function for the HWM extent is restored until
further investigation.
2023-02-02 08:07:18 -05:00

4986 lines
149 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016-2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: we_dmlcommandproc.cpp 3082 2011-09-26 22:00:38Z chao $
#include <unistd.h>
#include "bytestream.h"
using namespace messageqcpp;
#include "we_messages.h"
#include "we_message_handlers.h"
#include "../../dbcon/dmlpackage/dmlpkg.h"
#include "we_dmlcommandproc.h"
using namespace dmlpackage;
#include "dmlpackageprocessor.h"
using namespace dmlpackageprocessor;
#include "dataconvert.h"
using namespace dataconvert;
#include "calpontsystemcatalog.h"
#include "sessionmanager.h"
using namespace execplan;
#include "messagelog.h"
#include "stopwatch.h"
#include "idberrorinfo.h"
#include "errorids.h"
using namespace logging;
#include "brmtypes.h"
using namespace BRM;
#include "we_tablemetadata.h"
#include "we_dbrootextenttracker.h"
#include "we_bulkrollbackmgr.h"
#include "we_define.h"
#include "we_confirmhdfsdbfile.h"
#include "cacheutils.h"
#include "IDBDataFile.h"
#include "IDBPolicy.h"
#include "checks.h"
#include "columnwidth.h"
using namespace std;
namespace WriteEngine
{
// StopWatch timer;
WE_DMLCommandProc::WE_DMLCommandProc()
{
fIsFirstBatchPm = true;
filesPerColumnPartition = 8;
extentsPerSegmentFile = 1;
dbrootCnt = 1;
extentRows = 0x800000;
config::Config* cf = config::Config::makeConfig();
string fpc = cf->getConfig("ExtentMap", "FilesPerColumnPartition");
if (fpc.length() != 0)
filesPerColumnPartition = cf->uFromText(fpc);
// MCOL-4685: remove the option to set more than 2 extents per file (ExtentsPreSegmentFile).
extentsPerSegmentFile = DEFAULT_EXTENTS_PER_SEGMENT_FILE;
string dbct = cf->getConfig("SystemConfig", "DBRootCount");
if (dbct.length() != 0)
dbrootCnt = cf->uFromText(dbct);
}
WE_DMLCommandProc::WE_DMLCommandProc(const WE_DMLCommandProc& rhs)
{
fIsFirstBatchPm = rhs.fIsFirstBatchPm;
fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
}
WE_DMLCommandProc::~WE_DMLCommandProc()
{
dbRootExtTrackerVec.clear();
}
void WE_DMLCommandProc::processAuxCol(const std::vector<std::string>& origVals,
WriteEngine::ColValueList& colValuesList,
WriteEngine::DictStrList& dicStringList)
{
WriteEngine::ColTupleList auxColTuples;
WriteEngine::dictStr auxDicStrings;
for (uint32_t j = 0; j < origVals.size(); j++)
{
WriteEngine::ColTuple auxColTuple;
auxColTuple.data = (uint8_t)1;
auxColTuples.push_back(auxColTuple);
//@Bug 2515. Only pass string values to write engine
auxDicStrings.push_back("");
}
colValuesList.push_back(auxColTuples);
//@Bug 2515. Only pass string values to write engine
dicStringList.push_back(auxDicStrings);
}
uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
err.clear();
InsertDMLPackage insertPkg;
ByteStream::quadbyte tmp32;
bs >> tmp32;
BRM::TxnID txnid;
txnid.valid = true;
txnid.id = tmp32;
bs >> tmp32;
uint32_t dbroot = tmp32;
// cout << "processSingleInsert received bytestream length " << bs.length() << endl;
messageqcpp::ByteStream::byte packageType;
bs >> packageType;
insertPkg.read(bs);
uint32_t sessionId = insertPkg.get_SessionID();
// cout << " processSingleInsert for session " << sessionId << endl;
DMLTable* tablePtr = insertPkg.get_Table();
RowList rows = tablePtr->get_RowList();
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::ColValueList colValuesList;
WriteEngine::DictStrList dicStringList;
CalpontSystemCatalog::TableName tableName;
CalpontSystemCatalog::TableColName tableColName;
tableName.table = tableColName.table = tablePtr->get_TableName();
tableName.schema = tableColName.schema = tablePtr->get_SchemaName();
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::ROPair tableRoPair;
CalpontSystemCatalog::OID tableAUXColOid;
std::vector<string> colNames;
bool isWarningSet = false;
try
{
tableRoPair = systemCatalogPtr->tableRID(tableName);
tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName);
if (rows.size())
{
Row* rowPtr = rows.at(0);
ColumnList columns = rowPtr->get_ColumnList();
unsigned int numcols = rowPtr->get_NumberOfColumns();
cscColTypeList.reserve(numcols + 1);
// WIP
// We presume that DictCols number is low
colStructs.reserve(numcols + 1);
ColumnList::const_iterator column_iterator = columns.begin();
while (column_iterator != columns.end())
{
DMLColumn* columnPtr = *column_iterator;
tableColName.column = columnPtr->get_Name();
// TODO MCOL-641 replace with getColRidsOidsTypes()
CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName);
CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
WriteEngine::ColStruct colStruct;
colStruct.fColDbRoot = dbroot;
WriteEngine::DctnryStruct dctnryStruct;
dctnryStruct.fColDbRoot = dbroot;
colStruct.dataOid = roPair.objnum;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
// Token
if (isDictCol(colType))
{
// WIP Hardcoded value
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = colType.colWidth;
}
colStruct.colDataType = colType.colDataType;
dctnryStruct.fCharsetNumber = colType.charsetNumber;
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = colType.ddn.dictOID;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(colType);
++column_iterator;
}
// MCOL-5021 Valid AUX column OID for a table is > 3000
// Tables that were created before this feature was added will have
// tableAUXColOid = 0
if (tableAUXColOid > 3000)
{
CalpontSystemCatalog::ColType colType;
colType.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
colType.colWidth = execplan::AUX_COL_WIDTH;
colType.colDataType = execplan::AUX_COL_DATATYPE;
WriteEngine::ColStruct colStruct;
colStruct.fColDbRoot = dbroot;
WriteEngine::DctnryStruct dctnryStruct;
dctnryStruct.fColDbRoot = dbroot;
colStruct.dataOid = tableAUXColOid;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
colStruct.colWidth = colType.colWidth;
colStruct.colDataType = colType.colDataType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(colType);
}
std::string tmpStr("");
for (unsigned int i = 0; i < numcols; i++)
{
WriteEngine::ColTupleList colTuples;
WriteEngine::DctColTupleList dctColTuples;
RowList::const_iterator row_iterator = rows.begin();
while (row_iterator != rows.end())
{
Row* rowPtr = *row_iterator;
const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i);
tableColName.column = columnPtr->get_Name();
// TODO MCOL-641 remove these calls
CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
boost::any datavalue;
bool isNULL = false;
bool pushWarning = false;
std::vector<std::string> origVals;
origVals = columnPtr->get_DataVector();
WriteEngine::dictStr dicStrings;
// token
if (isDictCol(colType))
{
for (uint32_t i = 0; i < origVals.size(); i++)
{
tmpStr = origVals[i];
isNULL = columnPtr->get_isnull();
if (isNULL || (tmpStr.length() == 0))
isNULL = true;
else
isNULL = false;
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
if (isNULL && colType.defaultValue.empty()) // error out
{
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
rc = 1;
return rc;
}
else if (isNULL && !(colType.defaultValue.empty()))
{
tmpStr = colType.defaultValue;
}
}
if (tmpStr.length() > (unsigned int)colType.colWidth)
{
tmpStr = tmpStr.substr(0, colType.colWidth);
if (!pushWarning)
{
pushWarning = true;
isWarningSet = true;
if ((rc != NO_ERROR) && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
colNames.push_back(tableColName.column);
}
}
WriteEngine::ColTuple colTuple;
colTuple.data = datavalue;
colTuples.push_back(colTuple);
//@Bug 2515. Only pass string values to write engine
dicStrings.push_back(tmpStr);
}
colValuesList.push_back(colTuples);
//@Bug 2515. Only pass string values to write engine
dicStringList.push_back(dicStrings);
}
else
{
string x;
std::string indata;
for (uint32_t i = 0; i < origVals.size(); i++)
{
indata = origVals[i];
isNULL = columnPtr->get_isnull();
if (isNULL || (indata.length() == 0))
isNULL = true;
else
isNULL = false;
// check if autoincrement column and value is 0 or null
uint64_t nextVal = 1;
if (colType.autoincrement)
{
try
{
// WIP What if we combine this and previous loop and fail
// after get nextAIValue ?
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
if (colType.autoincrement && (isNULL || (indata.compare("0") == 0)))
{
try
{
bool reserved = fDbrm.getAIRange(oid, 1, &nextVal);
if (!reserved)
{
err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
rc = 1;
return rc;
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
ostringstream oss;
oss << nextVal;
indata = oss.str();
isNULL = false;
}
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
if (isNULL && colType.defaultValue.empty()) // error out
{
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
rc = 1;
return rc;
}
else if (isNULL && !(colType.defaultValue.empty()))
{
indata = colType.defaultValue;
isNULL = false;
}
}
try
{
datavalue = colType.convertColumnData(indata, pushWarning, insertPkg.get_TimeZone(), isNULL,
false, false);
}
catch (exception&)
{
rc = 1;
Message::Args args;
args.add(string("'") + indata + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
}
//@Bug 1806
if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
{
return rc;
}
if (pushWarning)
{
if (!isWarningSet)
isWarningSet = true;
if (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
colNames.push_back(tableColName.column);
}
WriteEngine::ColTuple colTuple;
colTuple.data = datavalue;
colTuples.push_back(colTuple);
//@Bug 2515. Only pass string values to write engine
dicStrings.push_back(tmpStr);
}
colValuesList.push_back(colTuples);
dicStringList.push_back(dicStrings);
}
// MCOL-5021
if ((i == numcols - 1) && (tableAUXColOid > 3000))
{
processAuxCol(origVals, colValuesList, dicStringList);
}
++row_iterator;
}
}
}
}
catch (exception& ex)
{
rc = 1;
err = ex.what();
return rc;
}
// call the write engine to write the rows
int error = NO_ERROR;
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
fWEWrapper.setTransId(txnid.id);
// For hdfs use only
uint32_t tblOid = tableRoPair.objnum;
// WIP are we saving HDFS?
if (idbdatafile::IDBPolicy::useHdfs())
{
std::vector<Column> columns;
DctnryStructList dctnryList;
CalpontSystemCatalog::ColType colType;
std::vector<DBRootExtentInfo> colDBRootExtentInfo;
Convertor convertor;
dbRootExtTrackerVec.clear();
fRBMetaWriter.reset(new RBMetaWriter("SingleInsert", NULL));
CalpontSystemCatalog::RIDList ridList;
try
{
ridList = systemCatalogPtr->columnRIDs(tableName, true);
std::vector<OID> dctnryStoreOids(ridList.size());
std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
bool bFirstExtentOnThisPM = false;
// First gather HWM BRM information for all columns
std::vector<int> colWidths;
for (unsigned i = 0; i < ridList.size(); i++)
{
rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
// need handle error
CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
colWidths.push_back(convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth));
}
for (unsigned i = 0; i < ridList.size(); i++)
{
// Find DBRoot/segment file where we want to start adding rows
colType = systemCatalogPtr->colType(ridList[i].objnum);
boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker(
new DBRootExtentTracker(ridList[i].objnum, colWidths, dbRootHWMInfoColVec, i, 0));
dbRootExtTrackerVec.push_back(pDBRootExtentTracker);
DBRootExtentInfo dbRootExtent;
std::string trkErrMsg;
bool bEmptyPM;
if (i == 0)
rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM,
trkErrMsg);
else
pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
colDBRootExtentInfo.push_back(dbRootExtent);
Column aColumn;
aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
aColumn.colDataType = colType.colDataType;
aColumn.compressionType = colType.compressionType;
aColumn.dataFile.oid = ridList[i].objnum;
aColumn.dataFile.fPartition = dbRootExtent.fPartition;
aColumn.dataFile.fSegment = dbRootExtent.fSegment;
aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
columns.push_back(aColumn);
if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
{
DctnryStruct aDctnry;
aDctnry.dctnryOid = colType.ddn.dictOID;
aDctnry.fColPartition = dbRootExtent.fPartition;
aDctnry.fColSegment = dbRootExtent.fSegment;
aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
dctnryList.push_back(aDctnry);
}
if (colType.ddn.dictOID > 0)
{
dctnryStoreOids[i] = colType.ddn.dictOID;
}
else
{
dctnryStoreOids[i] = 0;
}
}
fRBMetaWriter->init(tblOid, tableName.table);
fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
// cout << "Backing up hwm chunks" << endl;
for (unsigned i = 0; i < dctnryList.size(); i++) // back up chunks for compressed dictionary
{
// @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
fRBMetaWriter->backupDctnryHWMChunk(dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot,
dctnryList[i].fColPartition, dctnryList[i].fColSegment);
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
if (colValuesList[0].size() > 0)
{
if (NO_ERROR !=
(error = fWEWrapper.insertColumnRec_Single(txnid.id, cscColTypeList, colStructs, colValuesList,
dctnryStructList, dicStringList, tableRoPair.objnum)))
{
if (error == ERR_BRM_DEAD_LOCK)
{
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
WErrorCodes ec;
err = ec.errorString(error);
}
else if (error == ERR_BRM_VB_OVERFLOW)
{
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
}
else
{
rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
WErrorCodes ec;
err = ec.errorString(error);
}
}
}
std::map<uint32_t, uint32_t> oids;
std::vector<BRM::OID_t> oidsToFlush;
for (unsigned i = 0; i < colStructs.size(); i++)
{
oids[colStructs[i].dataOid] = colStructs[i].dataOid;
oidsToFlush.push_back(colStructs[i].dataOid);
}
for (unsigned i = 0; i < dctnryStructList.size(); i++)
{
oids[dctnryStructList[i].dctnryOid] = dctnryStructList[i].dctnryOid;
oidsToFlush.push_back(dctnryStructList[i].dctnryOid);
}
fWEWrapper.setTransId(txnid.id);
vector<LBID_t> lbidList;
if (idbdatafile::IDBPolicy::useHdfs())
{
// XXX THIS IS WRONG
// save the extent info to mark them invalid, after flush, the meta file will be gone.
std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();
try
{
auto mapIter = m_txnLBIDMap.find(txnid.id);
if (mapIter != m_txnLBIDMap.end())
{
SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
lbidList = spTxnLBIDRec->m_LBIDs;
}
}
catch (...)
{
}
}
// flush files
// @bug5333, up to here, rc may have an error code already, don't overwrite it.
int flushChunksRc = fWEWrapper.flushChunks(0, oids); // why not pass rc to flushChunks?
if (flushChunksRc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream ossErr;
ossErr << "Error flushing chunks for table " << tableName << "; " << ec.errorString(flushChunksRc);
// Append to errmsg in case we already have an error
if (err.length() > 0)
err += "; ";
err += ossErr.str();
if (error == NO_ERROR)
error = flushChunksRc;
if ((rc == NO_ERROR) || (rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
rc = 1; // return hardcoded 1 as the above
}
// Confirm HDFS DB file changes "only" if no error up to this point
if (idbdatafile::IDBPolicy::useHdfs())
{
if (error == NO_ERROR)
{
std::string eMsg;
ConfirmHdfsDbFile confirmHdfs;
error = confirmHdfs.confirmDbFileListFromMetaFile(tblOid, eMsg);
if (error != NO_ERROR)
{
ostringstream ossErr;
ossErr << "Error confirming changes to table " << tableName << "; " << eMsg;
err = ossErr.str();
rc = 1;
}
else // Perform extra cleanup that is necessary for HDFS
{
std::string eMsg;
ConfirmHdfsDbFile confirmHdfs;
int confirmRc2 = confirmHdfs.endDbFileListFromMetaFile(tblOid, true, eMsg);
if (confirmRc2 != NO_ERROR)
{
// Might want to log this error, but don't think we need
// to report as fatal, since all changes were confirmed.
}
// flush PrimProc FD cache
TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tblOid);
ColsExtsInfoMap colsExtsInfoMap = aTableMetaData->getColsExtsInfoMap();
ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
ColExtsInfo::iterator aIt;
std::vector<BRM::FileInfo> files;
BRM::FileInfo aFile;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aFile.oid = it->first;
while (aIt != (it->second).end())
{
aFile.partitionNum = aIt->partNum;
aFile.dbRoot = aIt->dbRoot;
aFile.segmentNum = aIt->segNum;
aFile.compType = aIt->compType;
files.push_back(aFile);
aIt++;
}
it++;
}
if (files.size() > 0)
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
cacheutils::flushOIDsFromCache(oidsToFlush);
fDbrm.invalidateUncommittedExtentLBIDs(0, false, &lbidList);
try
{
BulkRollbackMgr::deleteMetaFile(tblOid);
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
}
} // (error == NO_ERROR) through call to flushChunks()
if (error != NO_ERROR) // rollback
{
string applName("SingleInsert");
fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(), applName, false, err);
BulkRollbackMgr::deleteMetaFile(tblOid);
}
} // extra hdfs steps
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
TableMetaData::removeTableMetaData(tblOid);
if ((rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) || isWarningSet)
{
if (rc == NO_ERROR)
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
Message::Args args;
string cols = "'" + colNames[0] + "'";
for (unsigned i = 1; i < colNames.size(); i++)
{
cols = cols + ", " + "'" + colNames[i] + "'";
}
args.add(cols);
err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
// Strict mode enabled, so rollback on warning
if (insertPkg.get_isWarnToError())
{
string applName("SingleInsert");
fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(), applName, false, err);
BulkRollbackMgr::deleteMetaFile(tblOid);
}
}
// MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
return rc;
}
uint8_t WE_DMLCommandProc::commitVersion(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tmp32;
int txnID;
bs >> tmp32;
txnID = tmp32;
// cout << "processing commit txnid = " << txnID << endl;
rc = fWEWrapper.commit(txnID);
if (rc != 0)
{
WErrorCodes ec;
ostringstream oss;
oss << "WE: Error commiting transaction; " << txnID << ec.errorString(rc) << endl;
err = oss.str();
}
return rc;
}
uint8_t WE_DMLCommandProc::rollbackBlocks(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
// cout << "processing rollbackBlocks txnid = " << txnID << endl;
try
{
rc = fWEWrapper.rollbackBlocks(txnID, sessionID);
}
catch (std::exception& ex)
{
rc = 1;
err = ex.what();
}
return rc;
}
uint8_t WE_DMLCommandProc::rollbackVersion(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
// cout << "processing rollbackVersion txnid = " << txnID << endl;
rc = fWEWrapper.rollbackVersion(txnID, sessionID);
if (rc != 0)
{
WErrorCodes ec;
ostringstream oss;
oss << "WE: Error rolling back transaction " << txnID << " for session " << sessionID << "; "
<< ec.errorString(rc) << endl;
err = oss.str();
}
return rc;
}
uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::string& err,
ByteStream::quadbyte& PMId)
{
int rc = 0;
InsertDMLPackage insertPkg;
ByteStream::quadbyte tmp32;
bs >> tmp32;
bs >> PMId;
insertPkg.read(bs);
uint32_t sessionId = insertPkg.get_SessionID();
DMLTable* tablePtr = insertPkg.get_Table();
bool isAutocommitOn = insertPkg.get_isAutocommitOn();
if (idbdatafile::IDBPolicy::useHdfs())
isAutocommitOn = true;
BRM::TxnID txnid;
txnid.id = tmp32;
txnid.valid = true;
RowList rows = tablePtr->get_RowList();
bool isInsertSelect = insertPkg.get_isInsertSelect();
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::ColValueList colValuesList;
WriteEngine::DictStrList dicStringList;
CalpontSystemCatalog::TableName tableName;
CalpontSystemCatalog::TableColName tableColName;
tableName.table = tableColName.table = tablePtr->get_TableName();
tableName.schema = tableColName.schema = tablePtr->get_SchemaName();
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::ROPair roPair;
CalpontSystemCatalog::OID tableAUXColOid;
CalpontSystemCatalog::RIDList ridList;
CalpontSystemCatalog::DictOIDList dictOids;
try
{
ridList = systemCatalogPtr->columnRIDs(tableName, true);
roPair = systemCatalogPtr->tableRID(tableName);
tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
uint32_t sizeToAdd = (tableAUXColOid > 3000 ? 1 : 0);
std::vector<OID> dctnryStoreOids(ridList.size() + sizeToAdd);
std::vector<Column> columns;
DctnryStructList dctnryList;
std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size() + sizeToAdd);
uint32_t tblOid = roPair.objnum;
std::vector<DBRootExtentInfo> colDBRootExtentInfo;
bool bFirstExtentOnThisPM = false;
Convertor convertor;
if (fIsFirstBatchPm)
{
dbRootExtTrackerVec.clear();
if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn)))
fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
fWEWrapper.setTransId(txnid.id);
try
{
std::vector<CalpontSystemCatalog::ColType> colTypes;
for (unsigned i = 0; i < ridList.size(); i++)
{
CalpontSystemCatalog::ColType colType = systemCatalogPtr->colType(ridList[i].objnum);
colTypes.push_back(colType);
if (colType.ddn.dictOID > 0)
{
dctnryStoreOids[i] = colType.ddn.dictOID;
}
else
{
dctnryStoreOids[i] = 0;
}
}
// First gather HWM BRM information for all columns
std::vector<int> colWidths;
for (unsigned i = 0; i < ridList.size(); i++)
{
rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
// need handle error
colWidths.push_back(convertor.getCorrectRowWidth(colTypes[i].colDataType, colTypes[i].colWidth));
}
// MCOL-5021
if (tableAUXColOid > 3000)
{
rc = BRMWrapper::getInstance()->getDbRootHWMInfo(tableAUXColOid, dbRootHWMInfoColVec[ridList.size()]);
colWidths.push_back(execplan::AUX_COL_WIDTH);
dctnryStoreOids[ridList.size()] = 0;
CalpontSystemCatalog::ROPair auxRoPair;
auxRoPair.rid = ridList.back().rid + 1;
auxRoPair.objnum = tableAUXColOid;
ridList.push_back(auxRoPair);
}
for (unsigned i = 0; i < ridList.size(); i++)
{
uint32_t objNum = ridList[i].objnum;
// Find DBRoot/segment file where we want to start adding rows
boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker(
new DBRootExtentTracker(objNum, colWidths, dbRootHWMInfoColVec, i, 0));
dbRootExtTrackerVec.push_back(pDBRootExtentTracker);
DBRootExtentInfo dbRootExtent;
std::string trkErrMsg;
bool bEmptyPM;
if (i == 0)
{
rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM,
trkErrMsg);
}
else
pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
colDBRootExtentInfo.push_back(dbRootExtent);
Column aColumn;
if ((i == ridList.size() - 1) && (tableAUXColOid > 3000)) // AUX column
{
aColumn.colWidth = execplan::AUX_COL_WIDTH;
aColumn.colDataType = execplan::AUX_COL_DATATYPE;
// TODO MCOL-5021 compressionType for the AUX column is hard-coded to 2
aColumn.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
}
else
{
const CalpontSystemCatalog::ColType& colType = colTypes[i];
aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
aColumn.colDataType = colType.colDataType;
aColumn.compressionType = colType.compressionType;
if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
{
DctnryStruct aDctnry;
aDctnry.dctnryOid = colType.ddn.dictOID;
aDctnry.fColPartition = dbRootExtent.fPartition;
aDctnry.fColSegment = dbRootExtent.fSegment;
aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
dctnryList.push_back(aDctnry);
}
}
aColumn.dataFile.oid = objNum;
aColumn.dataFile.fPartition = dbRootExtent.fPartition;
aColumn.dataFile.fSegment = dbRootExtent.fSegment;
aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
columns.push_back(aColumn);
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
//@Bug 5996 validate hwm before starts
rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting",
tableAUXColOid > 3000);
if (rc != 0)
{
WErrorCodes ec;
err = ec.errorString(rc);
err += " Check err.log for detailed information.";
fIsFirstBatchPm = false;
rc = 1;
return rc;
}
}
std::vector<BRM::LBIDRange> rangeList;
// use of MetaFile for bulk rollback support
if (fIsFirstBatchPm && isAutocommitOn)
{
// save meta data, version last block for each dbroot at the start of batch insert
try
{
fRBMetaWriter->init(tblOid, tableName.table);
fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
if (!bFirstExtentOnThisPM)
{
for (unsigned i = 0; i < dctnryList.size(); i++) // back up chunks for compressed dictionary
{
// @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
fRBMetaWriter->backupDctnryHWMChunk(dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot,
dctnryList[i].fColPartition, dctnryList[i].fColSegment);
}
}
}
catch (WeException& ex) // catch exception to close file, then rethrow
{
rc = 1;
err = ex.what();
}
// Do versioning. Currently, we only version columns, not strings. If there is a design change, this will
// need to be re-visited
if (rc != 0)
return rc;
}
std::vector<string> colNames;
bool isWarningSet = false;
if (rows.size())
{
Row* rowPtr = rows.at(0);
ColumnList columns = rowPtr->get_ColumnList();
ColumnList::const_iterator column_iterator = columns.begin();
try
{
while (column_iterator != columns.end())
{
DMLColumn* columnPtr = *column_iterator;
tableColName.column = columnPtr->get_Name();
CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName);
CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = roPair.objnum;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
// Token
if (isDictCol(colType))
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = colType.colWidth;
}
colStruct.colDataType = colType.colDataType;
dctnryStruct.fCharsetNumber = colType.charsetNumber;
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = colType.ddn.dictOID;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(colType);
++column_iterator;
}
// MCOL-5021 Valid AUX column OID for a table is > 3000
// Tables that were created before this feature was added will have
// tableAUXColOid = 0
if (tableAUXColOid > 3000)
{
CalpontSystemCatalog::ColType colType;
colType.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
colType.colWidth = execplan::AUX_COL_WIDTH;
colType.colDataType = execplan::AUX_COL_DATATYPE;
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = tableAUXColOid;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
colStruct.colWidth = colType.colWidth;
colStruct.colDataType = colType.colDataType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(colType);
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
unsigned int numcols = rowPtr->get_NumberOfColumns();
std::string tmpStr("");
try
{
for (unsigned int i = 0; i < numcols; i++)
{
WriteEngine::ColTupleList colTuples;
WriteEngine::DctColTupleList dctColTuples;
RowList::const_iterator row_iterator = rows.begin();
bool pushWarning = false;
while (row_iterator != rows.end())
{
Row* rowPtr = *row_iterator;
const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i);
tableColName.column = columnPtr->get_Name();
CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
boost::any datavalue;
bool isNULL = false;
std::vector<std::string> origVals;
origVals = columnPtr->get_DataVector();
WriteEngine::dictStr dicStrings;
// token
if (isDictCol(colType))
{
for (uint32_t i = 0; i < origVals.size(); i++)
{
tmpStr = origVals[i];
if (tmpStr.length() == 0)
isNULL = true;
else
isNULL = false;
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
if (isNULL && colType.defaultValue.empty()) // error out
{
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
rc = 1;
return rc;
}
else if (isNULL && !(colType.defaultValue.empty()))
{
tmpStr = colType.defaultValue;
}
}
if (tmpStr.length() > (unsigned int)colType.colWidth)
{
tmpStr = tmpStr.substr(0, colType.colWidth);
if (!pushWarning)
pushWarning = true;
}
WriteEngine::ColTuple colTuple;
colTuple.data = datavalue;
colTuples.push_back(colTuple);
//@Bug 2515. Only pass string values to write engine
dicStrings.push_back(tmpStr);
}
colValuesList.push_back(colTuples);
//@Bug 2515. Only pass string values to write engine
dicStringList.push_back(dicStrings);
}
else
{
string x;
std::string indata;
// scan once to check how many autoincrement value needed
uint32_t nextValNeeded = 0;
uint64_t nextVal = 1;
if (colType.autoincrement)
{
try
{
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
for (uint32_t i = 0; i < origVals.size(); i++)
{
indata = origVals[i];
if (indata.length() == 0)
isNULL = true;
else
isNULL = false;
if (isNULL || (indata.compare("0") == 0))
nextValNeeded++;
}
}
if (nextValNeeded > 0) // reserve next value
{
try
{
bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal);
if (!reserved)
{
err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
rc = 1;
return rc;
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
for (uint32_t i = 0; i < origVals.size(); i++)
{
indata = origVals[i];
if (indata.length() == 0)
isNULL = true;
else
isNULL = false;
// check if autoincrement column and value is 0 or null
if (colType.autoincrement && (isNULL || (indata.compare("0") == 0)))
{
ostringstream oss;
oss << nextVal++;
indata = oss.str();
isNULL = false;
}
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
if (isNULL && colType.defaultValue.empty()) // error out
{
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
rc = 1;
return rc;
}
else if (isNULL && !(colType.defaultValue.empty()))
{
indata = colType.defaultValue;
isNULL = false;
}
}
try
{
datavalue = colType.convertColumnData(indata, pushWarning, insertPkg.get_TimeZone(), isNULL,
false, false);
}
catch (exception&)
{
rc = 1;
Message::Args args;
args.add(string("'") + indata + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
}
//@Bug 1806
if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
{
return rc;
}
if (pushWarning && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
WriteEngine::ColTuple colTuple;
colTuple.data = datavalue;
colTuples.push_back(colTuple);
//@Bug 2515. Only pass string values to write engine
dicStrings.push_back(tmpStr);
}
colValuesList.push_back(colTuples);
dicStringList.push_back(dicStrings);
}
// MCOL-5021
if ((i == numcols - 1) && (tableAUXColOid > 3000))
{
processAuxCol(origVals, colValuesList, dicStringList);
}
++row_iterator;
}
if (pushWarning)
{
colNames.push_back(tableColName.column);
isWarningSet = true;
}
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
// call the write engine to write the rows
int error = NO_ERROR;
if (colValuesList.size() > 0)
{
if (colValuesList[0].size() > 0)
{
/* Begin-Disable use of MetaFile for bulk rollback support;
Use alternate call below that passes 0 ptr for RBMetaWriter
if (NO_ERROR !=
(error = fWEWrapper.insertColumnRecs(txnid.id, colStructs, colValuesList,
dctnryStructList, dicStringList, dbRootExtTrackerVec, fRBMetaWriter.get(), bFirstExtentOnThisPM,
isInsertSelect, 0, roPair.objnum, fIsFirstBatchPm))) End-Disable use of MetaFile for bulk rollback
support
*/
if (NO_ERROR != (error = fWEWrapper.insertColumnRecs(
txnid.id, cscColTypeList, colStructs, colValuesList, dctnryStructList,
dicStringList, dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect,
isAutocommitOn, roPair.objnum, fIsFirstBatchPm)))
{
if (error == ERR_BRM_DEAD_LOCK)
{
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
WErrorCodes ec;
err = ec.errorString(error);
}
else if (error == ERR_BRM_VB_OVERFLOW)
{
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
}
else
{
rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
WErrorCodes ec;
err = ec.errorString(error);
}
}
}
}
if (fIsFirstBatchPm && isAutocommitOn)
{
// fWEWrapper.writeVBEnd(txnid.id, rangeList);
fIsFirstBatchPm = false;
}
else if (fIsFirstBatchPm)
{
fIsFirstBatchPm = false;
}
if (isWarningSet && (rc == NO_ERROR))
{
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
Message::Args args;
string cols = "'" + colNames[0] + "'";
for (unsigned i = 1; i < colNames.size(); i++)
{
cols = cols + ", " + "'" + colNames[i] + "'";
}
args.add(cols);
err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
// Strict mode enabled, so rollback on warning
if (insertPkg.get_isWarnToError())
{
string applName("BatchInsert");
fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(), applName, false, err);
BulkRollbackMgr::deleteMetaFile(tblOid);
}
}
// MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
return rc;
}
uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string& err,
ByteStream::quadbyte& PMId)
{
int rc = 0;
// cout << "processBatchInsert received bytestream length " << bs.length() << endl;
ByteStream::quadbyte tmp32;
ByteStream::byte tmp8;
bs >> tmp32;
// cout << "processBatchInsert got transaction id " << tmp32 << endl;
bs >> PMId;
// cout << "processBatchInsert gor PMId " << PMId << endl;
uint32_t sessionId;
bs >> sessionId;
// cout << " processBatchInsert for session " << sessionId << endl;
bool isAutocommitOn;
bs >> tmp8;
isAutocommitOn = tmp8;
if (idbdatafile::IDBPolicy::useHdfs())
isAutocommitOn = true;
// cout << "This session isAutocommitOn is " << isAutocommitOn << endl;
BRM::TxnID txnid;
txnid.id = tmp32;
txnid.valid = true;
bool isInsertSelect;
bs >> tmp8;
// For insert select, skip the hwm block and start inserting from the next block
// to avoid self insert issue.
// For batch insert: if not first batch, use the saved last rid to start adding rows.
isInsertSelect = tmp8;
WriteEngine::ColStructList colStructs;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
std::vector<uint64_t> colValuesList;
WriteEngine::DictStrList dicStringList;
CalpontSystemCatalog::TableName tableName;
CalpontSystemCatalog::TableColName tableColName;
bs >> tableColName.table;
bs >> tableColName.schema;
tableName.table = tableColName.table;
tableName.schema = tableColName.schema;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::ROPair roPair;
CalpontSystemCatalog::RIDList ridList;
CalpontSystemCatalog::DictOIDList dictOids;
try
{
ridList = systemCatalogPtr->columnRIDs(tableName, true);
roPair = systemCatalogPtr->tableRID(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
std::vector<OID> dctnryStoreOids(ridList.size());
std::vector<Column> columns;
DctnryStructList dctnryList;
std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
uint32_t tblOid = roPair.objnum;
CalpontSystemCatalog::ColType colType;
std::vector<DBRootExtentInfo> colDBRootExtentInfo;
bool bFirstExtentOnThisPM = false;
Convertor convertor;
if (fIsFirstBatchPm)
{
dbRootExtTrackerVec.clear();
if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn)))
fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
fWEWrapper.setTransId(txnid.id);
try
{
// First gather HWM BRM information for all columns
std::vector<int> colWidths;
for (unsigned i = 0; i < ridList.size(); i++)
{
rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
// need handle error
CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
colWidths.push_back(convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth));
}
for (unsigned i = 0; i < ridList.size(); i++)
{
// Find DBRoot/segment file where we want to start adding rows
colType = systemCatalogPtr->colType(ridList[i].objnum);
boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker(
new DBRootExtentTracker(ridList[i].objnum, colWidths, dbRootHWMInfoColVec, i, 0));
dbRootExtTrackerVec.push_back(pDBRootExtentTracker);
DBRootExtentInfo dbRootExtent;
std::string trkErrMsg;
bool bEmptyPM;
if (i == 0)
{
rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM,
trkErrMsg);
/* cout << "bEmptyPM = " << (int) bEmptyPM << " bFirstExtentOnThisPM= " <<
(int)bFirstExtentOnThisPM << " oid:dbroot:hwm = " << ridList[i].objnum <<
":"<<dbRootExtent.fDbRoot << ":"
<<":"<<dbRootExtent.fLocalHwm << " err = " << trkErrMsg << endl; */
}
else
pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
colDBRootExtentInfo.push_back(dbRootExtent);
Column aColumn;
aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
aColumn.colDataType = colType.colDataType;
aColumn.compressionType = colType.compressionType;
aColumn.dataFile.oid = ridList[i].objnum;
aColumn.dataFile.fPartition = dbRootExtent.fPartition;
aColumn.dataFile.fSegment = dbRootExtent.fSegment;
aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
columns.push_back(aColumn);
if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
{
DctnryStruct aDctnry;
aDctnry.dctnryOid = colType.ddn.dictOID;
aDctnry.fColPartition = dbRootExtent.fPartition;
aDctnry.fColSegment = dbRootExtent.fSegment;
aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
dctnryList.push_back(aDctnry);
}
if (colType.ddn.dictOID > 0)
{
dctnryStoreOids[i] = colType.ddn.dictOID;
}
else
{
dctnryStoreOids[i] = 0;
}
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// @Bug 5996 validate hwm before starts
// TODO MCOL-5021 hasAuxCol is hardcoded to false; add support here.
rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting", false);
if (rc != 0)
{
WErrorCodes ec;
err = ec.errorString(rc);
err += " Check err.log for detailed information.";
fIsFirstBatchPm = false;
rc = 1;
return rc;
}
}
std::vector<BRM::LBIDRange> rangeList;
// use of MetaFile for bulk rollback support
if (fIsFirstBatchPm && isAutocommitOn)
{
// save meta data, version last block for each dbroot at the start of batch insert
try
{
fRBMetaWriter->init(tblOid, tableName.table);
fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
// cout << "Saved meta files" << endl;
if (!bFirstExtentOnThisPM)
{
// cout << "Backing up hwm chunks" << endl;
for (unsigned i = 0; i < dctnryList.size(); i++) // back up chunks for compressed dictionary
{
// @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
fRBMetaWriter->backupDctnryHWMChunk(dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot,
dctnryList[i].fColPartition, dctnryList[i].fColSegment);
}
}
}
catch (WeException& ex) // catch exception to close file, then rethrow
{
rc = 1;
err = ex.what();
}
// Do versioning. Currently, we only version columns, not strings. If there is a design change, this will
// need to be re-visited
if (rc != 0)
return rc;
}
std::vector<string> colNames;
bool isWarningSet = false;
uint32_t columnCount;
bs >> columnCount;
if (columnCount)
{
try
{
for (uint32_t current_column = 0; current_column < columnCount; current_column++)
{
uint32_t tmp32;
std::string colName;
bs >> tmp32;
bs >> colName;
colNames.push_back(colName);
CalpontSystemCatalog::OID oid = tmp32;
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = oid;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
// Token
if (isDictCol(colType))
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = colType.colWidth;
}
colStruct.colDataType = colType.colDataType;
dctnryStruct.fCharsetNumber = colType.charsetNumber;
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = colType.ddn.dictOID;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
std::string tmpStr("");
uint32_t valuesPerColumn;
bs >> valuesPerColumn;
colValuesList.reserve(columnCount * valuesPerColumn);
try
{
bool pushWarning = false;
for (uint32_t j = 0; j < columnCount; j++)
{
WriteEngine::DctColTupleList dctColTuples;
tableColName.column = colNames[j];
CalpontSystemCatalog::OID oid = colStructs[j].dataOid;
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
bool isNULL = false;
WriteEngine::dictStr dicStrings;
// token
if (isDictCol(colType))
{
for (uint32_t i = 0; i < valuesPerColumn; i++)
{
bs >> tmp8;
isNULL = tmp8;
bs >> tmpStr;
if (tmpStr.length() == 0)
isNULL = true;
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
if (isNULL && colType.defaultValue.empty()) // error out
{
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
rc = 1;
return rc;
}
else if (isNULL && !(colType.defaultValue.empty()))
{
tmpStr = colType.defaultValue;
}
}
if (tmpStr.length() > (unsigned int)colType.colWidth)
{
tmpStr = tmpStr.substr(0, colType.colWidth);
if (!pushWarning)
pushWarning = true;
}
colValuesList.push_back(0);
//@Bug 2515. Only pass string values to write engine
dicStrings.push_back(tmpStr);
}
//@Bug 2515. Only pass string values to write engine
dicStringList.push_back(dicStrings);
}
else
{
string x;
// scan once to check how many autoincrement value needed
uint32_t nextValNeeded = 0;
uint64_t nextVal = 1;
if (colType.autoincrement)
{
try
{
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
for (uint32_t i = 0; i < valuesPerColumn; i++)
{
bs >> tmp8;
isNULL = tmp8;
uint8_t val8;
uint16_t val16;
uint32_t val32;
uint64_t val64;
uint64_t colValue;
float valF;
double valD;
std::string valStr;
bool valZero = false; // Needed for autoinc check
switch (colType.colDataType)
{
case execplan::CalpontSystemCatalog::TINYINT:
case execplan::CalpontSystemCatalog::UTINYINT:
bs >> val8;
if (val8 == 0)
valZero = true;
colValue = val8;
break;
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::USMALLINT:
bs >> val16;
if (val16 == 0)
valZero = true;
colValue = val16;
break;
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
bs >> val32;
if (val32 == 0)
valZero = true;
colValue = val32;
break;
case execplan::CalpontSystemCatalog::BIGINT:
case execplan::CalpontSystemCatalog::DATETIME:
case execplan::CalpontSystemCatalog::TIME:
case execplan::CalpontSystemCatalog::TIMESTAMP:
case execplan::CalpontSystemCatalog::UBIGINT:
bs >> val64;
if (val64 == 0)
valZero = true;
colValue = val64;
break;
case execplan::CalpontSystemCatalog::DECIMAL:
switch (colType.colWidth)
{
case 1:
{
bs >> val8;
colValue = val8;
break;
}
case 2:
{
bs >> val16;
colValue = val16;
break;
}
case 4:
{
bs >> val32;
colValue = val32;
break;
}
default:
{
bs >> val64;
colValue = val64;
break;
}
}
break;
case execplan::CalpontSystemCatalog::UDECIMAL:
// UDECIMAL numbers may not be negative
if (colType.colWidth == 1)
{
bs >> val8;
// FIXME: IDK what would it mean if valN are unsigned
if (utils::is_negative(val8) && val8 != joblist::TINYINTEMPTYROW &&
val8 != joblist::TINYINTNULL)
{
val8 = 0;
pushWarning = true;
}
colValue = val8;
}
else if (colType.colWidth == 2)
{
bs >> val16;
if (utils::is_negative(val16) && val16 != joblist::SMALLINTEMPTYROW &&
val16 != joblist::SMALLINTNULL)
{
val16 = 0;
pushWarning = true;
}
colValue = val16;
}
else if (colType.colWidth == 4)
{
bs >> val32;
if (utils::is_negative(val32) && val32 != joblist::INTEMPTYROW && val32 != joblist::INTNULL)
{
val32 = 0;
pushWarning = true;
}
colValue = val32;
}
else if (colType.colWidth == 8)
{
bs >> val64;
if (utils::is_negative(val64) && val64 != joblist::BIGINTEMPTYROW &&
val64 != joblist::BIGINTNULL)
{
val64 = 0;
pushWarning = true;
}
colValue = val64;
}
break;
case execplan::CalpontSystemCatalog::DOUBLE:
bs >> val64;
colValue = val64;
break;
case execplan::CalpontSystemCatalog::UDOUBLE:
bs >> val64;
memcpy(&valD, &val64, 8);
if (valD < 0.0 && valD != static_cast<double>(joblist::DOUBLEEMPTYROW) &&
valD != static_cast<double>(joblist::DOUBLENULL))
{
valD = 0.0;
pushWarning = true;
}
colValue = val64;
break;
case execplan::CalpontSystemCatalog::FLOAT:
bs >> val32;
colValue = val32;
break;
case execplan::CalpontSystemCatalog::UFLOAT:
bs >> val32;
memcpy(&valF, &val32, 4);
if (valF < 0.0 && valF != static_cast<float>(joblist::FLOATEMPTYROW) &&
valF != static_cast<float>(joblist::FLOATNULL))
{
valF = 0.0;
pushWarning = true;
}
colValue = val32;
break;
case execplan::CalpontSystemCatalog::CHAR:
case execplan::CalpontSystemCatalog::VARCHAR:
case execplan::CalpontSystemCatalog::TEXT:
case execplan::CalpontSystemCatalog::BLOB:
bs >> valStr;
if (valStr.length() > (unsigned int)colType.colWidth)
{
valStr = valStr.substr(0, colType.colWidth);
pushWarning = true;
}
else
{
if ((unsigned int)colType.colWidth > valStr.length())
{
// Pad null character to the string
valStr.resize(colType.colWidth, 0);
}
}
// FIXME: colValue is uint64_t (8 bytes)
memcpy(&colValue, valStr.c_str(), valStr.length());
break;
default:
rc = 1;
err = IDBErrorInfo::instance()->errorMsg(ERR_DATATYPE_NOT_SUPPORT);
break;
}
// check if autoincrement column and value is 0 or null
if (colType.autoincrement && (isNULL || valZero))
{
ostringstream oss;
oss << nextVal++;
isNULL = false;
try
{
nextValNeeded++;
bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal);
if (!reserved)
{
err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
rc = 1;
return rc;
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
colValue = nextVal;
}
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
if (isNULL && colType.defaultValue.empty()) // error out
{
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
rc = 1;
return rc;
}
else if (isNULL && !(colType.defaultValue.empty()))
{
memcpy(&colValue, colType.defaultValue.c_str(), colType.defaultValue.length());
isNULL = false;
}
}
//@Bug 1806
if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
{
return rc;
}
if (pushWarning && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
colValuesList.push_back(colValue);
//@Bug 2515. Only pass string values to write engine
dicStrings.push_back(valStr);
}
dicStringList.push_back(dicStrings);
}
if (pushWarning)
{
colNames.push_back(tableColName.column);
isWarningSet = true;
}
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
// call the write engine to write the rows
int error = NO_ERROR;
// fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3);
// cout << "Batch inserting a row with transaction id " << txnid.id << endl;
if (colValuesList.size() > 0)
{
if (NO_ERROR !=
(error = fWEWrapper.insertColumnRecsBinary(
txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList, dbRootExtTrackerVec, 0,
bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm)))
{
if (error == ERR_BRM_DEAD_LOCK)
{
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
WErrorCodes ec;
err = ec.errorString(error);
}
else if (error == ERR_BRM_VB_OVERFLOW)
{
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
}
else
{
rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
WErrorCodes ec;
err = ec.errorString(error);
}
}
}
if (fIsFirstBatchPm && isAutocommitOn)
{
// fWEWrapper.writeVBEnd(txnid.id, rangeList);
fIsFirstBatchPm = false;
}
else if (fIsFirstBatchPm)
{
fIsFirstBatchPm = false;
}
if (isWarningSet && (rc == NO_ERROR))
{
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
// cout << "Got warning" << endl;
Message::Args args;
string cols = "'" + colNames[0] + "'";
for (unsigned i = 1; i < colNames.size(); i++)
{
cols = cols + ", " + "'" + colNames[i] + "'";
}
args.add(cols);
err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
}
// MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
return rc;
}
uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
// need to commit the versioned blocks, set hwm, update casual partition, send back to DMLProc to set them
// cout << " in commiting autocommit on batch insert " << endl;
uint32_t tmp32, tableOid, sessionId;
int txnID;
bs >> tmp32;
txnID = tmp32;
bs >> tmp32;
tableOid = tmp32;
bs >> tmp32;
sessionId = tmp32;
BRM::DBRM dbrm;
std::vector<BRM::BulkSetHWMArg> setHWMArgs;
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
ColExtsInfo::iterator aIt;
BulkSetHWMArg aArg;
std::vector<BRM::FileInfo> files;
BRM::FileInfo aFile;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aArg.oid = it->first;
aFile.oid = it->first;
// cout << "OID:" << aArg.oid;
while (aIt != (it->second).end())
{
aArg.partNum = aIt->partNum;
aArg.segNum = aIt->segNum;
aArg.hwm = aIt->hwm;
if (!aIt->isDict)
setHWMArgs.push_back(aArg);
aFile.partitionNum = aIt->partNum;
aFile.dbRoot = aIt->dbRoot;
aFile.segmentNum = aIt->segNum;
aFile.compType = aIt->compType;
// cout <<"Added to files oid:dbroot:part:seg:compType = " <<
// aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
//<<":"<<aFile.compType <<endl;
files.push_back(aFile);
aIt++;
}
it++;
}
bs.restart();
// cout << " serialized setHWMArgs size = " << setHWMArgs.size() << endl;
serializeInlineVector(bs, setHWMArgs);
// flush files
// cout << "flush files when autocommit on" << endl;
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fIsFirstBatchPm = true;
if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0))
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
TableMetaData::removeTableMetaData(tableOid);
// MCOL-1160 For API bulk insert flush the PrimProc cached dictionary
// blocks tounched
std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;
mapIter = fWEWrapper.getDictMap().find(txnID);
if (mapIter != fWEWrapper.getDictMap().end())
{
std::set<BRM::LBID_t>::iterator lbidIter;
std::vector<BRM::LBID_t> dictFlushBlks;
cerr << "API Flushing blocks: ";
for (lbidIter = (*mapIter).second.begin(); lbidIter != (*mapIter).second.end(); lbidIter++)
{
cerr << *lbidIter << ", ";
dictFlushBlks.push_back((*lbidIter));
}
cerr << endl;
cacheutils::flushPrimProcAllverBlocks(dictFlushBlks);
fWEWrapper.getDictMap().erase(txnID);
}
// MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
return rc;
}
uint8_t WE_DMLCommandProc::processBatchInsertHwm(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t tmp8, rc = 0;
err.clear();
// set hwm for autocommit off
uint32_t tmp32, tableOid;
int txnID;
bool isAutoCommitOn;
bs >> tmp32;
txnID = tmp32;
bs >> tmp8;
isAutoCommitOn = (tmp8 != 0);
bs >> tmp32;
tableOid = tmp32;
bs >> tmp8;
// cout << "processBatchInsertHwm: tranid:isAutoCommitOn = " <<txnID <<":"<< (int)isAutoCommitOn << endl;
std::vector<BRM::FileInfo> files;
std::vector<BRM::OID_t> oidsToFlush;
BRM::FileInfo aFile;
// BRM::FileInfo curFile;
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
ColExtsInfo::iterator aIt;
CalpontSystemCatalog::RIDList ridList;
CalpontSystemCatalog::ROPair roPair;
//@Bug 5996. Validate hwm before set them
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(0);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::OID tableAUXColOid;
try
{
CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid);
ridList = systemCatalogPtr->columnRIDs(tableName);
tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName);
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
TableMetaData::removeTableMetaData(tableOid);
fIsFirstBatchPm = true;
// cout << "flush files when autocommit off" << endl;
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
return rc;
}
// MCOL-5021 Valid AUX column OID for a table is > 3000
// Tables that were created before this feature was added will have
// tableAUXColOid = 0
bool hasAUXCol = (tableAUXColOid > 3000);
if (hasAUXCol)
{
CalpontSystemCatalog::ROPair auxRoPair;
auxRoPair.rid = ridList.back().rid + 1;
auxRoPair.objnum = tableAUXColOid;
ridList.push_back(auxRoPair);
}
std::vector<DBRootExtentInfo> colDBRootExtentInfo;
DBRootExtentInfo aExtentInfo;
std::vector<DBRootExtentInfo> auxColDBRootExtentInfo;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aFile.oid = it->first;
oidsToFlush.push_back(aFile.oid);
roPair.objnum = aFile.oid;
aExtentInfo.fPartition = 0;
aExtentInfo.fDbRoot = 0;
aExtentInfo.fSegment = 0;
aExtentInfo.fLocalHwm = 0;
bool isDict = false;
while (aIt != (it->second).end())
{
aFile.partitionNum = aIt->partNum;
aFile.dbRoot = aIt->dbRoot;
aFile.segmentNum = aIt->segNum;
aFile.compType = aIt->compType;
files.push_back(aFile);
if (!aIt->isDict)
{
if ((aIt->partNum > aExtentInfo.fPartition) ||
((aIt->partNum == aExtentInfo.fPartition) && (aIt->segNum > aExtentInfo.fSegment)) ||
((aIt->partNum == aExtentInfo.fPartition) && (aIt->segNum == aExtentInfo.fSegment) &&
(aIt->segNum > aExtentInfo.fLocalHwm)))
{
aExtentInfo.fPartition = aIt->partNum;
aExtentInfo.fDbRoot = aIt->dbRoot;
aExtentInfo.fSegment = aIt->segNum;
aExtentInfo.fLocalHwm = aIt->hwm;
}
}
else
{
isDict = true;
}
aIt++;
}
if (!isDict)
{
if (hasAUXCol && (((uint32_t)tableAUXColOid) == it->first))
{
auxColDBRootExtentInfo.push_back(aExtentInfo);
}
else
{
colDBRootExtentInfo.push_back(aExtentInfo);
}
}
it++;
}
if (hasAUXCol)
{
colDBRootExtentInfo.insert(colDBRootExtentInfo.end(), auxColDBRootExtentInfo.begin(),
auxColDBRootExtentInfo.end());
}
rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Ending",
hasAUXCol);
if (rc != 0)
{
WErrorCodes ec;
err = ec.errorString(rc);
err += " Check err.log for detailed information.";
TableMetaData::removeTableMetaData(tableOid);
fIsFirstBatchPm = true;
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
rc = 1;
return rc;
}
try
{
if (isAutoCommitOn)
{
bs.restart();
if (fWEWrapper.getIsInsert())
{
// @bug5333, up to here, rc == 0, but flushchunk may fail.
rc = processBatchInsertHwmFlushChunks(tableOid, txnID, files, oidsToFlush, err);
}
if (tmp8 != 0)
TableMetaData::removeTableMetaData(tableOid);
return rc; // will set hwm with version commit.
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Handle case where isAutoCommitOn is false
BRM::DBRM dbrm;
// cout << " In processBatchInsertHwm setting hwm" << endl;
std::vector<BRM::BulkSetHWMArg> setHWMArgs;
it = colsExtsInfoMap.begin();
BulkSetHWMArg aArg;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aArg.oid = it->first;
// cout << "for oid " << aArg.oid << endl;
while (aIt != (it->second).end())
{
aArg.partNum = aIt->partNum;
aArg.segNum = aIt->segNum;
aArg.hwm = aIt->hwm;
//@Bug 6029 dictionary store files already set hwm.
if (!aIt->isDict)
setHWMArgs.push_back(aArg);
aIt++;
}
it++;
}
TableMetaData::removeTableMetaData(tableOid);
fIsFirstBatchPm = true;
// cout << "flush files when autocommit off" << endl;
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
rc = processBatchInsertHwmFlushChunks(tableOid, txnID, files, oidsToFlush, err);
bs.restart();
try
{
serializeInlineVector(bs, setHWMArgs);
}
catch (exception& ex)
{
// Append to errmsg in case we already have an error
if (err.length() > 0)
err += "; ";
err += ex.what();
rc = 1;
return rc;
}
// cout << "flush is called for transaction " << txnID << endl;
return rc;
}
//------------------------------------------------------------------------------
// Flush chunks for the specified table and transaction.
// Also confirms changes to DB files (for hdfs).
// files vector represents list of files to be purged from PrimProc cache.
// oid2ToFlush represents list of oids to be flushed from PrimProc cache.
// Afterwords, the following attributes are reset as follows:
// fWEWrapper.setIsInsert(false);
// fWEWrapper.setBulkFlag(false);
// fIsFirstBatchPm = true;
// returns 0 for success; returns 1 if error occurs
//------------------------------------------------------------------------------
uint8_t WE_DMLCommandProc::processBatchInsertHwmFlushChunks(uint32_t tblOid, int txnID,
const std::vector<BRM::FileInfo>& files,
const std::vector<BRM::OID_t>& oidsToFlush,
std::string& err)
{
uint8_t rc = 0;
std::map<uint32_t, uint32_t> oids;
CalpontSystemCatalog::TableName aTableName;
CalpontSystemCatalog::RIDList ridList;
CalpontSystemCatalog::DictOIDList dictOids;
CalpontSystemCatalog::OID tableAUXColOid;
try
{
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(txnID);
aTableName = systemCatalogPtr->tableName(tblOid);
ridList = systemCatalogPtr->columnRIDs(aTableName, true);
dictOids = systemCatalogPtr->dictOIDs(aTableName);
tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(aTableName);
}
catch (exception& ex)
{
std::ostringstream ossErr;
ossErr << "System Catalog error for table OID " << tblOid;
// Include tbl name in msg unless exception occurred before we got it
if (aTableName.table.length() > 0)
ossErr << '(' << aTableName << ')';
ossErr << "; " << ex.what();
err = ossErr.str();
rc = 1;
return rc;
}
// MCOL-5021 Valid AUX column OID for a table is > 3000
// Tables that were created before this feature was added will have
// tableAUXColOid = 0
if (tableAUXColOid > 3000)
{
CalpontSystemCatalog::ROPair auxRoPair;
auxRoPair.rid = ridList.back().rid + 1;
auxRoPair.objnum = tableAUXColOid;
ridList.push_back(auxRoPair);
}
for (unsigned i = 0; i < ridList.size(); i++)
{
oids[ridList[i].objnum] = ridList[i].objnum;
}
for (unsigned i = 0; i < dictOids.size(); i++)
{
oids[dictOids[i].dictOID] = dictOids[i].dictOID;
}
fWEWrapper.setTransId(txnID);
// @bug5333, up to here, rc == 0, but flushchunk may fail.
rc = fWEWrapper.flushChunks(0, oids);
if (rc == NO_ERROR)
{
// Confirm changes to db files "only" if no error up to this point
if (idbdatafile::IDBPolicy::useHdfs())
{
std::string eMsg;
ConfirmHdfsDbFile confirmHdfs;
int confirmDbRc = confirmHdfs.confirmDbFileListFromMetaFile(tblOid, eMsg);
if (confirmDbRc == NO_ERROR)
{
int endDbRc = confirmHdfs.endDbFileListFromMetaFile(tblOid, true, eMsg);
if (endDbRc != NO_ERROR)
{
// Might want to log this error, but don't think we
// need to report as fatal, as all changes were confirmed.
}
if (files.size() > 0)
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
cacheutils::flushOIDsFromCache(oidsToFlush);
}
else
{
ostringstream ossErr;
ossErr << "Error confirming changes to table " << aTableName << "; " << eMsg;
err = ossErr.str();
rc = 1; // reset to 1
}
}
}
else // flushChunks error
{
WErrorCodes ec;
std::ostringstream ossErr;
ossErr << "Error flushing chunks for table " << aTableName << "; " << ec.errorString(rc);
err = ossErr.str();
rc = 1; // reset to 1
}
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fIsFirstBatchPm = true;
return rc;
}
uint8_t WE_DMLCommandProc::commitBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
// commit all versioned blocks, set hwm, update casual partition
return rc;
}
uint8_t WE_DMLCommandProc::rollbackBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
uint32_t tmp32, tableOid, sessionID;
uint64_t lockID;
bs >> sessionID;
bs >> lockID;
bs >> tmp32;
tableOid = tmp32;
// Bulkrollback
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
CalpontSystemCatalog::TableName aTableName;
try
{
aTableName = systemCatalogPtr->tableName(tableOid);
}
catch (...)
{
err = std::string("No such table for oid ") + std::to_string(tableOid);
rc = 1;
return rc;
}
string table = aTableName.schema + "." + aTableName.table;
string applName("BatchInsert");
rc = fWEWrapper.bulkRollback(tableOid, lockID, table, applName, false, err);
fIsFirstBatchPm = true;
TableMetaData::removeTableMetaData(tableOid);
return rc;
}
uint8_t WE_DMLCommandProc::rollbackBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
// Rollbacked all versioned blocks
return rc;
}
uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, std::string& err,
ByteStream::quadbyte& PMId, uint64_t& blocksChanged)
{
uint8_t rc = 0;
uint32_t tmp32, sessionID;
TxnID txnId;
bs >> PMId;
bs >> tmp32;
txnId = tmp32;
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.setTransId(txnId);
if (!rowGroups[txnId]) // meta data
{
rowGroups[txnId] = new rowgroup::RowGroup();
rowGroups[txnId]->deserialize(bs);
uint8_t pkgType;
bs >> pkgType;
cpackages[txnId].read(bs);
rc = fWEWrapper.startTransaction(txnId);
if (rc != NO_ERROR)
{
WErrorCodes ec;
err = ec.errorString(rc);
}
return rc;
}
bool pushWarning = false;
rowgroup::RGData rgData;
rgData.deserialize(bs);
rowGroups[txnId]->setData(&rgData);
// rowGroups[txnId]->setData(const_cast<uint8_t*>(bs.buf()));
// get rows and values
rowgroup::Row row;
rowGroups[txnId]->initRow(&row);
string value("");
uint32_t rowsThisRowgroup = rowGroups[txnId]->getRowCount();
uint32_t columnsSelected = rowGroups[txnId]->getColumnCount();
std::vector<execplan::CalpontSystemCatalog::ColDataType> fetchColTypes = rowGroups[txnId]->getColTypes();
std::vector<uint32_t> fetchColScales = rowGroups[txnId]->getScale();
std::vector<uint32_t> fetchColColwidths;
for (uint32_t i = 0; i < columnsSelected; i++)
{
fetchColColwidths.push_back(rowGroups[txnId]->getColumnWidth(i));
}
WriteEngine::ColTupleList aColList;
WriteEngine::ColTuple colTuple;
WriteEngine::ColStructList colStructList;
WriteEngine::ColStruct colStruct;
WriteEngine::ColValueList colValueList;
WriteEngine::RIDList rowIDLists;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::DctnryValueList dctnryValueList;
CalpontSystemCatalog::TableName tableName;
CalpontSystemCatalog::TableColName tableColName;
DMLTable* tablePtr = cpackages[txnId].get_Table();
RowList rows = tablePtr->get_RowList();
dmlpackage::ColumnList columnsUpdated = rows[0]->get_ColumnList();
tableColName.table = tableName.table = tablePtr->get_TableName();
tableColName.schema = tableName.schema = tablePtr->get_SchemaName();
tableColName.column = columnsUpdated[0]->get_Name();
sessionID = cpackages[txnId].get_SessionID();
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
CalpontSystemCatalog::OID oid = 0;
CalpontSystemCatalog::ROPair tableRO;
long timeZone = cpackages[txnId].get_TimeZone();
try
{
tableRO = systemCatalogPtr->tableRID(tableName);
oid = systemCatalogPtr->lookupOID(tableColName);
}
catch (std::exception& ex)
{
rc = 1;
ostringstream oss;
oss << "lookupOID got exception " << ex.what() << " with column " << tableColName.schema
<< "." + tableColName.table << "." << tableColName.column;
err = oss.str();
}
catch (...)
{
rc = 1;
ostringstream oss;
oss << "lookupOID got unknown exception with column " << tableColName.schema << "." << tableColName.table
<< "." << tableColName.column;
err = oss.str();
}
if (rc != 0)
return rc;
rowGroups[txnId]->getRow(0, &row);
CalpontSystemCatalog::RID rid = row.getRid();
uint16_t dbRoot, segment, blockNum;
uint32_t partition;
uint8_t extentNum;
// Get the file information from rowgroup
dbRoot = rowGroups[txnId]->getDBRoot();
rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum);
colStruct.fColPartition = partition;
colStruct.fColSegment = segment;
colStruct.fColDbRoot = dbRoot;
dctnryStruct.fColPartition = partition;
dctnryStruct.fColSegment = segment;
dctnryStruct.fColDbRoot = dbRoot;
TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableRO.objnum);
// Build to be updated column structure and values
int error = 0;
unsigned fetchColPos = 0;
bool ridsFetched = false;
bool isNull = false;
boost::any datavalue;
int64_t intColVal = 0;
// timer.start("fetch values");
std::vector<string> colNames;
// for query stats
boost::scoped_array<CalpontSystemCatalog::ColType> colTypes(
new CalpontSystemCatalog::ColType[columnsUpdated.size()]);
boost::scoped_array<int> preBlkNums(new int[columnsUpdated.size()]);
boost::scoped_array<OID> oids(new OID[columnsUpdated.size()]);
BRMWrapper::setUseVb(true);
for (unsigned int j = 0; j < columnsUpdated.size(); j++)
{
// timer.start("lookupsyscat");
tableColName.column = columnsUpdated[j]->get_Name();
try
{
oids[j] = systemCatalogPtr->lookupOID(tableColName);
colTypes[j] = systemCatalogPtr->colType(oids[j]);
}
catch (std::exception& ex)
{
rc = 1;
ostringstream oss;
oss << "colType got exception " << ex.what() << " with column oid " << oid;
err = oss.str();
}
catch (...)
{
rc = 1;
ostringstream oss;
oss << "colType got unknown exception with column oid " << oid;
err = oss.str();
}
if (rc != 0)
return rc;
preBlkNums[j] = -1;
}
for (unsigned int j = 0; j < columnsUpdated.size(); j++)
{
WriteEngine::ColTupleList colTupleList;
CalpontSystemCatalog::ColType colType = colTypes[j];
oid = oids[j];
colStruct.dataOid = oid;
colStruct.colDataType = colType.colDataType;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
tableColName.column = columnsUpdated[j]->get_Name();
if (!ridsFetched)
{
// querystats
uint64_t relativeRID = 0;
for (unsigned i = 0; i < rowsThisRowgroup; i++)
{
rowGroups[txnId]->getRow(i, &row);
rid = row.getRid();
relativeRID = rid - rowGroups[txnId]->getBaseRid();
rid = relativeRID;
convertToRelativeRid(rid, extentNum, blockNum);
rowIDLists.push_back(rid);
uint32_t colWidth = colTypes[j].colWidth;
if (colWidth > 8 && !(colTypes[j].colDataType == CalpontSystemCatalog::DECIMAL ||
colTypes[j].colDataType == CalpontSystemCatalog::UDECIMAL))
{
colWidth = 8;
}
else if (colWidth >= datatypes::MAXDECIMALWIDTH)
{
colWidth = datatypes::MAXDECIMALWIDTH;
}
int rrid = (int)relativeRID / (BYTE_PER_BLOCK / colWidth);
// populate stats.blocksChanged
if (rrid > preBlkNums[j])
{
preBlkNums[j] = rrid;
blocksChanged++;
}
}
ridsFetched = true;
}
bool pushWarn = false;
bool nameNeeded = false;
if (isDictCol(colType))
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
dctnryStruct.dctnryOid = colType.ddn.dictOID;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.fCharsetNumber = colType.charsetNumber;
dctnryStruct.colWidth = colType.colWidth;
if (NO_ERROR != (error = fWEWrapper.openDctnry(txnId, dctnryStruct, false))) // @bug 5572 HDFS tmp file
{
WErrorCodes ec;
err = ec.errorString(error);
rc = error;
return rc;
}
ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStruct.dctnryOid);
ColExtsInfo::iterator it = aColExtsInfo.begin();
while (it != aColExtsInfo.end())
{
if ((it->dbRoot == dctnryStruct.fColDbRoot) && (it->partNum == dctnryStruct.fColPartition) &&
(it->segNum == dctnryStruct.fColSegment))
break;
it++;
}
if (it == aColExtsInfo.end()) // add this one to the list
{
ColExtInfo aExt;
aExt.dbRoot = dctnryStruct.fColDbRoot;
aExt.partNum = dctnryStruct.fColPartition;
aExt.segNum = dctnryStruct.fColSegment;
aExt.compType = dctnryStruct.fCompressionType;
aExt.isDict = true;
aColExtsInfo.push_back(aExt);
}
aTableMetaData->setColExtsInfo(dctnryStruct.dctnryOid, aColExtsInfo);
if (columnsUpdated[j]->get_isFromCol())
{
for (unsigned i = 0; i < rowsThisRowgroup; i++)
{
rowGroups[txnId]->getRow(i, &row);
if (row.isNullValue(fetchColPos))
{
if ((colType.defaultValue.length() <= 0) &&
(colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
rc = 1;
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
return rc;
}
else if (colType.defaultValue.length() > 0)
{
value = colType.defaultValue;
if (value.length() > (unsigned int)colType.colWidth)
{
value = value.substr(0, colType.colWidth);
pushWarn = true;
if (!pushWarning)
{
pushWarning = true;
}
if (pushWarn)
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
if (error != NO_ERROR)
{
fWEWrapper.closeDctnry(txnId, colType.compressionType);
return false;
}
colTuple.data = dctTuple.token;
colTupleList.push_back(colTuple);
}
else
{
WriteEngine::Token nullToken;
colTuple.data = nullToken;
colTupleList.push_back(colTuple);
}
continue;
}
switch (fetchColTypes[fetchColPos])
{
case CalpontSystemCatalog::DATE:
{
intColVal = row.getUintField<4>(fetchColPos);
value = DataConvert::dateToString(intColVal);
break;
}
case CalpontSystemCatalog::DATETIME:
{
intColVal = row.getUintField<8>(fetchColPos);
value = DataConvert::datetimeToString(intColVal, colType.precision);
break;
}
case CalpontSystemCatalog::TIMESTAMP:
{
intColVal = row.getUintField<8>(fetchColPos);
value = DataConvert::timestampToString(intColVal, timeZone, colType.precision);
break;
}
case CalpontSystemCatalog::TIME:
{
intColVal = row.getIntField<8>(fetchColPos);
value = DataConvert::timeToString(intColVal, colType.precision);
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
{
value = row.getStringField(fetchColPos);
unsigned i = strlen(value.c_str());
value = value.substr(0, i);
break;
}
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::TEXT:
{
value = row.getVarBinaryStringField(fetchColPos);
break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
if (fetchColColwidths[fetchColPos] == datatypes::MAXDECIMALWIDTH)
{
datatypes::Decimal dec(row.getTSInt128Field(fetchColPos), fetchColScales[fetchColPos],
rowGroups[txnId]->getPrecision()[fetchColPos]);
value = dec.toString(true);
break;
}
}
/* fall through */
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::UBIGINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::SMALLINT:
case CalpontSystemCatalog::USMALLINT:
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::UTINYINT:
{
{
intColVal = row.getIntField(fetchColPos);
if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDECIMAL && intColVal < 0)
{
intColVal = 0;
}
if (fetchColScales[fetchColPos] <= 0)
{
ostringstream os;
if (isUnsigned(fetchColTypes[fetchColPos]))
os << static_cast<uint64_t>(intColVal);
else
os << intColVal;
value = os.str();
}
else
{
datatypes::Decimal dec(intColVal, fetchColScales[fetchColPos],
rowGroups[txnId]->getPrecision()[fetchColPos]);
value = dec.toString();
}
}
break;
}
// In this case, we're trying to load a double output column with float data. This is the
// case when you do sum(floatcol), e.g.
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
float dl = row.getFloatField(fetchColPos);
if (dl == std::numeric_limits<float>::infinity())
continue;
if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UFLOAT && dl < 0.0)
{
dl = 0.0;
}
ostringstream os;
//@Bug 3350 fix the precision.
os << setprecision(7) << dl;
value = os.str();
break;
}
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
{
double dl = row.getDoubleField(fetchColPos);
if (dl == std::numeric_limits<double>::infinity())
continue;
if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDOUBLE && dl < 0.0)
{
dl = 0.0;
}
ostringstream os;
//@Bug 3350 fix the precision.
os << setprecision(16) << dl;
value = os.str();
break;
}
case CalpontSystemCatalog::LONGDOUBLE:
{
long double dll = row.getLongDoubleField(fetchColPos);
if (dll == std::numeric_limits<long double>::infinity())
continue;
ostringstream os;
//@Bug 3350 fix the precision.
os << setprecision(19) << dll;
value = os.str();
break;
}
default: // treat as int64
{
ostringstream os;
intColVal = row.getUintField<8>(fetchColPos);
os << intColVal;
value = os.str();
break;
}
}
uint32_t funcScale = columnsUpdated[j]->get_funcScale();
if (funcScale != 0)
{
string::size_type pos = value.find_first_of("."); // decimal point
if (pos >= value.length())
value.insert(value.length(), ".");
// padding 0 if needed
pos = value.find_first_of(".");
uint32_t digitsAfterPoint = value.length() - pos - 1;
if (digitsAfterPoint < funcScale)
{
for (uint32_t i = 0; i < (funcScale - digitsAfterPoint); i++)
value += "0";
}
}
// check data length
// trim the string if needed
if (value.length() > (unsigned int)colType.colWidth)
{
value = value.substr(0, colType.colWidth);
if (!pushWarn)
pushWarn = true;
if (!pushWarning)
pushWarning = true;
if (pushWarn)
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
if (error != NO_ERROR)
{
fWEWrapper.closeDctnry(txnId, colType.compressionType);
rc = error;
WErrorCodes ec;
err = ec.errorString(error);
return rc;
}
colTuple.data = dctTuple.token;
colTupleList.push_back(colTuple);
}
if (colType.compressionType == 0)
fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
else
fWEWrapper.closeDctnry(txnId, colType.compressionType, false);
fetchColPos++;
}
else // constant
{
if (columnsUpdated[j]->get_isnull())
{
if ((colType.defaultValue.length() <= 0) &&
(colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
rc = 1;
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
return rc;
}
else if (colType.defaultValue.length() > 0)
{
value = colType.defaultValue;
if (value.length() > (unsigned int)colType.colWidth)
{
value = value.substr(0, colType.colWidth);
pushWarn = true;
if (!pushWarning)
{
pushWarning = true;
}
if (pushWarn)
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
if (error != NO_ERROR)
{
fWEWrapper.closeDctnry(txnId, colType.compressionType);
rc = error;
WErrorCodes ec;
err = ec.errorString(error);
return rc;
}
colTuple.data = dctTuple.token;
if (colType.compressionType == 0)
fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
else
fWEWrapper.closeDctnry(txnId, colType.compressionType,
false); // Constant only need to tokenize once.
}
else
{
WriteEngine::Token nullToken;
colTuple.data = nullToken;
}
}
else
{
value = columnsUpdated[j]->get_Data();
if (value.length() > (unsigned int)colType.colWidth)
{
value = value.substr(0, colType.colWidth);
pushWarn = true;
if (!pushWarning)
{
pushWarning = true;
}
if (pushWarn)
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
if (error != NO_ERROR)
{
fWEWrapper.closeDctnry(txnId, colType.compressionType);
rc = error;
WErrorCodes ec;
err = ec.errorString(error);
return rc;
}
colTuple.data = dctTuple.token;
if (colType.compressionType == 0)
fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
else
fWEWrapper.closeDctnry(txnId, colType.compressionType,
false); // Constant only need to tokenize once.
}
for (unsigned row = 0; row < rowsThisRowgroup; row++)
colTupleList.push_back(colTuple);
}
}
else // Non dictionary column
{
colStruct.colWidth = colType.colWidth;
if (columnsUpdated[j]->get_isFromCol())
{
for (unsigned i = 0; i < rowsThisRowgroup; i++)
{
rowGroups[txnId]->getRow(i, &row);
if (row.isNullValue(fetchColPos))
{
isNull = true;
value = "";
}
else
{
isNull = false;
switch (fetchColTypes[fetchColPos])
{
case CalpontSystemCatalog::DATE:
{
intColVal = row.getUintField<4>(fetchColPos);
value = DataConvert::dateToString(intColVal);
break;
}
case CalpontSystemCatalog::DATETIME:
{
intColVal = row.getUintField<8>(fetchColPos);
value = DataConvert::datetimeToString(intColVal, colType.precision);
break;
}
case CalpontSystemCatalog::TIMESTAMP:
{
intColVal = row.getUintField<8>(fetchColPos);
value = DataConvert::timestampToString(intColVal, timeZone, colType.precision);
break;
}
case CalpontSystemCatalog::TIME:
{
intColVal = row.getIntField<8>(fetchColPos);
value = DataConvert::timeToString(intColVal, colType.precision);
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
{
value = row.getStringField(fetchColPos);
unsigned i = strlen(value.c_str());
value = value.substr(0, i);
break;
}
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::TEXT:
{
value = row.getVarBinaryStringField(fetchColPos);
break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
if (fetchColColwidths[fetchColPos] == datatypes::MAXDECIMALWIDTH)
{
datatypes::Decimal dec(row.getTSInt128Field(fetchColPos), fetchColScales[fetchColPos],
rowGroups[txnId]->getPrecision()[fetchColPos]);
value = dec.toString(true);
break;
}
}
/* fall through */
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::UBIGINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::SMALLINT:
case CalpontSystemCatalog::USMALLINT:
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::UTINYINT:
{
{
intColVal = row.getIntField(fetchColPos);
if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDECIMAL && intColVal < 0)
{
intColVal = 0;
}
if (fetchColScales[fetchColPos] <= 0)
{
ostringstream os;
if (isUnsigned(fetchColTypes[fetchColPos]))
os << static_cast<uint64_t>(intColVal);
else
os << intColVal;
value = os.str();
}
else
{
datatypes::Decimal dec(intColVal, fetchColScales[fetchColPos],
rowGroups[txnId]->getPrecision()[fetchColPos]);
value = dec.toString();
}
}
break;
}
// In this case, we're trying to load a double output column with float data. This is the
// case when you do sum(floatcol), e.g.
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
float dl = row.getFloatField(fetchColPos);
if (dl == std::numeric_limits<float>::infinity())
continue;
if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UFLOAT && dl < 0.0)
{
dl = 0.0;
}
ostringstream os;
//@Bug 3350 fix the precision.
os << setprecision(7) << dl;
value = os.str();
break;
}
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
{
double dl = row.getDoubleField(fetchColPos);
if (dl == std::numeric_limits<double>::infinity())
continue;
if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDOUBLE && dl < 0.0)
{
dl = 0.0;
}
ostringstream os;
//@Bug 3350 fix the precision.
os << setprecision(16) << dl;
value = os.str();
break;
}
case CalpontSystemCatalog::LONGDOUBLE:
{
long double dll = row.getLongDoubleField(fetchColPos);
if (dll == std::numeric_limits<long double>::infinity())
continue;
ostringstream os;
//@Bug 3350 fix the precision.
os << setprecision(19) << dll;
value = os.str();
break;
}
default: // treat as int64
{
ostringstream os;
intColVal = row.getUintField<8>(fetchColPos);
os << intColVal;
value = os.str();
break;
}
}
}
uint32_t funcScale = columnsUpdated[j]->get_funcScale();
if (funcScale != 0)
{
string::size_type pos = value.find_first_of("."); // decimal point
if (pos >= value.length())
value.insert(value.length(), ".");
// padding 0 if needed
pos = value.find_first_of(".");
uint32_t digitsAfterPoint = value.length() - pos - 1;
if (digitsAfterPoint < funcScale)
{
for (uint32_t i = 0; i < (funcScale - digitsAfterPoint); i++)
value += "0";
}
}
// Check NOT NULL constraint and default value
if ((isNull) && (colType.defaultValue.length() <= 0) &&
(colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
rc = 1;
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
return rc;
}
else if ((isNull) && (colType.defaultValue.length() > 0))
{
isNull = false;
bool oneWarn = false;
try
{
datavalue =
colType.convertColumnData(colType.defaultValue, pushWarn, timeZone, isNull, false, false);
}
catch (exception&)
{
//@Bug 2624. Error out on conversion failure
rc = 1;
Message::Args args;
args.add(string("'") + colType.defaultValue + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
}
if ((pushWarn) && (!oneWarn))
oneWarn = true;
colTuple.data = datavalue;
colTupleList.push_back(colTuple);
if (oneWarn)
pushWarn = true;
if (!pushWarning)
{
pushWarning = pushWarn;
}
if (pushWarn)
nameNeeded = true;
}
else
{
try
{
datavalue = colType.convertColumnData(value, pushWarn, timeZone, isNull, false, false);
}
catch (exception&)
{
//@Bug 2624. Error out on conversion failure
rc = 1;
Message::Args args;
args.add(string("'") + value + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
return rc;
}
colTuple.data = datavalue;
colTupleList.push_back(colTuple);
if (!pushWarning)
{
pushWarning = pushWarn;
}
if (pushWarn)
nameNeeded = true;
}
}
fetchColPos++;
}
else // constant column
{
if (columnsUpdated[j]->get_isnull())
{
isNull = true;
}
else
{
isNull = false;
}
string inData(columnsUpdated[j]->get_Data());
if (((colType.colDataType == execplan::CalpontSystemCatalog::DATE) && (inData == "0000-00-00")) ||
((colType.colDataType == execplan::CalpontSystemCatalog::DATETIME) &&
(inData == "0000-00-00 00:00:00")) ||
((colType.colDataType == execplan::CalpontSystemCatalog::TIMESTAMP) &&
(inData == "0000-00-00 00:00:00")))
{
isNull = true;
}
uint64_t nextVal = 0;
if (colType.autoincrement)
{
try
{
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
if (colType.autoincrement && (isNull || (inData.compare("0") == 0)))
{
// reserve nextVal
try
{
bool reserved = fDbrm.getAIRange(oid, rowsThisRowgroup, &nextVal);
if (!reserved)
{
err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
rc = 1;
return rc;
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
isNull = false;
bool oneWarn = false;
for (unsigned row = 0; row < rowsThisRowgroup; row++)
{
ostringstream oss;
oss << nextVal++;
inData = oss.str();
try
{
datavalue = colType.convertColumnData(inData, pushWarn, timeZone, isNull, false, false);
}
catch (exception&)
{
//@Bug 2624. Error out on conversion failure
rc = 1;
Message::Args args;
args.add(string("'") + inData + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
}
if ((pushWarn) && (!oneWarn))
oneWarn = true;
colTuple.data = datavalue;
colTupleList.push_back(colTuple);
}
if (oneWarn)
pushWarn = true;
if (!pushWarning)
{
pushWarning = pushWarn;
}
if (pushWarn)
nameNeeded = true;
}
else if (isNull && (colType.defaultValue.length() <= 0) &&
(colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
rc = 1;
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
return rc;
}
else if (isNull && (colType.defaultValue.length() > 0))
{
isNull = false;
bool oneWarn = false;
for (unsigned row = 0; row < rowsThisRowgroup; row++)
{
try
{
datavalue =
colType.convertColumnData(colType.defaultValue, pushWarn, timeZone, isNull, false, false);
}
catch (exception&)
{
//@Bug 2624. Error out on conversion failure
rc = 1;
Message::Args args;
args.add(string("'") + colType.defaultValue + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
}
if ((pushWarn) && (!oneWarn))
oneWarn = true;
colTuple.data = datavalue;
colTupleList.push_back(colTuple);
}
if (oneWarn)
pushWarn = true;
if (!pushWarning)
{
pushWarning = pushWarn;
}
if (pushWarn)
nameNeeded = true;
}
else
{
try
{
datavalue = colType.convertColumnData(inData, pushWarn, timeZone, isNull, false, true);
}
catch (exception& ex)
{
//@Bug 2624. Error out on conversion failure
rc = 1;
cout << ex.what() << endl;
Message::Args args;
args.add(string("'") + inData + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
return rc;
}
colTuple.data = datavalue;
if (!pushWarning)
{
pushWarning = pushWarn;
}
if (pushWarn)
nameNeeded = true;
for (unsigned row = 0; row < rowsThisRowgroup; row++)
colTupleList.push_back(colTuple);
}
}
}
if (nameNeeded)
{
colNames.push_back(tableColName.column);
}
colStructList.push_back(colStruct);
colValueList.push_back(colTupleList);
cscColTypeList.push_back(colType);
} // end of bulding values and column structure.
// timer.stop("fetch values");
if (rowIDLists.size() > 0)
{
error = fWEWrapper.updateColumnRecs(txnId, cscColTypeList, colStructList, colValueList, rowIDLists,
tableRO.objnum);
}
if (error != NO_ERROR)
{
rc = error;
WErrorCodes ec;
err = ec.errorString(error);
if (error == ERR_BRM_DEAD_LOCK)
{
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
}
else if (error == ERR_BRM_VB_OVERFLOW)
{
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
}
}
if (pushWarning)
{
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
Message::Args args;
string cols = "'" + colNames[0] + "'";
for (unsigned i = 1; i < colNames.size(); i++)
{
cols = cols + ", " + "'" + colNames[i] + "'";
}
args.add(cols);
err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
}
return rc;
}
uint8_t WE_DMLCommandProc::getWrittenLbids(messageqcpp::ByteStream& bs, std::string& err,
ByteStream::quadbyte& PMId)
{
uint8_t rc = 0;
uint32_t txnId;
vector<LBID_t> lbidList;
bs >> txnId;
std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();
try
{
auto mapIter = m_txnLBIDMap.find(txnId);
if (mapIter != m_txnLBIDMap.end())
{
SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
lbidList = spTxnLBIDRec->m_LBIDs;
}
}
catch (...)
{
}
bs.restart();
try
{
serializeInlineVector(bs, lbidList);
}
catch (exception& ex)
{
// Append to errmsg in case we already have an error
if (err.length() > 0)
err += "; ";
err += ex.what();
rc = 1;
return rc;
}
return rc;
}
uint8_t WE_DMLCommandProc::processFlushFiles(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
uint32_t flushCode, txnId, tableOid;
int error;
bs >> flushCode;
bs >> txnId;
bs >> tableOid;
std::map<uint32_t, uint32_t> oids;
CalpontSystemCatalog::TableName aTableName;
CalpontSystemCatalog::RIDList ridList;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(txnId);
// execplan::CalpontSystemCatalog::ColType colType;
CalpontSystemCatalog::DictOIDList dictOids;
if (tableOid >= 3000)
{
try
{
aTableName = systemCatalogPtr->tableName(tableOid);
}
catch (...)
{
err = std::string("Systemcatalog error for tableoid ") + std::to_string(tableOid);
rc = 1;
return rc;
}
dictOids = systemCatalogPtr->dictOIDs(aTableName);
for (unsigned i = 0; i < dictOids.size(); i++)
{
oids[dictOids[i].dictOID] = dictOids[i].dictOID;
}
// if (dictOids.size() > 0)
// colType = systemCatalogPtr->colTypeDct(dictOids[0].dictOID);
}
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
vector<LBID_t> lbidList;
if (idbdatafile::IDBPolicy::useHdfs())
{
// XXX THIS IS WRONG!!!
// save the extent info to mark them invalid, after flush, the meta file will be gone.
std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();
try
{
auto mapIter = m_txnLBIDMap.find(txnId);
if (mapIter != m_txnLBIDMap.end())
{
SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
lbidList = spTxnLBIDRec->m_LBIDs;
}
}
catch (...)
{
}
}
error = fWEWrapper.flushDataFiles(flushCode, txnId, oids);
// No need to close files, flushDataFile will close them.
// if (((colType.compressionType > 0 ) && (dictOids.size() > 0)) || (idbdatafile::IDBPolicy::useHdfs()))
// fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
if (error != NO_ERROR)
{
rc = error;
WErrorCodes ec;
err = ec.errorString(error);
}
// erase rowgroup from the rowGroup map
if (rowGroups[txnId])
{
delete rowGroups[txnId];
rowGroups[txnId] = 0;
}
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
ColExtsInfo::iterator aIt;
std::vector<BRM::FileInfo> files;
std::vector<BRM::OID_t> oidsToFlush;
BRM::FileInfo aFile;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aFile.oid = it->first;
oidsToFlush.push_back(aFile.oid);
while (aIt != (it->second).end())
{
aFile.partitionNum = aIt->partNum;
aFile.dbRoot = aIt->dbRoot;
aFile.segmentNum = aIt->segNum;
aFile.compType = aIt->compType;
files.push_back(aFile);
// cout <<"Added to files oid:dbroot:part:seg:compType = " <<
// aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
//<<":"<<aFile.compType <<endl;
aIt++;
}
it++;
}
if (idbdatafile::IDBPolicy::useHdfs())
{
rc = fWEWrapper.confirmTransaction(txnId);
//@Bug 5700. Purge FD cache after file swap
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
cacheutils::flushOIDsFromCache(oidsToFlush);
fDbrm.invalidateUncommittedExtentLBIDs(0, false, &lbidList);
}
// cout << "Purged files.size:moduleId = " << files.size() << ":"<<Config::getLocalModuleID() << endl;
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::dropPrimProcFdCache();
TableMetaData::removeTableMetaData(tableOid);
// MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
CalpontSystemCatalog::removeCalpontSystemCatalog(txnId);
CalpontSystemCatalog::removeCalpontSystemCatalog(txnId | 0x80000000);
return rc;
}
uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, std::string& err,
ByteStream::quadbyte& PMId, uint64_t& blocksChanged)
{
uint8_t rc = 0;
uint32_t tmp32, sessionID;
TxnID txnId;
bs >> PMId;
bs >> sessionID;
bs >> tmp32;
txnId = tmp32;
string schema, tableName;
bs >> schema;
bs >> tableName;
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.setTransId(txnId);
if (!rowGroups[txnId]) // meta data
{
rowGroups[txnId] = new rowgroup::RowGroup();
rowGroups[txnId]->deserialize(bs);
// If hdfs, call chunkmanager to set up
rc = fWEWrapper.startTransaction(txnId);
if (rc != NO_ERROR)
{
WErrorCodes ec;
err = ec.errorString(rc);
}
return rc;
}
rowgroup::RGData rgData;
rgData.deserialize(bs);
rowGroups[txnId]->setData(&rgData);
// rowGroups[txnId]->setData(const_cast<uint8_t*>(bs.buf()));
// get row ids
rowgroup::Row row;
rowGroups[txnId]->initRow(&row);
WriteEngine::RIDList rowIDList;
CalpontSystemCatalog::RID rid;
uint32_t rowsThisRowgroup = rowGroups[txnId]->getRowCount();
uint16_t dbRoot, segment, blockNum;
uint32_t partition;
uint8_t extentNum;
CalpontSystemCatalog::TableName aTableName;
aTableName.schema = schema;
aTableName.table = tableName;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
CalpontSystemCatalog::ROPair roPair;
CalpontSystemCatalog::OID tableAUXColOid;
CalpontSystemCatalog::RIDList tableRidList;
try
{
roPair = systemCatalogPtr->tableRID(aTableName);
tableRidList = systemCatalogPtr->columnRIDs(aTableName, true);
tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(aTableName);
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// querystats
uint64_t relativeRID = 0;
boost::scoped_array<int> preBlkNums(new int[row.getColumnCount()]);
boost::scoped_array<uint32_t> colWidth(new uint32_t[row.getColumnCount()]);
// initialize
for (uint32_t j = 0; j < row.getColumnCount(); j++)
{
preBlkNums[j] = -1;
colWidth[j] = row.getColumnWidth(j);
execplan::CalpontSystemCatalog::ColDataType colDataType = row.getColType(j);
if (colWidth[j] >= 8 && !(colDataType == execplan::CalpontSystemCatalog::DECIMAL ||
colDataType == execplan::CalpontSystemCatalog::UDECIMAL))
{
colWidth[j] = 8;
}
else if (colWidth[j] >= datatypes::MAXDECIMALWIDTH)
{
colWidth[j] = datatypes::MAXDECIMALWIDTH;
}
}
// Get the file information from rowgroup
dbRoot = rowGroups[txnId]->getDBRoot();
rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum);
WriteEngine::ColStructList colStructList;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::ColStruct colStruct;
colStruct.fColPartition = partition;
colStruct.fColSegment = segment;
colStruct.fColDbRoot = dbRoot;
for (unsigned i = 0; i < rowsThisRowgroup; i++)
{
rowGroups[txnId]->getRow(i, &row);
rid = row.getRid();
relativeRID = rid - rowGroups[txnId]->getBaseRid();
rid = relativeRID;
convertToRelativeRid(rid, extentNum, blockNum);
rowIDList.push_back(rid);
// populate stats.blocksChanged
for (uint32_t j = 0; j < row.getColumnCount(); j++)
{
if ((int)(relativeRID / (BYTE_PER_BLOCK / colWidth[j])) > preBlkNums[j])
{
blocksChanged++;
preBlkNums[j] = relativeRID / (BYTE_PER_BLOCK / colWidth[j]);
}
}
}
// MCOL-5021 Valid AUX column OID for a table is > 3000
// Tables that were created before this feature was added will have
// tableAUXColOid = 0
bool hasAUXCol = tableAUXColOid > 3000;
try
{
for (unsigned i = 0; i < tableRidList.size(); i++)
{
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(tableRidList[i].objnum);
colStruct.dataOid = tableRidList[i].objnum;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
if (colType.colWidth > 8 && !(colType.colDataType == CalpontSystemCatalog::DECIMAL ||
colType.colDataType == CalpontSystemCatalog::UDECIMAL)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = colType.colWidth;
}
colStruct.colDataType = colType.colDataType;
colStructList.push_back(colStruct);
cscColTypeList.push_back(colType);
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
if (hasAUXCol)
{
CalpontSystemCatalog::ColType colType;
colType.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
colType.colWidth = execplan::AUX_COL_WIDTH;
colType.colDataType = execplan::AUX_COL_DATATYPE;
colStruct.dataOid = tableAUXColOid;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
colStruct.colWidth = colType.colWidth;
colStruct.colDataType = colType.colDataType;
colStructList.push_back(colStruct);
cscColTypeList.push_back(colType);
}
std::vector<ColStructList> colExtentsStruct;
std::vector<CSCTypesList> colExtentsColType;
std::vector<void*> colOldValueList;
std::vector<RIDList> ridLists;
colExtentsStruct.push_back(colStructList);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(rowIDList);
int error = 0;
error = fWEWrapper.deleteRow(txnId, colExtentsColType, colExtentsStruct, colOldValueList, ridLists,
roPair.objnum, hasAUXCol);
if (error != NO_ERROR)
{
rc = error;
// cout << "WE Error code " << error << endl;
WErrorCodes ec;
err = ec.errorString(error);
if (error == ERR_BRM_DEAD_LOCK)
{
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
}
else if (error == ERR_BRM_VB_OVERFLOW)
{
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
}
}
return rc;
}
uint8_t WE_DMLCommandProc::processRemoveMeta(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
uint32_t tableOID;
try
{
bs >> tableOID;
// std::cout << ": tableOID-" << tableOID << std::endl;
BulkRollbackMgr::deleteMetaFile(tableOID);
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
return rc;
}
//------------------------------------------------------------------------------
// Process bulk rollback command
//------------------------------------------------------------------------------
uint8_t WE_DMLCommandProc::processBulkRollback(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
err.clear();
try
{
uint32_t tableOID;
uint64_t tableLockID;
std::string tableName;
std::string appName;
// May want to eventually comment out this logging to stdout,
// but it shouldn't hurt to keep in here.
std::cout << "processBulkRollback";
bs >> tableLockID;
// std::cout << ": tableLock-"<< tableLockID;
bs >> tableOID;
// std::cout << "; tableOID-" << tableOID;
bs >> tableName;
if (tableName.length() == 0)
{
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(0);
CalpontSystemCatalog::TableName aTableName = systemCatalogPtr->tableName(tableOID);
tableName = aTableName.toString();
}
std::cout << "; table-" << tableName;
bs >> appName;
std::cout << "; app-" << appName << std::endl;
int we_rc = fWEWrapper.bulkRollback(tableOID, tableLockID, tableName, appName,
false, // no extra debug logging to the console
err);
if (we_rc != NO_ERROR)
rc = 2;
}
catch (exception& ex)
{
std::cout << "processBulkRollback: exception-" << ex.what() << std::endl;
err = ex.what();
rc = 1;
}
return rc;
}
//------------------------------------------------------------------------------
// Process bulk rollback cleanup command (deletes bulk rollback meta data files)
//------------------------------------------------------------------------------
uint8_t WE_DMLCommandProc::processBulkRollbackCleanup(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
err.clear();
try
{
uint32_t tableOID;
// May want to eventually comment out this logging to stdout,
// but it shouldn't hurt to keep in here.
std::cout << "processBulkRollbackCleanup";
bs >> tableOID;
std::cout << ": tableOID-" << tableOID << std::endl;
BulkRollbackMgr::deleteMetaFile(tableOID);
}
catch (exception& ex)
{
std::cout << "processBulkRollbackCleanup: exception-" << ex.what() << std::endl;
err = ex.what();
rc = 1;
}
return rc;
}
uint8_t WE_DMLCommandProc::updateSyscolumnNextval(ByteStream& bs, std::string& err)
{
uint32_t columnOid, sessionID;
uint64_t nextVal;
int rc = 0;
bs >> columnOid;
bs >> nextVal;
bs >> sessionID;
uint16_t dbRoot;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
oids[columnOid] = columnOid;
// oidsToFlush.push_back(columnOid);
BRM::OID_t oid = 1021;
fDbrm.getSysCatDBRoot(oid, dbRoot);
fWEWrapper.setTransId(sessionID);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(sessionID);
// cout << "updateSyscolumnNextval startTransaction id " << sessionID << endl;
rc = fWEWrapper.updateNextValue(sessionID, columnOid, nextVal, sessionID, dbRoot);
if (rc != 0)
{
err = "Error in WE::updateNextValue";
rc = 1;
}
if (idbdatafile::IDBPolicy::useHdfs())
{
cout << "updateSyscolumnNextval flushDataFiles " << endl;
int rc1 = fWEWrapper.flushDataFiles(rc, sessionID, oids);
if ((rc == 0) && (rc1 == 0))
{
cout << "updateSyscolumnNextval confirmTransaction rc =0 " << endl;
rc1 = fWEWrapper.confirmTransaction(sessionID);
cout << "updateSyscolumnNextval confirmTransaction return code is " << rc1 << endl;
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(sessionID, true);
else
fWEWrapper.endTransaction(sessionID, false);
}
else
{
cout << "updateSyscolumnNextval endTransaction with error " << endl;
fWEWrapper.endTransaction(sessionID, false);
}
if (rc == NO_ERROR)
rc = rc1;
}
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DMLCommandProc::processPurgeFDCache(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tableOid;
bs >> tableOid;
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
ColExtsInfo::iterator aIt;
std::vector<BRM::FileInfo> files;
BRM::FileInfo aFile;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aFile.oid = it->first;
while (aIt != (it->second).end())
{
aFile.partitionNum = aIt->partNum;
aFile.dbRoot = aIt->dbRoot;
aFile.segmentNum = aIt->segNum;
aFile.compType = aIt->compType;
files.push_back(aFile);
// cout <<"Added to files oid:dbroot:part:seg:compType = " <<
// aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
//<<":"<<aFile.compType <<endl;
aIt++;
}
it++;
}
if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0))
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
TableMetaData::removeTableMetaData(tableOid);
return rc;
}
uint8_t WE_DMLCommandProc::processFixRows(messageqcpp::ByteStream& bs, std::string& err,
ByteStream::quadbyte& PMId)
{
uint8_t rc = 0;
// cout << " In processFixRows" << endl;
uint32_t tmp32;
uint64_t sessionID;
uint16_t dbRoot, segment;
uint32_t partition;
string schema, tableName;
TxnID txnId;
uint8_t tmp8;
bool firstBatch = false;
WriteEngine::RIDList rowIDList;
bs >> PMId;
bs >> tmp8;
firstBatch = (tmp8 != 0);
bs >> sessionID;
bs >> tmp32;
txnId = tmp32;
bs >> schema;
bs >> tableName;
bs >> dbRoot;
bs >> partition;
bs >> segment;
deserializeInlineVector(bs, rowIDList);
// Need to identify whether this is the first batch to start transaction.
if (firstBatch)
{
rc = fWEWrapper.startTransaction(txnId);
if (rc != NO_ERROR)
{
WErrorCodes ec;
err = ec.errorString(rc);
return rc;
}
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.setTransId(txnId);
fWEWrapper.setFixFlag(true);
}
CalpontSystemCatalog::TableName aTableName;
aTableName.schema = schema;
aTableName.table = tableName;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
CalpontSystemCatalog::ROPair roPair;
CalpontSystemCatalog::RIDList tableRidList;
try
{
roPair = systemCatalogPtr->tableRID(aTableName);
tableRidList = systemCatalogPtr->columnRIDs(aTableName, true);
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
WriteEngine::ColStructList colStructList;
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStructList dctnryStructList;
/* colStruct.fColPartition = partition;
colStruct.fColSegment = segment;
colStruct.fColDbRoot = dbRoot; */
colStruct.fColPartition = 0;
colStruct.fColSegment = 0;
colStruct.fColDbRoot = 3;
// should we always scan dictionary store files?
try
{
for (unsigned i = 0; i < tableRidList.size(); i++)
{
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(tableRidList[i].objnum);
colStruct.dataOid = tableRidList[i].objnum;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
WriteEngine::DctnryStruct dctnryStruct;
dctnryStruct.fColDbRoot = colStruct.fColDbRoot;
dctnryStruct.fColPartition = colStruct.fColPartition;
dctnryStruct.fColSegment = colStruct.fColSegment;
dctnryStruct.fCompressionType = colStruct.fCompressionType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.fCharsetNumber = colType.charsetNumber;
if (colType.colWidth > 8) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
dctnryStruct.dctnryOid = colType.ddn.dictOID;
}
else
{
colStruct.colWidth = colType.colWidth;
}
colStruct.colDataType = colType.colDataType;
colStructList.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
int error = 0;
try
{
error = fWEWrapper.deleteBadRows(txnId, colStructList, rowIDList, dctnryStructList);
if (error != NO_ERROR)
{
rc = error;
// cout << "WE Error code " << error << endl;
WErrorCodes ec;
err = ec.errorString(error);
if (error == ERR_BRM_DEAD_LOCK)
{
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
}
else if (error == ERR_BRM_VB_OVERFLOW)
{
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
}
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
}
// cout << "WES return rc " << (int)rc << " with msg " << err << endl;
return rc;
}
uint8_t WE_DMLCommandProc::processEndTransaction(ByteStream& bs, std::string& err)
{
int rc = 0;
ByteStream::byte tmp8;
bool success;
uint32_t txnid;
bs >> txnid;
bs >> tmp8;
success = (tmp8 != 0);
rc = fWEWrapper.endTransaction(txnid, success);
if (rc != NO_ERROR)
{
WErrorCodes ec;
err = ec.errorString(rc);
}
return rc;
}
//------------------------------------------------------------------------------
// Validates the correctness of the current HWMs for this table.
// The HWMs for all the 1 byte columns should be identical. Same goes
// for all the 2 byte columns, etc. The 2 byte column HWMs should be
// "roughly" (but not necessarily exactly) twice that of a 1 byte column.
// Same goes for the 4 byte column HWMs vs their 2 byte counterparts, etc.
// ridList - columns oids to be used to get column width on to use with validation.
// segFileInfo - Vector of File objects carrying current DBRoot, partition,
// HWM, etc to be validated for the columns belonging to jobTable.
// stage - Current stage we are validating. "Starting" or "Ending".
//------------------------------------------------------------------------------
int WE_DMLCommandProc::validateColumnHWMs(CalpontSystemCatalog::RIDList& ridList,
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr,
const std::vector<DBRootExtentInfo>& segFileInfo, const char* stage,
bool hasAuxCol)
{
int rc = NO_ERROR;
if ((!fIsFirstBatchPm) && (strcmp(stage, "Starting") == 0))
return rc;
// Used to track first 1-byte, 2-byte, 4-byte, 8-byte and 16-byte columns in table
int byte1First = -1;
int byte2First = -1;
int byte4First = -1;
int byte8First = -1;
int byte16First = -1;
// Make sure the HWMs for all 1-byte columns match; same for all 2-byte,
// 4-byte, 8-byte, and 16-byte columns as well.
CalpontSystemCatalog::ColType colType;
Convertor convertor;
uint32_t colWidth;
for (unsigned k = 0; k < segFileInfo.size(); k++)
{
unsigned k1 = 0;
// Find out column width
if (hasAuxCol && (k == segFileInfo.size() - 1))
{
colWidth = execplan::AUX_COL_WIDTH;
}
else
{
colType = systemCatalogPtr->colType(ridList[k].objnum);
colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
}
// Find the first 1-byte, 2-byte, 4-byte, 8-byte and 16-byte columns.
// Use those as our reference HWM for the respective column widths.
switch (colWidth)
{
case 1:
{
if (byte1First == -1)
byte1First = k;
k1 = byte1First;
break;
}
case 2:
{
if (byte2First == -1)
byte2First = k;
k1 = byte2First;
break;
}
case 4:
{
if (byte4First == -1)
byte4First = k;
k1 = byte4First;
break;
}
case 16:
{
if (byte16First == -1)
byte16First = k;
k1 = byte16First;
break;
}
case 8:
default:
{
if (byte8First == -1)
byte8First = k;
k1 = byte8First;
break;
}
} // end of switch based on column width (1,2,4,8 or 16)
// std::cout << "dbg: comparing0 " << stage << " refcol-" << k1 <<
// "; wid-" << jobColK1.width << "; hwm-" << segFileInfo[k1].fLocalHwm <<
// " <to> col-" << k <<
// "; wid-" << jobColK.width << " ; hwm-"<<segFileInfo[k].fLocalHwm<<std::endl;
// Validate that the HWM for this column (k) matches that of the
// corresponding reference column with the same width.
if ((segFileInfo[k1].fDbRoot != segFileInfo[k].fDbRoot) ||
(segFileInfo[k1].fPartition != segFileInfo[k].fPartition) ||
(segFileInfo[k1].fSegment != segFileInfo[k].fSegment) ||
(segFileInfo[k1].fLocalHwm != segFileInfo[k].fLocalHwm))
{
uint32_t colWidth2;
if (hasAuxCol && (k1 == segFileInfo.size() - 1))
{
colWidth2 = execplan::AUX_COL_WIDTH;
}
else
{
CalpontSystemCatalog::ColType colType2;
colType2 = systemCatalogPtr->colType(ridList[k1].objnum);
colWidth2 = colType2.colWidth;
}
ostringstream oss;
oss << stage
<< " HWMs do not match for"
" OID1-"
<< ridList[k1].objnum << "; DBRoot-" << segFileInfo[k1].fDbRoot << "; partition-"
<< segFileInfo[k1].fPartition << "; segment-" << segFileInfo[k1].fSegment << "; hwm-"
<< segFileInfo[k1].fLocalHwm << "; width-" << colWidth2 << ':' << std::endl
<< " and OID2-" << ridList[k].objnum << "; DBRoot-" << segFileInfo[k].fDbRoot << "; partition-"
<< segFileInfo[k].fPartition << "; segment-" << segFileInfo[k].fSegment << "; hwm-"
<< segFileInfo[k].fLocalHwm << "; width-" << colType.colWidth;
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR);
return ERR_BRM_HWMS_NOT_EQUAL;
}
// HWM DBRoot, partition, and segment number should match for all
// columns; so compare DBRoot, part#, and seg# with first column.
if ((segFileInfo[0].fDbRoot != segFileInfo[k].fDbRoot) ||
(segFileInfo[0].fPartition != segFileInfo[k].fPartition) ||
(segFileInfo[0].fSegment != segFileInfo[k].fSegment))
{
CalpontSystemCatalog::ColType colType2;
colType2 = systemCatalogPtr->colType(ridList[0].objnum);
ostringstream oss;
oss << stage
<< " HWM DBRoot,Part#, or Seg# do not match for"
" OID1-"
<< ridList[0].objnum << "; DBRoot-" << segFileInfo[0].fDbRoot << "; partition-"
<< segFileInfo[0].fPartition << "; segment-" << segFileInfo[0].fSegment << "; hwm-"
<< segFileInfo[0].fLocalHwm << "; width-" << colType2.colWidth << ':' << std::endl
<< " and OID2-" << ridList[k].objnum << "; DBRoot-" << segFileInfo[k].fDbRoot << "; partition-"
<< segFileInfo[k].fPartition << "; segment-" << segFileInfo[k].fSegment << "; hwm-"
<< segFileInfo[k].fLocalHwm << "; width-" << colType.colWidth;
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR);
return ERR_BRM_HWMS_NOT_EQUAL;
}
} // end of loop to compare all 1-byte HWMs, 2-byte HWMs, etc.
// Validate/compare HWM for 1-byte column in relation to 2-byte column, etc.
// Without knowing the exact row count, we can't extrapolate the exact HWM
// for the wider column, but we can narrow it down to an expected range.
unsigned refCol = 0;
unsigned colIdx = 0;
// Validate/compare HWMs given a 1-byte column as a starting point
if (byte1First >= 0)
{
refCol = byte1First;
if (byte2First >= 0)
{
HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 2;
HWM hwmHi = hwmLo + 1;
if ((segFileInfo[byte2First].fLocalHwm < hwmLo) || (segFileInfo[byte2First].fLocalHwm > hwmHi))
{
colIdx = byte2First;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
goto errorCheck;
}
}
if (byte4First >= 0)
{
HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 4;
HWM hwmHi = hwmLo + 3;
if ((segFileInfo[byte4First].fLocalHwm < hwmLo) || (segFileInfo[byte4First].fLocalHwm > hwmHi))
{
colIdx = byte4First;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
goto errorCheck;
}
}
if (byte8First >= 0)
{
HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 8;
HWM hwmHi = hwmLo + 7;
if ((segFileInfo[byte8First].fLocalHwm < hwmLo) || (segFileInfo[byte8First].fLocalHwm > hwmHi))
{
colIdx = byte8First;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
goto errorCheck;
}
}
}
// Validate/compare HWMs given a 2-byte column as a starting point
if (byte2First >= 0)
{
refCol = byte2First;
if (byte4First >= 0)
{
HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 2;
HWM hwmHi = hwmLo + 1;
if ((segFileInfo[byte4First].fLocalHwm < hwmLo) || (segFileInfo[byte4First].fLocalHwm > hwmHi))
{
colIdx = byte4First;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
goto errorCheck;
}
}
if (byte8First >= 0)
{
HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 4;
HWM hwmHi = hwmLo + 3;
if ((segFileInfo[byte8First].fLocalHwm < hwmLo) || (segFileInfo[byte8First].fLocalHwm > hwmHi))
{
colIdx = byte8First;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
goto errorCheck;
}
}
}
// Validate/compare HWMs given a 4-byte column as a starting point
if (byte4First >= 0)
{
refCol = byte4First;
if (byte8First >= 0)
{
HWM hwmLo = segFileInfo[byte4First].fLocalHwm * 2;
HWM hwmHi = hwmLo + 1;
if ((segFileInfo[byte8First].fLocalHwm < hwmLo) || (segFileInfo[byte8First].fLocalHwm > hwmHi))
{
colIdx = byte8First;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
goto errorCheck;
}
}
}
// To avoid repeating this message 6 times in the preceding source code, we
// use the "dreaded" goto to branch to this single place for error handling.
errorCheck:
if (rc != NO_ERROR)
{
uint32_t colWidth1, colWidth2;
if (hasAuxCol && (refCol == ridList.size() - 1))
{
colWidth1 = execplan::AUX_COL_WIDTH;
}
else
{
CalpontSystemCatalog::ColType colType1 = systemCatalogPtr->colType(ridList[refCol].objnum);
colWidth1 = convertor.getCorrectRowWidth(colType1.colDataType, colType1.colWidth);
}
if (hasAuxCol && (colIdx == ridList.size() - 1))
{
colWidth2 = execplan::AUX_COL_WIDTH;
}
else
{
CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[colIdx].objnum);
colWidth2 = convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth);
}
ostringstream oss;
oss << stage
<< " HWMs are not in sync for"
" OID1-"
<< ridList[refCol].objnum << "; DBRoot-" << segFileInfo[refCol].fDbRoot << "; partition-"
<< segFileInfo[refCol].fPartition << "; segment-" << segFileInfo[refCol].fSegment << "; hwm-"
<< segFileInfo[refCol].fLocalHwm << "; width-" << colWidth1 << ':' << std::endl
<< " and OID2-" << ridList[colIdx].objnum << "; DBRoot-" << segFileInfo[colIdx].fDbRoot
<< "; partition-" << segFileInfo[colIdx].fPartition << "; segment-" << segFileInfo[colIdx].fSegment
<< "; hwm-" << segFileInfo[colIdx].fLocalHwm << "; width-" << colWidth2;
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR);
}
return rc;
}
} // namespace WriteEngine