1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Serguey Zefirov 53b9a2a0f9 MCOL-4580 extent elimination for dictionary-based text/varchar types
The idea is relatively simple - encode prefixes of collated strings as
integers and use them to compute extents' ranges. Then we can eliminate
extents with strings.

The actual patch does have all the code there but miss one important
step: we do not keep collation index, we keep charset index. Because of
this, some of the tests in the bugfix suite fail and thus main
functionality is turned off.

The reason of this patch to be put into PR at all is that it contains
changes that made CHAR/VARCHAR columns unsigned. This change is needed in
vectorization work.
2022-03-02 23:53:39 +03:00

4797 lines
142 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016-2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: we_dmlcommandproc.cpp 3082 2011-09-26 22:00:38Z chao $
#include <unistd.h>
#include "bytestream.h"
using namespace messageqcpp;
#include "we_messages.h"
#include "we_message_handlers.h"
#include "../../dbcon/dmlpackage/dmlpkg.h"
#include "we_dmlcommandproc.h"
using namespace dmlpackage;
#include "dmlpackageprocessor.h"
using namespace dmlpackageprocessor;
#include "dataconvert.h"
using namespace dataconvert;
#include "calpontsystemcatalog.h"
#include "sessionmanager.h"
using namespace execplan;
#include "messagelog.h"
#include "stopwatch.h"
#include "idberrorinfo.h"
#include "errorids.h"
using namespace logging;
#include "brmtypes.h"
using namespace BRM;
#include "we_tablemetadata.h"
#include "we_dbrootextenttracker.h"
#include "we_bulkrollbackmgr.h"
#include "we_define.h"
#include "we_confirmhdfsdbfile.h"
#include "cacheutils.h"
#include "IDBDataFile.h"
#include "IDBPolicy.h"
#include "checks.h"
#include "columnwidth.h"
using namespace std;
namespace WriteEngine
{
// StopWatch timer;
WE_DMLCommandProc::WE_DMLCommandProc()
{
fIsFirstBatchPm = true;
filesPerColumnPartition = 8;
extentsPerSegmentFile = 1;
dbrootCnt = 1;
extentRows = 0x800000;
config::Config* cf = config::Config::makeConfig();
string fpc = cf->getConfig("ExtentMap", "FilesPerColumnPartition");
if (fpc.length() != 0)
filesPerColumnPartition = cf->uFromText(fpc);
// MCOL-4685: remove the option to set more than 2 extents per file (ExtentsPreSegmentFile).
extentsPerSegmentFile = DEFAULT_EXTENTS_PER_SEGMENT_FILE;
string dbct = cf->getConfig("SystemConfig", "DBRootCount");
if (dbct.length() != 0)
dbrootCnt = cf->uFromText(dbct);
}
WE_DMLCommandProc::WE_DMLCommandProc(const WE_DMLCommandProc& rhs)
{
fIsFirstBatchPm = rhs.fIsFirstBatchPm;
fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
}
WE_DMLCommandProc::~WE_DMLCommandProc()
{
dbRootExtTrackerVec.clear();
}
uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
err.clear();
InsertDMLPackage insertPkg;
ByteStream::quadbyte tmp32;
bs >> tmp32;
BRM::TxnID txnid;
txnid.valid = true;
txnid.id = tmp32;
bs >> tmp32;
uint32_t dbroot = tmp32;
// cout << "processSingleInsert received bytestream length " << bs.length() << endl;
messageqcpp::ByteStream::byte packageType;
bs >> packageType;
insertPkg.read(bs);
uint32_t sessionId = insertPkg.get_SessionID();
// cout << " processSingleInsert for session " << sessionId << endl;
DMLTable* tablePtr = insertPkg.get_Table();
RowList rows = tablePtr->get_RowList();
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::ColValueList colValuesList;
WriteEngine::DictStrList dicStringList;
CalpontSystemCatalog::TableName tableName;
CalpontSystemCatalog::TableColName tableColName;
tableName.table = tableColName.table = tablePtr->get_TableName();
tableName.schema = tableColName.schema = tablePtr->get_SchemaName();
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::ROPair tableRoPair;
std::vector<string> colNames;
bool isWarningSet = false;
try
{
tableRoPair = systemCatalogPtr->tableRID(tableName);
if (rows.size())
{
Row* rowPtr = rows.at(0);
ColumnList columns = rowPtr->get_ColumnList();
unsigned int numcols = rowPtr->get_NumberOfColumns();
cscColTypeList.reserve(numcols);
// WIP
// We presume that DictCols number is low
colStructs.reserve(numcols);
ColumnList::const_iterator column_iterator = columns.begin();
while (column_iterator != columns.end())
{
DMLColumn* columnPtr = *column_iterator;
tableColName.column = columnPtr->get_Name();
// TODO MCOL-641 replace with getColRidsOidsTypes()
CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName);
CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
WriteEngine::ColStruct colStruct;
colStruct.fColDbRoot = dbroot;
WriteEngine::DctnryStruct dctnryStruct;
dctnryStruct.fColDbRoot = dbroot;
colStruct.dataOid = roPair.objnum;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
// Token
if (isDictCol(colType))
{
// WIP Hardcoded value
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = colType.colWidth;
}
colStruct.colDataType = colType.colDataType;
dctnryStruct.fCharsetNumber = colType.charsetNumber;
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = colType.ddn.dictOID;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(colType);
++column_iterator;
}
std::string tmpStr("");
for (unsigned int i = 0; i < numcols; i++)
{
WriteEngine::ColTupleList colTuples;
WriteEngine::DctColTupleList dctColTuples;
RowList::const_iterator row_iterator = rows.begin();
while (row_iterator != rows.end())
{
Row* rowPtr = *row_iterator;
const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i);
tableColName.column = columnPtr->get_Name();
// TODO MCOL-641 remove these calls
CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
boost::any datavalue;
bool isNULL = false;
bool pushWarning = false;
std::vector<std::string> origVals;
origVals = columnPtr->get_DataVector();
WriteEngine::dictStr dicStrings;
// token
if (isDictCol(colType))
{
for (uint32_t i = 0; i < origVals.size(); i++)
{
tmpStr = origVals[i];
isNULL = columnPtr->get_isnull();
if (isNULL || (tmpStr.length() == 0))
isNULL = true;
else
isNULL = false;
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
if (isNULL && colType.defaultValue.empty()) // error out
{
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
rc = 1;
return rc;
}
else if (isNULL && !(colType.defaultValue.empty()))
{
tmpStr = colType.defaultValue;
}
}
if (tmpStr.length() > (unsigned int)colType.colWidth)
{
tmpStr = tmpStr.substr(0, colType.colWidth);
if (!pushWarning)
{
pushWarning = true;
isWarningSet = true;
if ((rc != NO_ERROR) && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
colNames.push_back(tableColName.column);
}
}
WriteEngine::ColTuple colTuple;
colTuple.data = datavalue;
colTuples.push_back(colTuple);
//@Bug 2515. Only pass string values to write engine
dicStrings.push_back(tmpStr);
}
colValuesList.push_back(colTuples);
//@Bug 2515. Only pass string values to write engine
dicStringList.push_back(dicStrings);
}
else
{
string x;
std::string indata;
for (uint32_t i = 0; i < origVals.size(); i++)
{
indata = origVals[i];
isNULL = columnPtr->get_isnull();
if (isNULL || (indata.length() == 0))
isNULL = true;
else
isNULL = false;
// check if autoincrement column and value is 0 or null
uint64_t nextVal = 1;
if (colType.autoincrement)
{
try
{
// WIP What if we combine this and previous loop and fail
// after get nextAIValue ?
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
if (colType.autoincrement && (isNULL || (indata.compare("0") == 0)))
{
try
{
bool reserved = fDbrm.getAIRange(oid, 1, &nextVal);
if (!reserved)
{
err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
rc = 1;
return rc;
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
ostringstream oss;
oss << nextVal;
indata = oss.str();
isNULL = false;
}
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
if (isNULL && colType.defaultValue.empty()) // error out
{
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
rc = 1;
return rc;
}
else if (isNULL && !(colType.defaultValue.empty()))
{
indata = colType.defaultValue;
isNULL = false;
}
}
try
{
datavalue = colType.convertColumnData(indata, pushWarning, insertPkg.get_TimeZone(), isNULL,
false, false);
}
catch (exception&)
{
rc = 1;
Message::Args args;
args.add(string("'") + indata + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
}
//@Bug 1806
if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
{
return rc;
}
if (pushWarning)
{
if (!isWarningSet)
isWarningSet = true;
if (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
colNames.push_back(tableColName.column);
}
WriteEngine::ColTuple colTuple;
colTuple.data = datavalue;
colTuples.push_back(colTuple);
//@Bug 2515. Only pass string values to write engine
dicStrings.push_back(tmpStr);
}
colValuesList.push_back(colTuples);
dicStringList.push_back(dicStrings);
}
++row_iterator;
}
}
}
}
catch (exception& ex)
{
rc = 1;
err = ex.what();
return rc;
}
// call the write engine to write the rows
int error = NO_ERROR;
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
fWEWrapper.setTransId(txnid.id);
// For hdfs use only
uint32_t tblOid = tableRoPair.objnum;
// WIP are we saving HDFS?
if (idbdatafile::IDBPolicy::useHdfs())
{
std::vector<Column> columns;
DctnryStructList dctnryList;
CalpontSystemCatalog::ColType colType;
std::vector<DBRootExtentInfo> colDBRootExtentInfo;
Convertor convertor;
dbRootExtTrackerVec.clear();
fRBMetaWriter.reset(new RBMetaWriter("SingleInsert", NULL));
CalpontSystemCatalog::RIDList ridList;
try
{
ridList = systemCatalogPtr->columnRIDs(tableName, true);
std::vector<OID> dctnryStoreOids(ridList.size());
std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
bool bFirstExtentOnThisPM = false;
// First gather HWM BRM information for all columns
std::vector<int> colWidths;
for (unsigned i = 0; i < ridList.size(); i++)
{
rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
// need handle error
CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
colWidths.push_back(convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth));
}
for (unsigned i = 0; i < ridList.size(); i++)
{
// Find DBRoot/segment file where we want to start adding rows
colType = systemCatalogPtr->colType(ridList[i].objnum);
boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker(
new DBRootExtentTracker(ridList[i].objnum, colWidths, dbRootHWMInfoColVec, i, 0));
dbRootExtTrackerVec.push_back(pDBRootExtentTracker);
DBRootExtentInfo dbRootExtent;
std::string trkErrMsg;
bool bEmptyPM;
if (i == 0)
rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM,
trkErrMsg);
else
pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
colDBRootExtentInfo.push_back(dbRootExtent);
Column aColumn;
aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
aColumn.colDataType = colType.colDataType;
aColumn.compressionType = colType.compressionType;
aColumn.dataFile.oid = ridList[i].objnum;
aColumn.dataFile.fPartition = dbRootExtent.fPartition;
aColumn.dataFile.fSegment = dbRootExtent.fSegment;
aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
columns.push_back(aColumn);
if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
{
DctnryStruct aDctnry;
aDctnry.dctnryOid = colType.ddn.dictOID;
aDctnry.fColPartition = dbRootExtent.fPartition;
aDctnry.fColSegment = dbRootExtent.fSegment;
aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
dctnryList.push_back(aDctnry);
}
if (colType.ddn.dictOID > 0)
{
dctnryStoreOids[i] = colType.ddn.dictOID;
}
else
{
dctnryStoreOids[i] = 0;
}
}
fRBMetaWriter->init(tblOid, tableName.table);
fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
// cout << "Backing up hwm chunks" << endl;
for (unsigned i = 0; i < dctnryList.size(); i++) // back up chunks for compressed dictionary
{
// @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
fRBMetaWriter->backupDctnryHWMChunk(dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot,
dctnryList[i].fColPartition, dctnryList[i].fColSegment);
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
if (colValuesList[0].size() > 0)
{
if (NO_ERROR !=
(error = fWEWrapper.insertColumnRec_Single(txnid.id, cscColTypeList, colStructs, colValuesList,
dctnryStructList, dicStringList, tableRoPair.objnum)))
{
if (error == ERR_BRM_DEAD_LOCK)
{
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
WErrorCodes ec;
err = ec.errorString(error);
}
else if (error == ERR_BRM_VB_OVERFLOW)
{
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
}
else
{
rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
WErrorCodes ec;
err = ec.errorString(error);
}
}
}
std::map<uint32_t, uint32_t> oids;
std::vector<BRM::OID_t> oidsToFlush;
for (unsigned i = 0; i < colStructs.size(); i++)
{
oids[colStructs[i].dataOid] = colStructs[i].dataOid;
oidsToFlush.push_back(colStructs[i].dataOid);
}
for (unsigned i = 0; i < dctnryStructList.size(); i++)
{
oids[dctnryStructList[i].dctnryOid] = dctnryStructList[i].dctnryOid;
oidsToFlush.push_back(dctnryStructList[i].dctnryOid);
}
fWEWrapper.setTransId(txnid.id);
vector<LBID_t> lbidList;
if (idbdatafile::IDBPolicy::useHdfs())
{
// XXX THIS IS WRONG
// save the extent info to mark them invalid, after flush, the meta file will be gone.
std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();
try
{
auto mapIter = m_txnLBIDMap.find(txnid.id);
if (mapIter != m_txnLBIDMap.end())
{
SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
lbidList = spTxnLBIDRec->m_LBIDs;
}
}
catch (...)
{
}
}
// flush files
// @bug5333, up to here, rc may have an error code already, don't overwrite it.
int flushChunksRc = fWEWrapper.flushChunks(0, oids); // why not pass rc to flushChunks?
if (flushChunksRc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream ossErr;
ossErr << "Error flushing chunks for table " << tableName << "; " << ec.errorString(flushChunksRc);
// Append to errmsg in case we already have an error
if (err.length() > 0)
err += "; ";
err += ossErr.str();
if (error == NO_ERROR)
error = flushChunksRc;
if ((rc == NO_ERROR) || (rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
rc = 1; // return hardcoded 1 as the above
}
// Confirm HDFS DB file changes "only" if no error up to this point
if (idbdatafile::IDBPolicy::useHdfs())
{
if (error == NO_ERROR)
{
std::string eMsg;
ConfirmHdfsDbFile confirmHdfs;
error = confirmHdfs.confirmDbFileListFromMetaFile(tblOid, eMsg);
if (error != NO_ERROR)
{
ostringstream ossErr;
ossErr << "Error confirming changes to table " << tableName << "; " << eMsg;
err = ossErr.str();
rc = 1;
}
else // Perform extra cleanup that is necessary for HDFS
{
std::string eMsg;
ConfirmHdfsDbFile confirmHdfs;
int confirmRc2 = confirmHdfs.endDbFileListFromMetaFile(tblOid, true, eMsg);
if (confirmRc2 != NO_ERROR)
{
// Might want to log this error, but don't think we need
// to report as fatal, since all changes were confirmed.
}
// flush PrimProc FD cache
TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tblOid);
ColsExtsInfoMap colsExtsInfoMap = aTableMetaData->getColsExtsInfoMap();
ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
ColExtsInfo::iterator aIt;
std::vector<BRM::FileInfo> files;
BRM::FileInfo aFile;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aFile.oid = it->first;
while (aIt != (it->second).end())
{
aFile.partitionNum = aIt->partNum;
aFile.dbRoot = aIt->dbRoot;
aFile.segmentNum = aIt->segNum;
aFile.compType = aIt->compType;
files.push_back(aFile);
aIt++;
}
it++;
}
if (files.size() > 0)
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
cacheutils::flushOIDsFromCache(oidsToFlush);
fDbrm.invalidateUncommittedExtentLBIDs(0, false, &lbidList);
try
{
BulkRollbackMgr::deleteMetaFile(tblOid);
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
}
} // (error == NO_ERROR) through call to flushChunks()
if (error != NO_ERROR) // rollback
{
string applName("SingleInsert");
fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(), applName, false, err);
BulkRollbackMgr::deleteMetaFile(tblOid);
}
} // extra hdfs steps
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
TableMetaData::removeTableMetaData(tblOid);
if ((rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) || isWarningSet)
{
if (rc == NO_ERROR)
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
Message::Args args;
string cols = "'" + colNames[0] + "'";
for (unsigned i = 1; i < colNames.size(); i++)
{
cols = cols + ", " + "'" + colNames[i] + "'";
}
args.add(cols);
err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
// Strict mode enabled, so rollback on warning
if (insertPkg.get_isWarnToError())
{
string applName("SingleInsert");
fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(), applName, false, err);
BulkRollbackMgr::deleteMetaFile(tblOid);
}
}
// MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
return rc;
}
uint8_t WE_DMLCommandProc::commitVersion(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tmp32;
int txnID;
bs >> tmp32;
txnID = tmp32;
// cout << "processing commit txnid = " << txnID << endl;
rc = fWEWrapper.commit(txnID);
if (rc != 0)
{
WErrorCodes ec;
ostringstream oss;
oss << "WE: Error commiting transaction; " << txnID << ec.errorString(rc) << endl;
err = oss.str();
}
return rc;
}
uint8_t WE_DMLCommandProc::rollbackBlocks(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
// cout << "processing rollbackBlocks txnid = " << txnID << endl;
try
{
rc = fWEWrapper.rollbackBlocks(txnID, sessionID);
}
catch (std::exception& ex)
{
rc = 1;
err = ex.what();
}
return rc;
}
uint8_t WE_DMLCommandProc::rollbackVersion(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t sessionID, tmp32;
int txnID;
bs >> sessionID;
bs >> tmp32;
txnID = tmp32;
// cout << "processing rollbackVersion txnid = " << txnID << endl;
rc = fWEWrapper.rollbackVersion(txnID, sessionID);
if (rc != 0)
{
WErrorCodes ec;
ostringstream oss;
oss << "WE: Error rolling back transaction " << txnID << " for session " << sessionID << "; "
<< ec.errorString(rc) << endl;
err = oss.str();
}
return rc;
}
uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::string& err,
ByteStream::quadbyte& PMId)
{
int rc = 0;
InsertDMLPackage insertPkg;
ByteStream::quadbyte tmp32;
bs >> tmp32;
bs >> PMId;
insertPkg.read(bs);
uint32_t sessionId = insertPkg.get_SessionID();
DMLTable* tablePtr = insertPkg.get_Table();
bool isAutocommitOn = insertPkg.get_isAutocommitOn();
if (idbdatafile::IDBPolicy::useHdfs())
isAutocommitOn = true;
BRM::TxnID txnid;
txnid.id = tmp32;
txnid.valid = true;
RowList rows = tablePtr->get_RowList();
bool isInsertSelect = insertPkg.get_isInsertSelect();
WriteEngine::ColStructList colStructs;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
WriteEngine::ColValueList colValuesList;
WriteEngine::DictStrList dicStringList;
CalpontSystemCatalog::TableName tableName;
CalpontSystemCatalog::TableColName tableColName;
tableName.table = tableColName.table = tablePtr->get_TableName();
tableName.schema = tableColName.schema = tablePtr->get_SchemaName();
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::ROPair roPair;
CalpontSystemCatalog::RIDList ridList;
CalpontSystemCatalog::DictOIDList dictOids;
try
{
ridList = systemCatalogPtr->columnRIDs(tableName, true);
roPair = systemCatalogPtr->tableRID(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
std::vector<OID> dctnryStoreOids(ridList.size());
std::vector<Column> columns;
DctnryStructList dctnryList;
std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
uint32_t tblOid = roPair.objnum;
CalpontSystemCatalog::ColType colType;
std::vector<DBRootExtentInfo> colDBRootExtentInfo;
bool bFirstExtentOnThisPM = false;
Convertor convertor;
if (fIsFirstBatchPm)
{
dbRootExtTrackerVec.clear();
if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn)))
fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
fWEWrapper.setTransId(txnid.id);
try
{
// First gather HWM BRM information for all columns
std::vector<int> colWidths;
for (unsigned i = 0; i < ridList.size(); i++)
{
rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
// need handle error
CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
colWidths.push_back(convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth));
}
for (unsigned i = 0; i < ridList.size(); i++)
{
// Find DBRoot/segment file where we want to start adding rows
colType = systemCatalogPtr->colType(ridList[i].objnum);
boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker(
new DBRootExtentTracker(ridList[i].objnum, colWidths, dbRootHWMInfoColVec, i, 0));
dbRootExtTrackerVec.push_back(pDBRootExtentTracker);
DBRootExtentInfo dbRootExtent;
std::string trkErrMsg;
bool bEmptyPM;
if (i == 0)
{
rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM,
trkErrMsg);
}
else
pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
colDBRootExtentInfo.push_back(dbRootExtent);
Column aColumn;
aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
aColumn.colDataType = colType.colDataType;
aColumn.compressionType = colType.compressionType;
aColumn.dataFile.oid = ridList[i].objnum;
aColumn.dataFile.fPartition = dbRootExtent.fPartition;
aColumn.dataFile.fSegment = dbRootExtent.fSegment;
aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
columns.push_back(aColumn);
if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
{
DctnryStruct aDctnry;
aDctnry.dctnryOid = colType.ddn.dictOID;
aDctnry.fColPartition = dbRootExtent.fPartition;
aDctnry.fColSegment = dbRootExtent.fSegment;
aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
dctnryList.push_back(aDctnry);
}
if (colType.ddn.dictOID > 0)
{
dctnryStoreOids[i] = colType.ddn.dictOID;
}
else
{
dctnryStoreOids[i] = 0;
}
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
//@Bug 5996 validate hwm before starts
rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting");
if (rc != 0)
{
WErrorCodes ec;
err = ec.errorString(rc);
err += " Check err.log for detailed information.";
fIsFirstBatchPm = false;
rc = 1;
return rc;
}
}
std::vector<BRM::LBIDRange> rangeList;
// use of MetaFile for bulk rollback support
if (fIsFirstBatchPm && isAutocommitOn)
{
// save meta data, version last block for each dbroot at the start of batch insert
try
{
fRBMetaWriter->init(tblOid, tableName.table);
fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
if (!bFirstExtentOnThisPM)
{
for (unsigned i = 0; i < dctnryList.size(); i++) // back up chunks for compressed dictionary
{
// @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
fRBMetaWriter->backupDctnryHWMChunk(dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot,
dctnryList[i].fColPartition, dctnryList[i].fColSegment);
}
}
}
catch (WeException& ex) // catch exception to close file, then rethrow
{
rc = 1;
err = ex.what();
}
// Do versioning. Currently, we only version columns, not strings. If there is a design change, this will
// need to be re-visited
if (rc != 0)
return rc;
}
std::vector<string> colNames;
bool isWarningSet = false;
if (rows.size())
{
Row* rowPtr = rows.at(0);
ColumnList columns = rowPtr->get_ColumnList();
ColumnList::const_iterator column_iterator = columns.begin();
try
{
while (column_iterator != columns.end())
{
DMLColumn* columnPtr = *column_iterator;
tableColName.column = columnPtr->get_Name();
CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName);
CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = roPair.objnum;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
// Token
if (isDictCol(colType))
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = colType.colWidth;
}
colStruct.colDataType = colType.colDataType;
dctnryStruct.fCharsetNumber = colType.charsetNumber;
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = colType.ddn.dictOID;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
cscColTypeList.push_back(colType);
++column_iterator;
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
unsigned int numcols = rowPtr->get_NumberOfColumns();
std::string tmpStr("");
try
{
for (unsigned int i = 0; i < numcols; i++)
{
WriteEngine::ColTupleList colTuples;
WriteEngine::DctColTupleList dctColTuples;
RowList::const_iterator row_iterator = rows.begin();
bool pushWarning = false;
while (row_iterator != rows.end())
{
Row* rowPtr = *row_iterator;
const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i);
tableColName.column = columnPtr->get_Name();
CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName);
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
boost::any datavalue;
bool isNULL = false;
std::vector<std::string> origVals;
origVals = columnPtr->get_DataVector();
WriteEngine::dictStr dicStrings;
// token
if (isDictCol(colType))
{
for (uint32_t i = 0; i < origVals.size(); i++)
{
tmpStr = origVals[i];
if (tmpStr.length() == 0)
isNULL = true;
else
isNULL = false;
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
if (isNULL && colType.defaultValue.empty()) // error out
{
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
rc = 1;
return rc;
}
else if (isNULL && !(colType.defaultValue.empty()))
{
tmpStr = colType.defaultValue;
}
}
if (tmpStr.length() > (unsigned int)colType.colWidth)
{
tmpStr = tmpStr.substr(0, colType.colWidth);
if (!pushWarning)
pushWarning = true;
}
WriteEngine::ColTuple colTuple;
colTuple.data = datavalue;
colTuples.push_back(colTuple);
//@Bug 2515. Only pass string values to write engine
dicStrings.push_back(tmpStr);
}
colValuesList.push_back(colTuples);
//@Bug 2515. Only pass string values to write engine
dicStringList.push_back(dicStrings);
}
else
{
string x;
std::string indata;
// scan once to check how many autoincrement value needed
uint32_t nextValNeeded = 0;
uint64_t nextVal = 1;
if (colType.autoincrement)
{
try
{
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
for (uint32_t i = 0; i < origVals.size(); i++)
{
indata = origVals[i];
if (indata.length() == 0)
isNULL = true;
else
isNULL = false;
if (isNULL || (indata.compare("0") == 0))
nextValNeeded++;
}
}
if (nextValNeeded > 0) // reserve next value
{
try
{
bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal);
if (!reserved)
{
err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
rc = 1;
return rc;
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
for (uint32_t i = 0; i < origVals.size(); i++)
{
indata = origVals[i];
if (indata.length() == 0)
isNULL = true;
else
isNULL = false;
// check if autoincrement column and value is 0 or null
if (colType.autoincrement && (isNULL || (indata.compare("0") == 0)))
{
ostringstream oss;
oss << nextVal++;
indata = oss.str();
isNULL = false;
}
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
if (isNULL && colType.defaultValue.empty()) // error out
{
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
rc = 1;
return rc;
}
else if (isNULL && !(colType.defaultValue.empty()))
{
indata = colType.defaultValue;
isNULL = false;
}
}
try
{
datavalue = colType.convertColumnData(indata, pushWarning, insertPkg.get_TimeZone(), isNULL,
false, false);
}
catch (exception&)
{
rc = 1;
Message::Args args;
args.add(string("'") + indata + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
}
//@Bug 1806
if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
{
return rc;
}
if (pushWarning && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
WriteEngine::ColTuple colTuple;
colTuple.data = datavalue;
colTuples.push_back(colTuple);
//@Bug 2515. Only pass string values to write engine
dicStrings.push_back(tmpStr);
}
colValuesList.push_back(colTuples);
dicStringList.push_back(dicStrings);
}
++row_iterator;
}
if (pushWarning)
{
colNames.push_back(tableColName.column);
isWarningSet = true;
}
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
// call the write engine to write the rows
int error = NO_ERROR;
if (colValuesList.size() > 0)
{
if (colValuesList[0].size() > 0)
{
/* Begin-Disable use of MetaFile for bulk rollback support;
Use alternate call below that passes 0 ptr for RBMetaWriter
if (NO_ERROR !=
(error = fWEWrapper.insertColumnRecs(txnid.id, colStructs, colValuesList,
dctnryStructList, dicStringList, dbRootExtTrackerVec, fRBMetaWriter.get(), bFirstExtentOnThisPM,
isInsertSelect, 0, roPair.objnum, fIsFirstBatchPm))) End-Disable use of MetaFile for bulk rollback
support
*/
if (NO_ERROR != (error = fWEWrapper.insertColumnRecs(
txnid.id, cscColTypeList, colStructs, colValuesList, dctnryStructList,
dicStringList, dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect,
isAutocommitOn, roPair.objnum, fIsFirstBatchPm)))
{
if (error == ERR_BRM_DEAD_LOCK)
{
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
WErrorCodes ec;
err = ec.errorString(error);
}
else if (error == ERR_BRM_VB_OVERFLOW)
{
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
}
else
{
rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
WErrorCodes ec;
err = ec.errorString(error);
}
}
}
}
if (fIsFirstBatchPm && isAutocommitOn)
{
// fWEWrapper.writeVBEnd(txnid.id, rangeList);
fIsFirstBatchPm = false;
}
else if (fIsFirstBatchPm)
{
fIsFirstBatchPm = false;
}
if (isWarningSet && (rc == NO_ERROR))
{
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
Message::Args args;
string cols = "'" + colNames[0] + "'";
for (unsigned i = 1; i < colNames.size(); i++)
{
cols = cols + ", " + "'" + colNames[i] + "'";
}
args.add(cols);
err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
// Strict mode enabled, so rollback on warning
if (insertPkg.get_isWarnToError())
{
string applName("BatchInsert");
fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(), applName, false, err);
BulkRollbackMgr::deleteMetaFile(tblOid);
}
}
// MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
return rc;
}
uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string& err,
ByteStream::quadbyte& PMId)
{
int rc = 0;
// cout << "processBatchInsert received bytestream length " << bs.length() << endl;
ByteStream::quadbyte tmp32;
ByteStream::byte tmp8;
bs >> tmp32;
// cout << "processBatchInsert got transaction id " << tmp32 << endl;
bs >> PMId;
// cout << "processBatchInsert gor PMId " << PMId << endl;
uint32_t sessionId;
bs >> sessionId;
// cout << " processBatchInsert for session " << sessionId << endl;
bool isAutocommitOn;
bs >> tmp8;
isAutocommitOn = tmp8;
if (idbdatafile::IDBPolicy::useHdfs())
isAutocommitOn = true;
// cout << "This session isAutocommitOn is " << isAutocommitOn << endl;
BRM::TxnID txnid;
txnid.id = tmp32;
txnid.valid = true;
bool isInsertSelect;
bs >> tmp8;
// For insert select, skip the hwm block and start inserting from the next block
// to avoid self insert issue.
// For batch insert: if not first batch, use the saved last rid to start adding rows.
isInsertSelect = tmp8;
WriteEngine::ColStructList colStructs;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryValueList dctnryValueList;
std::vector<uint64_t> colValuesList;
WriteEngine::DictStrList dicStringList;
CalpontSystemCatalog::TableName tableName;
CalpontSystemCatalog::TableColName tableColName;
bs >> tableColName.table;
bs >> tableColName.schema;
tableName.table = tableColName.table;
tableName.schema = tableColName.schema;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
CalpontSystemCatalog::ROPair roPair;
CalpontSystemCatalog::RIDList ridList;
CalpontSystemCatalog::DictOIDList dictOids;
try
{
ridList = systemCatalogPtr->columnRIDs(tableName, true);
roPair = systemCatalogPtr->tableRID(tableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
std::vector<OID> dctnryStoreOids(ridList.size());
std::vector<Column> columns;
DctnryStructList dctnryList;
std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(ridList.size());
uint32_t tblOid = roPair.objnum;
CalpontSystemCatalog::ColType colType;
std::vector<DBRootExtentInfo> colDBRootExtentInfo;
bool bFirstExtentOnThisPM = false;
Convertor convertor;
if (fIsFirstBatchPm)
{
dbRootExtTrackerVec.clear();
if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn)))
fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL));
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
fWEWrapper.setTransId(txnid.id);
try
{
// First gather HWM BRM information for all columns
std::vector<int> colWidths;
for (unsigned i = 0; i < ridList.size(); i++)
{
rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]);
// need handle error
CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum);
colWidths.push_back(convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth));
}
for (unsigned i = 0; i < ridList.size(); i++)
{
// Find DBRoot/segment file where we want to start adding rows
colType = systemCatalogPtr->colType(ridList[i].objnum);
boost::shared_ptr<DBRootExtentTracker> pDBRootExtentTracker(
new DBRootExtentTracker(ridList[i].objnum, colWidths, dbRootHWMInfoColVec, i, 0));
dbRootExtTrackerVec.push_back(pDBRootExtentTracker);
DBRootExtentInfo dbRootExtent;
std::string trkErrMsg;
bool bEmptyPM;
if (i == 0)
{
rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM,
trkErrMsg);
/* cout << "bEmptyPM = " << (int) bEmptyPM << " bFirstExtentOnThisPM= " <<
(int)bFirstExtentOnThisPM << " oid:dbroot:hwm = " << ridList[i].objnum <<
":"<<dbRootExtent.fDbRoot << ":"
<<":"<<dbRootExtent.fLocalHwm << " err = " << trkErrMsg << endl; */
}
else
pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
colDBRootExtentInfo.push_back(dbRootExtent);
Column aColumn;
aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
aColumn.colDataType = colType.colDataType;
aColumn.compressionType = colType.compressionType;
aColumn.dataFile.oid = ridList[i].objnum;
aColumn.dataFile.fPartition = dbRootExtent.fPartition;
aColumn.dataFile.fSegment = dbRootExtent.fSegment;
aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot;
aColumn.dataFile.hwm = dbRootExtent.fLocalHwm;
columns.push_back(aColumn);
if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0))
{
DctnryStruct aDctnry;
aDctnry.dctnryOid = colType.ddn.dictOID;
aDctnry.fColPartition = dbRootExtent.fPartition;
aDctnry.fColSegment = dbRootExtent.fSegment;
aDctnry.fColDbRoot = dbRootExtent.fDbRoot;
dctnryList.push_back(aDctnry);
}
if (colType.ddn.dictOID > 0)
{
dctnryStoreOids[i] = colType.ddn.dictOID;
}
else
{
dctnryStoreOids[i] = 0;
}
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
//@Bug 5996 validate hwm before starts
rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting");
if (rc != 0)
{
WErrorCodes ec;
err = ec.errorString(rc);
err += " Check err.log for detailed information.";
fIsFirstBatchPm = false;
rc = 1;
return rc;
}
}
std::vector<BRM::LBIDRange> rangeList;
// use of MetaFile for bulk rollback support
if (fIsFirstBatchPm && isAutocommitOn)
{
// save meta data, version last block for each dbroot at the start of batch insert
try
{
fRBMetaWriter->init(tblOid, tableName.table);
fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
// cout << "Saved meta files" << endl;
if (!bFirstExtentOnThisPM)
{
// cout << "Backing up hwm chunks" << endl;
for (unsigned i = 0; i < dctnryList.size(); i++) // back up chunks for compressed dictionary
{
// @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
fRBMetaWriter->backupDctnryHWMChunk(dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot,
dctnryList[i].fColPartition, dctnryList[i].fColSegment);
}
}
}
catch (WeException& ex) // catch exception to close file, then rethrow
{
rc = 1;
err = ex.what();
}
// Do versioning. Currently, we only version columns, not strings. If there is a design change, this will
// need to be re-visited
if (rc != 0)
return rc;
}
std::vector<string> colNames;
bool isWarningSet = false;
uint32_t columnCount;
bs >> columnCount;
if (columnCount)
{
try
{
for (uint32_t current_column = 0; current_column < columnCount; current_column++)
{
uint32_t tmp32;
std::string colName;
bs >> tmp32;
bs >> colName;
colNames.push_back(colName);
CalpontSystemCatalog::OID oid = tmp32;
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStruct dctnryStruct;
colStruct.dataOid = oid;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
// Token
if (isDictCol(colType))
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = colType.colWidth;
}
colStruct.colDataType = colType.colDataType;
dctnryStruct.fCharsetNumber = colType.charsetNumber;
if (colStruct.tokenFlag)
{
dctnryStruct.dctnryOid = colType.ddn.dictOID;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
}
else
{
dctnryStruct.dctnryOid = 0;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.colWidth = colType.colWidth;
}
colStructs.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
std::string tmpStr("");
uint32_t valuesPerColumn;
bs >> valuesPerColumn;
colValuesList.reserve(columnCount * valuesPerColumn);
try
{
bool pushWarning = false;
for (uint32_t j = 0; j < columnCount; j++)
{
WriteEngine::DctColTupleList dctColTuples;
tableColName.column = colNames[j];
CalpontSystemCatalog::OID oid = colStructs[j].dataOid;
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(oid);
bool isNULL = false;
WriteEngine::dictStr dicStrings;
// token
if (isDictCol(colType))
{
for (uint32_t i = 0; i < valuesPerColumn; i++)
{
bs >> tmp8;
isNULL = tmp8;
bs >> tmpStr;
if (tmpStr.length() == 0)
isNULL = true;
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
if (isNULL && colType.defaultValue.empty()) // error out
{
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
rc = 1;
return rc;
}
else if (isNULL && !(colType.defaultValue.empty()))
{
tmpStr = colType.defaultValue;
}
}
if (tmpStr.length() > (unsigned int)colType.colWidth)
{
tmpStr = tmpStr.substr(0, colType.colWidth);
if (!pushWarning)
pushWarning = true;
}
colValuesList.push_back(0);
//@Bug 2515. Only pass string values to write engine
dicStrings.push_back(tmpStr);
}
//@Bug 2515. Only pass string values to write engine
dicStringList.push_back(dicStrings);
}
else
{
string x;
// scan once to check how many autoincrement value needed
uint32_t nextValNeeded = 0;
uint64_t nextVal = 1;
if (colType.autoincrement)
{
try
{
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
for (uint32_t i = 0; i < valuesPerColumn; i++)
{
bs >> tmp8;
isNULL = tmp8;
uint8_t val8;
uint16_t val16;
uint32_t val32;
uint64_t val64;
uint64_t colValue;
float valF;
double valD;
std::string valStr;
bool valZero = false; // Needed for autoinc check
switch (colType.colDataType)
{
case execplan::CalpontSystemCatalog::TINYINT:
case execplan::CalpontSystemCatalog::UTINYINT:
bs >> val8;
if (val8 == 0)
valZero = true;
colValue = val8;
break;
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::USMALLINT:
bs >> val16;
if (val16 == 0)
valZero = true;
colValue = val16;
break;
case execplan::CalpontSystemCatalog::DATE:
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
bs >> val32;
if (val32 == 0)
valZero = true;
colValue = val32;
break;
case execplan::CalpontSystemCatalog::BIGINT:
case execplan::CalpontSystemCatalog::DATETIME:
case execplan::CalpontSystemCatalog::TIME:
case execplan::CalpontSystemCatalog::TIMESTAMP:
case execplan::CalpontSystemCatalog::UBIGINT:
bs >> val64;
if (val64 == 0)
valZero = true;
colValue = val64;
break;
case execplan::CalpontSystemCatalog::DECIMAL:
switch (colType.colWidth)
{
case 1:
{
bs >> val8;
colValue = val8;
break;
}
case 2:
{
bs >> val16;
colValue = val16;
break;
}
case 4:
{
bs >> val32;
colValue = val32;
break;
}
default:
{
bs >> val64;
colValue = val64;
break;
}
}
break;
case execplan::CalpontSystemCatalog::UDECIMAL:
// UDECIMAL numbers may not be negative
if (colType.colWidth == 1)
{
bs >> val8;
// FIXME: IDK what would it mean if valN are unsigned
if (utils::is_negative(val8) && val8 != joblist::TINYINTEMPTYROW &&
val8 != joblist::TINYINTNULL)
{
val8 = 0;
pushWarning = true;
}
colValue = val8;
}
else if (colType.colWidth == 2)
{
bs >> val16;
if (utils::is_negative(val16) && val16 != joblist::SMALLINTEMPTYROW &&
val16 != joblist::SMALLINTNULL)
{
val16 = 0;
pushWarning = true;
}
colValue = val16;
}
else if (colType.colWidth == 4)
{
bs >> val32;
if (utils::is_negative(val32) && val32 != joblist::INTEMPTYROW && val32 != joblist::INTNULL)
{
val32 = 0;
pushWarning = true;
}
colValue = val32;
}
else if (colType.colWidth == 8)
{
bs >> val64;
if (utils::is_negative(val64) && val64 != joblist::BIGINTEMPTYROW &&
val64 != joblist::BIGINTNULL)
{
val64 = 0;
pushWarning = true;
}
colValue = val64;
}
break;
case execplan::CalpontSystemCatalog::DOUBLE:
bs >> val64;
colValue = val64;
break;
case execplan::CalpontSystemCatalog::UDOUBLE:
bs >> val64;
memcpy(&valD, &val64, 8);
if (valD < 0.0 && valD != static_cast<double>(joblist::DOUBLEEMPTYROW) &&
valD != static_cast<double>(joblist::DOUBLENULL))
{
valD = 0.0;
pushWarning = true;
}
colValue = val64;
break;
case execplan::CalpontSystemCatalog::FLOAT:
bs >> val32;
colValue = val32;
break;
case execplan::CalpontSystemCatalog::UFLOAT:
bs >> val32;
memcpy(&valF, &val32, 4);
if (valF < 0.0 && valF != static_cast<float>(joblist::FLOATEMPTYROW) &&
valF != static_cast<float>(joblist::FLOATNULL))
{
valF = 0.0;
pushWarning = true;
}
colValue = val32;
break;
case execplan::CalpontSystemCatalog::CHAR:
case execplan::CalpontSystemCatalog::VARCHAR:
case execplan::CalpontSystemCatalog::TEXT:
case execplan::CalpontSystemCatalog::BLOB:
bs >> valStr;
if (valStr.length() > (unsigned int)colType.colWidth)
{
valStr = valStr.substr(0, colType.colWidth);
pushWarning = true;
}
else
{
if ((unsigned int)colType.colWidth > valStr.length())
{
// Pad null character to the string
valStr.resize(colType.colWidth, 0);
}
}
// FIXME: colValue is uint64_t (8 bytes)
memcpy(&colValue, valStr.c_str(), valStr.length());
break;
default:
rc = 1;
err = IDBErrorInfo::instance()->errorMsg(ERR_DATATYPE_NOT_SUPPORT);
break;
}
// check if autoincrement column and value is 0 or null
if (colType.autoincrement && (isNULL || valZero))
{
ostringstream oss;
oss << nextVal++;
isNULL = false;
try
{
nextValNeeded++;
bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal);
if (!reserved)
{
err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
rc = 1;
return rc;
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
colValue = nextVal;
}
if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
if (isNULL && colType.defaultValue.empty()) // error out
{
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
rc = 1;
return rc;
}
else if (isNULL && !(colType.defaultValue.empty()))
{
memcpy(&colValue, colType.defaultValue.c_str(), colType.defaultValue.length());
isNULL = false;
}
}
//@Bug 1806
if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
{
return rc;
}
if (pushWarning && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING))
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
colValuesList.push_back(colValue);
//@Bug 2515. Only pass string values to write engine
dicStrings.push_back(valStr);
}
dicStringList.push_back(dicStrings);
}
if (pushWarning)
{
colNames.push_back(tableColName.column);
isWarningSet = true;
}
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
// call the write engine to write the rows
int error = NO_ERROR;
// fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3);
// cout << "Batch inserting a row with transaction id " << txnid.id << endl;
if (colValuesList.size() > 0)
{
if (NO_ERROR !=
(error = fWEWrapper.insertColumnRecsBinary(
txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList, dbRootExtTrackerVec, 0,
bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm)))
{
if (error == ERR_BRM_DEAD_LOCK)
{
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
WErrorCodes ec;
err = ec.errorString(error);
}
else if (error == ERR_BRM_VB_OVERFLOW)
{
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
}
else
{
rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR;
WErrorCodes ec;
err = ec.errorString(error);
}
}
}
if (fIsFirstBatchPm && isAutocommitOn)
{
// fWEWrapper.writeVBEnd(txnid.id, rangeList);
fIsFirstBatchPm = false;
}
else if (fIsFirstBatchPm)
{
fIsFirstBatchPm = false;
}
if (isWarningSet && (rc == NO_ERROR))
{
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
// cout << "Got warning" << endl;
Message::Args args;
string cols = "'" + colNames[0] + "'";
for (unsigned i = 1; i < colNames.size(); i++)
{
cols = cols + ", " + "'" + colNames[i] + "'";
}
args.add(cols);
err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
}
// MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
return rc;
}
uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
// need to commit the versioned blocks, set hwm, update casual partition, send back to DMLProc to set them
// cout << " in commiting autocommit on batch insert " << endl;
uint32_t tmp32, tableOid, sessionId;
int txnID;
bs >> tmp32;
txnID = tmp32;
bs >> tmp32;
tableOid = tmp32;
bs >> tmp32;
sessionId = tmp32;
BRM::DBRM dbrm;
std::vector<BRM::BulkSetHWMArg> setHWMArgs;
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
ColExtsInfo::iterator aIt;
BulkSetHWMArg aArg;
std::vector<BRM::FileInfo> files;
BRM::FileInfo aFile;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aArg.oid = it->first;
aFile.oid = it->first;
// cout << "OID:" << aArg.oid;
while (aIt != (it->second).end())
{
aArg.partNum = aIt->partNum;
aArg.segNum = aIt->segNum;
aArg.hwm = aIt->hwm;
if (!aIt->isDict)
setHWMArgs.push_back(aArg);
aFile.partitionNum = aIt->partNum;
aFile.dbRoot = aIt->dbRoot;
aFile.segmentNum = aIt->segNum;
aFile.compType = aIt->compType;
// cout <<"Added to files oid:dbroot:part:seg:compType = " <<
// aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
//<<":"<<aFile.compType <<endl;
files.push_back(aFile);
aIt++;
}
it++;
}
bs.restart();
// cout << " serialized setHWMArgs size = " << setHWMArgs.size() << endl;
serializeInlineVector(bs, setHWMArgs);
// flush files
// cout << "flush files when autocommit on" << endl;
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
std::map<uint32_t, uint32_t> oids;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId);
CalpontSystemCatalog::TableName aTableName = systemCatalogPtr->tableName(tableOid);
CalpontSystemCatalog::RIDList ridList;
try
{
ridList = systemCatalogPtr->columnRIDs(aTableName, true);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
for (unsigned i = 0; i < ridList.size(); i++)
{
oids[ridList[i].objnum] = ridList[i].objnum;
}
CalpontSystemCatalog::DictOIDList dictOids;
try
{
dictOids = systemCatalogPtr->dictOIDs(aTableName);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
for (unsigned i = 0; i < dictOids.size(); i++)
{
oids[dictOids[i].dictOID] = dictOids[i].dictOID;
}
fWEWrapper.setTransId(txnID);
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fIsFirstBatchPm = true;
if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0))
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
TableMetaData::removeTableMetaData(tableOid);
// MCOL-1160 For API bulk insert flush the PrimProc cached dictionary
// blocks tounched
std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;
mapIter = fWEWrapper.getDictMap().find(txnID);
if (mapIter != fWEWrapper.getDictMap().end())
{
std::set<BRM::LBID_t>::iterator lbidIter;
std::vector<BRM::LBID_t> dictFlushBlks;
cerr << "API Flushing blocks: ";
for (lbidIter = (*mapIter).second.begin(); lbidIter != (*mapIter).second.end(); lbidIter++)
{
cerr << *lbidIter << ", ";
dictFlushBlks.push_back((*lbidIter));
}
cerr << endl;
cacheutils::flushPrimProcAllverBlocks(dictFlushBlks);
fWEWrapper.getDictMap().erase(txnID);
}
// MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId);
CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000);
return rc;
}
uint8_t WE_DMLCommandProc::processBatchInsertHwm(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t tmp8, rc = 0;
err.clear();
// set hwm for autocommit off
uint32_t tmp32, tableOid;
int txnID;
bool isAutoCommitOn;
bs >> tmp32;
txnID = tmp32;
bs >> tmp8;
isAutoCommitOn = (tmp8 != 0);
bs >> tmp32;
tableOid = tmp32;
bs >> tmp8;
// cout << "processBatchInsertHwm: tranid:isAutoCommitOn = " <<txnID <<":"<< (int)isAutoCommitOn << endl;
std::vector<BRM::FileInfo> files;
std::vector<BRM::OID_t> oidsToFlush;
BRM::FileInfo aFile;
// BRM::FileInfo curFile;
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
ColExtsInfo::iterator aIt;
CalpontSystemCatalog::RIDList ridList;
CalpontSystemCatalog::ROPair roPair;
std::vector<DBRootExtentInfo> colDBRootExtentInfo;
DBRootExtentInfo aExtentInfo;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aFile.oid = it->first;
oidsToFlush.push_back(aFile.oid);
roPair.objnum = aFile.oid;
aExtentInfo.fPartition = 0;
aExtentInfo.fDbRoot = 0;
aExtentInfo.fSegment = 0;
aExtentInfo.fLocalHwm = 0;
bool isDict = false;
while (aIt != (it->second).end())
{
aFile.partitionNum = aIt->partNum;
aFile.dbRoot = aIt->dbRoot;
aFile.segmentNum = aIt->segNum;
aFile.compType = aIt->compType;
files.push_back(aFile);
if (!aIt->isDict)
{
if ((aIt->partNum > aExtentInfo.fPartition) ||
((aIt->partNum == aExtentInfo.fPartition) && (aIt->segNum > aExtentInfo.fSegment)) ||
((aIt->partNum == aExtentInfo.fPartition) && (aIt->segNum == aExtentInfo.fSegment) &&
(aIt->segNum > aExtentInfo.fLocalHwm)))
{
aExtentInfo.fPartition = aIt->partNum;
aExtentInfo.fDbRoot = aIt->dbRoot;
aExtentInfo.fSegment = aIt->segNum;
aExtentInfo.fLocalHwm = aIt->hwm;
}
}
else
{
isDict = true;
}
aIt++;
}
if (!isDict)
{
ridList.push_back(roPair);
colDBRootExtentInfo.push_back(aExtentInfo);
}
it++;
}
//@Bug 5996. Validate hwm before set them
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(0);
systemCatalogPtr->identity(CalpontSystemCatalog::EC);
try
{
CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid);
ridList = systemCatalogPtr->columnRIDs(tableName);
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
TableMetaData::removeTableMetaData(tableOid);
fIsFirstBatchPm = true;
// cout << "flush files when autocommit off" << endl;
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
return rc;
}
rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Ending");
if (rc != 0)
{
WErrorCodes ec;
err = ec.errorString(rc);
err += " Check err.log for detailed information.";
TableMetaData::removeTableMetaData(tableOid);
fIsFirstBatchPm = true;
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
rc = 1;
return rc;
}
try
{
if (isAutoCommitOn)
{
bs.restart();
if (fWEWrapper.getIsInsert())
{
// @bug5333, up to here, rc == 0, but flushchunk may fail.
rc = processBatchInsertHwmFlushChunks(tableOid, txnID, files, oidsToFlush, err);
}
if (tmp8 != 0)
TableMetaData::removeTableMetaData(tableOid);
return rc; // will set hwm with version commit.
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// Handle case where isAutoCommitOn is false
BRM::DBRM dbrm;
// cout << " In processBatchInsertHwm setting hwm" << endl;
std::vector<BRM::BulkSetHWMArg> setHWMArgs;
it = colsExtsInfoMap.begin();
BulkSetHWMArg aArg;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aArg.oid = it->first;
// cout << "for oid " << aArg.oid << endl;
while (aIt != (it->second).end())
{
aArg.partNum = aIt->partNum;
aArg.segNum = aIt->segNum;
aArg.hwm = aIt->hwm;
//@Bug 6029 dictionary store files already set hwm.
if (!aIt->isDict)
setHWMArgs.push_back(aArg);
aIt++;
}
it++;
}
TableMetaData::removeTableMetaData(tableOid);
fIsFirstBatchPm = true;
// cout << "flush files when autocommit off" << endl;
fWEWrapper.setIsInsert(true);
fWEWrapper.setBulkFlag(true);
rc = processBatchInsertHwmFlushChunks(tableOid, txnID, files, oidsToFlush, err);
bs.restart();
try
{
serializeInlineVector(bs, setHWMArgs);
}
catch (exception& ex)
{
// Append to errmsg in case we already have an error
if (err.length() > 0)
err += "; ";
err += ex.what();
rc = 1;
return rc;
}
// cout << "flush is called for transaction " << txnID << endl;
return rc;
}
//------------------------------------------------------------------------------
// Flush chunks for the specified table and transaction.
// Also confirms changes to DB files (for hdfs).
// files vector represents list of files to be purged from PrimProc cache.
// oid2ToFlush represents list of oids to be flushed from PrimProc cache.
// Afterwords, the following attributes are reset as follows:
// fWEWrapper.setIsInsert(false);
// fWEWrapper.setBulkFlag(false);
// fIsFirstBatchPm = true;
// returns 0 for success; returns 1 if error occurs
//------------------------------------------------------------------------------
uint8_t WE_DMLCommandProc::processBatchInsertHwmFlushChunks(uint32_t tblOid, int txnID,
const std::vector<BRM::FileInfo>& files,
const std::vector<BRM::OID_t>& oidsToFlush,
std::string& err)
{
uint8_t rc = 0;
std::map<uint32_t, uint32_t> oids;
CalpontSystemCatalog::TableName aTableName;
CalpontSystemCatalog::RIDList ridList;
CalpontSystemCatalog::DictOIDList dictOids;
try
{
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(txnID);
aTableName = systemCatalogPtr->tableName(tblOid);
ridList = systemCatalogPtr->columnRIDs(aTableName, true);
dictOids = systemCatalogPtr->dictOIDs(aTableName);
}
catch (exception& ex)
{
std::ostringstream ossErr;
ossErr << "System Catalog error for table OID " << tblOid;
// Include tbl name in msg unless exception occurred before we got it
if (aTableName.table.length() > 0)
ossErr << '(' << aTableName << ')';
ossErr << "; " << ex.what();
err = ossErr.str();
rc = 1;
return rc;
}
for (unsigned i = 0; i < ridList.size(); i++)
{
oids[ridList[i].objnum] = ridList[i].objnum;
}
for (unsigned i = 0; i < dictOids.size(); i++)
{
oids[dictOids[i].dictOID] = dictOids[i].dictOID;
}
fWEWrapper.setTransId(txnID);
// @bug5333, up to here, rc == 0, but flushchunk may fail.
rc = fWEWrapper.flushChunks(0, oids);
if (rc == NO_ERROR)
{
// Confirm changes to db files "only" if no error up to this point
if (idbdatafile::IDBPolicy::useHdfs())
{
std::string eMsg;
ConfirmHdfsDbFile confirmHdfs;
int confirmDbRc = confirmHdfs.confirmDbFileListFromMetaFile(tblOid, eMsg);
if (confirmDbRc == NO_ERROR)
{
int endDbRc = confirmHdfs.endDbFileListFromMetaFile(tblOid, true, eMsg);
if (endDbRc != NO_ERROR)
{
// Might want to log this error, but don't think we
// need to report as fatal, as all changes were confirmed.
}
if (files.size() > 0)
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
cacheutils::flushOIDsFromCache(oidsToFlush);
}
else
{
ostringstream ossErr;
ossErr << "Error confirming changes to table " << aTableName << "; " << eMsg;
err = ossErr.str();
rc = 1; // reset to 1
}
}
}
else // flushChunks error
{
WErrorCodes ec;
std::ostringstream ossErr;
ossErr << "Error flushing chunks for table " << aTableName << "; " << ec.errorString(rc);
err = ossErr.str();
rc = 1; // reset to 1
}
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fIsFirstBatchPm = true;
return rc;
}
uint8_t WE_DMLCommandProc::commitBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
// commit all versioned blocks, set hwm, update casual partition
return rc;
}
uint8_t WE_DMLCommandProc::rollbackBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
uint32_t tmp32, tableOid, sessionID;
uint64_t lockID;
bs >> sessionID;
bs >> lockID;
bs >> tmp32;
tableOid = tmp32;
// Bulkrollback
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
CalpontSystemCatalog::TableName aTableName;
try
{
aTableName = systemCatalogPtr->tableName(tableOid);
}
catch (...)
{
err = std::string("No such table for oid ") + std::to_string(tableOid);
rc = 1;
return rc;
}
string table = aTableName.schema + "." + aTableName.table;
string applName("BatchInsert");
rc = fWEWrapper.bulkRollback(tableOid, lockID, table, applName, false, err);
fIsFirstBatchPm = true;
TableMetaData::removeTableMetaData(tableOid);
return rc;
}
uint8_t WE_DMLCommandProc::rollbackBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
// Rollbacked all versioned blocks
return rc;
}
uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, std::string& err,
ByteStream::quadbyte& PMId, uint64_t& blocksChanged)
{
uint8_t rc = 0;
uint32_t tmp32, sessionID;
TxnID txnId;
bs >> PMId;
bs >> tmp32;
txnId = tmp32;
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.setTransId(txnId);
if (!rowGroups[txnId]) // meta data
{
rowGroups[txnId] = new rowgroup::RowGroup();
rowGroups[txnId]->deserialize(bs);
uint8_t pkgType;
bs >> pkgType;
cpackages[txnId].read(bs);
rc = fWEWrapper.startTransaction(txnId);
if (rc != NO_ERROR)
{
WErrorCodes ec;
err = ec.errorString(rc);
}
return rc;
}
bool pushWarning = false;
rowgroup::RGData rgData;
rgData.deserialize(bs);
rowGroups[txnId]->setData(&rgData);
// rowGroups[txnId]->setData(const_cast<uint8_t*>(bs.buf()));
// get rows and values
rowgroup::Row row;
rowGroups[txnId]->initRow(&row);
string value("");
uint32_t rowsThisRowgroup = rowGroups[txnId]->getRowCount();
uint32_t columnsSelected = rowGroups[txnId]->getColumnCount();
std::vector<execplan::CalpontSystemCatalog::ColDataType> fetchColTypes = rowGroups[txnId]->getColTypes();
std::vector<uint32_t> fetchColScales = rowGroups[txnId]->getScale();
std::vector<uint32_t> fetchColColwidths;
for (uint32_t i = 0; i < columnsSelected; i++)
{
fetchColColwidths.push_back(rowGroups[txnId]->getColumnWidth(i));
}
WriteEngine::ColTupleList aColList;
WriteEngine::ColTuple colTuple;
WriteEngine::ColStructList colStructList;
WriteEngine::ColStruct colStruct;
WriteEngine::ColValueList colValueList;
WriteEngine::RIDList rowIDLists;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::DctnryStructList dctnryStructList;
WriteEngine::DctnryStruct dctnryStruct;
WriteEngine::DctnryValueList dctnryValueList;
CalpontSystemCatalog::TableName tableName;
CalpontSystemCatalog::TableColName tableColName;
DMLTable* tablePtr = cpackages[txnId].get_Table();
RowList rows = tablePtr->get_RowList();
dmlpackage::ColumnList columnsUpdated = rows[0]->get_ColumnList();
tableColName.table = tableName.table = tablePtr->get_TableName();
tableColName.schema = tableName.schema = tablePtr->get_SchemaName();
tableColName.column = columnsUpdated[0]->get_Name();
sessionID = cpackages[txnId].get_SessionID();
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
CalpontSystemCatalog::OID oid = 0;
CalpontSystemCatalog::ROPair tableRO;
long timeZone = cpackages[txnId].get_TimeZone();
try
{
tableRO = systemCatalogPtr->tableRID(tableName);
oid = systemCatalogPtr->lookupOID(tableColName);
}
catch (std::exception& ex)
{
rc = 1;
ostringstream oss;
oss << "lookupOID got exception " << ex.what() << " with column " << tableColName.schema
<< "." + tableColName.table << "." << tableColName.column;
err = oss.str();
}
catch (...)
{
rc = 1;
ostringstream oss;
oss << "lookupOID got unknown exception with column " << tableColName.schema << "." << tableColName.table
<< "." << tableColName.column;
err = oss.str();
}
if (rc != 0)
return rc;
rowGroups[txnId]->getRow(0, &row);
CalpontSystemCatalog::RID rid = row.getRid();
uint16_t dbRoot, segment, blockNum;
uint32_t partition;
uint8_t extentNum;
// Get the file information from rowgroup
dbRoot = rowGroups[txnId]->getDBRoot();
rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum);
colStruct.fColPartition = partition;
colStruct.fColSegment = segment;
colStruct.fColDbRoot = dbRoot;
dctnryStruct.fColPartition = partition;
dctnryStruct.fColSegment = segment;
dctnryStruct.fColDbRoot = dbRoot;
TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableRO.objnum);
// Build to be updated column structure and values
int error = 0;
unsigned fetchColPos = 0;
bool ridsFetched = false;
bool isNull = false;
boost::any datavalue;
int64_t intColVal = 0;
// timer.start("fetch values");
std::vector<string> colNames;
// for query stats
boost::scoped_array<CalpontSystemCatalog::ColType> colTypes(
new CalpontSystemCatalog::ColType[columnsUpdated.size()]);
boost::scoped_array<int> preBlkNums(new int[columnsUpdated.size()]);
boost::scoped_array<OID> oids(new OID[columnsUpdated.size()]);
BRMWrapper::setUseVb(true);
for (unsigned int j = 0; j < columnsUpdated.size(); j++)
{
// timer.start("lookupsyscat");
tableColName.column = columnsUpdated[j]->get_Name();
try
{
oids[j] = systemCatalogPtr->lookupOID(tableColName);
colTypes[j] = systemCatalogPtr->colType(oids[j]);
}
catch (std::exception& ex)
{
rc = 1;
ostringstream oss;
oss << "colType got exception " << ex.what() << " with column oid " << oid;
err = oss.str();
}
catch (...)
{
rc = 1;
ostringstream oss;
oss << "colType got unknown exception with column oid " << oid;
err = oss.str();
}
if (rc != 0)
return rc;
preBlkNums[j] = -1;
}
for (unsigned int j = 0; j < columnsUpdated.size(); j++)
{
WriteEngine::ColTupleList colTupleList;
CalpontSystemCatalog::ColType colType = colTypes[j];
oid = oids[j];
colStruct.dataOid = oid;
colStruct.colDataType = colType.colDataType;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
tableColName.column = columnsUpdated[j]->get_Name();
if (!ridsFetched)
{
// querystats
uint64_t relativeRID = 0;
for (unsigned i = 0; i < rowsThisRowgroup; i++)
{
rowGroups[txnId]->getRow(i, &row);
rid = row.getRid();
relativeRID = rid - rowGroups[txnId]->getBaseRid();
rid = relativeRID;
convertToRelativeRid(rid, extentNum, blockNum);
rowIDLists.push_back(rid);
uint32_t colWidth = colTypes[j].colWidth;
if (colWidth > 8 && !(colTypes[j].colDataType == CalpontSystemCatalog::DECIMAL ||
colTypes[j].colDataType == CalpontSystemCatalog::UDECIMAL))
{
colWidth = 8;
}
else if (colWidth >= datatypes::MAXDECIMALWIDTH)
{
colWidth = datatypes::MAXDECIMALWIDTH;
}
int rrid = (int)relativeRID / (BYTE_PER_BLOCK / colWidth);
// populate stats.blocksChanged
if (rrid > preBlkNums[j])
{
preBlkNums[j] = rrid;
blocksChanged++;
}
}
ridsFetched = true;
}
bool pushWarn = false;
bool nameNeeded = false;
if (isDictCol(colType))
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
dctnryStruct.dctnryOid = colType.ddn.dictOID;
dctnryStruct.columnOid = colStruct.dataOid;
dctnryStruct.fCompressionType = colType.compressionType;
dctnryStruct.fCharsetNumber = colType.charsetNumber;
dctnryStruct.colWidth = colType.colWidth;
if (NO_ERROR != (error = fWEWrapper.openDctnry(txnId, dctnryStruct, false))) // @bug 5572 HDFS tmp file
{
WErrorCodes ec;
err = ec.errorString(error);
rc = error;
return rc;
}
ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStruct.dctnryOid);
ColExtsInfo::iterator it = aColExtsInfo.begin();
while (it != aColExtsInfo.end())
{
if ((it->dbRoot == dctnryStruct.fColDbRoot) && (it->partNum == dctnryStruct.fColPartition) &&
(it->segNum == dctnryStruct.fColSegment))
break;
it++;
}
if (it == aColExtsInfo.end()) // add this one to the list
{
ColExtInfo aExt;
aExt.dbRoot = dctnryStruct.fColDbRoot;
aExt.partNum = dctnryStruct.fColPartition;
aExt.segNum = dctnryStruct.fColSegment;
aExt.compType = dctnryStruct.fCompressionType;
aExt.isDict = true;
aColExtsInfo.push_back(aExt);
}
aTableMetaData->setColExtsInfo(dctnryStruct.dctnryOid, aColExtsInfo);
if (columnsUpdated[j]->get_isFromCol())
{
for (unsigned i = 0; i < rowsThisRowgroup; i++)
{
rowGroups[txnId]->getRow(i, &row);
if (row.isNullValue(fetchColPos))
{
if ((colType.defaultValue.length() <= 0) &&
(colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
rc = 1;
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
return rc;
}
else if (colType.defaultValue.length() > 0)
{
value = colType.defaultValue;
if (value.length() > (unsigned int)colType.colWidth)
{
value = value.substr(0, colType.colWidth);
pushWarn = true;
if (!pushWarning)
{
pushWarning = true;
}
if (pushWarn)
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
if (error != NO_ERROR)
{
fWEWrapper.closeDctnry(txnId, colType.compressionType);
return false;
}
colTuple.data = dctTuple.token;
colTupleList.push_back(colTuple);
}
else
{
WriteEngine::Token nullToken;
colTuple.data = nullToken;
colTupleList.push_back(colTuple);
}
continue;
}
switch (fetchColTypes[fetchColPos])
{
case CalpontSystemCatalog::DATE:
{
intColVal = row.getUintField<4>(fetchColPos);
value = DataConvert::dateToString(intColVal);
break;
}
case CalpontSystemCatalog::DATETIME:
{
intColVal = row.getUintField<8>(fetchColPos);
value = DataConvert::datetimeToString(intColVal, colType.precision);
break;
}
case CalpontSystemCatalog::TIMESTAMP:
{
intColVal = row.getUintField<8>(fetchColPos);
value = DataConvert::timestampToString(intColVal, timeZone, colType.precision);
break;
}
case CalpontSystemCatalog::TIME:
{
intColVal = row.getIntField<8>(fetchColPos);
value = DataConvert::timeToString(intColVal, colType.precision);
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
{
value = row.getStringField(fetchColPos);
unsigned i = strlen(value.c_str());
value = value.substr(0, i);
break;
}
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::TEXT:
{
value = row.getVarBinaryStringField(fetchColPos);
break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
if (fetchColColwidths[fetchColPos] == datatypes::MAXDECIMALWIDTH)
{
datatypes::Decimal dec(0, fetchColScales[fetchColPos],
rowGroups[txnId]->getPrecision()[fetchColPos],
row.getBinaryField<int128_t>(fetchColPos));
value = dec.toString(true);
break;
}
}
/* fall through */
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::UBIGINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::SMALLINT:
case CalpontSystemCatalog::USMALLINT:
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::UTINYINT:
{
{
intColVal = row.getIntField(fetchColPos);
if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDECIMAL && intColVal < 0)
{
intColVal = 0;
}
if (fetchColScales[fetchColPos] <= 0)
{
ostringstream os;
if (isUnsigned(fetchColTypes[fetchColPos]))
os << static_cast<uint64_t>(intColVal);
else
os << intColVal;
value = os.str();
}
else
{
datatypes::Decimal dec(intColVal, fetchColScales[fetchColPos],
rowGroups[txnId]->getPrecision()[fetchColPos]);
value = dec.toString();
}
}
break;
}
// In this case, we're trying to load a double output column with float data. This is the
// case when you do sum(floatcol), e.g.
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
float dl = row.getFloatField(fetchColPos);
if (dl == std::numeric_limits<float>::infinity())
continue;
if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UFLOAT && dl < 0.0)
{
dl = 0.0;
}
ostringstream os;
//@Bug 3350 fix the precision.
os << setprecision(7) << dl;
value = os.str();
break;
}
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
{
double dl = row.getDoubleField(fetchColPos);
if (dl == std::numeric_limits<double>::infinity())
continue;
if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDOUBLE && dl < 0.0)
{
dl = 0.0;
}
ostringstream os;
//@Bug 3350 fix the precision.
os << setprecision(16) << dl;
value = os.str();
break;
}
case CalpontSystemCatalog::LONGDOUBLE:
{
long double dll = row.getLongDoubleField(fetchColPos);
if (dll == std::numeric_limits<long double>::infinity())
continue;
ostringstream os;
//@Bug 3350 fix the precision.
os << setprecision(19) << dll;
value = os.str();
break;
}
default: // treat as int64
{
ostringstream os;
intColVal = row.getUintField<8>(fetchColPos);
os << intColVal;
value = os.str();
break;
}
}
uint32_t funcScale = columnsUpdated[j]->get_funcScale();
if (funcScale != 0)
{
string::size_type pos = value.find_first_of("."); // decimal point
if (pos >= value.length())
value.insert(value.length(), ".");
// padding 0 if needed
pos = value.find_first_of(".");
uint32_t digitsAfterPoint = value.length() - pos - 1;
if (digitsAfterPoint < funcScale)
{
for (uint32_t i = 0; i < (funcScale - digitsAfterPoint); i++)
value += "0";
}
}
// check data length
// trim the string if needed
if (value.length() > (unsigned int)colType.colWidth)
{
value = value.substr(0, colType.colWidth);
if (!pushWarn)
pushWarn = true;
if (!pushWarning)
pushWarning = true;
if (pushWarn)
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
if (error != NO_ERROR)
{
fWEWrapper.closeDctnry(txnId, colType.compressionType);
rc = error;
WErrorCodes ec;
err = ec.errorString(error);
return rc;
}
colTuple.data = dctTuple.token;
colTupleList.push_back(colTuple);
}
if (colType.compressionType == 0)
fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
else
fWEWrapper.closeDctnry(txnId, colType.compressionType, false);
fetchColPos++;
}
else // constant
{
if (columnsUpdated[j]->get_isnull())
{
if ((colType.defaultValue.length() <= 0) &&
(colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
rc = 1;
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
return rc;
}
else if (colType.defaultValue.length() > 0)
{
value = colType.defaultValue;
if (value.length() > (unsigned int)colType.colWidth)
{
value = value.substr(0, colType.colWidth);
pushWarn = true;
if (!pushWarning)
{
pushWarning = true;
}
if (pushWarn)
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
if (error != NO_ERROR)
{
fWEWrapper.closeDctnry(txnId, colType.compressionType);
rc = error;
WErrorCodes ec;
err = ec.errorString(error);
return rc;
}
colTuple.data = dctTuple.token;
if (colType.compressionType == 0)
fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
else
fWEWrapper.closeDctnry(txnId, colType.compressionType,
false); // Constant only need to tokenize once.
}
else
{
WriteEngine::Token nullToken;
colTuple.data = nullToken;
}
}
else
{
value = columnsUpdated[j]->get_Data();
if (value.length() > (unsigned int)colType.colWidth)
{
value = value.substr(0, colType.colWidth);
pushWarn = true;
if (!pushWarning)
{
pushWarning = true;
}
if (pushWarn)
nameNeeded = true;
}
WriteEngine::DctnryTuple dctTuple;
dctTuple.sigValue = (unsigned char*)value.c_str();
dctTuple.sigSize = value.length();
dctTuple.isNull = false;
error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType);
if (error != NO_ERROR)
{
fWEWrapper.closeDctnry(txnId, colType.compressionType);
rc = error;
WErrorCodes ec;
err = ec.errorString(error);
return rc;
}
colTuple.data = dctTuple.token;
if (colType.compressionType == 0)
fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
else
fWEWrapper.closeDctnry(txnId, colType.compressionType,
false); // Constant only need to tokenize once.
}
for (unsigned row = 0; row < rowsThisRowgroup; row++)
colTupleList.push_back(colTuple);
}
}
else // Non dictionary column
{
colStruct.colWidth = colType.colWidth;
if (columnsUpdated[j]->get_isFromCol())
{
for (unsigned i = 0; i < rowsThisRowgroup; i++)
{
rowGroups[txnId]->getRow(i, &row);
if (row.isNullValue(fetchColPos))
{
isNull = true;
value = "";
}
else
{
isNull = false;
switch (fetchColTypes[fetchColPos])
{
case CalpontSystemCatalog::DATE:
{
intColVal = row.getUintField<4>(fetchColPos);
value = DataConvert::dateToString(intColVal);
break;
}
case CalpontSystemCatalog::DATETIME:
{
intColVal = row.getUintField<8>(fetchColPos);
value = DataConvert::datetimeToString(intColVal, colType.precision);
break;
}
case CalpontSystemCatalog::TIMESTAMP:
{
intColVal = row.getUintField<8>(fetchColPos);
value = DataConvert::timestampToString(intColVal, timeZone, colType.precision);
break;
}
case CalpontSystemCatalog::TIME:
{
intColVal = row.getIntField<8>(fetchColPos);
value = DataConvert::timeToString(intColVal, colType.precision);
break;
}
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::VARCHAR:
{
value = row.getStringField(fetchColPos);
unsigned i = strlen(value.c_str());
value = value.substr(0, i);
break;
}
case CalpontSystemCatalog::VARBINARY:
case CalpontSystemCatalog::BLOB:
case CalpontSystemCatalog::TEXT:
{
value = row.getVarBinaryStringField(fetchColPos);
break;
}
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
{
if (fetchColColwidths[fetchColPos] == datatypes::MAXDECIMALWIDTH)
{
datatypes::Decimal dec(0, fetchColScales[fetchColPos],
rowGroups[txnId]->getPrecision()[fetchColPos],
row.getBinaryField<int128_t>(fetchColPos));
value = dec.toString(true);
break;
}
}
/* fall through */
case CalpontSystemCatalog::BIGINT:
case CalpontSystemCatalog::UBIGINT:
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::MEDINT:
case CalpontSystemCatalog::UMEDINT:
case CalpontSystemCatalog::SMALLINT:
case CalpontSystemCatalog::USMALLINT:
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::UTINYINT:
{
{
intColVal = row.getIntField(fetchColPos);
if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDECIMAL && intColVal < 0)
{
intColVal = 0;
}
if (fetchColScales[fetchColPos] <= 0)
{
ostringstream os;
if (isUnsigned(fetchColTypes[fetchColPos]))
os << static_cast<uint64_t>(intColVal);
else
os << intColVal;
value = os.str();
}
else
{
datatypes::Decimal dec(intColVal, fetchColScales[fetchColPos],
rowGroups[txnId]->getPrecision()[fetchColPos]);
value = dec.toString();
}
}
break;
}
// In this case, we're trying to load a double output column with float data. This is the
// case when you do sum(floatcol), e.g.
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
float dl = row.getFloatField(fetchColPos);
if (dl == std::numeric_limits<float>::infinity())
continue;
if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UFLOAT && dl < 0.0)
{
dl = 0.0;
}
ostringstream os;
//@Bug 3350 fix the precision.
os << setprecision(7) << dl;
value = os.str();
break;
}
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
{
double dl = row.getDoubleField(fetchColPos);
if (dl == std::numeric_limits<double>::infinity())
continue;
if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDOUBLE && dl < 0.0)
{
dl = 0.0;
}
ostringstream os;
//@Bug 3350 fix the precision.
os << setprecision(16) << dl;
value = os.str();
break;
}
case CalpontSystemCatalog::LONGDOUBLE:
{
long double dll = row.getLongDoubleField(fetchColPos);
if (dll == std::numeric_limits<long double>::infinity())
continue;
ostringstream os;
//@Bug 3350 fix the precision.
os << setprecision(19) << dll;
value = os.str();
break;
}
default: // treat as int64
{
ostringstream os;
intColVal = row.getUintField<8>(fetchColPos);
os << intColVal;
value = os.str();
break;
}
}
}
uint32_t funcScale = columnsUpdated[j]->get_funcScale();
if (funcScale != 0)
{
string::size_type pos = value.find_first_of("."); // decimal point
if (pos >= value.length())
value.insert(value.length(), ".");
// padding 0 if needed
pos = value.find_first_of(".");
uint32_t digitsAfterPoint = value.length() - pos - 1;
if (digitsAfterPoint < funcScale)
{
for (uint32_t i = 0; i < (funcScale - digitsAfterPoint); i++)
value += "0";
}
}
// Check NOT NULL constraint and default value
if ((isNull) && (colType.defaultValue.length() <= 0) &&
(colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
rc = 1;
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
return rc;
}
else if ((isNull) && (colType.defaultValue.length() > 0))
{
isNull = false;
bool oneWarn = false;
try
{
datavalue =
colType.convertColumnData(colType.defaultValue, pushWarn, timeZone, isNull, false, false);
}
catch (exception&)
{
//@Bug 2624. Error out on conversion failure
rc = 1;
Message::Args args;
args.add(string("'") + colType.defaultValue + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
}
if ((pushWarn) && (!oneWarn))
oneWarn = true;
colTuple.data = datavalue;
colTupleList.push_back(colTuple);
if (oneWarn)
pushWarn = true;
if (!pushWarning)
{
pushWarning = pushWarn;
}
if (pushWarn)
nameNeeded = true;
}
else
{
try
{
datavalue = colType.convertColumnData(value, pushWarn, timeZone, isNull, false, false);
}
catch (exception&)
{
//@Bug 2624. Error out on conversion failure
rc = 1;
Message::Args args;
args.add(string("'") + value + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
return rc;
}
colTuple.data = datavalue;
colTupleList.push_back(colTuple);
if (!pushWarning)
{
pushWarning = pushWarn;
}
if (pushWarn)
nameNeeded = true;
}
}
fetchColPos++;
}
else // constant column
{
if (columnsUpdated[j]->get_isnull())
{
isNull = true;
}
else
{
isNull = false;
}
string inData(columnsUpdated[j]->get_Data());
if (((colType.colDataType == execplan::CalpontSystemCatalog::DATE) && (inData == "0000-00-00")) ||
((colType.colDataType == execplan::CalpontSystemCatalog::DATETIME) &&
(inData == "0000-00-00 00:00:00")) ||
((colType.colDataType == execplan::CalpontSystemCatalog::TIMESTAMP) &&
(inData == "0000-00-00 00:00:00")))
{
isNull = true;
}
uint64_t nextVal = 0;
if (colType.autoincrement)
{
try
{
nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType);
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
}
if (colType.autoincrement && (isNull || (inData.compare("0") == 0)))
{
// reserve nextVal
try
{
bool reserved = fDbrm.getAIRange(oid, rowsThisRowgroup, &nextVal);
if (!reserved)
{
err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT);
rc = 1;
return rc;
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
isNull = false;
bool oneWarn = false;
for (unsigned row = 0; row < rowsThisRowgroup; row++)
{
ostringstream oss;
oss << nextVal++;
inData = oss.str();
try
{
datavalue = colType.convertColumnData(inData, pushWarn, timeZone, isNull, false, false);
}
catch (exception&)
{
//@Bug 2624. Error out on conversion failure
rc = 1;
Message::Args args;
args.add(string("'") + inData + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
}
if ((pushWarn) && (!oneWarn))
oneWarn = true;
colTuple.data = datavalue;
colTupleList.push_back(colTuple);
}
if (oneWarn)
pushWarn = true;
if (!pushWarning)
{
pushWarning = pushWarn;
}
if (pushWarn)
nameNeeded = true;
}
else if (isNull && (colType.defaultValue.length() <= 0) &&
(colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT))
{
rc = 1;
Message::Args args;
args.add(tableColName.column);
err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args);
return rc;
}
else if (isNull && (colType.defaultValue.length() > 0))
{
isNull = false;
bool oneWarn = false;
for (unsigned row = 0; row < rowsThisRowgroup; row++)
{
try
{
datavalue =
colType.convertColumnData(colType.defaultValue, pushWarn, timeZone, isNull, false, false);
}
catch (exception&)
{
//@Bug 2624. Error out on conversion failure
rc = 1;
Message::Args args;
args.add(string("'") + colType.defaultValue + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
}
if ((pushWarn) && (!oneWarn))
oneWarn = true;
colTuple.data = datavalue;
colTupleList.push_back(colTuple);
}
if (oneWarn)
pushWarn = true;
if (!pushWarning)
{
pushWarning = pushWarn;
}
if (pushWarn)
nameNeeded = true;
}
else
{
try
{
datavalue = colType.convertColumnData(inData, pushWarn, timeZone, isNull, false, true);
}
catch (exception& ex)
{
//@Bug 2624. Error out on conversion failure
rc = 1;
cout << ex.what() << endl;
Message::Args args;
args.add(string("'") + inData + string("'"));
err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args);
return rc;
}
colTuple.data = datavalue;
if (!pushWarning)
{
pushWarning = pushWarn;
}
if (pushWarn)
nameNeeded = true;
for (unsigned row = 0; row < rowsThisRowgroup; row++)
colTupleList.push_back(colTuple);
}
}
}
if (nameNeeded)
{
colNames.push_back(tableColName.column);
}
colStructList.push_back(colStruct);
colValueList.push_back(colTupleList);
cscColTypeList.push_back(colType);
} // end of bulding values and column structure.
// timer.stop("fetch values");
if (rowIDLists.size() > 0)
{
error = fWEWrapper.updateColumnRecs(txnId, cscColTypeList, colStructList, colValueList, rowIDLists,
tableRO.objnum);
}
if (error != NO_ERROR)
{
rc = error;
WErrorCodes ec;
err = ec.errorString(error);
if (error == ERR_BRM_DEAD_LOCK)
{
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
}
else if (error == ERR_BRM_VB_OVERFLOW)
{
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
}
}
if (pushWarning)
{
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
Message::Args args;
string cols = "'" + colNames[0] + "'";
for (unsigned i = 1; i < colNames.size(); i++)
{
cols = cols + ", " + "'" + colNames[i] + "'";
}
args.add(cols);
err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args);
}
return rc;
}
uint8_t WE_DMLCommandProc::getWrittenLbids(messageqcpp::ByteStream& bs, std::string& err,
ByteStream::quadbyte& PMId)
{
uint8_t rc = 0;
uint32_t txnId;
vector<LBID_t> lbidList;
bs >> txnId;
std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();
try
{
auto mapIter = m_txnLBIDMap.find(txnId);
if (mapIter != m_txnLBIDMap.end())
{
SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
lbidList = spTxnLBIDRec->m_LBIDs;
}
}
catch (...)
{
}
bs.restart();
try
{
serializeInlineVector(bs, lbidList);
}
catch (exception& ex)
{
// Append to errmsg in case we already have an error
if (err.length() > 0)
err += "; ";
err += ex.what();
rc = 1;
return rc;
}
return rc;
}
uint8_t WE_DMLCommandProc::processFlushFiles(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
uint32_t flushCode, txnId, tableOid;
int error;
bs >> flushCode;
bs >> txnId;
bs >> tableOid;
std::map<uint32_t, uint32_t> oids;
CalpontSystemCatalog::TableName aTableName;
CalpontSystemCatalog::RIDList ridList;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(txnId);
// execplan::CalpontSystemCatalog::ColType colType;
CalpontSystemCatalog::DictOIDList dictOids;
if (tableOid >= 3000)
{
try
{
aTableName = systemCatalogPtr->tableName(tableOid);
}
catch (...)
{
err = std::string("Systemcatalog error for tableoid ") + std::to_string(tableOid);
rc = 1;
return rc;
}
dictOids = systemCatalogPtr->dictOIDs(aTableName);
for (unsigned i = 0; i < dictOids.size(); i++)
{
oids[dictOids[i].dictOID] = dictOids[i].dictOID;
}
// if (dictOids.size() > 0)
// colType = systemCatalogPtr->colTypeDct(dictOids[0].dictOID);
}
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
vector<LBID_t> lbidList;
if (idbdatafile::IDBPolicy::useHdfs())
{
// XXX THIS IS WRONG!!!
// save the extent info to mark them invalid, after flush, the meta file will be gone.
std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap = fWEWrapper.getTxnMap();
try
{
auto mapIter = m_txnLBIDMap.find(txnId);
if (mapIter != m_txnLBIDMap.end())
{
SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
lbidList = spTxnLBIDRec->m_LBIDs;
}
}
catch (...)
{
}
}
error = fWEWrapper.flushDataFiles(flushCode, txnId, oids);
// No need to close files, flushDataFile will close them.
// if (((colType.compressionType > 0 ) && (dictOids.size() > 0)) || (idbdatafile::IDBPolicy::useHdfs()))
// fWEWrapper.closeDctnry(txnId, colType.compressionType, true);
if (error != NO_ERROR)
{
rc = error;
WErrorCodes ec;
err = ec.errorString(error);
}
// erase rowgroup from the rowGroup map
if (rowGroups[txnId])
{
delete rowGroups[txnId];
rowGroups[txnId] = 0;
}
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
ColExtsInfo::iterator aIt;
std::vector<BRM::FileInfo> files;
std::vector<BRM::OID_t> oidsToFlush;
BRM::FileInfo aFile;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aFile.oid = it->first;
oidsToFlush.push_back(aFile.oid);
while (aIt != (it->second).end())
{
aFile.partitionNum = aIt->partNum;
aFile.dbRoot = aIt->dbRoot;
aFile.segmentNum = aIt->segNum;
aFile.compType = aIt->compType;
files.push_back(aFile);
// cout <<"Added to files oid:dbroot:part:seg:compType = " <<
// aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
//<<":"<<aFile.compType <<endl;
aIt++;
}
it++;
}
if (idbdatafile::IDBPolicy::useHdfs())
{
rc = fWEWrapper.confirmTransaction(txnId);
//@Bug 5700. Purge FD cache after file swap
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
cacheutils::flushOIDsFromCache(oidsToFlush);
fDbrm.invalidateUncommittedExtentLBIDs(0, false, &lbidList);
}
// cout << "Purged files.size:moduleId = " << files.size() << ":"<<Config::getLocalModuleID() << endl;
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::dropPrimProcFdCache();
TableMetaData::removeTableMetaData(tableOid);
// MCOL-1495 Remove fCatalogMap entries CS won't use anymore.
CalpontSystemCatalog::removeCalpontSystemCatalog(txnId);
CalpontSystemCatalog::removeCalpontSystemCatalog(txnId | 0x80000000);
return rc;
}
uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, std::string& err,
ByteStream::quadbyte& PMId, uint64_t& blocksChanged)
{
uint8_t rc = 0;
uint32_t tmp32, sessionID;
TxnID txnId;
bs >> PMId;
bs >> sessionID;
bs >> tmp32;
txnId = tmp32;
string schema, tableName;
bs >> schema;
bs >> tableName;
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.setTransId(txnId);
if (!rowGroups[txnId]) // meta data
{
rowGroups[txnId] = new rowgroup::RowGroup();
rowGroups[txnId]->deserialize(bs);
// If hdfs, call chunkmanager to set up
rc = fWEWrapper.startTransaction(txnId);
if (rc != NO_ERROR)
{
WErrorCodes ec;
err = ec.errorString(rc);
}
return rc;
}
rowgroup::RGData rgData;
rgData.deserialize(bs);
rowGroups[txnId]->setData(&rgData);
// rowGroups[txnId]->setData(const_cast<uint8_t*>(bs.buf()));
// get row ids
rowgroup::Row row;
rowGroups[txnId]->initRow(&row);
WriteEngine::RIDList rowIDList;
CalpontSystemCatalog::RID rid;
uint32_t rowsThisRowgroup = rowGroups[txnId]->getRowCount();
uint16_t dbRoot, segment, blockNum;
uint32_t partition;
uint8_t extentNum;
CalpontSystemCatalog::TableName aTableName;
aTableName.schema = schema;
aTableName.table = tableName;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
CalpontSystemCatalog::ROPair roPair;
CalpontSystemCatalog::RIDList tableRidList;
try
{
roPair = systemCatalogPtr->tableRID(aTableName);
tableRidList = systemCatalogPtr->columnRIDs(aTableName, true);
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
// querystats
uint64_t relativeRID = 0;
boost::scoped_array<int> preBlkNums(new int[row.getColumnCount()]);
boost::scoped_array<uint32_t> colWidth(new uint32_t[row.getColumnCount()]);
// initialize
for (uint32_t j = 0; j < row.getColumnCount(); j++)
{
preBlkNums[j] = -1;
colWidth[j] = row.getColumnWidth(j);
execplan::CalpontSystemCatalog::ColDataType colDataType = row.getColType(j);
if (colWidth[j] >= 8 && !(colDataType == execplan::CalpontSystemCatalog::DECIMAL ||
colDataType == execplan::CalpontSystemCatalog::UDECIMAL))
{
colWidth[j] = 8;
}
else if (colWidth[j] >= datatypes::MAXDECIMALWIDTH)
{
colWidth[j] = datatypes::MAXDECIMALWIDTH;
}
}
// Get the file information from rowgroup
dbRoot = rowGroups[txnId]->getDBRoot();
rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum);
WriteEngine::ColStructList colStructList;
WriteEngine::CSCTypesList cscColTypeList;
WriteEngine::ColStruct colStruct;
colStruct.fColPartition = partition;
colStruct.fColSegment = segment;
colStruct.fColDbRoot = dbRoot;
for (unsigned i = 0; i < rowsThisRowgroup; i++)
{
rowGroups[txnId]->getRow(i, &row);
rid = row.getRid();
relativeRID = rid - rowGroups[txnId]->getBaseRid();
rid = relativeRID;
convertToRelativeRid(rid, extentNum, blockNum);
rowIDList.push_back(rid);
// populate stats.blocksChanged
for (uint32_t j = 0; j < row.getColumnCount(); j++)
{
if ((int)(relativeRID / (BYTE_PER_BLOCK / colWidth[j])) > preBlkNums[j])
{
blocksChanged++;
preBlkNums[j] = relativeRID / (BYTE_PER_BLOCK / colWidth[j]);
}
}
}
try
{
for (unsigned i = 0; i < tableRidList.size(); i++)
{
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(tableRidList[i].objnum);
colStruct.dataOid = tableRidList[i].objnum;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
if (colType.colWidth > 8 && !(colType.colDataType == CalpontSystemCatalog::DECIMAL ||
colType.colDataType == CalpontSystemCatalog::UDECIMAL)) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
}
else
{
colStruct.colWidth = colType.colWidth;
}
colStruct.colDataType = colType.colDataType;
colStructList.push_back(colStruct);
cscColTypeList.push_back(colType);
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
std::vector<ColStructList> colExtentsStruct;
std::vector<CSCTypesList> colExtentsColType;
std::vector<void*> colOldValueList;
std::vector<RIDList> ridLists;
colExtentsStruct.push_back(colStructList);
colExtentsColType.push_back(cscColTypeList);
ridLists.push_back(rowIDList);
int error = 0;
error = fWEWrapper.deleteRow(txnId, colExtentsColType, colExtentsStruct, colOldValueList, ridLists,
roPair.objnum);
if (error != NO_ERROR)
{
rc = error;
// cout << "WE Error code " << error << endl;
WErrorCodes ec;
err = ec.errorString(error);
if (error == ERR_BRM_DEAD_LOCK)
{
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
}
else if (error == ERR_BRM_VB_OVERFLOW)
{
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
}
}
return rc;
}
uint8_t WE_DMLCommandProc::processRemoveMeta(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
uint32_t tableOID;
try
{
bs >> tableOID;
// std::cout << ": tableOID-" << tableOID << std::endl;
BulkRollbackMgr::deleteMetaFile(tableOID);
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
}
return rc;
}
//------------------------------------------------------------------------------
// Process bulk rollback command
//------------------------------------------------------------------------------
uint8_t WE_DMLCommandProc::processBulkRollback(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
err.clear();
try
{
uint32_t tableOID;
uint64_t tableLockID;
std::string tableName;
std::string appName;
// May want to eventually comment out this logging to stdout,
// but it shouldn't hurt to keep in here.
std::cout << "processBulkRollback";
bs >> tableLockID;
// std::cout << ": tableLock-"<< tableLockID;
bs >> tableOID;
// std::cout << "; tableOID-" << tableOID;
bs >> tableName;
if (tableName.length() == 0)
{
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(0);
CalpontSystemCatalog::TableName aTableName = systemCatalogPtr->tableName(tableOID);
tableName = aTableName.toString();
}
std::cout << "; table-" << tableName;
bs >> appName;
std::cout << "; app-" << appName << std::endl;
int we_rc = fWEWrapper.bulkRollback(tableOID, tableLockID, tableName, appName,
false, // no extra debug logging to the console
err);
if (we_rc != NO_ERROR)
rc = 2;
}
catch (exception& ex)
{
std::cout << "processBulkRollback: exception-" << ex.what() << std::endl;
err = ex.what();
rc = 1;
}
return rc;
}
//------------------------------------------------------------------------------
// Process bulk rollback cleanup command (deletes bulk rollback meta data files)
//------------------------------------------------------------------------------
uint8_t WE_DMLCommandProc::processBulkRollbackCleanup(messageqcpp::ByteStream& bs, std::string& err)
{
uint8_t rc = 0;
err.clear();
try
{
uint32_t tableOID;
// May want to eventually comment out this logging to stdout,
// but it shouldn't hurt to keep in here.
std::cout << "processBulkRollbackCleanup";
bs >> tableOID;
std::cout << ": tableOID-" << tableOID << std::endl;
BulkRollbackMgr::deleteMetaFile(tableOID);
}
catch (exception& ex)
{
std::cout << "processBulkRollbackCleanup: exception-" << ex.what() << std::endl;
err = ex.what();
rc = 1;
}
return rc;
}
uint8_t WE_DMLCommandProc::updateSyscolumnNextval(ByteStream& bs, std::string& err)
{
uint32_t columnOid, sessionID;
uint64_t nextVal;
int rc = 0;
bs >> columnOid;
bs >> nextVal;
bs >> sessionID;
uint16_t dbRoot;
std::map<uint32_t, uint32_t> oids;
// std::vector<BRM::OID_t> oidsToFlush;
oids[columnOid] = columnOid;
// oidsToFlush.push_back(columnOid);
BRM::OID_t oid = 1021;
fDbrm.getSysCatDBRoot(oid, dbRoot);
fWEWrapper.setTransId(sessionID);
fWEWrapper.setBulkFlag(false);
fWEWrapper.startTransaction(sessionID);
// cout << "updateSyscolumnNextval startTransaction id " << sessionID << endl;
rc = fWEWrapper.updateNextValue(sessionID, columnOid, nextVal, sessionID, dbRoot);
if (rc != 0)
{
err = "Error in WE::updateNextValue";
rc = 1;
}
if (idbdatafile::IDBPolicy::useHdfs())
{
cout << "updateSyscolumnNextval flushDataFiles " << endl;
int rc1 = fWEWrapper.flushDataFiles(rc, sessionID, oids);
if ((rc == 0) && (rc1 == 0))
{
cout << "updateSyscolumnNextval confirmTransaction rc =0 " << endl;
rc1 = fWEWrapper.confirmTransaction(sessionID);
cout << "updateSyscolumnNextval confirmTransaction return code is " << rc1 << endl;
if (rc1 == NO_ERROR)
rc1 = fWEWrapper.endTransaction(sessionID, true);
else
fWEWrapper.endTransaction(sessionID, false);
}
else
{
cout << "updateSyscolumnNextval endTransaction with error " << endl;
fWEWrapper.endTransaction(sessionID, false);
}
if (rc == NO_ERROR)
rc = rc1;
}
// if (idbdatafile::IDBPolicy::useHdfs())
// cacheutils::flushOIDsFromCache(oidsToFlush);
return rc;
}
uint8_t WE_DMLCommandProc::processPurgeFDCache(ByteStream& bs, std::string& err)
{
int rc = 0;
uint32_t tableOid;
bs >> tableOid;
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap();
ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin();
ColExtsInfo::iterator aIt;
std::vector<BRM::FileInfo> files;
BRM::FileInfo aFile;
while (it != colsExtsInfoMap.end())
{
aIt = (it->second).begin();
aFile.oid = it->first;
while (aIt != (it->second).end())
{
aFile.partitionNum = aIt->partNum;
aFile.dbRoot = aIt->dbRoot;
aFile.segmentNum = aIt->segNum;
aFile.compType = aIt->compType;
files.push_back(aFile);
// cout <<"Added to files oid:dbroot:part:seg:compType = " <<
// aFile.oid<<":"<<aFile.dbRoot<<":"<<aFile.partitionNum<<":"<<aFile.segmentNum
//<<":"<<aFile.compType <<endl;
aIt++;
}
it++;
}
if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0))
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
TableMetaData::removeTableMetaData(tableOid);
return rc;
}
uint8_t WE_DMLCommandProc::processFixRows(messageqcpp::ByteStream& bs, std::string& err,
ByteStream::quadbyte& PMId)
{
uint8_t rc = 0;
// cout << " In processFixRows" << endl;
uint32_t tmp32;
uint64_t sessionID;
uint16_t dbRoot, segment;
uint32_t partition;
string schema, tableName;
TxnID txnId;
uint8_t tmp8;
bool firstBatch = false;
WriteEngine::RIDList rowIDList;
bs >> PMId;
bs >> tmp8;
firstBatch = (tmp8 != 0);
bs >> sessionID;
bs >> tmp32;
txnId = tmp32;
bs >> schema;
bs >> tableName;
bs >> dbRoot;
bs >> partition;
bs >> segment;
deserializeInlineVector(bs, rowIDList);
// Need to identify whether this is the first batch to start transaction.
if (firstBatch)
{
rc = fWEWrapper.startTransaction(txnId);
if (rc != NO_ERROR)
{
WErrorCodes ec;
err = ec.errorString(rc);
return rc;
}
fWEWrapper.setIsInsert(false);
fWEWrapper.setBulkFlag(false);
fWEWrapper.setTransId(txnId);
fWEWrapper.setFixFlag(true);
}
CalpontSystemCatalog::TableName aTableName;
aTableName.schema = schema;
aTableName.table = tableName;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
CalpontSystemCatalog::ROPair roPair;
CalpontSystemCatalog::RIDList tableRidList;
try
{
roPair = systemCatalogPtr->tableRID(aTableName);
tableRidList = systemCatalogPtr->columnRIDs(aTableName, true);
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
WriteEngine::ColStructList colStructList;
WriteEngine::ColStruct colStruct;
WriteEngine::DctnryStructList dctnryStructList;
/* colStruct.fColPartition = partition;
colStruct.fColSegment = segment;
colStruct.fColDbRoot = dbRoot; */
colStruct.fColPartition = 0;
colStruct.fColSegment = 0;
colStruct.fColDbRoot = 3;
// should we always scan dictionary store files?
try
{
for (unsigned i = 0; i < tableRidList.size(); i++)
{
CalpontSystemCatalog::ColType colType;
colType = systemCatalogPtr->colType(tableRidList[i].objnum);
colStruct.dataOid = tableRidList[i].objnum;
colStruct.tokenFlag = false;
colStruct.fCompressionType = colType.compressionType;
WriteEngine::DctnryStruct dctnryStruct;
dctnryStruct.fColDbRoot = colStruct.fColDbRoot;
dctnryStruct.fColPartition = colStruct.fColPartition;
dctnryStruct.fColSegment = colStruct.fColSegment;
dctnryStruct.fCompressionType = colStruct.fCompressionType;
dctnryStruct.dctnryOid = 0;
dctnryStruct.fCharsetNumber = colType.charsetNumber;
if (colType.colWidth > 8) // token
{
colStruct.colWidth = 8;
colStruct.tokenFlag = true;
dctnryStruct.dctnryOid = colType.ddn.dictOID;
}
else
{
colStruct.colWidth = colType.colWidth;
}
colStruct.colDataType = colType.colDataType;
colStructList.push_back(colStruct);
dctnryStructList.push_back(dctnryStruct);
}
}
catch (exception& ex)
{
err = ex.what();
rc = 1;
return rc;
}
int error = 0;
try
{
error = fWEWrapper.deleteBadRows(txnId, colStructList, rowIDList, dctnryStructList);
if (error != NO_ERROR)
{
rc = error;
// cout << "WE Error code " << error << endl;
WErrorCodes ec;
err = ec.errorString(error);
if (error == ERR_BRM_DEAD_LOCK)
{
rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR;
}
else if (error == ERR_BRM_VB_OVERFLOW)
{
rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR;
err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW);
}
}
}
catch (std::exception& ex)
{
err = ex.what();
rc = 1;
}
// cout << "WES return rc " << (int)rc << " with msg " << err << endl;
return rc;
}
uint8_t WE_DMLCommandProc::processEndTransaction(ByteStream& bs, std::string& err)
{
int rc = 0;
ByteStream::byte tmp8;
bool success;
uint32_t txnid;
bs >> txnid;
bs >> tmp8;
success = (tmp8 != 0);
rc = fWEWrapper.endTransaction(txnid, success);
if (rc != NO_ERROR)
{
WErrorCodes ec;
err = ec.errorString(rc);
}
return rc;
}
//------------------------------------------------------------------------------
// Validates the correctness of the current HWMs for this table.
// The HWMs for all the 1 byte columns should be identical. Same goes
// for all the 2 byte columns, etc. The 2 byte column HWMs should be
// "roughly" (but not necessarily exactly) twice that of a 1 byte column.
// Same goes for the 4 byte column HWMs vs their 2 byte counterparts, etc.
// ridList - columns oids to be used to get column width on to use with validation.
// segFileInfo - Vector of File objects carrying current DBRoot, partition,
// HWM, etc to be validated for the columns belonging to jobTable.
// stage - Current stage we are validating. "Starting" or "Ending".
//------------------------------------------------------------------------------
int WE_DMLCommandProc::validateColumnHWMs(CalpontSystemCatalog::RIDList& ridList,
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr,
const std::vector<DBRootExtentInfo>& segFileInfo, const char* stage)
{
int rc = NO_ERROR;
if ((!fIsFirstBatchPm) && (strcmp(stage, "Starting") == 0))
return rc;
// Used to track first 1-byte, 2-byte, 4-byte, and 8-byte columns in table
int byte1First = -1;
int byte2First = -1;
int byte4First = -1;
int byte8First = -1;
// Make sure the HWMs for all 1-byte columns match; same for all 2-byte,
// 4-byte, and 8-byte columns as well.
CalpontSystemCatalog::ColType colType;
Convertor convertor;
for (unsigned k = 0; k < segFileInfo.size(); k++)
{
int k1 = 0;
// Find out column width
colType = systemCatalogPtr->colType(ridList[k].objnum);
colType.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth);
// Find the first 1-byte, 2-byte, 4-byte, and 8-byte columns.
// Use those as our reference HWM for the respective column widths.
switch (colType.colWidth)
{
case 1:
{
if (byte1First == -1)
byte1First = k;
k1 = byte1First;
break;
}
case 2:
{
if (byte2First == -1)
byte2First = k;
k1 = byte2First;
break;
}
case 4:
{
if (byte4First == -1)
byte4First = k;
k1 = byte4First;
break;
}
case 8:
default:
{
if (byte8First == -1)
byte8First = k;
k1 = byte8First;
break;
}
} // end of switch based on column width (1,2,4, or 8)
// std::cout << "dbg: comparing0 " << stage << " refcol-" << k1 <<
// "; wid-" << jobColK1.width << "; hwm-" << segFileInfo[k1].fLocalHwm <<
// " <to> col-" << k <<
// "; wid-" << jobColK.width << " ; hwm-"<<segFileInfo[k].fLocalHwm<<std::endl;
// Validate that the HWM for this column (k) matches that of the
// corresponding reference column with the same width.
if ((segFileInfo[k1].fDbRoot != segFileInfo[k].fDbRoot) ||
(segFileInfo[k1].fPartition != segFileInfo[k].fPartition) ||
(segFileInfo[k1].fSegment != segFileInfo[k].fSegment) ||
(segFileInfo[k1].fLocalHwm != segFileInfo[k].fLocalHwm))
{
CalpontSystemCatalog::ColType colType2;
colType2 = systemCatalogPtr->colType(ridList[k1].objnum);
ostringstream oss;
oss << stage
<< " HWMs do not match for"
" OID1-"
<< ridList[k1].objnum << "; DBRoot-" << segFileInfo[k1].fDbRoot << "; partition-"
<< segFileInfo[k1].fPartition << "; segment-" << segFileInfo[k1].fSegment << "; hwm-"
<< segFileInfo[k1].fLocalHwm << "; width-" << colType2.colWidth << ':' << std::endl
<< " and OID2-" << ridList[k].objnum << "; DBRoot-" << segFileInfo[k].fDbRoot << "; partition-"
<< segFileInfo[k].fPartition << "; segment-" << segFileInfo[k].fSegment << "; hwm-"
<< segFileInfo[k].fLocalHwm << "; width-" << colType.colWidth;
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR);
return ERR_BRM_HWMS_NOT_EQUAL;
}
// HWM DBRoot, partition, and segment number should match for all
// columns; so compare DBRoot, part#, and seg# with first column.
if ((segFileInfo[0].fDbRoot != segFileInfo[k].fDbRoot) ||
(segFileInfo[0].fPartition != segFileInfo[k].fPartition) ||
(segFileInfo[0].fSegment != segFileInfo[k].fSegment))
{
CalpontSystemCatalog::ColType colType2;
colType2 = systemCatalogPtr->colType(ridList[0].objnum);
ostringstream oss;
oss << stage
<< " HWM DBRoot,Part#, or Seg# do not match for"
" OID1-"
<< ridList[0].objnum << "; DBRoot-" << segFileInfo[0].fDbRoot << "; partition-"
<< segFileInfo[0].fPartition << "; segment-" << segFileInfo[0].fSegment << "; hwm-"
<< segFileInfo[0].fLocalHwm << "; width-" << colType2.colWidth << ':' << std::endl
<< " and OID2-" << ridList[k].objnum << "; DBRoot-" << segFileInfo[k].fDbRoot << "; partition-"
<< segFileInfo[k].fPartition << "; segment-" << segFileInfo[k].fSegment << "; hwm-"
<< segFileInfo[k].fLocalHwm << "; width-" << colType.colWidth;
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR);
return ERR_BRM_HWMS_NOT_EQUAL;
}
} // end of loop to compare all 1-byte HWMs, 2-byte HWMs, etc.
// Validate/compare HWM for 1-byte column in relation to 2-byte column, etc.
// Without knowing the exact row count, we can't extrapolate the exact HWM
// for the wider column, but we can narrow it down to an expected range.
int refCol = 0;
int colIdx = 0;
// Validate/compare HWMs given a 1-byte column as a starting point
if (byte1First >= 0)
{
refCol = byte1First;
if (byte2First >= 0)
{
HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 2;
HWM hwmHi = hwmLo + 1;
if ((segFileInfo[byte2First].fLocalHwm < hwmLo) || (segFileInfo[byte2First].fLocalHwm > hwmHi))
{
colIdx = byte2First;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
goto errorCheck;
}
}
if (byte4First >= 0)
{
HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 4;
HWM hwmHi = hwmLo + 3;
if ((segFileInfo[byte4First].fLocalHwm < hwmLo) || (segFileInfo[byte4First].fLocalHwm > hwmHi))
{
colIdx = byte4First;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
goto errorCheck;
}
}
if (byte8First >= 0)
{
HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 8;
HWM hwmHi = hwmLo + 7;
if ((segFileInfo[byte8First].fLocalHwm < hwmLo) || (segFileInfo[byte8First].fLocalHwm > hwmHi))
{
colIdx = byte8First;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
goto errorCheck;
}
}
}
// Validate/compare HWMs given a 2-byte column as a starting point
if (byte2First >= 0)
{
refCol = byte2First;
if (byte4First >= 0)
{
HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 2;
HWM hwmHi = hwmLo + 1;
if ((segFileInfo[byte4First].fLocalHwm < hwmLo) || (segFileInfo[byte4First].fLocalHwm > hwmHi))
{
colIdx = byte4First;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
goto errorCheck;
}
}
if (byte8First >= 0)
{
HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 4;
HWM hwmHi = hwmLo + 3;
if ((segFileInfo[byte8First].fLocalHwm < hwmLo) || (segFileInfo[byte8First].fLocalHwm > hwmHi))
{
colIdx = byte8First;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
goto errorCheck;
}
}
}
// Validate/compare HWMs given a 4-byte column as a starting point
if (byte4First >= 0)
{
refCol = byte4First;
if (byte8First >= 0)
{
HWM hwmLo = segFileInfo[byte4First].fLocalHwm * 2;
HWM hwmHi = hwmLo + 1;
if ((segFileInfo[byte8First].fLocalHwm < hwmLo) || (segFileInfo[byte8First].fLocalHwm > hwmHi))
{
colIdx = byte8First;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
goto errorCheck;
}
}
}
// To avoid repeating this message 6 times in the preceding source code, we
// use the "dreaded" goto to branch to this single place for error handling.
errorCheck:
if (rc != NO_ERROR)
{
CalpontSystemCatalog::ColType colType1, colType2;
colType1 = systemCatalogPtr->colType(ridList[refCol].objnum);
colType1.colWidth = convertor.getCorrectRowWidth(colType1.colDataType, colType1.colWidth);
colType2 = systemCatalogPtr->colType(ridList[colIdx].objnum);
colType2.colWidth = convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth);
ostringstream oss;
oss << stage
<< " HWMs are not in sync for"
" OID1-"
<< ridList[refCol].objnum << "; DBRoot-" << segFileInfo[refCol].fDbRoot << "; partition-"
<< segFileInfo[refCol].fPartition << "; segment-" << segFileInfo[refCol].fSegment << "; hwm-"
<< segFileInfo[refCol].fLocalHwm << "; width-" << colType1.colWidth << ':' << std::endl
<< " and OID2-" << ridList[colIdx].objnum << "; DBRoot-" << segFileInfo[colIdx].fDbRoot
<< "; partition-" << segFileInfo[colIdx].fPartition << "; segment-" << segFileInfo[colIdx].fSegment
<< "; hwm-" << segFileInfo[colIdx].fLocalHwm << "; width-" << colType2.colWidth;
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR);
}
return rc;
}
} // namespace WriteEngine