/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /****************************************************************************** * $Id: we_columninfo.cpp 4737 2013-08-14 20:45:46Z bwilkinson $ * *******************************************************************************/ #include #include #include //#define NDEBUG //#include #include #include "we_columninfo.h" #include "we_log.h" #include "we_stats.h" #include "we_colopbulk.h" #include "brmtypes.h" #include "we_columnautoinc.h" #include "we_dbrootextenttracker.h" #include "we_brmreporter.h" #include "we_tableinfo.h" #include "IDBDataFile.h" using namespace idbdatafile; namespace { //------------------------------------------------------------------------------ // Do a fast ascii-hex-string to binary data conversion. This is done in-place. // We take bytes 1 and 2 and put them back into byte 1; 3 and 4 into 2; etc. // The length is adjusted by 1/2 and returned to the caller as the new length. // If any invalid hex characters are present in the string (not 0-9,A-F, or // a-f), then the string is considered invalid, and a null token will be used. //------------------------------------------------------------------------------ unsigned int compactVarBinary(char* charTmpBuf, int fieldLength) { unsigned char* p = reinterpret_cast(charTmpBuf); char* f = charTmpBuf; char v = '\0'; for (int i = 0; i < fieldLength / 2; i++, p++) { // Store even number byte in high order 4 bits of next output byte v = *f; if (!isxdigit(v)) return WriteEngine::COLPOSPAIR_NULL_TOKEN_OFFSET; if (v <= '9') *p = v - '0'; else if (v <= 'F') *p = v - 'A' + 10; else // if (v <= 'f') *p = v - 'a' + 10; *p <<= 4; f++; // Store odd number byte in low order 4 bite of next output byte v = *f; if (!isxdigit(v)) return WriteEngine::COLPOSPAIR_NULL_TOKEN_OFFSET; if (v <= '9') *p |= v - '0'; else if (v <= 'F') *p |= v - 'A' + 10; else // if (v <= 'f') *p |= v - 'a' + 10; f++; } // Changed our mind and decided to have the read thread reject rows with // incomplete (odd length) varbinary fields, so the following check is not // necessary. We should only get to this function with an even fieldLength. #if 0 // Handle case where input data field has "odd" byte length. // Store last input byte in high order 4 bits of additional output byte, // and leave the low order bits set to 0. if ((fieldLength & 1) == 1) { v = *f; if (!isxdigit(v)) return WriteEngine::COLPOSPAIR_NULL_TOKEN_OFFSET; if (v <= '9') *p = v - '0'; else if (v <= 'F') *p = v - 'A' + 10; else //if (v <= 'f') *p = v - 'a' + 10; *p <<= 4; fieldLength++; } #endif return (fieldLength / 2); } } // namespace namespace WriteEngine { //------------------------------------------------------------------------------ // ColumnInfo constructor //------------------------------------------------------------------------------ ColumnInfo::ColumnInfo(Log* logger, int idIn, const JobColumn& columnIn, DBRootExtentTracker* pDBRootExtTrk, TableInfo* pTableInfo) : id(idIn) , lastProcessingTime(0) , #ifdef PROFILE totalProcessingTime(0) , #endif fColBufferMgr(0) , availFileSize(0) , fileSize(0) , fLog(logger) , fDelayedFileStartBlksSkipped(0) , fSavedLbid(0) , fLastUpdatedLbid(0) , fSizeWrittenStart(0) , fSizeWritten(0) , fLastInputRowInCurrentExtent(0) , fLoadingAbbreviatedExtent(false) , fColExtInf(0) , fMaxNumRowsPerSegFile(0) , fStore(0) , fAutoIncLastValue(0) , fSaturatedRowCnt(0) , fpTableInfo(pTableInfo) , fAutoIncMgr(0) , fDbRootExtTrk(pDBRootExtTrk) , fColWidthFactor(1) , fDelayedFileCreation(INITIAL_DBFILE_STAT_FILE_EXISTS) , fRowsPerExtent(0) { column = columnIn; fRowsPerExtent = BRMWrapper::getInstance()->getExtentRows(); // Allocate a ColExtInfBase object for those types that won't track // min/max CasualPartition info; this is a stub class that won't do // anything. switch (column.weType) { case WriteEngine::WR_FLOAT: case WriteEngine::WR_DOUBLE: case WriteEngine::WR_VARBINARY: // treat like char dictionary for now case WriteEngine::WR_TOKEN: { fColExtInf = new ColExtInfBase(); break; } case WriteEngine::WR_CHAR: { if (column.colType == COL_TYPE_DICT) { fColExtInf = new ColExtInfBase(); } else { fColExtInf = new ColExtInf(column.mapOid, logger); } break; } case WriteEngine::WR_SHORT: case WriteEngine::WR_BYTE: case WriteEngine::WR_LONGLONG: case WriteEngine::WR_MEDINT: case WriteEngine::WR_INT: case WriteEngine::WR_USHORT: case WriteEngine::WR_UBYTE: case WriteEngine::WR_ULONGLONG: case WriteEngine::WR_UMEDINT: case WriteEngine::WR_UINT: case WriteEngine::WR_BINARY: default: { fColExtInf = new ColExtInf(column.mapOid, logger); break; } } colOp.reset(new ColumnOpBulk(logger, column.compressionType)); fMaxNumRowsPerSegFile = fRowsPerExtent * Config::getExtentsPerSegmentFile(); // Create auto-increment object to manage auto-increment next-value if (column.autoIncFlag) { fAutoIncMgr = new ColumnAutoIncIncremental(logger); // formerly used ColumnAutoIncJob for Shared Everything // fAutoIncMgr = new ColumnAutoIncJob(logger); } } //------------------------------------------------------------------------------ // ColumnInfo destructor //------------------------------------------------------------------------------ ColumnInfo::~ColumnInfo() { clearMemory(); // Closing dictionary file also updates the extent map; which we // don't want to do if we are aborting the job. Besides, the // application code should be closing the dictionary as needed, // instead of relying on the destructor, so disabled this code. // if(fStore != NULL) //{ // fStore->closeDctnryStore(); // delete fStore; //} if (fColExtInf) delete fColExtInf; if (fAutoIncMgr) delete fAutoIncMgr; if (fDbRootExtTrk) delete fDbRootExtTrk; } //------------------------------------------------------------------------------ // Clear memory consumed by this ColumnInfo object. //------------------------------------------------------------------------------ void ColumnInfo::clearMemory() { if (fColBufferMgr) { delete fColBufferMgr; fColBufferMgr = 0; } fDictBlocks.clear(); } //------------------------------------------------------------------------------ // If at the start of the job, We have encountered a PM that has no DB file for // this column, or whose HWM extent is disabled; then this function is called // to setup delayed file creation. // A starting DB file will be created if/when we determine that we have rows // to be processed. //------------------------------------------------------------------------------ void ColumnInfo::setupDelayedFileCreation(uint16_t dbRoot, uint32_t partition, uint16_t segment, HWM hwm, bool bEmptyPM) { if (bEmptyPM) fDelayedFileCreation = INITIAL_DBFILE_STAT_CREATE_FILE_ON_EMPTY; else fDelayedFileCreation = INITIAL_DBFILE_STAT_CREATE_FILE_ON_DISABLED; fDelayedFileStartBlksSkipped = hwm; fSavedLbid = INVALID_LBID; colOp->initColumn(curCol); colOp->setColParam(curCol, id, column.width, column.dataType, column.weType, column.mapOid, column.compressionType, dbRoot, partition, segment); colOp->findTypeHandler(column.width, column.dataType); } //------------------------------------------------------------------------------ // Create a DB file as part of delayed file creation. See setupDelayedFile- // Creation for an explanation. //------------------------------------------------------------------------------ int ColumnInfo::createDelayedFileIfNeeded(const std::string& tableName) { int rc = NO_ERROR; // For optimization sake, we use a separate mutex (fDelayedFileCreateMutex) // exclusively reserved to be used as the gatekeeper to this function. // No sense in waiting for a fColMutex lock, when 99.99% of the time, // all we need to do is check fDelayedFileCreation, see that it's value // is INITIAL_DBFILE_STAT_FILE_EXISTS, and exit the function. boost::mutex::scoped_lock lock(fDelayedFileCreateMutex); if (fDelayedFileCreation == INITIAL_DBFILE_STAT_FILE_EXISTS) return NO_ERROR; // Don't try creating extent again if we are already in error state with a // previous thread failing to create this extent. if (fDelayedFileCreation == INITIAL_DBFILE_STAT_ERROR_STATE) { rc = ERR_FILE_CREATE; std::ostringstream oss; oss << "Previous attempt failed to create initial dbroot" << curCol.dataFile.fDbRoot << " extent for column file OID-" << column.mapOid << "; dbroot-" << curCol.dataFile.fDbRoot << "; partition-" << curCol.dataFile.fPartition; fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } // Once we get this far, we go ahead and acquire a fColMutex lock. The // fDelayedFileCreateMutex lock might suffice, but better to explicitly // lock fColMutex since we are modifying attributes that we typically // change within the scope of a fColMutex lock. boost::mutex::scoped_lock lock2(fColMutex); uint16_t dbRoot = curCol.dataFile.fDbRoot; uint32_t partition = curCol.dataFile.fPartition; // We don't have a file on this PM, so we create an initial file ColumnOpBulk tempColOp(fLog, column.compressionType); bool createLeaveFileOpen = false; IDBDataFile* createPFile = 0; uint16_t createDbRoot = dbRoot; uint32_t createPartition = partition; uint16_t createSegment = 0; std::string createSegFile; HWM createHwm = 0; // output BRM::LBID_t createStartLbid = 0; // output bool createNewFile = true; // output int createAllocSize = 0; // output char* createHdrs = 0; // output std::string allocErrMsg; rc = fpTableInfo->allocateBRMColumnExtent(curCol.dataFile.fid, createDbRoot, createPartition, createSegment, createStartLbid, createAllocSize, createHwm, allocErrMsg); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error creating initial dbroot" << dbRoot << " BRM extent for OID-" << column.mapOid << "; dbroot-" << dbRoot << "; partition-" << partition << "; " << ec.errorString(rc) << "; " << allocErrMsg; fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); fDelayedFileCreation = INITIAL_DBFILE_STAT_ERROR_STATE; return rc; } uint16_t segment = createSegment; partition = createPartition; // update our partition variable in // case extent was added to a different // partition than we intended BRM::LBID_t lbid = createStartLbid; rc = tempColOp.extendColumn(curCol, createLeaveFileOpen, createHwm, createStartLbid, createAllocSize, createDbRoot, createPartition, createSegment, createSegFile, createPFile, createNewFile, createHdrs); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error adding initial dbroot" << dbRoot << " extent to column file OID-" << column.mapOid << "; dbroot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); fDelayedFileCreation = INITIAL_DBFILE_STAT_ERROR_STATE; return rc; } // We don't have a file on this PM (or HWM extent is disabled), so we // create a new file to load std::ostringstream oss1; if (fDelayedFileCreation == INITIAL_DBFILE_STAT_CREATE_FILE_ON_EMPTY) oss1 << "PM empty; Creating starting column extent"; else oss1 << "HWM extent disabled; Creating starting column extent"; oss1 << " on DBRoot-" << createDbRoot << " for OID-" << column.mapOid << "; part-" << createPartition << "; seg-" << createSegment << "; hwm-" << createHwm << "; LBID-" << createStartLbid << "; file-" << createSegFile; fLog->logMsg(oss1.str(), MSGLVL_INFO2); // Create corresponding dictionary store file if applicable if (column.colType == COL_TYPE_DICT) { std::ostringstream oss; oss << "Creating starting dictionary extent on dbroot" << dbRoot << " (segment " << segment << ") for dictionary OID " << column.dctnry.dctnryOid; fLog->logMsg(oss.str(), MSGLVL_INFO2); BRM::LBID_t dLbid; Dctnry* tempD = 0; if (column.dctnry.fCompressionType != 0) { DctnryCompress1* tempD1; tempD1 = new DctnryCompress1(column.dctnry.fCompressionType); tempD1->setMaxActiveChunkNum(1); tempD1->setBulkFlag(true); tempD = tempD1; } else { tempD = new DctnryCompress0; } boost::scoped_ptr refDctnry(tempD); // MCOL-4328 Define a file owner uid and gid refDctnry->setUIDGID(this); rc = tempD->createDctnry(column.dctnry.dctnryOid, column.dctnryWidth, dbRoot, partition, segment, dLbid, true); // creating the store file if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error creating initial dbroot" << dbRoot << " extent for dictionary file OID-" << column.dctnry.dctnryOid << "; dbroot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); fDelayedFileCreation = INITIAL_DBFILE_STAT_ERROR_STATE; return rc; } rc = tempD->closeDctnry(); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error creating/closing initial dbroot" << dbRoot << " extent for dictionary file OID-" << column.dctnry.dctnryOid << "; partition-" << partition << "; segment-" << segment << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); fDelayedFileCreation = INITIAL_DBFILE_STAT_ERROR_STATE; return rc; } } // end of dictionary column processing // Check for special case: where we skip initial blk(s) at the start of // the "very" 1st file on each PM. // We are checking to see if the PM is empty, "and" if the partition is 0. // The PM could be empty if all the existing files on the PM were dropped // or disabled, but we don't want/need to do block skipping in this case; // so we also check to see if the partition number is 0, denoting the 1st // extent for the PM. // (The reason we are skipping blocks in partition 0, is because import // does this with the partition 0, segment 0 file created by DDL. // We skip blocks on the other PMs, so that the 1st file created on each // PM will employ the same block skipping.) HWM hwm = 0; if ((fDelayedFileCreation == INITIAL_DBFILE_STAT_CREATE_FILE_ON_EMPTY) && (partition == 0)) { hwm = fDelayedFileStartBlksSkipped; } rc = setupInitialColumnExtent(dbRoot, partition, segment, tableName, lbid, hwm, hwm, false, true); if (rc == NO_ERROR) fDelayedFileCreation = INITIAL_DBFILE_STAT_FILE_EXISTS; else fDelayedFileCreation = INITIAL_DBFILE_STAT_ERROR_STATE; return rc; } //------------------------------------------------------------------------------ // Add an extent for this column. The next segment file in the DBRoot, // partition, segment number rotation will be selected for the extent. // // NOTE: no mutex lock is employed here. It is assumed that the calling // application code is taking care of this, if it is needed. //------------------------------------------------------------------------------ int ColumnInfo::extendColumn(bool saveLBIDForCP) { //..We assume the applicable file is already open, so... // the HWM of the current segment file should be set to reference the // last block in the current file (as specified in curCol.dataFile.pFile). // // Prior to adding compression, we used ftell() to set HWM, but that // would not work for compressed data. Code now assumes that if we // are adding an extent, that fSizeWritten is a multiple of blksize, // which it should be. If we are adding an extent, fSizeWritten should // point to the last byte of a full extent boundary. HWM hwm = (fSizeWritten / BYTE_PER_BLOCK) - 1; //..Save info about the current segment column file, and close that file. addToSegFileList(curCol.dataFile, hwm); // Close current segment column file prior to adding extent to next seg file int rc = closeColumnFile(true, false); if (rc != NO_ERROR) { std::ostringstream oss; oss << "extendColumn: error closing extent in " << "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; hwm-" << hwm; fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } // Call Config::initConfigCache() to force the Config class // to reload config cache "if" the config file has changed. Config::initConfigCache(); bool bChangeFlag = Config::hasLocalDBRootListChanged(); // if (fLog->isDebug( DEBUG_1 )) //{ // std::ostringstream oss; // oss << "Checking DBRootListChangeFlag: " << bChangeFlag; // fLog->logMsg( oss.str(), MSGLVL_INFO2 ); //} if (bChangeFlag) { rc = ERR_BULK_DBROOT_CHANGE; WErrorCodes ec; std::ostringstream oss; oss << "extendColumn: DBRoots changed; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } //..Declare variables used to advance to the next extent uint16_t dbRootNext = 0; uint32_t partitionNext = 0; uint16_t segmentNext = 0; HWM hwmNext = 0; BRM::LBID_t startLbid; //..When we finish an extent, we typically should be advancing to the next // DBRoot to create a "new" extent. But "if" the user has moved a DBRoot // from another PM to this PM, then we may have a partial extent that we // need to fill up. Here's where we just fill out such partially filled // extents with empty values, until we can get back to a "normal" full // extent boundary case. bool bAllocNewExtent = false; while (!bAllocNewExtent) { //..If we have a DBRoot Tracker, then use that to determine next DBRoot // to rotate to, else the old legacy BRM extent allocator will assign, // if we pass in a dbroot of 0. bAllocNewExtent = true; if (fDbRootExtTrk) { bAllocNewExtent = fDbRootExtTrk->nextSegFile(dbRootNext, partitionNext, segmentNext, hwmNext, startLbid); } // If our next extent is a partial extent, then fill out that extent // to the next full extent boundary, and round up HWM accordingly. if (!bAllocNewExtent) { rc = extendColumnOldExtent(dbRootNext, partitionNext, segmentNext, hwmNext); if (rc != NO_ERROR) return rc; } } // Once we are back on a "normal" full extent boundary, we add a new extent // to resume adding rows. rc = extendColumnNewExtent(saveLBIDForCP, dbRootNext, partitionNext); return rc; } //------------------------------------------------------------------------------ // Add a new extent to this column, at the specified DBRoot. Partition may be // used if DBRoot is empty. //------------------------------------------------------------------------------ int ColumnInfo::extendColumnNewExtent(bool saveLBIDForCP, uint16_t dbRootNew, uint32_t partitionNew) { //..Declare variables used to advance to the next extent IDBDataFile* pFileNew = 0; HWM hwmNew = 0; bool newFile = false; std::string segFileNew; uint16_t segmentNew = 0; BRM::LBID_t startLbid; char hdr[compress::CompressInterface::HDR_BUF_LEN * 2]; // Extend the column by adding an extent to the next // DBRoot, partition, and segment file in the rotation int allocsize = 0; std::string allocErrMsg; int rc = fpTableInfo->allocateBRMColumnExtent(curCol.dataFile.fid, dbRootNew, partitionNew, segmentNew, startLbid, allocsize, hwmNew, allocErrMsg); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "extendColumnNewExtent: error creating BRM extent after " << "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment; oss << "; newDBRoot-" << dbRootNew << "; newpart-" << partitionNew << "; " << ec.errorString(rc) << "; " << allocErrMsg; fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL); fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str()); return rc; } rc = colOp->extendColumn(curCol, true, // leave file open hwmNew, startLbid, allocsize, dbRootNew, partitionNew, segmentNew, segFileNew, pFileNew, newFile, hdr); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "extendColumnNewExtent: error adding file extent after " << "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment; oss << "; newDBRoot-" << dbRootNew << "; newpart-" << partitionNew << "; newseg-" << segmentNew << "; fbo-" << hwmNew << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL); fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str()); if (pFileNew) colOp->closeFile(pFileNew); // clean up loose ends return rc; } std::ostringstream oss; oss << "Add column extent OID-" << curCol.dataFile.fid << "; DBRoot-" << dbRootNew << "; part-" << partitionNew << "; seg-" << segmentNew << "; hwm-" << hwmNew << "; LBID-" << startLbid << "; file-" << segFileNew; fLog->logMsg(oss.str(), MSGLVL_INFO2); // Update lbid. fLastUpdatedLbid = startLbid; // Save the LBID with our CP extent info, so that we can update extent map if (saveLBIDForCP) { int rcLBID = fColExtInf->updateEntryLbid(startLbid); // If error occurs, we log WARNING, but we don't fail the job. if (rcLBID != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "updateEntryLbid failed for OID-" << curCol.dataFile.fid << "; LBID-" << startLbid << "; CasualPartition info may become invalid; " << ec.errorString(rcLBID); fLog->logMsg(oss.str(), rcLBID, MSGLVL_WARNING); } } //..Reset data members to reflect where we are in the newly // opened column segment file. The file may be a new file, or we may // be adding an extent to an existing column segment file. curCol.dataFile.hwm = hwmNew; curCol.dataFile.pFile = pFileNew; curCol.dataFile.fPartition = partitionNew; curCol.dataFile.fSegment = segmentNew; curCol.dataFile.fDbRoot = dbRootNew; curCol.dataFile.fSegFileName = segFileNew; rc = resetFileOffsetsNewExtent(hdr); if (rc != NO_ERROR) { std::ostringstream oss; oss << "extendColumnNewExtent: error moving to new extent in " << "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; hwm-" << curCol.dataFile.hwm; fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); if (pFileNew) closeColumnFile(false, true); // clean up loose ends return rc; } if (fLog->isDebug(DEBUG_1)) { std::ostringstream oss2; oss2 << "Extent added to column OID-" << curCol.dataFile.fid << "; DBRoot-" << dbRootNew << "; part-" << partitionNew << "; seg-" << segmentNew << "; begByte-" << fSizeWritten << "; endByte-" << fileSize << "; freeBytes-" << availFileSize; fLog->logMsg(oss2.str(), MSGLVL_INFO2); } return NO_ERROR; } //------------------------------------------------------------------------------ // Fill out existing partial extent to extent boundary, so that we can resume // inserting rows on an extent boundary basis. This use case should only take // place when a DBRoot with a partial extent has been moved from one PM to // another. //------------------------------------------------------------------------------ int ColumnInfo::extendColumnOldExtent(uint16_t dbRootNext, uint32_t partitionNext, uint16_t segmentNext, HWM hwmNext) { const unsigned int BLKS_PER_EXTENT = (fRowsPerExtent * column.width) / BYTE_PER_BLOCK; HWM hwmNextExtentBoundary = hwmNext; // Round up HWM to the end of the current extent unsigned int nBlks = hwmNext + 1; unsigned int nRem = nBlks % BLKS_PER_EXTENT; if (nRem > 0) hwmNextExtentBoundary = nBlks - nRem + BLKS_PER_EXTENT - 1; else hwmNextExtentBoundary = nBlks - 1; std::ostringstream oss; oss << "Padding partial extent to extent boundary in OID-" << curCol.dataFile.fid << "; DBRoot-" << dbRootNext << "; part-" << partitionNext << "; seg-" << segmentNext << "; oldhwm-" << hwmNext << "; newhwm-" << hwmNextExtentBoundary; fLog->logMsg(oss.str(), MSGLVL_INFO2); long long fileSizeBytes; int rc = colOp->getFileSize(curCol.dataFile.fid, dbRootNext, partitionNext, segmentNext, fileSizeBytes); if (rc != NO_ERROR) { std::ostringstream oss; oss << "extendColumnOldExtent: error padding partial extent for " << "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; hwm-" << curCol.dataFile.hwm; fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } curCol.dataFile.pFile = 0; curCol.dataFile.fDbRoot = dbRootNext; curCol.dataFile.fPartition = partitionNext; curCol.dataFile.fSegment = segmentNext; curCol.dataFile.hwm = hwmNextExtentBoundary; curCol.dataFile.fSegFileName.clear(); // See if we have an abbreviated extent that needs to be expanded on disk if (fileSizeBytes == (long long)INITIAL_EXTENT_ROWS_TO_DISK * column.width) { std::string segFile; // @bug 5572 - HDFS usage: incorporate *.tmp file backup flag IDBDataFile* pFile = colOp->openFile(curCol, dbRootNext, partitionNext, segmentNext, segFile, true); if (!pFile) { std::ostringstream oss; rc = ERR_FILE_OPEN; oss << "extendColumnOldExtent: error padding partial extent for " << "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; hwm-" << curCol.dataFile.hwm; fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } rc = colOp->expandAbbrevColumnExtent(pFile, dbRootNext, column.emptyVal, column.width, column.dataType); if (rc != NO_ERROR) { std::ostringstream oss; oss << "extendColumnOldExtent: error padding partial extent for " << "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; hwm-" << curCol.dataFile.hwm; fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL); fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str()); colOp->closeFile(pFile); return rc; } colOp->closeFile(pFile); } addToSegFileList(curCol.dataFile, hwmNextExtentBoundary); return NO_ERROR; } //------------------------------------------------------------------------------ // Either add or update the File object, so that it has the updated HWM. // We will access this info to update the HWM in the ExtentMap at the end // of the import. // dmc-could optimize later by changing fSegFileUpdateList from a vector // to a map or hashtable with a key consisting of partition and segment. //------------------------------------------------------------------------------ void ColumnInfo::addToSegFileList(File& dataFile, HWM hwm) { bool foundFlag = false; for (unsigned int i = 0; i < fSegFileUpdateList.size(); i++) { if ((fSegFileUpdateList[i].fPartition == dataFile.fPartition) && (fSegFileUpdateList[i].fSegment == dataFile.fSegment)) { if (fLog->isDebug(DEBUG_1)) { std::ostringstream oss3; oss3 << "Updating HWM list" "; column OID-" << dataFile.fid << "; DBRoot-" << dataFile.fDbRoot << "; part-" << dataFile.fPartition << "; seg-" << dataFile.fSegment << "; oldhwm-" << fSegFileUpdateList[i].hwm << "; newhwm-" << hwm; fLog->logMsg(oss3.str(), MSGLVL_INFO2); } fSegFileUpdateList[i].hwm = hwm; foundFlag = true; break; } } if (!foundFlag) { if (fLog->isDebug(DEBUG_1)) { std::ostringstream oss3; oss3 << "Adding to HWM list" << "; column OID-" << dataFile.fid << "; DBRoot-" << dataFile.fDbRoot << "; part-" << dataFile.fPartition << "; seg-" << dataFile.fSegment << "; hwm-" << hwm; fLog->logMsg(oss3.str(), MSGLVL_INFO2); } dataFile.hwm = hwm; fSegFileUpdateList.push_back(dataFile); } } //------------------------------------------------------------------------------ // Reset file offset data member attributes when we start working on the next // extent. //------------------------------------------------------------------------------ int ColumnInfo::resetFileOffsetsNewExtent(const char* /*hdr*/) { setFileSize(curCol.dataFile.hwm, false); long long byteOffset = (long long)curCol.dataFile.hwm * (long long)BYTE_PER_BLOCK; fSizeWritten = byteOffset; fSizeWrittenStart = fSizeWritten; availFileSize = fileSize - fSizeWritten; // If we are adding an extent as part of preliminary block skipping, then // we won't have a ColumnBufferManager object yet, but that's okay, because // we are only adding the empty extent at this point. if (fColBufferMgr) { RETURN_ON_ERROR(fColBufferMgr->setDbFile(curCol.dataFile.pFile, curCol.dataFile.hwm, 0)); RETURN_ON_ERROR(colOp->setFileOffset(curCol.dataFile.pFile, byteOffset)); } return NO_ERROR; } //------------------------------------------------------------------------------ // Set current size of file in raw (uncompressed) bytes, given the specified // hwm. abbrevFlag indicates whether this is a fixed size abbreviated extent. // For unabbreviated extents the "logical" file size is calculated by rounding // the hwm up to the nearest multiple of the extent size. //------------------------------------------------------------------------------ void ColumnInfo::setFileSize(HWM hwm, int abbrevFlag) { // Must be an abbreviated extent if there is only 1 compressed chunk in // the db file. Even a 1-byte column would have 2 4MB chunks for an 8M // row column extent. if (abbrevFlag) { fileSize = (INITIAL_EXTENT_ROWS_TO_DISK * curCol.colWidth); } else { const unsigned int ROWS_PER_EXTENT = fRowsPerExtent; long long nRows = ((long long)(hwm + 1) * (long long)BYTE_PER_BLOCK) / (long long)curCol.colWidth; long long nRem = nRows % ROWS_PER_EXTENT; if (nRem == 0) { fileSize = nRows * curCol.colWidth; } else { fileSize = (nRows - nRem + ROWS_PER_EXTENT) * curCol.colWidth; } } } //------------------------------------------------------------------------------ // If we are dealing with the first extent in the first segment file for this // column, and the segment file is still equal to 256K rows, then we set the // fLoadingAbbreviatedExtent flag. This tells us (later on) that we are dealing // with an abbreviated extent that still needs to be expanded and filled, before // we start adding new extents. //------------------------------------------------------------------------------ void ColumnInfo::setAbbrevExtentCheck() { // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? if ((curCol.dataFile.fPartition == 0) && (curCol.dataFile.fSegment == 0)) { if (fileSize == (INITIAL_EXTENT_ROWS_TO_DISK * curCol.colWidth)) { fLoadingAbbreviatedExtent = true; if (fLog->isDebug(DEBUG_1)) { std::ostringstream oss; oss << "Importing into abbreviated extent, column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; fileSize-" << fileSize; fLog->logMsg(oss.str(), MSGLVL_INFO2); } } } } //------------------------------------------------------------------------------ // If this is an abbreviated extent, we expand the extent to a full extent on // disk, by initializing the necessary number of remaining blocks. // bRetainFilePos flag controls whether the current file position is retained // upon return from this function; else the file will be positioned at the end // of the file. //------------------------------------------------------------------------------ int ColumnInfo::expandAbbrevExtent(bool bRetainFilePos) { if (fLoadingAbbreviatedExtent) { off64_t oldOffset = 0; if (bRetainFilePos) { oldOffset = curCol.dataFile.pFile->tell(); } colOp->setFileOffset(curCol.dataFile.pFile, 0, SEEK_END); std::ostringstream oss; oss << "Expanding first extent to column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; file-" << curCol.dataFile.fSegFileName; fLog->logMsg(oss.str(), MSGLVL_INFO2); int rc = colOp->expandAbbrevExtent(curCol); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "expandAbbrevExtent: error expanding extent for " << "OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL); fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str()); return rc; } // Update available file size to reflect disk space added by expanding // the extent. long long fileSizeBeforeExpand = fileSize; setFileSize((fileSizeBeforeExpand / BYTE_PER_BLOCK), false); availFileSize += (fileSize - fileSizeBeforeExpand); // Restore offset back to where we were before expanding the extent if (bRetainFilePos) { rc = colOp->setFileOffset(curCol.dataFile.pFile, oldOffset, SEEK_SET); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "expandAbbrevExtent: error seeking to new extent for " << "OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL); fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str()); return rc; } } // We only use abbreviated extents for the very first extent. So after // expanding a col's abbreviated extent, we should disable this check. fLoadingAbbreviatedExtent = false; } return NO_ERROR; } //------------------------------------------------------------------------------ // Close the current Column file. //------------------------------------------------------------------------------ int ColumnInfo::closeColumnFile(bool /*bCompletingExtent*/, bool /*bAbort*/) { if (curCol.dataFile.pFile) { colOp->closeFile(curCol.dataFile.pFile); curCol.dataFile.pFile = 0; } return NO_ERROR; } //------------------------------------------------------------------------------ // Initialize fLastInputRowInCurrentExtent used in detecting when a Read Buffer // is crossing an extent boundary, so that we can accurately track the min/max // for each extent as the Read buffers are parsed. //------------------------------------------------------------------------------ void ColumnInfo::lastInputRowInExtentInit(bool bIsNewExtent) { // Reworked initial block skipping for compression: const unsigned int ROWS_PER_EXTENT = fRowsPerExtent; RID numRowsLeftInExtent = 0; RID numRowsWritten = fSizeWritten / curCol.colWidth; if ((numRowsWritten % ROWS_PER_EXTENT) != 0) numRowsLeftInExtent = ROWS_PER_EXTENT - (numRowsWritten % ROWS_PER_EXTENT); bool bRoomToAddToOriginalExtent = true; if (fSizeWritten > 0) { // Handle edge case; if numRowsLeftInExtent comes out to be 0, then // current extent is full. In this case we first bump up row count // by a full extent before we subtract by 1 to get the last row number // in extent. if (numRowsLeftInExtent == 0) { numRowsLeftInExtent = ROWS_PER_EXTENT; ; bRoomToAddToOriginalExtent = false; } } else { // Starting new file with empty extent, so set row count to full extent numRowsLeftInExtent = ROWS_PER_EXTENT; } fLastInputRowInCurrentExtent = numRowsLeftInExtent - 1; // If we have a pre-existing extent that we are going to add rows to, // then we need to add that extent to our ColExtInf object, so that we // can update the CP min/max at the end of the bulk load job. if (bRoomToAddToOriginalExtent) { fColExtInf->addFirstEntry(fLastInputRowInCurrentExtent, fSavedLbid, bIsNewExtent); } } //------------------------------------------------------------------------------ // Increment fLastRIDInExtent to the end of the next extent. //------------------------------------------------------------------------------ void ColumnInfo::lastInputRowInExtentInc() { fLastInputRowInCurrentExtent += fRowsPerExtent; } //------------------------------------------------------------------------------ // Parsing is complete for this column. Flush pending data. Close the current // segment file, and corresponding dictionary store file (if applicable). Also // clears memory taken up by this ColumnInfo object. //------------------------------------------------------------------------------ int ColumnInfo::finishParsing() { int rc = NO_ERROR; // Close the dctnry file handle. if (fStore) { rc = closeDctnryStore(false); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "finishParsing: close dictionary file error with column " << column.colName << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } } // We don't need the mutex to protect against concurrent access by other // threads, since by the time we get to this point, this is the last // thread working on this column. But, we use the mutex to insure that // we see the latest state that may have been set by another parsing thread // working with the same column. boost::mutex::scoped_lock lock(fColMutex); // Force the flushing of remaining data in the output buffer if (fColBufferMgr) { rc = fColBufferMgr->flush(); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "finishParsing: flush error with column " << column.colName << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } } // Close the column file rc = closeColumnFile(false, false); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "finishParsing: close column file error with column " << column.colName << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } clearMemory(); return NO_ERROR; } //------------------------------------------------------------------------------ // Store updated column information in BRMReporter for this column at EOJ; // so that Extent Map CP information and HWM's can be updated. // Bug2117-Src code from this function was factored over from we_tableinfo.cpp. // // We use mutex because this function is called by "one" of the parsing threads // when parsing is complete for all the columns from this column's table. // We use the mutex to insure that this parsing thread, which ends up being // responsible for updating BRM for this column, is getting the most up to // date values in fSegFileUpdateList, fSizeWritten, etc which may have been // set by another parsing thread. //------------------------------------------------------------------------------ void ColumnInfo::getBRMUpdateInfo(BRMReporter& brmReporter) { boost::mutex::scoped_lock lock(fColMutex); // Useful for debugging // printCPInfo(column); int entriesAdded = getHWMInfoForBRM(brmReporter); // If we added any rows (HWM update count > 0), then update corresponding CP if (entriesAdded > 0) getCPInfoForBRM(brmReporter); } //------------------------------------------------------------------------------ // Get updated Casual Partition (CP) information for BRM for this column at EOJ. //------------------------------------------------------------------------------ void ColumnInfo::getCPInfoForBRM(BRMReporter& brmReporter) { fColExtInf->getCPInfoForBRM(column, brmReporter); } //------------------------------------------------------------------------------ // Get updated HWM information for BRM for this column at EOJ. // Returns count of the number of HWM entries added to the BRMReporter. //------------------------------------------------------------------------------ int ColumnInfo::getHWMInfoForBRM(BRMReporter& brmReporter) { //..If we wrote out any data to the last segment file, then // update HWM for the current (last) segment file we were writing to. // Bug1374 - Update HWM when data added to file if (fSizeWritten > fSizeWrittenStart) { // Bug1372. HWM hwm = (fSizeWritten - 1) / BYTE_PER_BLOCK; addToSegFileList(curCol.dataFile, hwm); } int entriesAdded = 0; //..Update HWM for each segment file we touched, including the last one for (unsigned int iseg = 0; iseg < fSegFileUpdateList.size(); iseg++) { // Log for now; may control with debug flag later // if (fLog->isDebug( DEBUG_1 )) { std::ostringstream oss; oss << "Saving HWM update for OID-" << fSegFileUpdateList[iseg].fid << "; hwm-" << fSegFileUpdateList[iseg].hwm << "; DBRoot-" << fSegFileUpdateList[iseg].fDbRoot << "; partition-" << fSegFileUpdateList[iseg].fPartition << "; segment-" << fSegFileUpdateList[iseg].fSegment; fLog->logMsg(oss.str(), MSGLVL_INFO2); } BRM::BulkSetHWMArg hwmArg; hwmArg.oid = fSegFileUpdateList[iseg].fid; hwmArg.partNum = fSegFileUpdateList[iseg].fPartition; hwmArg.segNum = fSegFileUpdateList[iseg].fSegment; hwmArg.hwm = fSegFileUpdateList[iseg].hwm; brmReporter.addToHWMInfo(hwmArg); // Save list of modified db column files BRM::FileInfo aFile; aFile.oid = fSegFileUpdateList[iseg].fid; aFile.partitionNum = fSegFileUpdateList[iseg].fPartition; aFile.segmentNum = fSegFileUpdateList[iseg].fSegment; aFile.dbRoot = fSegFileUpdateList[iseg].fDbRoot; aFile.compType = curCol.compressionType; brmReporter.addToFileInfo(aFile); // Save list of corresponding modified db dictionary store files if (column.colType == COL_TYPE_DICT) { BRM::FileInfo dFile; dFile.oid = column.dctnry.dctnryOid; dFile.partitionNum = fSegFileUpdateList[iseg].fPartition; dFile.segmentNum = fSegFileUpdateList[iseg].fSegment; dFile.dbRoot = fSegFileUpdateList[iseg].fDbRoot; dFile.compType = curCol.compressionType; brmReporter.addToDctnryFileInfo(dFile); } entriesAdded++; } fSegFileUpdateList.clear(); // don't need vector anymore, so release memory return entriesAdded; } // Returns last updated LBID. BRM::LBID_t ColumnInfo::getLastUpdatedLBID() const { return fLastUpdatedLbid; } //------------------------------------------------------------------------------ // Setup initial extent we will begin loading at start of import. // DBRoot, partition, segment, etc for the starting extent are specified. // If block skipping is causing us to advance to the next extent, then we // set things up to point to the last block in the current extent. When we // start adding rows, we will automatically advance to the next extent. //------------------------------------------------------------------------------ int ColumnInfo::setupInitialColumnExtent(uint16_t dbRoot, // dbroot of starting extent uint32_t partition, // partition number of starting extent uint16_t segment, // segment number of starting extent const std::string& tblName, // name of table containing this column BRM::LBID_t lbid, // starting LBID for starting extent HWM oldHwm, // original HWM HWM hwm, // new projected HWM after block skipping bool bSkippedToNewExtent, // blk skipping to next extent bool bIsNewExtent) // treat as new extent (for CP updates) { // Init the ColumnInfo object colOp->initColumn(curCol); colOp->setColParam(curCol, id, column.width, column.dataType, column.weType, column.mapOid, column.compressionType, dbRoot, partition, segment); colOp->findTypeHandler(column.width, column.dataType); // Open the column file if (!colOp->exists(column.mapOid, dbRoot, partition, segment)) { std::ostringstream oss; oss << "Column file does not exist for OID-" << column.mapOid << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment; fLog->logMsg(oss.str(), ERR_FILE_NOT_EXIST, MSGLVL_ERROR); return ERR_FILE_NOT_EXIST; } std::string segFile; bool useTmpSuffix = false; if (!bIsNewExtent) useTmpSuffix = true; // @bug 5572 - HDFS usage: incorporate *.tmp file backup flag int rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error opening column file for OID-" << column.mapOid << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; filename-" << segFile << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR); return ERR_FILE_OPEN; } std::ostringstream oss1; oss1 << "Initializing import: " << "Table-" << tblName << "; Col-" << column.colName; if (curCol.compressionType) oss1 << " (compressed)"; oss1 << "; OID-" << column.mapOid << "; hwm-" << hwm; if (bSkippedToNewExtent) oss1 << " (full; load into next extent)"; oss1 << "; file-" << curCol.dataFile.fSegFileName; fLog->logMsg(oss1.str(), MSGLVL_INFO2); if (column.colType == COL_TYPE_DICT) { RETURN_ON_ERROR(openDctnryStore(true)); } fSavedLbid = lbid; fLastUpdatedLbid = lbid; if (bSkippedToNewExtent) oldHwm = hwm; rc = setupInitialColumnFile(oldHwm, hwm); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error reading/positioning column file for OID-" << column.mapOid << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; filename-" << segFile << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } // Reworked initial block skipping for compression: // Block skipping is causing us to wrap up this extent. We consider // the current extent to be full, so we "pretend" to fill out the // last block by adding 8192 bytes to the bytes written count. // This will help trigger the addition of a new extent when we // try to store the first section of rows to the db. if (bSkippedToNewExtent) { updateBytesWrittenCounts(BYTE_PER_BLOCK); fSizeWrittenStart = fSizeWritten; } // Reworked initial block skipping for compression: // This initializes CP stats for first extent regardless of whether // we end up adding rows to this extent, or initial block skipping // ultimately causes us to start with a new extent. lastInputRowInExtentInit(bIsNewExtent); return NO_ERROR; } //------------------------------------------------------------------------------ // Prepare the initial column segment file for import. //------------------------------------------------------------------------------ int ColumnInfo::setupInitialColumnFile(HWM oldHwm, HWM hwm) { // Initialize the output buffer manager for the column. if (column.colType == COL_TYPE_DICT) { fColBufferMgr = new ColumnBufferManagerDctnry(this, 8, fLog, 0); } else { fColBufferMgr = new ColumnBufferManager(this, column.width, fLog, 0); } RETURN_ON_ERROR(fColBufferMgr->setDbFile(curCol.dataFile.pFile, hwm, 0)); RETURN_ON_ERROR(colOp->getFileSize(curCol.dataFile.pFile, fileSize)); // See if dealing with abbreviated extent that will need expanding. // This only applies to the first extent of the first segment file. setAbbrevExtentCheck(); // If we are dealing with initial extent, see if block skipping has // exceeded disk allocation, in which case we expand to a full extent. if (isAbbrevExtent()) { unsigned int numBlksForFirstExtent = (INITIAL_EXTENT_ROWS_TO_DISK * column.width) / BYTE_PER_BLOCK; if (((oldHwm + 1) <= numBlksForFirstExtent) && ((hwm + 1) > numBlksForFirstExtent)) { RETURN_ON_ERROR(expandAbbrevExtent(false)); } } // Seek till the HWM lbid. // Store the current allocated file size in availFileSize. long long byteOffset = (long long)hwm * (long long)BYTE_PER_BLOCK; RETURN_ON_ERROR(colOp->setFileOffset(curCol.dataFile.pFile, byteOffset)); fSizeWritten = byteOffset; fSizeWrittenStart = fSizeWritten; availFileSize = fileSize - fSizeWritten; if (fLog->isDebug(DEBUG_1)) { std::ostringstream oss; oss << "Init raw data offsets in column file OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; begByte-" << fSizeWritten << "; endByte-" << fileSize << "; freeBytes-" << availFileSize; fLog->logMsg(oss.str(), MSGLVL_INFO2); } return NO_ERROR; } //------------------------------------------------------------------------------ // Update the number of bytes in the file, and the free space still remaining. //------------------------------------------------------------------------------ void ColumnInfo::updateBytesWrittenCounts(unsigned int numBytesWritten) { availFileSize = availFileSize - numBytesWritten; fSizeWritten = fSizeWritten + numBytesWritten; } //------------------------------------------------------------------------------ // Tell whether the current column segment file being managed by ColumnInfo, // has filled up all its extents with data. //------------------------------------------------------------------------------ bool ColumnInfo::isFileComplete() const { if ((fSizeWritten / column.width) >= fMaxNumRowsPerSegFile) return true; return false; } //------------------------------------------------------------------------------ // Initialize last used auto-increment value from the current "next" // auto-increment value taken from the system catalog (or BRM). //------------------------------------------------------------------------------ int ColumnInfo::initAutoInc(const std::string& fullTableName) { int rc = fAutoIncMgr->init(fullTableName, this); return rc; } //------------------------------------------------------------------------------ // Reserves the requested number of auto-increment numbers (autoIncCount). // The starting value of the reserved block of numbers is returned in nextValue. //------------------------------------------------------------------------------ int ColumnInfo::reserveAutoIncNums(uint32_t autoIncCount, uint64_t& nextValue) { int rc = fAutoIncMgr->reserveNextRange(autoIncCount, nextValue); return rc; } //------------------------------------------------------------------------------ // Finished using auto-increment. Current value can be committed back to the // system catalog (or BRM). //------------------------------------------------------------------------------ int ColumnInfo::finishAutoInc() { int rc = fAutoIncMgr->finish(); return rc; } //------------------------------------------------------------------------------ // Get current dbroot, partition, segment, and HWM for this column. // // We use mutex because this function is called by "one" of the parsing threads // when parsing is complete for all the columns from this column's table. // We use the mutex to insure that this parsing thread, which ends up being // responsible for wrapping up this column, is getting the most up to // date values for dbroot, partition, segment, and HWM which may have been // set by another parsing thread. //------------------------------------------------------------------------------ void ColumnInfo::getSegFileInfo(DBRootExtentInfo& fileInfo) { boost::mutex::scoped_lock lock(fColMutex); fileInfo.fDbRoot = curCol.dataFile.fDbRoot; fileInfo.fPartition = curCol.dataFile.fPartition; fileInfo.fSegment = curCol.dataFile.fSegment; if (fSizeWritten > 0) fileInfo.fLocalHwm = (fSizeWritten - 1) / BYTE_PER_BLOCK; else fileInfo.fLocalHwm = 0; } //------------------------------------------------------------------------------ // Open a new or existing Dictionary store file based on the DBRoot, // partition, and segment settings in curCol.dataFile. //------------------------------------------------------------------------------ int ColumnInfo::openDctnryStore(bool bMustExist) { int rc = NO_ERROR; if (column.dctnry.fCompressionType != 0) { DctnryCompress1* dctnryCompress1 = new DctnryCompress1(column.dctnry.fCompressionType); dctnryCompress1->setMaxActiveChunkNum(1); dctnryCompress1->setBulkFlag(true); fStore = dctnryCompress1; } else { fStore = new DctnryCompress0; } fStore->setLogger(fLog); fStore->setColWidth(column.dctnryWidth); fStore->setUIDGID(this); if (column.fWithDefault) fStore->setDefault(column.fDefaultChr); fStore->setImportDataMode(fpTableInfo->getImportDataMode()); // If we are in the process of adding an extent to this column, // and the extent we are adding is the first extent for the // relevant column segment file, then the corresponding dictionary // store file will not exist, in which case we must create // the store file, else we open the applicable store file. if ((bMustExist) || (colOp->exists(column.dctnry.dctnryOid, curCol.dataFile.fDbRoot, curCol.dataFile.fPartition, curCol.dataFile.fSegment))) { // Save HWM chunk (for compressed files) if this seg file calls for it // @bug 5572 - HDFS usage: incorporate *.tmp file backup flag bool useTmpSuffixDctnry = false; RETURN_ON_ERROR(saveDctnryStoreHWMChunk(useTmpSuffixDctnry)); // @bug 5572 - HDFS usage: incorporate *.tmp file backup flag rc = fStore->openDctnry(column.dctnry.dctnryOid, curCol.dataFile.fDbRoot, curCol.dataFile.fPartition, curCol.dataFile.fSegment, useTmpSuffixDctnry); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "openDctnryStore: error opening existing store file for " << "OID-" << column.dctnry.dctnryOid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; tmpFlag-" << useTmpSuffixDctnry << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); // Ignore return code from closing file; already in error state closeDctnryStore(true); // clean up loose ends return rc; } if (INVALID_LBID != fStore->getCurLbid()) fDictBlocks.push_back(fStore->getCurLbid()); std::ostringstream oss; oss << "Opening existing store file for " << column.colName << "; OID-" << column.dctnry.dctnryOid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; hwm-" << fStore->getHWM() << "; file-" << fStore->getFileName(); fLog->logMsg(oss.str(), MSGLVL_INFO2); } else { BRM::LBID_t startLbid; rc = fStore->createDctnry(column.dctnry.dctnryOid, column.dctnryWidth, //@bug 3313 - pass string col width curCol.dataFile.fDbRoot, curCol.dataFile.fPartition, curCol.dataFile.fSegment, startLbid); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "openDctnryStore: error creating new store file for " << "OID-" << column.dctnry.dctnryOid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL); fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str()); // Ignore return code from closing file; already in error state closeDctnryStore(true); // clean up loose ends return rc; } rc = fStore->openDctnry(column.dctnry.dctnryOid, curCol.dataFile.fDbRoot, curCol.dataFile.fPartition, curCol.dataFile.fSegment, false); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "openDctnryStore: error opening new store file for " << "OID-" << column.dctnry.dctnryOid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); // Ignore return code from closing file; already in error state closeDctnryStore(true); // clean up loose ends return rc; } std::ostringstream oss; oss << "Opening new store file for " << column.colName << "; OID-" << column.dctnry.dctnryOid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; file-" << fStore->getFileName(); fLog->logMsg(oss.str(), MSGLVL_INFO2); } return rc; } //------------------------------------------------------------------------------ // Close the current Dictionary store file. //------------------------------------------------------------------------------ int ColumnInfo::closeDctnryStore(bool bAbort) { int rc = NO_ERROR; if (fStore) { if (bAbort) rc = fStore->closeDctnryOnly(); else rc = fStore->closeDctnry(); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "closeDctnryStore: error closing store file for " << "OID-" << column.dctnry.dctnryOid << "; file-" << fStore->getFileName() << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); } delete fStore; fStore = 0; } return rc; } //------------------------------------------------------------------------------ // Update dictionary store file with specified strings, and return the assigned // tokens (tokenbuf) to be stored in the corresponding column token file. //------------------------------------------------------------------------------ int ColumnInfo::updateDctnryStore(char* buf, ColPosPair** pos, const int totalRow, char* tokenBuf) { long long truncCount = 0; // No. of rows with truncated values // If this is a VARBINARY column; convert the ascii hex string into binary // data and fix the length (it's now only half as long). // Should be safe to modify pos and buf arrays outside a mutex, as no other // thread should be accessing the strings from the same buffer, for this // column. // This only applies to default text mode. This step is bypassed for // binary imports, because in that case, the data is already true binary. if (((curCol.colType == WR_VARBINARY) || (curCol.colType == WR_BLOB && fpTableInfo->readFromSTDIN())) && (fpTableInfo->getImportDataMode() == IMPORT_DATA_TEXT)) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_COMPACT_VARBINARY); #endif for (int i = 0; i < totalRow; i++) { pos[i][id].offset = compactVarBinary(buf + pos[i][id].start, pos[i][id].offset); } #ifdef PROFILE Stats::startParseEvent(WE_STATS_COMPACT_VARBINARY); #endif } #ifdef PROFILE Stats::startParseEvent(WE_STATS_WAIT_TO_PARSE_DCT); #endif boost::mutex::scoped_lock lock(fDictionaryMutex); #ifdef PROFILE Stats::stopParseEvent(WE_STATS_WAIT_TO_PARSE_DCT); #endif int rc = fStore->insertDctnry(buf, pos, totalRow, id, tokenBuf, truncCount, column.cs, column.weType); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "updateDctnryStore: error adding rows to store file for " << "OID-" << column.dctnry.dctnryOid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL); fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str()); return rc; } incSaturatedCnt(truncCount); return NO_ERROR; } //------------------------------------------------------------------------------ // No action necessary for uncompressed dictionary files //------------------------------------------------------------------------------ // @bug 5572 - HDFS usage: add flag used to control *.tmp file usage int ColumnInfo::saveDctnryStoreHWMChunk(bool& needBackup) { needBackup = false; return NO_ERROR; } //------------------------------------------------------------------------------ // Truncate specified dictionary store file for this column. // Only applies to compressed columns. //------------------------------------------------------------------------------ int ColumnInfo::truncateDctnryStore(OID /*dctnryOid*/, uint16_t /*root*/, uint32_t /*pNum*/, uint16_t /*sNum*/) const { return NO_ERROR; } //------------------------------------------------------------------------------ // utility to convert a Status enumeration to a string //------------------------------------------------------------------------------ /* static */ void ColumnInfo::convertStatusToString(WriteEngine::Status status, std::string& statusString) { static std::string statusStringParseComplete("PARSE_COMPLETE"); static std::string statusStringReadComplete("READ_COMPLETE"); static std::string statusStringReadProgress("READ_PROGRESS"); static std::string statusStringNew("NEW"); static std::string statusStringErr("ERR"); static std::string statusStringUnknown("OTHER"); switch (status) { case PARSE_COMPLETE: statusString = statusStringParseComplete; break; case READ_COMPLETE: statusString = statusStringReadComplete; break; case READ_PROGRESS: statusString = statusStringReadProgress; break; case NEW: statusString = statusStringNew; break; case ERR: statusString = statusStringErr; break; default: statusString = statusStringUnknown; break; } } } // namespace WriteEngine