/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016-2019 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: we_dmlcommandproc.cpp 3082 2011-09-26 22:00:38Z chao $ #include #include "bytestream.h" using namespace messageqcpp; #include "we_messages.h" #include "we_message_handlers.h" #include "../../dbcon/dmlpackage/dmlpkg.h" #include "we_dmlcommandproc.h" using namespace dmlpackage; #include "dmlpackageprocessor.h" using namespace dmlpackageprocessor; #include "dataconvert.h" using namespace dataconvert; #include "calpontsystemcatalog.h" #include "sessionmanager.h" using namespace execplan; #include "messagelog.h" #include "stopwatch.h" #include "idberrorinfo.h" #include "errorids.h" using namespace logging; #include "brmtypes.h" using namespace BRM; #include "we_tablemetadata.h" #include "we_dbrootextenttracker.h" #include "we_bulkrollbackmgr.h" #include "we_define.h" #include "we_confirmhdfsdbfile.h" #include "cacheutils.h" #include "IDBDataFile.h" #include "IDBPolicy.h" #include "checks.h" #include "columnwidth.h" using namespace std; using namespace utils; namespace WriteEngine { // StopWatch timer; WE_DMLCommandProc::WE_DMLCommandProc() { fIsFirstBatchPm = true; filesPerColumnPartition = 8; extentsPerSegmentFile = 1; dbrootCnt = 1; extentRows = 0x800000; config::Config* cf = config::Config::makeConfig(); string fpc = cf->getConfig("ExtentMap", "FilesPerColumnPartition"); if (fpc.length() != 0) filesPerColumnPartition = cf->uFromText(fpc); // MCOL-4685: remove the option to set more than 2 extents per file (ExtentsPreSegmentFile). extentsPerSegmentFile = DEFAULT_EXTENTS_PER_SEGMENT_FILE; string dbct = cf->getConfig("SystemConfig", "DBRootCount"); if (dbct.length() != 0) dbrootCnt = cf->uFromText(dbct); } WE_DMLCommandProc::WE_DMLCommandProc(const WE_DMLCommandProc& rhs) { fIsFirstBatchPm = rhs.fIsFirstBatchPm; fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL)); } WE_DMLCommandProc::~WE_DMLCommandProc() { dbRootExtTrackerVec.clear(); } void WE_DMLCommandProc::processAuxCol(const std::vector& origVals, WriteEngine::ColValueList& colValuesList, WriteEngine::DictStrList& dicStringList) { WriteEngine::ColTupleList auxColTuples; WriteEngine::dictStr auxDicStrings; for (uint32_t j = 0; j < origVals.size(); j++) { WriteEngine::ColTuple auxColTuple; auxColTuple.data = (uint8_t)1; auxColTuples.push_back(auxColTuple); //@Bug 2515. Only pass string values to write engine utils::NullString ns; auxDicStrings.push_back(ns); } colValuesList.push_back(auxColTuples); //@Bug 2515. Only pass string values to write engine dicStringList.push_back(auxDicStrings); } uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std::string& err) { uint8_t rc = 0; err.clear(); InsertDMLPackage insertPkg; ByteStream::quadbyte tmp32; bs >> tmp32; BRM::TxnID txnid; txnid.valid = true; txnid.id = tmp32; bs >> tmp32; uint32_t dbroot = tmp32; // cout << "processSingleInsert received bytestream length " << bs.length() << endl; messageqcpp::ByteStream::byte packageType; bs >> packageType; insertPkg.read(bs); uint32_t sessionId = insertPkg.get_SessionID(); // cout << " processSingleInsert for session " << sessionId << endl; DMLTable* tablePtr = insertPkg.get_Table(); RowList rows = tablePtr->get_RowList(); WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::ColValueList colValuesList; WriteEngine::DictStrList dicStringList; CalpontSystemCatalog::TableName tableName; CalpontSystemCatalog::TableColName tableColName; tableName.table = tableColName.table = tablePtr->get_TableName(); tableName.schema = tableColName.schema = tablePtr->get_SchemaName(); boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::ROPair tableRoPair; CalpontSystemCatalog::OID tableAUXColOid; std::vector colNames; bool isWarningSet = false; try { tableRoPair = systemCatalogPtr->tableRID(tableName); tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName); if (rows.size()) { Row* rowPtr = rows.at(0); ColumnList columns = rowPtr->get_ColumnList(); unsigned int numcols = rowPtr->get_NumberOfColumns(); cscColTypeList.reserve(numcols + 1); // WIP // We presume that DictCols number is low colStructs.reserve(numcols + 1); ColumnList::const_iterator column_iterator = columns.begin(); while (column_iterator != columns.end()) { DMLColumn* columnPtr = *column_iterator; tableColName.column = columnPtr->get_Name(); // TODO MCOL-641 replace with getColRidsOidsTypes() CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName); CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName); CalpontSystemCatalog::ColType colType; colType = systemCatalogPtr->colType(oid); WriteEngine::ColStruct colStruct; colStruct.fColDbRoot = dbroot; WriteEngine::DctnryStruct dctnryStruct; dctnryStruct.fColDbRoot = dbroot; colStruct.dataOid = roPair.objnum; colStruct.tokenFlag = false; colStruct.fCompressionType = colType.compressionType; // Token if (isDictCol(colType)) { // WIP Hardcoded value colStruct.colWidth = 8; colStruct.tokenFlag = true; } else { colStruct.colWidth = colType.colWidth; } colStruct.colDataType = colType.colDataType; dctnryStruct.fCharsetNumber = colType.charsetNumber; if (colStruct.tokenFlag) { dctnryStruct.dctnryOid = colType.ddn.dictOID; dctnryStruct.columnOid = colStruct.dataOid; dctnryStruct.fCompressionType = colType.compressionType; dctnryStruct.colWidth = colType.colWidth; } else { dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; dctnryStruct.fCompressionType = colType.compressionType; dctnryStruct.colWidth = colType.colWidth; } colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); cscColTypeList.push_back(colType); ++column_iterator; } // MCOL-5021 Valid AUX column OID for a table is > 3000 // Tables that were created before this feature was added will have // tableAUXColOid = 0 if (tableAUXColOid > 3000) { CalpontSystemCatalog::ColType colType; colType.compressionType = execplan::AUX_COL_COMPRESSION_TYPE; colType.colWidth = execplan::AUX_COL_WIDTH; colType.colDataType = execplan::AUX_COL_DATATYPE; WriteEngine::ColStruct colStruct; colStruct.fColDbRoot = dbroot; WriteEngine::DctnryStruct dctnryStruct; dctnryStruct.fColDbRoot = dbroot; colStruct.dataOid = tableAUXColOid; colStruct.tokenFlag = false; colStruct.fCompressionType = colType.compressionType; colStruct.colWidth = colType.colWidth; colStruct.colDataType = colType.colDataType; dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; dctnryStruct.fCompressionType = colType.compressionType; dctnryStruct.colWidth = colType.colWidth; colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); cscColTypeList.push_back(colType); } NullString tmpStr; for (unsigned int i = 0; i < numcols; i++) { WriteEngine::ColTupleList colTuples; WriteEngine::DctColTupleList dctColTuples; RowList::const_iterator row_iterator = rows.begin(); while (row_iterator != rows.end()) { Row* rowPtr = *row_iterator; const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i); tableColName.column = columnPtr->get_Name(); // TODO MCOL-641 remove these calls CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName); CalpontSystemCatalog::ColType colType; colType = systemCatalogPtr->colType(oid); boost::any datavalue; bool isNULL = false; bool pushWarning = false; std::vector origVals; origVals = columnPtr->get_DataVector(); WriteEngine::dictStr dicStrings; // token if (isDictCol(colType)) { for (uint32_t i = 0; i < origVals.size(); i++) { tmpStr = origVals[i]; isNULL = columnPtr->get_isnull(); isNULL = isNULL ? true : tmpStr.isNull(); if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT) { if (isNULL && colType.defaultValue.isNull()) // error out { Message::Args args; args.add(tableColName.column); err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); rc = 1; return rc; } else if (isNULL && !(colType.defaultValue.isNull())) { tmpStr = colType.defaultValue; } } if (tmpStr.length() > (unsigned int)colType.colWidth) { tmpStr.assign(tmpStr.unsafeStringRef().substr(0, colType.colWidth)); if (!pushWarning) { pushWarning = true; isWarningSet = true; if ((rc != NO_ERROR) && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)) rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; colNames.push_back(tableColName.column); } } WriteEngine::ColTuple colTuple; colTuple.data = datavalue; colTuples.push_back(colTuple); //@Bug 2515. Only pass string values to write engine dicStrings.push_back(tmpStr); } colValuesList.push_back(colTuples); //@Bug 2515. Only pass string values to write engine dicStringList.push_back(dicStrings); } else { string x; NullString indata; for (uint32_t i = 0; i < origVals.size(); i++) { indata = origVals[i]; isNULL = columnPtr->get_isnull() ? true : indata.isNull(); // check if autoincrement column and value is 0 or null uint64_t nextVal = 1; if (colType.autoincrement) { try { // WIP What if we combine this and previous loop and fail // after get nextAIValue ? nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType); } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } } if (colType.autoincrement && (isNULL || (indata.safeString("").compare("0") == 0))) { try { bool reserved = fDbrm.getAIRange(oid, 1, &nextVal); if (!reserved) { err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT); rc = 1; return rc; } } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } ostringstream oss; oss << nextVal; indata.assign(oss.str()); isNULL = false; } if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT) { if (isNULL && colType.defaultValue.isNull()) // error out { Message::Args args; args.add(tableColName.column); err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); rc = 1; return rc; } else if (isNULL && !(colType.defaultValue.isNull())) { indata = colType.defaultValue; isNULL = false; } } try { datavalue = colType.convertColumnData(indata, pushWarning, insertPkg.get_TimeZone(), false, false); } catch (exception&) { rc = 1; Message::Args args; args.add(string("'") + indata.safeString("<>") + string("'")); err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args); } //@Bug 1806 if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) { return rc; } if (pushWarning) { if (!isWarningSet) isWarningSet = true; if (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; colNames.push_back(tableColName.column); } WriteEngine::ColTuple colTuple; colTuple.data = datavalue; colTuples.push_back(colTuple); //@Bug 2515. Only pass string values to write engine dicStrings.push_back(tmpStr); } colValuesList.push_back(colTuples); dicStringList.push_back(dicStrings); } // MCOL-5021 if ((i == numcols - 1) && (tableAUXColOid > 3000)) { processAuxCol(origVals, colValuesList, dicStringList); } ++row_iterator; } } } } catch (exception& ex) { rc = 1; err = ex.what(); return rc; } // call the write engine to write the rows int error = NO_ERROR; fWEWrapper.setIsInsert(true); fWEWrapper.setBulkFlag(true); fWEWrapper.setTransId(txnid.id); // For hdfs use only uint32_t tblOid = tableRoPair.objnum; // WIP are we saving HDFS? if (idbdatafile::IDBPolicy::useHdfs()) { std::vector columns; DctnryStructList dctnryList; CalpontSystemCatalog::ColType colType; std::vector colDBRootExtentInfo; Convertor convertor; dbRootExtTrackerVec.clear(); fRBMetaWriter.reset(new RBMetaWriter("SingleInsert", NULL)); CalpontSystemCatalog::RIDList ridList; try { ridList = systemCatalogPtr->columnRIDs(tableName, true); std::vector dctnryStoreOids(ridList.size()); std::vector dbRootHWMInfoColVec(ridList.size()); bool bFirstExtentOnThisPM = false; // First gather HWM BRM information for all columns std::vector colWidths; for (unsigned i = 0; i < ridList.size(); i++) { rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]); // need handle error CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum); colWidths.push_back(convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth)); } for (unsigned i = 0; i < ridList.size(); i++) { // Find DBRoot/segment file where we want to start adding rows colType = systemCatalogPtr->colType(ridList[i].objnum); boost::shared_ptr pDBRootExtentTracker( new DBRootExtentTracker(ridList[i].objnum, colWidths, dbRootHWMInfoColVec, i, 0)); dbRootExtTrackerVec.push_back(pDBRootExtentTracker); DBRootExtentInfo dbRootExtent; std::string trkErrMsg; bool bEmptyPM; if (i == 0) rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM, trkErrMsg); else pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent); colDBRootExtentInfo.push_back(dbRootExtent); Column aColumn; aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth); aColumn.colDataType = colType.colDataType; aColumn.compressionType = colType.compressionType; aColumn.dataFile.oid = ridList[i].objnum; aColumn.dataFile.fPartition = dbRootExtent.fPartition; aColumn.dataFile.fSegment = dbRootExtent.fSegment; aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot; aColumn.dataFile.hwm = dbRootExtent.fLocalHwm; columns.push_back(aColumn); if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0)) { DctnryStruct aDctnry; aDctnry.dctnryOid = colType.ddn.dictOID; aDctnry.fColPartition = dbRootExtent.fPartition; aDctnry.fColSegment = dbRootExtent.fSegment; aDctnry.fColDbRoot = dbRootExtent.fDbRoot; dctnryList.push_back(aDctnry); } if (colType.ddn.dictOID > 0) { dctnryStoreOids[i] = colType.ddn.dictOID; } else { dctnryStoreOids[i] = 0; } } fRBMetaWriter->init(tblOid, tableName.table); fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec); // cout << "Backing up hwm chunks" << endl; for (unsigned i = 0; i < dctnryList.size(); i++) // back up chunks for compressed dictionary { // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context fRBMetaWriter->backupDctnryHWMChunk(dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot, dctnryList[i].fColPartition, dctnryList[i].fColSegment); } } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } } if (colValuesList[0].size() > 0) { if (NO_ERROR != (error = fWEWrapper.insertColumnRec_Single(txnid.id, cscColTypeList, colStructs, colValuesList, dctnryStructList, dicStringList, tableRoPair.objnum))) { if (error == ERR_BRM_DEAD_LOCK) { rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR; WErrorCodes ec; err = ec.errorString(error); } else if (error == ERR_BRM_VB_OVERFLOW) { rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR; err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW); } else { rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; WErrorCodes ec; err = ec.errorString(error); } } } std::map oids; std::vector oidsToFlush; for (unsigned i = 0; i < colStructs.size(); i++) { oids[colStructs[i].dataOid] = colStructs[i].dataOid; oidsToFlush.push_back(colStructs[i].dataOid); } for (unsigned i = 0; i < dctnryStructList.size(); i++) { oids[dctnryStructList[i].dctnryOid] = dctnryStructList[i].dctnryOid; oidsToFlush.push_back(dctnryStructList[i].dctnryOid); } fWEWrapper.setTransId(txnid.id); vector lbidList; if (idbdatafile::IDBPolicy::useHdfs()) { // XXX THIS IS WRONG // save the extent info to mark them invalid, after flush, the meta file will be gone. std::tr1::unordered_map m_txnLBIDMap = fWEWrapper.getTxnMap(); try { auto mapIter = m_txnLBIDMap.find(txnid.id); if (mapIter != m_txnLBIDMap.end()) { SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second; lbidList = spTxnLBIDRec->m_LBIDs; } } catch (...) { } } // flush files // @bug5333, up to here, rc may have an error code already, don't overwrite it. int flushChunksRc = fWEWrapper.flushChunks(0, oids); // why not pass rc to flushChunks? if (flushChunksRc != NO_ERROR) { WErrorCodes ec; std::ostringstream ossErr; ossErr << "Error flushing chunks for table " << tableName << "; " << ec.errorString(flushChunksRc); // Append to errmsg in case we already have an error if (err.length() > 0) err += "; "; err += ossErr.str(); if (error == NO_ERROR) error = flushChunksRc; if ((rc == NO_ERROR) || (rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)) rc = 1; // return hardcoded 1 as the above } // Confirm HDFS DB file changes "only" if no error up to this point if (idbdatafile::IDBPolicy::useHdfs()) { if (error == NO_ERROR) { std::string eMsg; ConfirmHdfsDbFile confirmHdfs; error = confirmHdfs.confirmDbFileListFromMetaFile(tblOid, eMsg); if (error != NO_ERROR) { ostringstream ossErr; ossErr << "Error confirming changes to table " << tableName << "; " << eMsg; err = ossErr.str(); rc = 1; } else // Perform extra cleanup that is necessary for HDFS { std::string eMsg; ConfirmHdfsDbFile confirmHdfs; int confirmRc2 = confirmHdfs.endDbFileListFromMetaFile(tblOid, true, eMsg); if (confirmRc2 != NO_ERROR) { // Might want to log this error, but don't think we need // to report as fatal, since all changes were confirmed. } // flush PrimProc FD cache TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tblOid); ColsExtsInfoMap colsExtsInfoMap = aTableMetaData->getColsExtsInfoMap(); ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin(); ColExtsInfo::iterator aIt; std::vector files; BRM::FileInfo aFile; while (it != colsExtsInfoMap.end()) { aIt = (it->second).begin(); aFile.oid = it->first; while (aIt != (it->second).end()) { aFile.partitionNum = aIt->partNum; aFile.dbRoot = aIt->dbRoot; aFile.segmentNum = aIt->segNum; aFile.compType = aIt->compType; files.push_back(aFile); aIt++; } it++; } if (files.size() > 0) cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID()); cacheutils::flushOIDsFromCache(oidsToFlush); fDbrm.invalidateUncommittedExtentLBIDs(0, false, &lbidList); try { BulkRollbackMgr::deleteMetaFile(tblOid); } catch (exception& ex) { err = ex.what(); rc = 1; } } } // (error == NO_ERROR) through call to flushChunks() if (error != NO_ERROR) // rollback { string applName("SingleInsert"); fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(), applName, false, err); BulkRollbackMgr::deleteMetaFile(tblOid); } } // extra hdfs steps fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); TableMetaData::removeTableMetaData(tblOid); if ((rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) || isWarningSet) { if (rc == NO_ERROR) rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; Message::Args args; string cols = "'" + colNames[0] + "'"; for (unsigned i = 1; i < colNames.size(); i++) { cols = cols + ", " + "'" + colNames[i] + "'"; } args.add(cols); err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args); // Strict mode enabled, so rollback on warning if (insertPkg.get_isWarnToError()) { string applName("SingleInsert"); fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(), applName, false, err); BulkRollbackMgr::deleteMetaFile(tblOid); } } // MCOL-1495 Remove fCatalogMap entries CS won't use anymore. CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId); CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000); return rc; } uint8_t WE_DMLCommandProc::commitVersion(ByteStream& bs, std::string& err) { int rc = 0; uint32_t tmp32; int txnID; bs >> tmp32; txnID = tmp32; // cout << "processing commit txnid = " << txnID << endl; rc = fWEWrapper.commit(txnID); if (rc != 0) { WErrorCodes ec; ostringstream oss; oss << "WE: Error commiting transaction; " << txnID << ec.errorString(rc) << endl; err = oss.str(); } return rc; } uint8_t WE_DMLCommandProc::rollbackBlocks(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; int txnID; bs >> sessionID; bs >> tmp32; txnID = tmp32; // cout << "processing rollbackBlocks txnid = " << txnID << endl; try { rc = fWEWrapper.rollbackBlocks(txnID, sessionID); } catch (std::exception& ex) { rc = 1; err = ex.what(); } return rc; } uint8_t WE_DMLCommandProc::rollbackVersion(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; int txnID; bs >> sessionID; bs >> tmp32; txnID = tmp32; // cout << "processing rollbackVersion txnid = " << txnID << endl; rc = fWEWrapper.rollbackVersion(txnID, sessionID); if (rc != 0) { WErrorCodes ec; ostringstream oss; oss << "WE: Error rolling back transaction " << txnID << " for session " << sessionID << "; " << ec.errorString(rc) << endl; err = oss.str(); } return rc; } uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId) { int rc = 0; InsertDMLPackage insertPkg; ByteStream::quadbyte tmp32; bs >> tmp32; bs >> PMId; insertPkg.read(bs); uint32_t sessionId = insertPkg.get_SessionID(); DMLTable* tablePtr = insertPkg.get_Table(); bool isAutocommitOn = insertPkg.get_isAutocommitOn(); if (idbdatafile::IDBPolicy::useHdfs()) isAutocommitOn = true; BRM::TxnID txnid; txnid.id = tmp32; txnid.valid = true; RowList rows = tablePtr->get_RowList(); bool isInsertSelect = insertPkg.get_isInsertSelect(); WriteEngine::ColStructList colStructs; WriteEngine::CSCTypesList cscColTypeList; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::ColValueList colValuesList; WriteEngine::DictStrList dicStringList; CalpontSystemCatalog::TableName tableName; CalpontSystemCatalog::TableColName tableColName; tableName.table = tableColName.table = tablePtr->get_TableName(); tableName.schema = tableColName.schema = tablePtr->get_SchemaName(); boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::ROPair roPair; CalpontSystemCatalog::OID tableAUXColOid; CalpontSystemCatalog::RIDList ridList; CalpontSystemCatalog::DictOIDList dictOids; try { ridList = systemCatalogPtr->columnRIDs(tableName, true); roPair = systemCatalogPtr->tableRID(tableName); tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName); } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } uint32_t sizeToAdd = (tableAUXColOid > 3000 ? 1 : 0); std::vector dctnryStoreOids(ridList.size() + sizeToAdd); std::vector columns; DctnryStructList dctnryList; std::vector dbRootHWMInfoColVec(ridList.size() + sizeToAdd); uint32_t tblOid = roPair.objnum; std::vector colDBRootExtentInfo; bool bFirstExtentOnThisPM = false; Convertor convertor; if (fIsFirstBatchPm) { dbRootExtTrackerVec.clear(); if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn))) fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL)); fWEWrapper.setIsInsert(true); fWEWrapper.setBulkFlag(true); fWEWrapper.setTransId(txnid.id); try { std::vector colTypes; for (unsigned i = 0; i < ridList.size(); i++) { CalpontSystemCatalog::ColType colType = systemCatalogPtr->colType(ridList[i].objnum); colTypes.push_back(colType); if (colType.ddn.dictOID > 0) { dctnryStoreOids[i] = colType.ddn.dictOID; } else { dctnryStoreOids[i] = 0; } } // First gather HWM BRM information for all columns std::vector colWidths; for (unsigned i = 0; i < ridList.size(); i++) { rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]); // need handle error colWidths.push_back(convertor.getCorrectRowWidth(colTypes[i].colDataType, colTypes[i].colWidth)); } // MCOL-5021 if (tableAUXColOid > 3000) { rc = BRMWrapper::getInstance()->getDbRootHWMInfo(tableAUXColOid, dbRootHWMInfoColVec[ridList.size()]); colWidths.push_back(execplan::AUX_COL_WIDTH); dctnryStoreOids[ridList.size()] = 0; CalpontSystemCatalog::ROPair auxRoPair; auxRoPair.rid = ridList.back().rid + 1; auxRoPair.objnum = tableAUXColOid; ridList.push_back(auxRoPair); } for (unsigned i = 0; i < ridList.size(); i++) { uint32_t objNum = ridList[i].objnum; // Find DBRoot/segment file where we want to start adding rows boost::shared_ptr pDBRootExtentTracker( new DBRootExtentTracker(objNum, colWidths, dbRootHWMInfoColVec, i, 0)); dbRootExtTrackerVec.push_back(pDBRootExtentTracker); DBRootExtentInfo dbRootExtent; std::string trkErrMsg; bool bEmptyPM; if (i == 0) { rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM, trkErrMsg); } else pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent); colDBRootExtentInfo.push_back(dbRootExtent); Column aColumn; if ((i == ridList.size() - 1) && (tableAUXColOid > 3000)) // AUX column { aColumn.colWidth = execplan::AUX_COL_WIDTH; aColumn.colDataType = execplan::AUX_COL_DATATYPE; // TODO MCOL-5021 compressionType for the AUX column is hard-coded to 2 aColumn.compressionType = execplan::AUX_COL_COMPRESSION_TYPE; } else { const CalpontSystemCatalog::ColType& colType = colTypes[i]; aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth); aColumn.colDataType = colType.colDataType; aColumn.compressionType = colType.compressionType; if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0)) { DctnryStruct aDctnry; aDctnry.dctnryOid = colType.ddn.dictOID; aDctnry.fColPartition = dbRootExtent.fPartition; aDctnry.fColSegment = dbRootExtent.fSegment; aDctnry.fColDbRoot = dbRootExtent.fDbRoot; dctnryList.push_back(aDctnry); } } aColumn.dataFile.oid = objNum; aColumn.dataFile.fPartition = dbRootExtent.fPartition; aColumn.dataFile.fSegment = dbRootExtent.fSegment; aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot; aColumn.dataFile.hwm = dbRootExtent.fLocalHwm; columns.push_back(aColumn); } } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } //@Bug 5996 validate hwm before starts rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting", tableAUXColOid > 3000); if (rc != 0) { WErrorCodes ec; err = ec.errorString(rc); err += " Check err.log for detailed information."; fIsFirstBatchPm = false; rc = 1; return rc; } } std::vector rangeList; // use of MetaFile for bulk rollback support if (fIsFirstBatchPm && isAutocommitOn) { // save meta data, version last block for each dbroot at the start of batch insert try { fRBMetaWriter->init(tblOid, tableName.table); fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec); if (!bFirstExtentOnThisPM) { for (unsigned i = 0; i < dctnryList.size(); i++) // back up chunks for compressed dictionary { // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context fRBMetaWriter->backupDctnryHWMChunk(dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot, dctnryList[i].fColPartition, dctnryList[i].fColSegment); } } } catch (WeException& ex) // catch exception to close file, then rethrow { rc = 1; err = ex.what(); } // Do versioning. Currently, we only version columns, not strings. If there is a design change, this will // need to be re-visited if (rc != 0) return rc; } std::vector colNames; bool isWarningSet = false; if (rows.size()) { Row* rowPtr = rows.at(0); ColumnList columns = rowPtr->get_ColumnList(); ColumnList::const_iterator column_iterator = columns.begin(); try { while (column_iterator != columns.end()) { DMLColumn* columnPtr = *column_iterator; tableColName.column = columnPtr->get_Name(); CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName); CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName); CalpontSystemCatalog::ColType colType; colType = systemCatalogPtr->colType(oid); WriteEngine::ColStruct colStruct; WriteEngine::DctnryStruct dctnryStruct; colStruct.dataOid = roPair.objnum; colStruct.tokenFlag = false; colStruct.fCompressionType = colType.compressionType; // Token if (isDictCol(colType)) { colStruct.colWidth = 8; colStruct.tokenFlag = true; } else { colStruct.colWidth = colType.colWidth; } colStruct.colDataType = colType.colDataType; dctnryStruct.fCharsetNumber = colType.charsetNumber; if (colStruct.tokenFlag) { dctnryStruct.dctnryOid = colType.ddn.dictOID; dctnryStruct.columnOid = colStruct.dataOid; dctnryStruct.fCompressionType = colType.compressionType; dctnryStruct.colWidth = colType.colWidth; } else { dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; dctnryStruct.fCompressionType = colType.compressionType; dctnryStruct.colWidth = colType.colWidth; } colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); cscColTypeList.push_back(colType); ++column_iterator; } // MCOL-5021 Valid AUX column OID for a table is > 3000 // Tables that were created before this feature was added will have // tableAUXColOid = 0 if (tableAUXColOid > 3000) { CalpontSystemCatalog::ColType colType; colType.compressionType = execplan::AUX_COL_COMPRESSION_TYPE; colType.colWidth = execplan::AUX_COL_WIDTH; colType.colDataType = execplan::AUX_COL_DATATYPE; WriteEngine::ColStruct colStruct; WriteEngine::DctnryStruct dctnryStruct; colStruct.dataOid = tableAUXColOid; colStruct.tokenFlag = false; colStruct.fCompressionType = colType.compressionType; colStruct.colWidth = colType.colWidth; colStruct.colDataType = colType.colDataType; dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; dctnryStruct.fCompressionType = colType.compressionType; dctnryStruct.colWidth = colType.colWidth; colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); cscColTypeList.push_back(colType); } } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } unsigned int numcols = rowPtr->get_NumberOfColumns(); NullString tmpStr; try { for (unsigned int i = 0; i < numcols; i++) { WriteEngine::ColTupleList colTuples; WriteEngine::DctColTupleList dctColTuples; RowList::const_iterator row_iterator = rows.begin(); bool pushWarning = false; while (row_iterator != rows.end()) { Row* rowPtr = *row_iterator; const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i); tableColName.column = columnPtr->get_Name(); CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName); CalpontSystemCatalog::ColType colType; colType = systemCatalogPtr->colType(oid); boost::any datavalue; bool isNULL = false; std::vector origVals; origVals = columnPtr->get_DataVector(); WriteEngine::dictStr dicStrings; // token if (isDictCol(colType)) { for (uint32_t i = 0; i < origVals.size(); i++) { tmpStr = origVals[i]; isNULL = tmpStr.isNull(); if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT) { if (isNULL && colType.defaultValue.isNull()) // error out { Message::Args args; args.add(tableColName.column); err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); rc = 1; return rc; } else if (isNULL && !(colType.defaultValue.isNull())) { tmpStr = colType.defaultValue; } } if (tmpStr.length() > (unsigned int)colType.colWidth) { tmpStr.assign(tmpStr.unsafeStringRef().substr(0, colType.colWidth)); if (!pushWarning) pushWarning = true; } WriteEngine::ColTuple colTuple; colTuple.data = datavalue; colTuples.push_back(colTuple); //@Bug 2515. Only pass string values to write engine dicStrings.push_back(tmpStr); } colValuesList.push_back(colTuples); //@Bug 2515. Only pass string values to write engine dicStringList.push_back(dicStrings); } else { string x; NullString indata; // scan once to check how many autoincrement value needed uint32_t nextValNeeded = 0; uint64_t nextVal = 1; if (colType.autoincrement) { try { nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType); } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } for (uint32_t i = 0; i < origVals.size(); i++) { indata = origVals[i]; if (indata.isNull()) isNULL = true; else isNULL = false; if (isNULL || (indata.safeString("").compare("0") == 0)) nextValNeeded++; } } if (nextValNeeded > 0) // reserve next value { try { bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal); if (!reserved) { err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT); rc = 1; return rc; } } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } } for (uint32_t i = 0; i < origVals.size(); i++) { indata = origVals[i]; if (indata.isNull()) isNULL = true; else isNULL = false; // check if autoincrement column and value is 0 or null if (colType.autoincrement && (isNULL || (indata.safeString("").compare("0") == 0))) { ostringstream oss; oss << nextVal++; indata.assign(oss.str()); isNULL = false; } if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT) { if (isNULL && colType.defaultValue.isNull()) // error out { Message::Args args; args.add(tableColName.column); err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); rc = 1; return rc; } else if (isNULL && !(colType.defaultValue.isNull())) { indata = colType.defaultValue; isNULL = false; } } try { datavalue = colType.convertColumnData(indata, pushWarning, insertPkg.get_TimeZone(), false, false); } catch (exception&) { rc = 1; Message::Args args; args.add(string("'") + indata.safeString("<>") + string("'")); err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args); } //@Bug 1806 if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) { return rc; } if (pushWarning && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)) rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; WriteEngine::ColTuple colTuple; colTuple.data = datavalue; colTuples.push_back(colTuple); //@Bug 2515. Only pass string values to write engine dicStrings.push_back(tmpStr); } colValuesList.push_back(colTuples); dicStringList.push_back(dicStrings); } // MCOL-5021 if ((i == numcols - 1) && (tableAUXColOid > 3000)) { processAuxCol(origVals, colValuesList, dicStringList); } ++row_iterator; } if (pushWarning) { colNames.push_back(tableColName.column); isWarningSet = true; } } } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } } // call the write engine to write the rows int error = NO_ERROR; if (colValuesList.size() > 0) { if (colValuesList[0].size() > 0) { /* Begin-Disable use of MetaFile for bulk rollback support; Use alternate call below that passes 0 ptr for RBMetaWriter if (NO_ERROR != (error = fWEWrapper.insertColumnRecs(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList, dbRootExtTrackerVec, fRBMetaWriter.get(), bFirstExtentOnThisPM, isInsertSelect, 0, roPair.objnum, fIsFirstBatchPm))) End-Disable use of MetaFile for bulk rollback support */ if (NO_ERROR != (error = fWEWrapper.insertColumnRecs( txnid.id, cscColTypeList, colStructs, colValuesList, dctnryStructList, dicStringList, dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm))) { if (error == ERR_BRM_DEAD_LOCK) { rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR; WErrorCodes ec; err = ec.errorString(error); } else if (error == ERR_BRM_VB_OVERFLOW) { rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR; err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW); } else { rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; WErrorCodes ec; err = ec.errorString(error); } } } } if (fIsFirstBatchPm && isAutocommitOn) { // fWEWrapper.writeVBEnd(txnid.id, rangeList); fIsFirstBatchPm = false; } else if (fIsFirstBatchPm) { fIsFirstBatchPm = false; } if (isWarningSet && (rc == NO_ERROR)) { rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; Message::Args args; string cols = "'" + colNames[0] + "'"; for (unsigned i = 1; i < colNames.size(); i++) { cols = cols + ", " + "'" + colNames[i] + "'"; } args.add(cols); err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args); // Strict mode enabled, so rollback on warning if (insertPkg.get_isWarnToError()) { string applName("BatchInsert"); fWEWrapper.bulkRollback(tblOid, txnid.id, tableName.toString(), applName, false, err); BulkRollbackMgr::deleteMetaFile(tblOid); } } // MCOL-1495 Remove fCatalogMap entries CS won't use anymore. CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId); CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000); return rc; } uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId) { int rc = 0; // cout << "processBatchInsert received bytestream length " << bs.length() << endl; ByteStream::quadbyte tmp32; ByteStream::byte tmp8; bs >> tmp32; // cout << "processBatchInsert got transaction id " << tmp32 << endl; bs >> PMId; // cout << "processBatchInsert gor PMId " << PMId << endl; uint32_t sessionId; bs >> sessionId; // cout << " processBatchInsert for session " << sessionId << endl; bool isAutocommitOn; bs >> tmp8; isAutocommitOn = tmp8; if (idbdatafile::IDBPolicy::useHdfs()) isAutocommitOn = true; // cout << "This session isAutocommitOn is " << isAutocommitOn << endl; BRM::TxnID txnid; txnid.id = tmp32; txnid.valid = true; bool isInsertSelect; bs >> tmp8; // For insert select, skip the hwm block and start inserting from the next block // to avoid self insert issue. // For batch insert: if not first batch, use the saved last rid to start adding rows. isInsertSelect = tmp8; WriteEngine::ColStructList colStructs; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; std::vector colValuesList; WriteEngine::DictStrList dicStringList; CalpontSystemCatalog::TableName tableName; CalpontSystemCatalog::TableColName tableColName; bs >> tableColName.table; bs >> tableColName.schema; tableName.table = tableColName.table; tableName.schema = tableColName.schema; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::ROPair roPair; CalpontSystemCatalog::RIDList ridList; CalpontSystemCatalog::DictOIDList dictOids; try { ridList = systemCatalogPtr->columnRIDs(tableName, true); roPair = systemCatalogPtr->tableRID(tableName); } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } std::vector dctnryStoreOids(ridList.size()); std::vector columns; DctnryStructList dctnryList; std::vector dbRootHWMInfoColVec(ridList.size()); uint32_t tblOid = roPair.objnum; CalpontSystemCatalog::ColType colType; std::vector colDBRootExtentInfo; bool bFirstExtentOnThisPM = false; Convertor convertor; if (fIsFirstBatchPm) { dbRootExtTrackerVec.clear(); if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn))) fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL)); fWEWrapper.setIsInsert(true); fWEWrapper.setBulkFlag(true); fWEWrapper.setTransId(txnid.id); try { // First gather HWM BRM information for all columns std::vector colWidths; for (unsigned i = 0; i < ridList.size(); i++) { rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]); // need handle error CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum); colWidths.push_back(convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth)); } for (unsigned i = 0; i < ridList.size(); i++) { // Find DBRoot/segment file where we want to start adding rows colType = systemCatalogPtr->colType(ridList[i].objnum); boost::shared_ptr pDBRootExtentTracker( new DBRootExtentTracker(ridList[i].objnum, colWidths, dbRootHWMInfoColVec, i, 0)); dbRootExtTrackerVec.push_back(pDBRootExtentTracker); DBRootExtentInfo dbRootExtent; std::string trkErrMsg; bool bEmptyPM; if (i == 0) { rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM, trkErrMsg); /* cout << "bEmptyPM = " << (int) bEmptyPM << " bFirstExtentOnThisPM= " << (int)bFirstExtentOnThisPM << " oid:dbroot:hwm = " << ridList[i].objnum << ":"< rangeList; // use of MetaFile for bulk rollback support if (fIsFirstBatchPm && isAutocommitOn) { // save meta data, version last block for each dbroot at the start of batch insert try { fRBMetaWriter->init(tblOid, tableName.table); fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec); // cout << "Saved meta files" << endl; if (!bFirstExtentOnThisPM) { // cout << "Backing up hwm chunks" << endl; for (unsigned i = 0; i < dctnryList.size(); i++) // back up chunks for compressed dictionary { // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context fRBMetaWriter->backupDctnryHWMChunk(dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot, dctnryList[i].fColPartition, dctnryList[i].fColSegment); } } } catch (WeException& ex) // catch exception to close file, then rethrow { rc = 1; err = ex.what(); } // Do versioning. Currently, we only version columns, not strings. If there is a design change, this will // need to be re-visited if (rc != 0) return rc; } std::vector colNames; bool isWarningSet = false; uint32_t columnCount; bs >> columnCount; if (columnCount) { try { for (uint32_t current_column = 0; current_column < columnCount; current_column++) { uint32_t tmp32; std::string colName; bs >> tmp32; bs >> colName; colNames.push_back(colName); CalpontSystemCatalog::OID oid = tmp32; CalpontSystemCatalog::ColType colType; colType = systemCatalogPtr->colType(oid); WriteEngine::ColStruct colStruct; WriteEngine::DctnryStruct dctnryStruct; colStruct.dataOid = oid; colStruct.tokenFlag = false; colStruct.fCompressionType = colType.compressionType; // Token if (isDictCol(colType)) { colStruct.colWidth = 8; colStruct.tokenFlag = true; } else { colStruct.colWidth = colType.colWidth; } colStruct.colDataType = colType.colDataType; dctnryStruct.fCharsetNumber = colType.charsetNumber; if (colStruct.tokenFlag) { dctnryStruct.dctnryOid = colType.ddn.dictOID; dctnryStruct.columnOid = colStruct.dataOid; dctnryStruct.fCompressionType = colType.compressionType; dctnryStruct.colWidth = colType.colWidth; } else { dctnryStruct.dctnryOid = 0; dctnryStruct.columnOid = colStruct.dataOid; dctnryStruct.fCompressionType = colType.compressionType; dctnryStruct.colWidth = colType.colWidth; } colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); } } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } NullString tmpStr; uint32_t valuesPerColumn; bs >> valuesPerColumn; colValuesList.reserve(columnCount * valuesPerColumn); try { bool pushWarning = false; for (uint32_t j = 0; j < columnCount; j++) { WriteEngine::DctColTupleList dctColTuples; tableColName.column = colNames[j]; CalpontSystemCatalog::OID oid = colStructs[j].dataOid; CalpontSystemCatalog::ColType colType; colType = systemCatalogPtr->colType(oid); bool isNULL = false; WriteEngine::dictStr dicStrings; // token if (isDictCol(colType)) { for (uint32_t i = 0; i < valuesPerColumn; i++) { bs >> tmpStr; isNULL = tmpStr.isNull(); if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT) { if (isNULL && colType.defaultValue.isNull()) // error out { Message::Args args; args.add(tableColName.column); err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); rc = 1; return rc; } else if (isNULL && !(colType.defaultValue.isNull())) { tmpStr = colType.defaultValue; } } if (tmpStr.length() > (unsigned int)colType.colWidth) { tmpStr.assign(tmpStr.unsafeStringRef().substr(0, colType.colWidth)); if (!pushWarning) pushWarning = true; } colValuesList.push_back(0); //@Bug 2515. Only pass string values to write engine dicStrings.push_back(tmpStr); } //@Bug 2515. Only pass string values to write engine dicStringList.push_back(dicStrings); } else { string x; // scan once to check how many autoincrement value needed uint32_t nextValNeeded = 0; uint64_t nextVal = 1; if (colType.autoincrement) { try { nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType); } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } } for (uint32_t i = 0; i < valuesPerColumn; i++) { bs >> tmp8; isNULL = tmp8; uint8_t val8; uint16_t val16; uint32_t val32; uint64_t val64; uint64_t colValue; float valF; double valD; NullString valStr; bool valZero = false; // Needed for autoinc check switch (colType.colDataType) { case execplan::CalpontSystemCatalog::TINYINT: case execplan::CalpontSystemCatalog::UTINYINT: bs >> val8; if (val8 == 0) valZero = true; colValue = val8; break; case execplan::CalpontSystemCatalog::SMALLINT: case execplan::CalpontSystemCatalog::USMALLINT: bs >> val16; if (val16 == 0) valZero = true; colValue = val16; break; case execplan::CalpontSystemCatalog::DATE: case execplan::CalpontSystemCatalog::MEDINT: case execplan::CalpontSystemCatalog::INT: case execplan::CalpontSystemCatalog::UMEDINT: case execplan::CalpontSystemCatalog::UINT: bs >> val32; if (val32 == 0) valZero = true; colValue = val32; break; case execplan::CalpontSystemCatalog::BIGINT: case execplan::CalpontSystemCatalog::DATETIME: case execplan::CalpontSystemCatalog::TIME: case execplan::CalpontSystemCatalog::TIMESTAMP: case execplan::CalpontSystemCatalog::UBIGINT: bs >> val64; if (val64 == 0) valZero = true; colValue = val64; break; case execplan::CalpontSystemCatalog::DECIMAL: switch (colType.colWidth) { case 1: { bs >> val8; colValue = val8; break; } case 2: { bs >> val16; colValue = val16; break; } case 4: { bs >> val32; colValue = val32; break; } default: { bs >> val64; colValue = val64; break; } } break; case execplan::CalpontSystemCatalog::UDECIMAL: // UDECIMAL numbers may not be negative if (colType.colWidth == 1) { bs >> val8; // FIXME: IDK what would it mean if valN are unsigned if (utils::is_negative(val8) && val8 != joblist::TINYINTEMPTYROW && val8 != joblist::TINYINTNULL) { val8 = 0; pushWarning = true; } colValue = val8; } else if (colType.colWidth == 2) { bs >> val16; if (utils::is_negative(val16) && val16 != joblist::SMALLINTEMPTYROW && val16 != joblist::SMALLINTNULL) { val16 = 0; pushWarning = true; } colValue = val16; } else if (colType.colWidth == 4) { bs >> val32; if (utils::is_negative(val32) && val32 != joblist::INTEMPTYROW && val32 != joblist::INTNULL) { val32 = 0; pushWarning = true; } colValue = val32; } else if (colType.colWidth == 8) { bs >> val64; if (utils::is_negative(val64) && val64 != joblist::BIGINTEMPTYROW && val64 != joblist::BIGINTNULL) { val64 = 0; pushWarning = true; } colValue = val64; } break; case execplan::CalpontSystemCatalog::DOUBLE: bs >> val64; colValue = val64; break; case execplan::CalpontSystemCatalog::UDOUBLE: bs >> val64; memcpy(&valD, &val64, 8); if (valD < 0.0 && valD != static_cast(joblist::DOUBLEEMPTYROW) && valD != static_cast(joblist::DOUBLENULL)) { valD = 0.0; pushWarning = true; } colValue = val64; break; case execplan::CalpontSystemCatalog::FLOAT: bs >> val32; colValue = val32; break; case execplan::CalpontSystemCatalog::UFLOAT: bs >> val32; memcpy(&valF, &val32, 4); if (valF < 0.0 && valF != static_cast(joblist::FLOATEMPTYROW) && valF != static_cast(joblist::FLOATNULL)) { valF = 0.0; pushWarning = true; } colValue = val32; break; case execplan::CalpontSystemCatalog::CHAR: case execplan::CalpontSystemCatalog::VARCHAR: case execplan::CalpontSystemCatalog::TEXT: case execplan::CalpontSystemCatalog::BLOB: bs >> valStr; if (!valStr.isNull()) // null values do not require any padding or truncation. { if (valStr.length() > (unsigned int)colType.colWidth) { valStr = NullString(valStr.unsafeStringRef().substr(0, colType.colWidth)); pushWarning = true; } else { if ((unsigned int)colType.colWidth > valStr.length()) { // Pad null character to the string valStr.resize(colType.colWidth, 0); } } } // FIXME: colValue is uint64_t (8 bytes) memcpy(&colValue, valStr.str(), valStr.length()); break; default: rc = 1; err = IDBErrorInfo::instance()->errorMsg(ERR_DATATYPE_NOT_SUPPORT); break; } // check if autoincrement column and value is 0 or null if (colType.autoincrement && (isNULL || valZero)) { ostringstream oss; oss << nextVal++; isNULL = false; try { nextValNeeded++; bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal); if (!reserved) { err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT); rc = 1; return rc; } } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } colValue = nextVal; } if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT) { if (isNULL && colType.defaultValue.isNull()) // error out { Message::Args args; args.add(tableColName.column); err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); rc = 1; return rc; } else if (isNULL && !(colType.defaultValue.isNull())) { memcpy(&colValue, colType.defaultValue.str(), colType.defaultValue.length()); isNULL = false; } } //@Bug 1806 if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) { return rc; } if (pushWarning && (rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)) rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; colValuesList.push_back(colValue); //@Bug 2515. Only pass string values to write engine dicStrings.push_back(valStr); } dicStringList.push_back(dicStrings); } if (pushWarning) { colNames.push_back(tableColName.column); isWarningSet = true; } } } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } } // call the write engine to write the rows int error = NO_ERROR; // fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3); // cout << "Batch inserting a row with transaction id " << txnid.id << endl; if (colValuesList.size() > 0) { if (NO_ERROR != (error = fWEWrapper.insertColumnRecsBinary( txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList, dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm))) { if (error == ERR_BRM_DEAD_LOCK) { rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR; WErrorCodes ec; err = ec.errorString(error); } else if (error == ERR_BRM_VB_OVERFLOW) { rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR; err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW); } else { rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; WErrorCodes ec; err = ec.errorString(error); } } } if (fIsFirstBatchPm && isAutocommitOn) { // fWEWrapper.writeVBEnd(txnid.id, rangeList); fIsFirstBatchPm = false; } else if (fIsFirstBatchPm) { fIsFirstBatchPm = false; } if (isWarningSet && (rc == NO_ERROR)) { rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; // cout << "Got warning" << endl; Message::Args args; string cols = "'" + colNames[0] + "'"; for (unsigned i = 1; i < colNames.size(); i++) { cols = cols + ", " + "'" + colNames[i] + "'"; } args.add(cols); err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args); } // MCOL-1495 Remove fCatalogMap entries CS won't use anymore. CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId); CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000); return rc; } uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err) { uint8_t rc = 0; // need to commit the versioned blocks, set hwm, update casual partition, send back to DMLProc to set them // cout << " in commiting autocommit on batch insert " << endl; uint32_t tmp32, tableOid, sessionId; int txnID; bs >> tmp32; txnID = tmp32; bs >> tmp32; tableOid = tmp32; bs >> tmp32; sessionId = tmp32; BRM::DBRM dbrm; std::vector setHWMArgs; TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap(); ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin(); ColExtsInfo::iterator aIt; BulkSetHWMArg aArg; std::vector files; BRM::FileInfo aFile; while (it != colsExtsInfoMap.end()) { aIt = (it->second).begin(); aArg.oid = it->first; aFile.oid = it->first; // cout << "OID:" << aArg.oid; while (aIt != (it->second).end()) { aArg.partNum = aIt->partNum; aArg.segNum = aIt->segNum; aArg.hwm = aIt->hwm; if (!aIt->isDict) setHWMArgs.push_back(aArg); aFile.partitionNum = aIt->partNum; aFile.dbRoot = aIt->dbRoot; aFile.segmentNum = aIt->segNum; aFile.compType = aIt->compType; // cout <<"Added to files oid:dbroot:part:seg:compType = " << // aFile.oid<<":"< 0)) cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID()); TableMetaData::removeTableMetaData(tableOid); // MCOL-1160 For API bulk insert flush the PrimProc cached dictionary // blocks tounched std::tr1::unordered_map::iterator mapIter; mapIter = fWEWrapper.getDictMap().find(txnID); if (mapIter != fWEWrapper.getDictMap().end()) { std::set::iterator lbidIter; std::vector dictFlushBlks; cerr << "API Flushing blocks: "; for (lbidIter = (*mapIter).second.begin(); lbidIter != (*mapIter).second.end(); lbidIter++) { cerr << *lbidIter << ", "; dictFlushBlks.push_back((*lbidIter)); } cerr << endl; cacheutils::flushPrimProcAllverBlocks(dictFlushBlks); fWEWrapper.getDictMap().erase(txnID); } // MCOL-1495 Remove fCatalogMap entries CS won't use anymore. CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId); CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000); return rc; } uint8_t WE_DMLCommandProc::processBatchInsertHwm(messageqcpp::ByteStream& bs, std::string& err) { uint8_t tmp8, rc = 0; err.clear(); // set hwm for autocommit off uint32_t tmp32, tableOid; int txnID; bool isAutoCommitOn; bs >> tmp32; txnID = tmp32; bs >> tmp8; isAutoCommitOn = (tmp8 != 0); bs >> tmp32; tableOid = tmp32; bs >> tmp8; // cout << "processBatchInsertHwm: tranid:isAutoCommitOn = " < files; std::vector oidsToFlush; BRM::FileInfo aFile; // BRM::FileInfo curFile; TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap(); ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin(); ColExtsInfo::iterator aIt; CalpontSystemCatalog::RIDList ridList; CalpontSystemCatalog::ROPair roPair; //@Bug 5996. Validate hwm before set them boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(0); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::OID tableAUXColOid; try { CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid); ridList = systemCatalogPtr->columnRIDs(tableName); tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName); } catch (exception& ex) { err = ex.what(); rc = 1; TableMetaData::removeTableMetaData(tableOid); fIsFirstBatchPm = true; // cout << "flush files when autocommit off" << endl; fWEWrapper.setIsInsert(true); fWEWrapper.setBulkFlag(true); return rc; } // MCOL-5021 Valid AUX column OID for a table is > 3000 // Tables that were created before this feature was added will have // tableAUXColOid = 0 bool hasAUXCol = (tableAUXColOid > 3000); if (hasAUXCol) { CalpontSystemCatalog::ROPair auxRoPair; auxRoPair.rid = ridList.back().rid + 1; auxRoPair.objnum = tableAUXColOid; ridList.push_back(auxRoPair); } std::vector colDBRootExtentInfo; DBRootExtentInfo aExtentInfo; std::vector auxColDBRootExtentInfo; while (it != colsExtsInfoMap.end()) { aIt = (it->second).begin(); aFile.oid = it->first; oidsToFlush.push_back(aFile.oid); roPair.objnum = aFile.oid; aExtentInfo.fPartition = 0; aExtentInfo.fDbRoot = 0; aExtentInfo.fSegment = 0; aExtentInfo.fLocalHwm = 0; bool isDict = false; while (aIt != (it->second).end()) { aFile.partitionNum = aIt->partNum; aFile.dbRoot = aIt->dbRoot; aFile.segmentNum = aIt->segNum; aFile.compType = aIt->compType; files.push_back(aFile); if (!aIt->isDict) { if ((aIt->partNum > aExtentInfo.fPartition) || ((aIt->partNum == aExtentInfo.fPartition) && (aIt->segNum > aExtentInfo.fSegment)) || ((aIt->partNum == aExtentInfo.fPartition) && (aIt->segNum == aExtentInfo.fSegment) && (aIt->segNum > aExtentInfo.fLocalHwm))) { aExtentInfo.fPartition = aIt->partNum; aExtentInfo.fDbRoot = aIt->dbRoot; aExtentInfo.fSegment = aIt->segNum; aExtentInfo.fLocalHwm = aIt->hwm; } } else { isDict = true; } aIt++; } if (!isDict) { if (hasAUXCol && (((uint32_t)tableAUXColOid) == it->first)) { auxColDBRootExtentInfo.push_back(aExtentInfo); } else { colDBRootExtentInfo.push_back(aExtentInfo); } } it++; } if (hasAUXCol) { colDBRootExtentInfo.insert(colDBRootExtentInfo.end(), auxColDBRootExtentInfo.begin(), auxColDBRootExtentInfo.end()); } rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Ending", hasAUXCol); if (rc != 0) { WErrorCodes ec; err = ec.errorString(rc); err += " Check err.log for detailed information."; TableMetaData::removeTableMetaData(tableOid); fIsFirstBatchPm = true; fWEWrapper.setIsInsert(true); fWEWrapper.setBulkFlag(true); rc = 1; return rc; } try { if (isAutoCommitOn) { bs.restart(); if (fWEWrapper.getIsInsert()) { // @bug5333, up to here, rc == 0, but flushchunk may fail. rc = processBatchInsertHwmFlushChunks(tableOid, txnID, files, oidsToFlush, err); } if (tmp8 != 0) TableMetaData::removeTableMetaData(tableOid); return rc; // will set hwm with version commit. } } catch (exception& ex) { err = ex.what(); rc = 1; return rc; } // Handle case where isAutoCommitOn is false BRM::DBRM dbrm; // cout << " In processBatchInsertHwm setting hwm" << endl; std::vector setHWMArgs; it = colsExtsInfoMap.begin(); BulkSetHWMArg aArg; while (it != colsExtsInfoMap.end()) { aIt = (it->second).begin(); aArg.oid = it->first; // cout << "for oid " << aArg.oid << endl; while (aIt != (it->second).end()) { aArg.partNum = aIt->partNum; aArg.segNum = aIt->segNum; aArg.hwm = aIt->hwm; //@Bug 6029 dictionary store files already set hwm. if (!aIt->isDict) setHWMArgs.push_back(aArg); aIt++; } it++; } TableMetaData::removeTableMetaData(tableOid); fIsFirstBatchPm = true; // cout << "flush files when autocommit off" << endl; fWEWrapper.setIsInsert(true); fWEWrapper.setBulkFlag(true); rc = processBatchInsertHwmFlushChunks(tableOid, txnID, files, oidsToFlush, err); bs.restart(); try { serializeInlineVector(bs, setHWMArgs); } catch (exception& ex) { // Append to errmsg in case we already have an error if (err.length() > 0) err += "; "; err += ex.what(); rc = 1; return rc; } // cout << "flush is called for transaction " << txnID << endl; return rc; } //------------------------------------------------------------------------------ // Flush chunks for the specified table and transaction. // Also confirms changes to DB files (for hdfs). // files vector represents list of files to be purged from PrimProc cache. // oid2ToFlush represents list of oids to be flushed from PrimProc cache. // Afterwords, the following attributes are reset as follows: // fWEWrapper.setIsInsert(false); // fWEWrapper.setBulkFlag(false); // fIsFirstBatchPm = true; // returns 0 for success; returns 1 if error occurs //------------------------------------------------------------------------------ uint8_t WE_DMLCommandProc::processBatchInsertHwmFlushChunks(uint32_t tblOid, int txnID, const std::vector& files, const std::vector& oidsToFlush, std::string& err) { uint8_t rc = 0; std::map oids; CalpontSystemCatalog::TableName aTableName; CalpontSystemCatalog::RIDList ridList; CalpontSystemCatalog::DictOIDList dictOids; CalpontSystemCatalog::OID tableAUXColOid; try { boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(txnID); aTableName = systemCatalogPtr->tableName(tblOid); ridList = systemCatalogPtr->columnRIDs(aTableName, true); dictOids = systemCatalogPtr->dictOIDs(aTableName); tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(aTableName); } catch (exception& ex) { std::ostringstream ossErr; ossErr << "System Catalog error for table OID " << tblOid; // Include tbl name in msg unless exception occurred before we got it if (aTableName.table.length() > 0) ossErr << '(' << aTableName << ')'; ossErr << "; " << ex.what(); err = ossErr.str(); rc = 1; return rc; } // MCOL-5021 Valid AUX column OID for a table is > 3000 // Tables that were created before this feature was added will have // tableAUXColOid = 0 if (tableAUXColOid > 3000) { CalpontSystemCatalog::ROPair auxRoPair; auxRoPair.rid = ridList.back().rid + 1; auxRoPair.objnum = tableAUXColOid; ridList.push_back(auxRoPair); } for (unsigned i = 0; i < ridList.size(); i++) { oids[ridList[i].objnum] = ridList[i].objnum; } for (unsigned i = 0; i < dictOids.size(); i++) { oids[dictOids[i].dictOID] = dictOids[i].dictOID; } fWEWrapper.setTransId(txnID); // @bug5333, up to here, rc == 0, but flushchunk may fail. rc = fWEWrapper.flushChunks(0, oids); if (rc == NO_ERROR) { // Confirm changes to db files "only" if no error up to this point if (idbdatafile::IDBPolicy::useHdfs()) { std::string eMsg; ConfirmHdfsDbFile confirmHdfs; int confirmDbRc = confirmHdfs.confirmDbFileListFromMetaFile(tblOid, eMsg); if (confirmDbRc == NO_ERROR) { int endDbRc = confirmHdfs.endDbFileListFromMetaFile(tblOid, true, eMsg); if (endDbRc != NO_ERROR) { // Might want to log this error, but don't think we // need to report as fatal, as all changes were confirmed. } if (files.size() > 0) cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID()); cacheutils::flushOIDsFromCache(oidsToFlush); } else { ostringstream ossErr; ossErr << "Error confirming changes to table " << aTableName << "; " << eMsg; err = ossErr.str(); rc = 1; // reset to 1 } } } else // flushChunks error { WErrorCodes ec; std::ostringstream ossErr; ossErr << "Error flushing chunks for table " << aTableName << "; " << ec.errorString(rc); err = ossErr.str(); rc = 1; // reset to 1 } fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fIsFirstBatchPm = true; return rc; } uint8_t WE_DMLCommandProc::commitBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err) { uint8_t rc = 0; // commit all versioned blocks, set hwm, update casual partition return rc; } uint8_t WE_DMLCommandProc::rollbackBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err) { uint8_t rc = 0; uint32_t tmp32, tableOid, sessionID; uint64_t lockID; bs >> sessionID; bs >> lockID; bs >> tmp32; tableOid = tmp32; // Bulkrollback boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); CalpontSystemCatalog::TableName aTableName; try { aTableName = systemCatalogPtr->tableName(tableOid); } catch (...) { err = std::string("No such table for oid ") + std::to_string(tableOid); rc = 1; return rc; } string table = aTableName.schema + "." + aTableName.table; string applName("BatchInsert"); rc = fWEWrapper.bulkRollback(tableOid, lockID, table, applName, false, err); fIsFirstBatchPm = true; TableMetaData::removeTableMetaData(tableOid); return rc; } uint8_t WE_DMLCommandProc::rollbackBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err) { uint8_t rc = 0; // Rollbacked all versioned blocks return rc; } uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId, uint64_t& blocksChanged) { uint8_t rc = 0; uint32_t tmp32, sessionID; TxnID txnId; bs >> PMId; bs >> tmp32; txnId = tmp32; fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fWEWrapper.setTransId(txnId); if (!rowGroups[txnId]) // meta data { rowGroups[txnId] = new rowgroup::RowGroup(); rowGroups[txnId]->deserialize(bs); uint8_t pkgType; bs >> pkgType; cpackages[txnId].read(bs); rc = fWEWrapper.startTransaction(txnId); if (rc != NO_ERROR) { WErrorCodes ec; err = ec.errorString(rc); } return rc; } bool pushWarning = false; rowgroup::RGData rgData; rgData.deserialize(bs); rowGroups[txnId]->setData(&rgData); // rowGroups[txnId]->setData(const_cast(bs.buf())); // get rows and values rowgroup::Row row; rowGroups[txnId]->initRow(&row); utils::NullString value; uint32_t rowsThisRowgroup = rowGroups[txnId]->getRowCount(); uint32_t columnsSelected = rowGroups[txnId]->getColumnCount(); std::vector fetchColTypes = rowGroups[txnId]->getColTypes(); std::vector fetchColScales = rowGroups[txnId]->getScale(); std::vector fetchColColwidths; for (uint32_t i = 0; i < columnsSelected; i++) { fetchColColwidths.push_back(rowGroups[txnId]->getColumnWidth(i)); } WriteEngine::ColTupleList aColList; WriteEngine::ColTuple colTuple; WriteEngine::ColStructList colStructList; WriteEngine::ColStruct colStruct; WriteEngine::ColValueList colValueList; WriteEngine::RIDList rowIDLists; WriteEngine::CSCTypesList cscColTypeList; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryStruct dctnryStruct; WriteEngine::DctnryValueList dctnryValueList; CalpontSystemCatalog::TableName tableName; CalpontSystemCatalog::TableColName tableColName; DMLTable* tablePtr = cpackages[txnId].get_Table(); RowList rows = tablePtr->get_RowList(); dmlpackage::ColumnList columnsUpdated = rows[0]->get_ColumnList(); tableColName.table = tableName.table = tablePtr->get_TableName(); tableColName.schema = tableName.schema = tablePtr->get_SchemaName(); tableColName.column = columnsUpdated[0]->get_Name(); sessionID = cpackages[txnId].get_SessionID(); boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); CalpontSystemCatalog::OID oid = 0; CalpontSystemCatalog::ROPair tableRO; long timeZone = cpackages[txnId].get_TimeZone(); try { tableRO = systemCatalogPtr->tableRID(tableName); oid = systemCatalogPtr->lookupOID(tableColName); } catch (std::exception& ex) { rc = 1; ostringstream oss; oss << "lookupOID got exception " << ex.what() << " with column " << tableColName.schema << "." + tableColName.table << "." << tableColName.column; err = oss.str(); } catch (...) { rc = 1; ostringstream oss; oss << "lookupOID got unknown exception with column " << tableColName.schema << "." << tableColName.table << "." << tableColName.column; err = oss.str(); } if (rc != 0) return rc; rowGroups[txnId]->getRow(0, &row); CalpontSystemCatalog::RID rid = row.getRid(); uint16_t dbRoot, segment, blockNum; uint32_t partition; uint8_t extentNum; // Get the file information from rowgroup dbRoot = rowGroups[txnId]->getDBRoot(); rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum); colStruct.fColPartition = partition; colStruct.fColSegment = segment; colStruct.fColDbRoot = dbRoot; dctnryStruct.fColPartition = partition; dctnryStruct.fColSegment = segment; dctnryStruct.fColDbRoot = dbRoot; TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableRO.objnum); // Build to be updated column structure and values int error = 0; unsigned fetchColPos = 0; bool ridsFetched = false; bool isNull = false; boost::any datavalue; int64_t intColVal = 0; // timer.start("fetch values"); std::vector colNames; // for query stats boost::scoped_array colTypes( new CalpontSystemCatalog::ColType[columnsUpdated.size()]); boost::scoped_array preBlkNums(new int[columnsUpdated.size()]); boost::scoped_array oids(new OID[columnsUpdated.size()]); BRMWrapper::setUseVb(true); for (unsigned int j = 0; j < columnsUpdated.size(); j++) { // timer.start("lookupsyscat"); tableColName.column = columnsUpdated[j]->get_Name(); try { oids[j] = systemCatalogPtr->lookupOID(tableColName); colTypes[j] = systemCatalogPtr->colType(oids[j]); } catch (std::exception& ex) { rc = 1; ostringstream oss; oss << "colType got exception " << ex.what() << " with column oid " << oid; err = oss.str(); } catch (...) { rc = 1; ostringstream oss; oss << "colType got unknown exception with column oid " << oid; err = oss.str(); } if (rc != 0) return rc; preBlkNums[j] = -1; } for (unsigned int j = 0; j < columnsUpdated.size(); j++) { WriteEngine::ColTupleList colTupleList; CalpontSystemCatalog::ColType colType = colTypes[j]; oid = oids[j]; colStruct.dataOid = oid; colStruct.colDataType = colType.colDataType; colStruct.tokenFlag = false; colStruct.fCompressionType = colType.compressionType; tableColName.column = columnsUpdated[j]->get_Name(); if (!ridsFetched) { // querystats uint64_t relativeRID = 0; for (unsigned i = 0; i < rowsThisRowgroup; i++) { rowGroups[txnId]->getRow(i, &row); rid = row.getRid(); relativeRID = rid - rowGroups[txnId]->getBaseRid(); rid = relativeRID; convertToRelativeRid(rid, extentNum, blockNum); rowIDLists.push_back(rid); uint32_t colWidth = colTypes[j].colWidth; if (colWidth > 8 && !(colTypes[j].colDataType == CalpontSystemCatalog::DECIMAL || colTypes[j].colDataType == CalpontSystemCatalog::UDECIMAL)) { colWidth = 8; } else if (colWidth >= datatypes::MAXDECIMALWIDTH) { colWidth = datatypes::MAXDECIMALWIDTH; } int rrid = (int)relativeRID / (BYTE_PER_BLOCK / colWidth); // populate stats.blocksChanged if (rrid > preBlkNums[j]) { preBlkNums[j] = rrid; blocksChanged++; } } ridsFetched = true; } bool pushWarn = false; bool nameNeeded = false; if (isDictCol(colType)) { colStruct.colWidth = 8; colStruct.tokenFlag = true; dctnryStruct.dctnryOid = colType.ddn.dictOID; dctnryStruct.columnOid = colStruct.dataOid; dctnryStruct.fCompressionType = colType.compressionType; dctnryStruct.fCharsetNumber = colType.charsetNumber; dctnryStruct.colWidth = colType.colWidth; if (NO_ERROR != (error = fWEWrapper.openDctnry(txnId, dctnryStruct, false))) // @bug 5572 HDFS tmp file { WErrorCodes ec; err = ec.errorString(error); rc = error; return rc; } ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStruct.dctnryOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { if ((it->dbRoot == dctnryStruct.fColDbRoot) && (it->partNum == dctnryStruct.fColPartition) && (it->segNum == dctnryStruct.fColSegment)) break; it++; } if (it == aColExtsInfo.end()) // add this one to the list { ColExtInfo aExt; aExt.dbRoot = dctnryStruct.fColDbRoot; aExt.partNum = dctnryStruct.fColPartition; aExt.segNum = dctnryStruct.fColSegment; aExt.compType = dctnryStruct.fCompressionType; aExt.isDict = true; aColExtsInfo.push_back(aExt); } aTableMetaData->setColExtsInfo(dctnryStruct.dctnryOid, aColExtsInfo); if (columnsUpdated[j]->get_isFromCol()) { for (unsigned i = 0; i < rowsThisRowgroup; i++) { value.dropString(); rowGroups[txnId]->getRow(i, &row); if (row.isNullValue(fetchColPos)) { if ((colType.defaultValue.length() <= 0) && (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)) { rc = 1; Message::Args args; args.add(tableColName.column); err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); return rc; } else if (colType.defaultValue.length() > 0) { value = colType.defaultValue; if (value.length() > (unsigned int)colType.colWidth) { value.assign(value.safeString("").substr(0, colType.colWidth)); pushWarn = true; if (!pushWarning) { pushWarning = true; } if (pushWarn) nameNeeded = true; } WriteEngine::DctnryTuple dctTuple; dctTuple.sigValue = (unsigned char*)value.str(); dctTuple.sigSize = value.length(); dctTuple.isNull = false; error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType); if (error != NO_ERROR) { fWEWrapper.closeDctnry(txnId, colType.compressionType); return false; } colTuple.data = dctTuple.token; colTupleList.push_back(colTuple); } else { WriteEngine::Token nullToken; colTuple.data = nullToken; colTupleList.push_back(colTuple); } continue; } switch (fetchColTypes[fetchColPos]) { case CalpontSystemCatalog::DATE: { intColVal = row.getUintField<4>(fetchColPos); value.assign(DataConvert::dateToString(intColVal)); break; } case CalpontSystemCatalog::DATETIME: { intColVal = row.getUintField<8>(fetchColPos); value.assign(DataConvert::datetimeToString(intColVal, colType.precision)); break; } case CalpontSystemCatalog::TIMESTAMP: { intColVal = row.getUintField<8>(fetchColPos); value.assign(DataConvert::timestampToString(intColVal, timeZone, colType.precision)); break; } case CalpontSystemCatalog::TIME: { intColVal = row.getIntField<8>(fetchColPos); value.assign(DataConvert::timeToString(intColVal, colType.precision)); break; } case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: { value = row.getStringField(fetchColPos); if (!value.isNull()) { unsigned i = strlen(value.str()); value.assign(value.unsafeStringRef().substr(0, i)); } break; } case CalpontSystemCatalog::VARBINARY: case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::TEXT: { value.assign(row.getVarBinaryField(fetchColPos), row.getVarBinaryLength(fetchColPos)); break; } case CalpontSystemCatalog::DECIMAL: case CalpontSystemCatalog::UDECIMAL: { if (fetchColColwidths[fetchColPos] == datatypes::MAXDECIMALWIDTH) { datatypes::Decimal dec(row.getTSInt128Field(fetchColPos), fetchColScales[fetchColPos], rowGroups[txnId]->getPrecision()[fetchColPos]); value.assign(dec.toString(true)); break; } } /* fall through */ case CalpontSystemCatalog::BIGINT: case CalpontSystemCatalog::UBIGINT: case CalpontSystemCatalog::INT: case CalpontSystemCatalog::UINT: case CalpontSystemCatalog::MEDINT: case CalpontSystemCatalog::UMEDINT: case CalpontSystemCatalog::SMALLINT: case CalpontSystemCatalog::USMALLINT: case CalpontSystemCatalog::TINYINT: case CalpontSystemCatalog::UTINYINT: { { intColVal = row.getIntField(fetchColPos); if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDECIMAL && intColVal < 0) { intColVal = 0; } if (fetchColScales[fetchColPos] <= 0) { ostringstream os; if (isUnsigned(fetchColTypes[fetchColPos])) os << static_cast(intColVal); else os << intColVal; value.assign(os.str()); } else { datatypes::Decimal dec(intColVal, fetchColScales[fetchColPos], rowGroups[txnId]->getPrecision()[fetchColPos]); value.assign(dec.toString()); } } break; } // In this case, we're trying to load a double output column with float data. This is the // case when you do sum(floatcol), e.g. case CalpontSystemCatalog::FLOAT: case CalpontSystemCatalog::UFLOAT: { float dl = row.getFloatField(fetchColPos); if (dl == std::numeric_limits::infinity()) continue; if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UFLOAT && dl < 0.0) { dl = 0.0; } ostringstream os; //@Bug 3350 fix the precision. os << setprecision(7) << dl; value.assign(os.str()); break; } case CalpontSystemCatalog::DOUBLE: case CalpontSystemCatalog::UDOUBLE: { double dl = row.getDoubleField(fetchColPos); if (dl == std::numeric_limits::infinity()) continue; if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDOUBLE && dl < 0.0) { dl = 0.0; } ostringstream os; //@Bug 3350 fix the precision. os << setprecision(16) << dl; value.assign(os.str()); break; } case CalpontSystemCatalog::LONGDOUBLE: { long double dll = row.getLongDoubleField(fetchColPos); if (dll == std::numeric_limits::infinity()) continue; ostringstream os; //@Bug 3350 fix the precision. os << setprecision(19) << dll; value.assign(os.str()); break; } default: // treat as int64 { ostringstream os; intColVal = row.getUintField<8>(fetchColPos); os << intColVal; value.assign(os.str()); break; } } uint32_t funcScale = columnsUpdated[j]->get_funcScale(); if (funcScale != 0) { string str = value.safeString(""); string::size_type pos = str.find_first_of("."); // decimal point if (pos >= str.length()) str.insert(str.length(), "."); // padding 0 if needed pos = str.find_first_of("."); uint32_t digitsAfterPoint = str.length() - pos - 1; if (digitsAfterPoint < funcScale) { for (uint32_t i = 0; i < (funcScale - digitsAfterPoint); i++) str += "0"; } value.assign(str); } // check data length // trim the string if needed if (value.length() > (unsigned int)colType.colWidth) { value.assign(value.unsafeStringRef().substr(0, colType.colWidth)); if (!pushWarn) pushWarn = true; if (!pushWarning) pushWarning = true; if (pushWarn) nameNeeded = true; } WriteEngine::DctnryTuple dctTuple; dctTuple.sigValue = (unsigned char*)value.str(); dctTuple.sigSize = value.length(); dctTuple.isNull = false; error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType); if (error != NO_ERROR) { fWEWrapper.closeDctnry(txnId, colType.compressionType); rc = error; WErrorCodes ec; err = ec.errorString(error); return rc; } colTuple.data = dctTuple.token; colTupleList.push_back(colTuple); } if (colType.compressionType == 0) fWEWrapper.closeDctnry(txnId, colType.compressionType, true); else fWEWrapper.closeDctnry(txnId, colType.compressionType, false); fetchColPos++; } else // constant { if (columnsUpdated[j]->get_isnull()) { if ((colType.defaultValue.length() <= 0) && (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)) { rc = 1; Message::Args args; args.add(tableColName.column); err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); return rc; } else if (colType.defaultValue.length() > 0) { value = colType.defaultValue; if (value.length() > (unsigned int)colType.colWidth) { value.assign(value.safeString("").substr(0, colType.colWidth)); pushWarn = true; if (!pushWarning) { pushWarning = true; } if (pushWarn) nameNeeded = true; } WriteEngine::DctnryTuple dctTuple; dctTuple.sigValue = (unsigned char*)value.str(); dctTuple.sigSize = value.length(); dctTuple.isNull = false; error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType); if (error != NO_ERROR) { fWEWrapper.closeDctnry(txnId, colType.compressionType); rc = error; WErrorCodes ec; err = ec.errorString(error); return rc; } colTuple.data = dctTuple.token; if (colType.compressionType == 0) fWEWrapper.closeDctnry(txnId, colType.compressionType, true); else fWEWrapper.closeDctnry(txnId, colType.compressionType, false); // Constant only need to tokenize once. } else { WriteEngine::Token nullToken; colTuple.data = nullToken; } } else { idbassert(!columnsUpdated[j]->get_DataVector()[0].isNull()); value = columnsUpdated[j]->get_DataVector()[0]; if (value.length() > (unsigned int)colType.colWidth) { value.assign(value.unsafeStringRef().substr(0, colType.colWidth)); pushWarn = true; if (!pushWarning) { pushWarning = true; } if (pushWarn) nameNeeded = true; } WriteEngine::DctnryTuple dctTuple; dctTuple.sigValue = (unsigned char*)value.str(); dctTuple.sigSize = value.length(); dctTuple.isNull = false; error = fWEWrapper.tokenize(txnId, dctTuple, colType.compressionType); if (error != NO_ERROR) { fWEWrapper.closeDctnry(txnId, colType.compressionType); rc = error; WErrorCodes ec; err = ec.errorString(error); return rc; } colTuple.data = dctTuple.token; if (colType.compressionType == 0) fWEWrapper.closeDctnry(txnId, colType.compressionType, true); else fWEWrapper.closeDctnry(txnId, colType.compressionType, false); // Constant only need to tokenize once. } for (unsigned row = 0; row < rowsThisRowgroup; row++) colTupleList.push_back(colTuple); } } else // Non dictionary column { colStruct.colWidth = colType.colWidth; if (columnsUpdated[j]->get_isFromCol()) { for (unsigned i = 0; i < rowsThisRowgroup; i++) { rowGroups[txnId]->getRow(i, &row); if (row.isNullValue(fetchColPos)) { isNull = true; value.dropString(); } else { isNull = false; switch (fetchColTypes[fetchColPos]) { case CalpontSystemCatalog::DATE: { intColVal = row.getUintField<4>(fetchColPos); value.assign(DataConvert::dateToString(intColVal)); break; } case CalpontSystemCatalog::DATETIME: { intColVal = row.getUintField<8>(fetchColPos); value.assign(DataConvert::datetimeToString(intColVal, colType.precision)); break; } case CalpontSystemCatalog::TIMESTAMP: { intColVal = row.getUintField<8>(fetchColPos); value.assign(DataConvert::timestampToString(intColVal, timeZone, colType.precision)); break; } case CalpontSystemCatalog::TIME: { intColVal = row.getIntField<8>(fetchColPos); value.assign(DataConvert::timeToString(intColVal, colType.precision)); break; } case CalpontSystemCatalog::CHAR: case CalpontSystemCatalog::VARCHAR: { value = row.getStringField(fetchColPos); if (!value.isNull()) { unsigned i = strlen(value.str()); value.assign(value.safeString().substr(0, i)); // XXX: why??? } break; } case CalpontSystemCatalog::VARBINARY: case CalpontSystemCatalog::BLOB: case CalpontSystemCatalog::TEXT: { value.assign(row.getVarBinaryField(fetchColPos), row.getVarBinaryLength(fetchColPos)); break; } case CalpontSystemCatalog::DECIMAL: case CalpontSystemCatalog::UDECIMAL: { if (fetchColColwidths[fetchColPos] == datatypes::MAXDECIMALWIDTH) { datatypes::Decimal dec(row.getTSInt128Field(fetchColPos), fetchColScales[fetchColPos], rowGroups[txnId]->getPrecision()[fetchColPos]); value.assign(dec.toString(true)); break; } } /* fall through */ case CalpontSystemCatalog::BIGINT: case CalpontSystemCatalog::UBIGINT: case CalpontSystemCatalog::INT: case CalpontSystemCatalog::UINT: case CalpontSystemCatalog::MEDINT: case CalpontSystemCatalog::UMEDINT: case CalpontSystemCatalog::SMALLINT: case CalpontSystemCatalog::USMALLINT: case CalpontSystemCatalog::TINYINT: case CalpontSystemCatalog::UTINYINT: { { intColVal = row.getIntField(fetchColPos); if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDECIMAL && intColVal < 0) { intColVal = 0; } if (fetchColScales[fetchColPos] <= 0) { ostringstream os; if (isUnsigned(fetchColTypes[fetchColPos])) os << static_cast(intColVal); else os << intColVal; value.assign(os.str()); } else { datatypes::Decimal dec(intColVal, fetchColScales[fetchColPos], rowGroups[txnId]->getPrecision()[fetchColPos]); value.assign(dec.toString()); } } break; } // In this case, we're trying to load a double output column with float data. This is the // case when you do sum(floatcol), e.g. case CalpontSystemCatalog::FLOAT: case CalpontSystemCatalog::UFLOAT: { float dl = row.getFloatField(fetchColPos); if (dl == std::numeric_limits::infinity()) continue; if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UFLOAT && dl < 0.0) { dl = 0.0; } ostringstream os; //@Bug 3350 fix the precision. os << setprecision(7) << dl; value.assign(os.str()); break; } case CalpontSystemCatalog::DOUBLE: case CalpontSystemCatalog::UDOUBLE: { double dl = row.getDoubleField(fetchColPos); if (dl == std::numeric_limits::infinity()) continue; if (fetchColTypes[fetchColPos] == CalpontSystemCatalog::UDOUBLE && dl < 0.0) { dl = 0.0; } ostringstream os; //@Bug 3350 fix the precision. os << setprecision(16) << dl; value.assign(os.str()); break; } case CalpontSystemCatalog::LONGDOUBLE: { long double dll = row.getLongDoubleField(fetchColPos); if (dll == std::numeric_limits::infinity()) continue; ostringstream os; //@Bug 3350 fix the precision. os << setprecision(19) << dll; value.assign(os.str()); break; } default: // treat as int64 { ostringstream os; intColVal = row.getUintField<8>(fetchColPos); os << intColVal; value.assign(os.str()); break; } } } uint32_t funcScale = columnsUpdated[j]->get_funcScale(); if (funcScale != 0) { string str = value.safeString(""); string::size_type pos = str.find_first_of("."); // decimal point if (pos >= str.length()) str.insert(str.length(), "."); // padding 0 if needed pos = str.find_first_of("."); uint32_t digitsAfterPoint = str.length() - pos - 1; if (digitsAfterPoint < funcScale) { for (uint32_t i = 0; i < (funcScale - digitsAfterPoint); i++) str += "0"; } value.assign(str); } // Check NOT NULL constraint and default value if ((isNull) && (colType.defaultValue.isNull()) && (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)) { rc = 1; Message::Args args; args.add(tableColName.column); err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); return rc; } else if ((isNull) && (!colType.defaultValue.isNull())) { isNull = false; bool oneWarn = false; try { datavalue = colType.convertColumnData(colType.defaultValue.safeString(""), pushWarn, timeZone, isNull, false, false); } catch (exception&) { //@Bug 2624. Error out on conversion failure rc = 1; Message::Args args; args.add(string("'") + colType.defaultValue.safeString() + string("'")); err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args); } if ((pushWarn) && (!oneWarn)) oneWarn = true; colTuple.data = datavalue; colTupleList.push_back(colTuple); if (oneWarn) pushWarn = true; if (!pushWarning) { pushWarning = pushWarn; } if (pushWarn) nameNeeded = true; } else { try { datavalue = colType.convertColumnData(value.safeString(""), pushWarn, timeZone, isNull, false, false); } catch (exception&) { //@Bug 2624. Error out on conversion failure rc = 1; Message::Args args; args.add(string("'") + value.safeString() + string("'")); err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args); return rc; } colTuple.data = datavalue; colTupleList.push_back(colTuple); if (!pushWarning) { pushWarning = pushWarn; } if (pushWarn) nameNeeded = true; } } fetchColPos++; } else // constant column { if (columnsUpdated[j]->get_isnull()) { isNull = true; } else { isNull = false; } NullString inData; if (!isNull) { inData = columnsUpdated[j]->get_DataVector()[0]; } uint64_t nextVal = 0; if (colType.autoincrement) { try { nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType); } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } } if (colType.autoincrement && (isNull || (inData.safeString("").compare("0") == 0))) { // reserve nextVal try { bool reserved = fDbrm.getAIRange(oid, rowsThisRowgroup, &nextVal); if (!reserved) { err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT); rc = 1; return rc; } } catch (std::exception& ex) { err = ex.what(); rc = 1; return rc; } isNull = false; bool oneWarn = false; for (unsigned row = 0; row < rowsThisRowgroup; row++) { ostringstream oss; oss << nextVal++; inData.assign(oss.str()); try { datavalue = colType.convertColumnData(inData, pushWarn, timeZone, false, false); } catch (exception&) { //@Bug 2624. Error out on conversion failure rc = 1; Message::Args args; args.add(string("'") + inData.safeString() + string("'")); err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args); } if ((pushWarn) && (!oneWarn)) oneWarn = true; colTuple.data = datavalue; colTupleList.push_back(colTuple); } if (oneWarn) pushWarn = true; if (!pushWarning) { pushWarning = pushWarn; } if (pushWarn) nameNeeded = true; } else if (isNull && (colType.defaultValue.length() <= 0) && (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT)) { rc = 1; Message::Args args; args.add(tableColName.column); err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); return rc; } else if (isNull && (colType.defaultValue.length() > 0)) { isNull = false; bool oneWarn = false; for (unsigned row = 0; row < rowsThisRowgroup; row++) { try { datavalue = colType.convertColumnData(colType.defaultValue.safeString(), pushWarn, timeZone, isNull, false, false); } catch (exception&) { //@Bug 2624. Error out on conversion failure rc = 1; Message::Args args; args.add(string("'") + colType.defaultValue.safeString() + string("'")); err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args); } if ((pushWarn) && (!oneWarn)) oneWarn = true; colTuple.data = datavalue; colTupleList.push_back(colTuple); } if (oneWarn) pushWarn = true; if (!pushWarning) { pushWarning = pushWarn; } if (pushWarn) nameNeeded = true; } else { try { datavalue = colType.convertColumnData(inData, pushWarn, timeZone, false, true); } catch (exception& ex) { //@Bug 2624. Error out on conversion failure rc = 1; cout << ex.what() << endl; Message::Args args; args.add(string("'") + inData.safeString() + string("'")); err = IDBErrorInfo::instance()->errorMsg(ERR_NON_NUMERIC_DATA, args); return rc; } colTuple.data = datavalue; if (!pushWarning) { pushWarning = pushWarn; } if (pushWarn) nameNeeded = true; for (unsigned row = 0; row < rowsThisRowgroup; row++) colTupleList.push_back(colTuple); } } } if (nameNeeded) { colNames.push_back(tableColName.column); } colStructList.push_back(colStruct); colValueList.push_back(colTupleList); cscColTypeList.push_back(colType); } // end of bulding values and column structure. // timer.stop("fetch values"); if (rowIDLists.size() > 0) { error = fWEWrapper.updateColumnRecs(txnId, cscColTypeList, colStructList, colValueList, rowIDLists, tableRO.objnum); } if (error != NO_ERROR) { rc = error; WErrorCodes ec; err = ec.errorString(error); if (error == ERR_BRM_DEAD_LOCK) { rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR; } else if (error == ERR_BRM_VB_OVERFLOW) { rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR; err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW); } } if (pushWarning) { rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; Message::Args args; string cols = "'" + colNames[0] + "'"; for (unsigned i = 1; i < colNames.size(); i++) { cols = cols + ", " + "'" + colNames[i] + "'"; } args.add(cols); err = IDBErrorInfo::instance()->errorMsg(WARN_DATA_TRUNC, args); } return rc; } uint8_t WE_DMLCommandProc::getWrittenLbids(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId) { uint8_t rc = 0; uint32_t txnId; vector lbidList; bs >> txnId; std::tr1::unordered_map m_txnLBIDMap = fWEWrapper.getTxnMap(); try { auto mapIter = m_txnLBIDMap.find(txnId); if (mapIter != m_txnLBIDMap.end()) { SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second; lbidList = spTxnLBIDRec->m_LBIDs; } } catch (...) { } bs.restart(); try { serializeInlineVector(bs, lbidList); } catch (exception& ex) { // Append to errmsg in case we already have an error if (err.length() > 0) err += "; "; err += ex.what(); rc = 1; return rc; } return rc; } uint8_t WE_DMLCommandProc::processFlushFiles(messageqcpp::ByteStream& bs, std::string& err) { uint8_t rc = 0; uint32_t flushCode, txnId, tableOid; int error; bs >> flushCode; bs >> txnId; bs >> tableOid; std::map oids; CalpontSystemCatalog::TableName aTableName; CalpontSystemCatalog::RIDList ridList; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(txnId); // execplan::CalpontSystemCatalog::ColType colType; CalpontSystemCatalog::DictOIDList dictOids; if (tableOid >= 3000) { try { aTableName = systemCatalogPtr->tableName(tableOid); } catch (...) { err = std::string("Systemcatalog error for tableoid ") + std::to_string(tableOid); rc = 1; return rc; } dictOids = systemCatalogPtr->dictOIDs(aTableName); for (unsigned i = 0; i < dictOids.size(); i++) { oids[dictOids[i].dictOID] = dictOids[i].dictOID; } // if (dictOids.size() > 0) // colType = systemCatalogPtr->colTypeDct(dictOids[0].dictOID); } fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); vector lbidList; if (idbdatafile::IDBPolicy::useHdfs()) { // XXX THIS IS WRONG!!! // save the extent info to mark them invalid, after flush, the meta file will be gone. std::tr1::unordered_map m_txnLBIDMap = fWEWrapper.getTxnMap(); try { auto mapIter = m_txnLBIDMap.find(txnId); if (mapIter != m_txnLBIDMap.end()) { SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second; lbidList = spTxnLBIDRec->m_LBIDs; } } catch (...) { } } error = fWEWrapper.flushDataFiles(flushCode, txnId, oids); // No need to close files, flushDataFile will close them. // if (((colType.compressionType > 0 ) && (dictOids.size() > 0)) || (idbdatafile::IDBPolicy::useHdfs())) // fWEWrapper.closeDctnry(txnId, colType.compressionType, true); if (error != NO_ERROR) { rc = error; WErrorCodes ec; err = ec.errorString(error); } // erase rowgroup from the rowGroup map if (rowGroups[txnId]) { delete rowGroups[txnId]; rowGroups[txnId] = 0; } TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap(); ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin(); ColExtsInfo::iterator aIt; std::vector files; std::vector oidsToFlush; BRM::FileInfo aFile; while (it != colsExtsInfoMap.end()) { aIt = (it->second).begin(); aFile.oid = it->first; oidsToFlush.push_back(aFile.oid); while (aIt != (it->second).end()) { aFile.partitionNum = aIt->partNum; aFile.dbRoot = aIt->dbRoot; aFile.segmentNum = aIt->segNum; aFile.compType = aIt->compType; files.push_back(aFile); // cout <<"Added to files oid:dbroot:part:seg:compType = " << // aFile.oid<<":"<> PMId; bs >> sessionID; bs >> tmp32; txnId = tmp32; string schema, tableName; bs >> schema; bs >> tableName; fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fWEWrapper.setTransId(txnId); if (!rowGroups[txnId]) // meta data { rowGroups[txnId] = new rowgroup::RowGroup(); rowGroups[txnId]->deserialize(bs); // If hdfs, call chunkmanager to set up rc = fWEWrapper.startTransaction(txnId); if (rc != NO_ERROR) { WErrorCodes ec; err = ec.errorString(rc); } return rc; } rowgroup::RGData rgData; rgData.deserialize(bs); rowGroups[txnId]->setData(&rgData); // rowGroups[txnId]->setData(const_cast(bs.buf())); // get row ids rowgroup::Row row; rowGroups[txnId]->initRow(&row); WriteEngine::RIDList rowIDList; CalpontSystemCatalog::RID rid; uint32_t rowsThisRowgroup = rowGroups[txnId]->getRowCount(); uint16_t dbRoot, segment, blockNum; uint32_t partition; uint8_t extentNum; CalpontSystemCatalog::TableName aTableName; aTableName.schema = schema; aTableName.table = tableName; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); CalpontSystemCatalog::ROPair roPair; CalpontSystemCatalog::OID tableAUXColOid; CalpontSystemCatalog::RIDList tableRidList; try { roPair = systemCatalogPtr->tableRID(aTableName); tableRidList = systemCatalogPtr->columnRIDs(aTableName, true); tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(aTableName); } catch (exception& ex) { err = ex.what(); rc = 1; return rc; } // querystats uint64_t relativeRID = 0; boost::scoped_array preBlkNums(new int[row.getColumnCount()]); boost::scoped_array colWidth(new uint32_t[row.getColumnCount()]); // initialize for (uint32_t j = 0; j < row.getColumnCount(); j++) { preBlkNums[j] = -1; colWidth[j] = row.getColumnWidth(j); execplan::CalpontSystemCatalog::ColDataType colDataType = row.getColType(j); if (colWidth[j] >= 8 && !(colDataType == execplan::CalpontSystemCatalog::DECIMAL || colDataType == execplan::CalpontSystemCatalog::UDECIMAL)) { colWidth[j] = 8; } else if (colWidth[j] >= datatypes::MAXDECIMALWIDTH) { colWidth[j] = datatypes::MAXDECIMALWIDTH; } } // Get the file information from rowgroup dbRoot = rowGroups[txnId]->getDBRoot(); rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum); WriteEngine::ColStructList colStructList; WriteEngine::CSCTypesList cscColTypeList; WriteEngine::ColStruct colStruct; colStruct.fColPartition = partition; colStruct.fColSegment = segment; colStruct.fColDbRoot = dbRoot; for (unsigned i = 0; i < rowsThisRowgroup; i++) { rowGroups[txnId]->getRow(i, &row); rid = row.getRid(); relativeRID = rid - rowGroups[txnId]->getBaseRid(); rid = relativeRID; convertToRelativeRid(rid, extentNum, blockNum); rowIDList.push_back(rid); // populate stats.blocksChanged for (uint32_t j = 0; j < row.getColumnCount(); j++) { if ((int)(relativeRID / (BYTE_PER_BLOCK / colWidth[j])) > preBlkNums[j]) { blocksChanged++; preBlkNums[j] = relativeRID / (BYTE_PER_BLOCK / colWidth[j]); } } } // MCOL-5021 Valid AUX column OID for a table is > 3000 // Tables that were created before this feature was added will have // tableAUXColOid = 0 bool hasAUXCol = tableAUXColOid > 3000; try { for (unsigned i = 0; i < tableRidList.size(); i++) { CalpontSystemCatalog::ColType colType; colType = systemCatalogPtr->colType(tableRidList[i].objnum); colStruct.dataOid = tableRidList[i].objnum; colStruct.tokenFlag = false; colStruct.fCompressionType = colType.compressionType; if (colType.colWidth > 8 && !(colType.colDataType == CalpontSystemCatalog::DECIMAL || colType.colDataType == CalpontSystemCatalog::UDECIMAL)) // token { colStruct.colWidth = 8; colStruct.tokenFlag = true; } else { colStruct.colWidth = colType.colWidth; } colStruct.colDataType = colType.colDataType; colStructList.push_back(colStruct); cscColTypeList.push_back(colType); } } catch (exception& ex) { err = ex.what(); rc = 1; return rc; } if (hasAUXCol) { CalpontSystemCatalog::ColType colType; colType.compressionType = execplan::AUX_COL_COMPRESSION_TYPE; colType.colWidth = execplan::AUX_COL_WIDTH; colType.colDataType = execplan::AUX_COL_DATATYPE; colStruct.dataOid = tableAUXColOid; colStruct.tokenFlag = false; colStruct.fCompressionType = colType.compressionType; colStruct.colWidth = colType.colWidth; colStruct.colDataType = colType.colDataType; colStructList.push_back(colStruct); cscColTypeList.push_back(colType); } std::vector colExtentsStruct; std::vector colExtentsColType; std::vector colOldValueList; std::vector ridLists; colExtentsStruct.push_back(colStructList); colExtentsColType.push_back(cscColTypeList); ridLists.push_back(rowIDList); int error = 0; error = fWEWrapper.deleteRow(txnId, colExtentsColType, colExtentsStruct, colOldValueList, ridLists, roPair.objnum, hasAUXCol); if (error != NO_ERROR) { rc = error; // cout << "WE Error code " << error << endl; WErrorCodes ec; err = ec.errorString(error); if (error == ERR_BRM_DEAD_LOCK) { rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR; } else if (error == ERR_BRM_VB_OVERFLOW) { rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR; err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW); } } return rc; } uint8_t WE_DMLCommandProc::processRemoveMeta(messageqcpp::ByteStream& bs, std::string& err) { uint8_t rc = 0; uint32_t tableOID; try { bs >> tableOID; // std::cout << ": tableOID-" << tableOID << std::endl; BulkRollbackMgr::deleteMetaFile(tableOID); } catch (exception& ex) { err = ex.what(); rc = 1; } return rc; } //------------------------------------------------------------------------------ // Process bulk rollback command //------------------------------------------------------------------------------ uint8_t WE_DMLCommandProc::processBulkRollback(messageqcpp::ByteStream& bs, std::string& err) { uint8_t rc = 0; err.clear(); try { uint32_t tableOID; uint64_t tableLockID; std::string tableName; std::string appName; // May want to eventually comment out this logging to stdout, // but it shouldn't hurt to keep in here. std::cout << "processBulkRollback"; bs >> tableLockID; // std::cout << ": tableLock-"<< tableLockID; bs >> tableOID; // std::cout << "; tableOID-" << tableOID; bs >> tableName; if (tableName.length() == 0) { boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(0); CalpontSystemCatalog::TableName aTableName = systemCatalogPtr->tableName(tableOID); tableName = aTableName.toString(); } std::cout << "; table-" << tableName; bs >> appName; std::cout << "; app-" << appName << std::endl; int we_rc = fWEWrapper.bulkRollback(tableOID, tableLockID, tableName, appName, false, // no extra debug logging to the console err); if (we_rc != NO_ERROR) rc = 2; } catch (exception& ex) { std::cout << "processBulkRollback: exception-" << ex.what() << std::endl; err = ex.what(); rc = 1; } return rc; } //------------------------------------------------------------------------------ // Process bulk rollback cleanup command (deletes bulk rollback meta data files) //------------------------------------------------------------------------------ uint8_t WE_DMLCommandProc::processBulkRollbackCleanup(messageqcpp::ByteStream& bs, std::string& err) { uint8_t rc = 0; err.clear(); try { uint32_t tableOID; // May want to eventually comment out this logging to stdout, // but it shouldn't hurt to keep in here. std::cout << "processBulkRollbackCleanup"; bs >> tableOID; std::cout << ": tableOID-" << tableOID << std::endl; BulkRollbackMgr::deleteMetaFile(tableOID); } catch (exception& ex) { std::cout << "processBulkRollbackCleanup: exception-" << ex.what() << std::endl; err = ex.what(); rc = 1; } return rc; } uint8_t WE_DMLCommandProc::updateSyscolumnNextval(ByteStream& bs, std::string& err) { uint32_t columnOid, sessionID; uint64_t nextVal; int rc = 0; bs >> columnOid; bs >> nextVal; bs >> sessionID; uint16_t dbRoot; std::map oids; // std::vector oidsToFlush; oids[columnOid] = columnOid; // oidsToFlush.push_back(columnOid); BRM::OID_t oid = 1021; fDbrm.getSysCatDBRoot(oid, dbRoot); fWEWrapper.setTransId(sessionID); fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(sessionID); // cout << "updateSyscolumnNextval startTransaction id " << sessionID << endl; rc = fWEWrapper.updateNextValue(sessionID, columnOid, nextVal, sessionID, dbRoot); if (rc != 0) { err = "Error in WE::updateNextValue"; rc = 1; } if (idbdatafile::IDBPolicy::useHdfs()) { cout << "updateSyscolumnNextval flushDataFiles " << endl; int rc1 = fWEWrapper.flushDataFiles(rc, sessionID, oids); if ((rc == 0) && (rc1 == 0)) { cout << "updateSyscolumnNextval confirmTransaction rc =0 " << endl; rc1 = fWEWrapper.confirmTransaction(sessionID); cout << "updateSyscolumnNextval confirmTransaction return code is " << rc1 << endl; if (rc1 == NO_ERROR) rc1 = fWEWrapper.endTransaction(sessionID, true); else fWEWrapper.endTransaction(sessionID, false); } else { cout << "updateSyscolumnNextval endTransaction with error " << endl; fWEWrapper.endTransaction(sessionID, false); } if (rc == NO_ERROR) rc = rc1; } // if (idbdatafile::IDBPolicy::useHdfs()) // cacheutils::flushOIDsFromCache(oidsToFlush); return rc; } uint8_t WE_DMLCommandProc::processPurgeFDCache(ByteStream& bs, std::string& err) { int rc = 0; uint32_t tableOid; bs >> tableOid; TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); ColsExtsInfoMap colsExtsInfoMap = aTbaleMetaData->getColsExtsInfoMap(); ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin(); ColExtsInfo::iterator aIt; std::vector files; BRM::FileInfo aFile; while (it != colsExtsInfoMap.end()) { aIt = (it->second).begin(); aFile.oid = it->first; while (aIt != (it->second).end()) { aFile.partitionNum = aIt->partNum; aFile.dbRoot = aIt->dbRoot; aFile.segmentNum = aIt->segNum; aFile.compType = aIt->compType; files.push_back(aFile); // cout <<"Added to files oid:dbroot:part:seg:compType = " << // aFile.oid<<":"< 0)) cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID()); TableMetaData::removeTableMetaData(tableOid); return rc; } uint8_t WE_DMLCommandProc::processFixRows(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId) { uint8_t rc = 0; // cout << " In processFixRows" << endl; uint32_t tmp32; uint64_t sessionID; uint16_t dbRoot, segment; uint32_t partition; string schema, tableName; TxnID txnId; uint8_t tmp8; bool firstBatch = false; WriteEngine::RIDList rowIDList; bs >> PMId; bs >> tmp8; firstBatch = (tmp8 != 0); bs >> sessionID; bs >> tmp32; txnId = tmp32; bs >> schema; bs >> tableName; bs >> dbRoot; bs >> partition; bs >> segment; deserializeInlineVector(bs, rowIDList); // Need to identify whether this is the first batch to start transaction. if (firstBatch) { rc = fWEWrapper.startTransaction(txnId); if (rc != NO_ERROR) { WErrorCodes ec; err = ec.errorString(rc); return rc; } fWEWrapper.setIsInsert(false); fWEWrapper.setBulkFlag(false); fWEWrapper.setTransId(txnId); fWEWrapper.setFixFlag(true); } CalpontSystemCatalog::TableName aTableName; aTableName.schema = schema; aTableName.table = tableName; boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); CalpontSystemCatalog::ROPair roPair; CalpontSystemCatalog::RIDList tableRidList; try { roPair = systemCatalogPtr->tableRID(aTableName); tableRidList = systemCatalogPtr->columnRIDs(aTableName, true); } catch (exception& ex) { err = ex.what(); rc = 1; return rc; } WriteEngine::ColStructList colStructList; WriteEngine::ColStruct colStruct; WriteEngine::DctnryStructList dctnryStructList; /* colStruct.fColPartition = partition; colStruct.fColSegment = segment; colStruct.fColDbRoot = dbRoot; */ colStruct.fColPartition = 0; colStruct.fColSegment = 0; colStruct.fColDbRoot = 3; // should we always scan dictionary store files? try { for (unsigned i = 0; i < tableRidList.size(); i++) { CalpontSystemCatalog::ColType colType; colType = systemCatalogPtr->colType(tableRidList[i].objnum); colStruct.dataOid = tableRidList[i].objnum; colStruct.tokenFlag = false; colStruct.fCompressionType = colType.compressionType; WriteEngine::DctnryStruct dctnryStruct; dctnryStruct.fColDbRoot = colStruct.fColDbRoot; dctnryStruct.fColPartition = colStruct.fColPartition; dctnryStruct.fColSegment = colStruct.fColSegment; dctnryStruct.fCompressionType = colStruct.fCompressionType; dctnryStruct.dctnryOid = 0; dctnryStruct.fCharsetNumber = colType.charsetNumber; if (colType.colWidth > 8) // token { colStruct.colWidth = 8; colStruct.tokenFlag = true; dctnryStruct.dctnryOid = colType.ddn.dictOID; } else { colStruct.colWidth = colType.colWidth; } colStruct.colDataType = colType.colDataType; colStructList.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); } } catch (exception& ex) { err = ex.what(); rc = 1; return rc; } int error = 0; try { error = fWEWrapper.deleteBadRows(txnId, colStructList, rowIDList, dctnryStructList); if (error != NO_ERROR) { rc = error; // cout << "WE Error code " << error << endl; WErrorCodes ec; err = ec.errorString(error); if (error == ERR_BRM_DEAD_LOCK) { rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR; } else if (error == ERR_BRM_VB_OVERFLOW) { rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR; err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW); } } } catch (std::exception& ex) { err = ex.what(); rc = 1; } // cout << "WES return rc " << (int)rc << " with msg " << err << endl; return rc; } uint8_t WE_DMLCommandProc::processEndTransaction(ByteStream& bs, std::string& err) { int rc = 0; ByteStream::byte tmp8; bool success; uint32_t txnid; bs >> txnid; bs >> tmp8; success = (tmp8 != 0); rc = fWEWrapper.endTransaction(txnid, success); if (rc != NO_ERROR) { WErrorCodes ec; err = ec.errorString(rc); } return rc; } //------------------------------------------------------------------------------ // Validates the correctness of the current HWMs for this table. // The HWMs for all the 1 byte columns should be identical. Same goes // for all the 2 byte columns, etc. The 2 byte column HWMs should be // "roughly" (but not necessarily exactly) twice that of a 1 byte column. // Same goes for the 4 byte column HWMs vs their 2 byte counterparts, etc. // ridList - columns oids to be used to get column width on to use with validation. // segFileInfo - Vector of File objects carrying current DBRoot, partition, // HWM, etc to be validated for the columns belonging to jobTable. // stage - Current stage we are validating. "Starting" or "Ending". //------------------------------------------------------------------------------ int WE_DMLCommandProc::validateColumnHWMs(CalpontSystemCatalog::RIDList& ridList, boost::shared_ptr systemCatalogPtr, const std::vector& segFileInfo, const char* stage, bool hasAuxCol) { int rc = NO_ERROR; if ((!fIsFirstBatchPm) && (strcmp(stage, "Starting") == 0)) return rc; // Used to track first 1-byte, 2-byte, 4-byte, 8-byte and 16-byte columns in table int byte1First = -1; int byte2First = -1; int byte4First = -1; int byte8First = -1; int byte16First = -1; // Make sure the HWMs for all 1-byte columns match; same for all 2-byte, // 4-byte, 8-byte, and 16-byte columns as well. CalpontSystemCatalog::ColType colType; Convertor convertor; uint32_t colWidth; for (unsigned k = 0; k < segFileInfo.size(); k++) { unsigned k1 = 0; // Find out column width if (hasAuxCol && (k == segFileInfo.size() - 1)) { colWidth = execplan::AUX_COL_WIDTH; } else { colType = systemCatalogPtr->colType(ridList[k].objnum); colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth); } // Find the first 1-byte, 2-byte, 4-byte, 8-byte and 16-byte columns. // Use those as our reference HWM for the respective column widths. switch (colWidth) { case 1: { if (byte1First == -1) byte1First = k; k1 = byte1First; break; } case 2: { if (byte2First == -1) byte2First = k; k1 = byte2First; break; } case 4: { if (byte4First == -1) byte4First = k; k1 = byte4First; break; } case 16: { if (byte16First == -1) byte16First = k; k1 = byte16First; break; } case 8: default: { if (byte8First == -1) byte8First = k; k1 = byte8First; break; } } // end of switch based on column width (1,2,4,8 or 16) // std::cout << "dbg: comparing0 " << stage << " refcol-" << k1 << // "; wid-" << jobColK1.width << "; hwm-" << segFileInfo[k1].fLocalHwm << // " col-" << k << // "; wid-" << jobColK.width << " ; hwm-"<colType(ridList[k1].objnum); colWidth2 = colType2.colWidth; } ostringstream oss; oss << stage << " HWMs do not match for" " OID1-" << ridList[k1].objnum << "; DBRoot-" << segFileInfo[k1].fDbRoot << "; partition-" << segFileInfo[k1].fPartition << "; segment-" << segFileInfo[k1].fSegment << "; hwm-" << segFileInfo[k1].fLocalHwm << "; width-" << colWidth2 << ':' << std::endl << " and OID2-" << ridList[k].objnum << "; DBRoot-" << segFileInfo[k].fDbRoot << "; partition-" << segFileInfo[k].fPartition << "; segment-" << segFileInfo[k].fSegment << "; hwm-" << segFileInfo[k].fLocalHwm << "; width-" << colType.colWidth; fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR); return ERR_BRM_HWMS_NOT_EQUAL; } // HWM DBRoot, partition, and segment number should match for all // columns; so compare DBRoot, part#, and seg# with first column. if ((segFileInfo[0].fDbRoot != segFileInfo[k].fDbRoot) || (segFileInfo[0].fPartition != segFileInfo[k].fPartition) || (segFileInfo[0].fSegment != segFileInfo[k].fSegment)) { CalpontSystemCatalog::ColType colType2; colType2 = systemCatalogPtr->colType(ridList[0].objnum); ostringstream oss; oss << stage << " HWM DBRoot,Part#, or Seg# do not match for" " OID1-" << ridList[0].objnum << "; DBRoot-" << segFileInfo[0].fDbRoot << "; partition-" << segFileInfo[0].fPartition << "; segment-" << segFileInfo[0].fSegment << "; hwm-" << segFileInfo[0].fLocalHwm << "; width-" << colType2.colWidth << ':' << std::endl << " and OID2-" << ridList[k].objnum << "; DBRoot-" << segFileInfo[k].fDbRoot << "; partition-" << segFileInfo[k].fPartition << "; segment-" << segFileInfo[k].fSegment << "; hwm-" << segFileInfo[k].fLocalHwm << "; width-" << colType.colWidth; fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR); return ERR_BRM_HWMS_NOT_EQUAL; } } // end of loop to compare all 1-byte HWMs, 2-byte HWMs, etc. // Validate/compare HWM for 1-byte column in relation to 2-byte column, etc. // Without knowing the exact row count, we can't extrapolate the exact HWM // for the wider column, but we can narrow it down to an expected range. unsigned refCol = 0; unsigned colIdx = 0; // Validate/compare HWMs given a 1-byte column as a starting point if (byte1First >= 0) { refCol = byte1First; if (byte2First >= 0) { HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 2; HWM hwmHi = hwmLo + 1; if ((segFileInfo[byte2First].fLocalHwm < hwmLo) || (segFileInfo[byte2First].fLocalHwm > hwmHi)) { colIdx = byte2First; rc = ERR_BRM_HWMS_OUT_OF_SYNC; goto errorCheck; } } if (byte4First >= 0) { HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 4; HWM hwmHi = hwmLo + 3; if ((segFileInfo[byte4First].fLocalHwm < hwmLo) || (segFileInfo[byte4First].fLocalHwm > hwmHi)) { colIdx = byte4First; rc = ERR_BRM_HWMS_OUT_OF_SYNC; goto errorCheck; } } if (byte8First >= 0) { HWM hwmLo = segFileInfo[byte1First].fLocalHwm * 8; HWM hwmHi = hwmLo + 7; if ((segFileInfo[byte8First].fLocalHwm < hwmLo) || (segFileInfo[byte8First].fLocalHwm > hwmHi)) { colIdx = byte8First; rc = ERR_BRM_HWMS_OUT_OF_SYNC; goto errorCheck; } } } // Validate/compare HWMs given a 2-byte column as a starting point if (byte2First >= 0) { refCol = byte2First; if (byte4First >= 0) { HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 2; HWM hwmHi = hwmLo + 1; if ((segFileInfo[byte4First].fLocalHwm < hwmLo) || (segFileInfo[byte4First].fLocalHwm > hwmHi)) { colIdx = byte4First; rc = ERR_BRM_HWMS_OUT_OF_SYNC; goto errorCheck; } } if (byte8First >= 0) { HWM hwmLo = segFileInfo[byte2First].fLocalHwm * 4; HWM hwmHi = hwmLo + 3; if ((segFileInfo[byte8First].fLocalHwm < hwmLo) || (segFileInfo[byte8First].fLocalHwm > hwmHi)) { colIdx = byte8First; rc = ERR_BRM_HWMS_OUT_OF_SYNC; goto errorCheck; } } } // Validate/compare HWMs given a 4-byte column as a starting point if (byte4First >= 0) { refCol = byte4First; if (byte8First >= 0) { HWM hwmLo = segFileInfo[byte4First].fLocalHwm * 2; HWM hwmHi = hwmLo + 1; if ((segFileInfo[byte8First].fLocalHwm < hwmLo) || (segFileInfo[byte8First].fLocalHwm > hwmHi)) { colIdx = byte8First; rc = ERR_BRM_HWMS_OUT_OF_SYNC; goto errorCheck; } } } // To avoid repeating this message 6 times in the preceding source code, we // use the "dreaded" goto to branch to this single place for error handling. errorCheck: if (rc != NO_ERROR) { uint32_t colWidth1, colWidth2; if (hasAuxCol && (refCol == ridList.size() - 1)) { colWidth1 = execplan::AUX_COL_WIDTH; } else { CalpontSystemCatalog::ColType colType1 = systemCatalogPtr->colType(ridList[refCol].objnum); colWidth1 = convertor.getCorrectRowWidth(colType1.colDataType, colType1.colWidth); } if (hasAuxCol && (colIdx == ridList.size() - 1)) { colWidth2 = execplan::AUX_COL_WIDTH; } else { CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[colIdx].objnum); colWidth2 = convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth); } ostringstream oss; oss << stage << " HWMs are not in sync for" " OID1-" << ridList[refCol].objnum << "; DBRoot-" << segFileInfo[refCol].fDbRoot << "; partition-" << segFileInfo[refCol].fPartition << "; segment-" << segFileInfo[refCol].fSegment << "; hwm-" << segFileInfo[refCol].fLocalHwm << "; width-" << colWidth1 << ':' << std::endl << " and OID2-" << ridList[colIdx].objnum << "; DBRoot-" << segFileInfo[colIdx].fDbRoot << "; partition-" << segFileInfo[colIdx].fPartition << "; segment-" << segFileInfo[colIdx].fSegment << "; hwm-" << segFileInfo[colIdx].fLocalHwm << "; width-" << colWidth2; fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR); } return rc; } } // namespace WriteEngine