/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /****************************************************************************** * $Id: we_columninfocompressed.cpp 4726 2013-08-07 03:38:36Z bwilkinson $ * *******************************************************************************/ #include "we_columninfocompressed.h" #include "we_define.h" #include "we_log.h" #include "we_type.h" #include "we_rbmetawriter.h" #include "we_stats.h" #include "we_tableinfo.h" #include "idbcompress.h" using namespace compress; #include "IDBFileSystem.h" #include namespace WriteEngine { //------------------------------------------------------------------------------ // ColumnInfoCompressed constructor //------------------------------------------------------------------------------ ColumnInfoCompressed::ColumnInfoCompressed(Log* logger, int idIn, const JobColumn& columnIn, DBRootExtentTracker* pDBRootExtTrk, TableInfo* pTableInfo) : // RBMetaWriter* rbMetaWriter) : ColumnInfo(logger, idIn, columnIn, pDBRootExtTrk, pTableInfo) , fRBMetaWriter(pTableInfo->rbMetaWriter()) { } //------------------------------------------------------------------------------ // ColumnInfoCompressed destructor //------------------------------------------------------------------------------ ColumnInfoCompressed::~ColumnInfoCompressed() { } //------------------------------------------------------------------------------ // Close the current compressed Column file after first compressing/flushing // any remaining data, and re-writing the headers as well. //------------------------------------------------------------------------------ int ColumnInfoCompressed::closeColumnFile(bool bCompletingExtent, bool bAbort) { int rc = NO_ERROR; if (curCol.dataFile.pFile) { if (!bAbort) { // If we are opening and closing a file in order to add an extent as // part of preliminary block skipping, then we won't have a Column- // BufferManger object yet. One will be created when the file is // reopened to begin importing. if (fColBufferMgr) { rc = fColBufferMgr->finishFile(bCompletingExtent); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error closing compressed file; OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); bAbort = true; } } } ColumnInfo::closeColumnFile(bCompletingExtent, bAbort); } return rc; } //------------------------------------------------------------------------------ // Prepare the initial compressed column segment file for import. //------------------------------------------------------------------------------ int ColumnInfoCompressed::setupInitialColumnFile(HWM oldHwm, HWM hwm) { char hdr[compress::CompressInterface::HDR_BUF_LEN * 2]; RETURN_ON_ERROR(colOp->readHeaders(curCol.dataFile.pFile, hdr)); // Initialize the output buffer manager for the column. WriteEngine::ColumnBufferManager* mgr; if (column.colType == COL_TYPE_DICT) { mgr = new ColumnBufferManagerDctnry(this, 8, fLog, column.compressionType); RETURN_ON_ERROR(mgr->setDbFile(curCol.dataFile.pFile, hwm, hdr)); } else { mgr = new ColumnBufferManager(this, column.width, fLog, column.compressionType); RETURN_ON_ERROR(mgr->setDbFile(curCol.dataFile.pFile, hwm, hdr)); } fColBufferMgr = mgr; int abbrevFlag = (compress::CompressInterface::getBlockCount(hdr) == uint64_t(INITIAL_EXTENT_ROWS_TO_DISK * column.width / BYTE_PER_BLOCK)); setFileSize(hwm, abbrevFlag); // See if dealing with abbreviated extent that will need expanding. // This only applies to the first extent of the first segment file. setAbbrevExtentCheck(); // If we are dealing with initial extent, see if block skipping has // exceeded disk allocation, in which case we expand to a full extent. if (isAbbrevExtent()) { unsigned int numBlksForFirstExtent = (INITIAL_EXTENT_ROWS_TO_DISK * column.width) / BYTE_PER_BLOCK; if (((oldHwm + 1) <= numBlksForFirstExtent) && ((hwm + 1) > numBlksForFirstExtent)) { RETURN_ON_ERROR(expandAbbrevExtent(false)); } } // Store the current allocated file size in availFileSize. // Keep in mind, these are raw uncompressed offsets. // NOTE: We don't call setFileOffset() to set the file position in the // column segment file at this point; we wait till we load the compressed // buffer later on in ColumnBufferCompressed::initToBeCompressedBuffer() long long byteOffset = (long long)hwm * (long long)BYTE_PER_BLOCK; fSizeWritten = byteOffset; fSizeWrittenStart = fSizeWritten; availFileSize = fileSize - fSizeWritten; if (fLog->isDebug(DEBUG_1)) { std::ostringstream oss; oss << "Init raw data offsets in compressed column file OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; abbrev-" << abbrevFlag << "; begByte-" << fSizeWritten << "; endByte-" << fileSize << "; freeBytes-" << availFileSize; fLog->logMsg(oss.str(), MSGLVL_INFO2); } return NO_ERROR; } //------------------------------------------------------------------------------ // Reinitializes ColBuf buffer, and resets // file offset data member attributes where new extent will start. //------------------------------------------------------------------------------ int ColumnInfoCompressed::resetFileOffsetsNewExtent(const char* hdr) { setFileSize(curCol.dataFile.hwm, false); long long byteOffset = (long long)curCol.dataFile.hwm * (long long)BYTE_PER_BLOCK; fSizeWritten = byteOffset; fSizeWrittenStart = fSizeWritten; availFileSize = fileSize - fSizeWritten; // If we are adding an extent as part of preliminary block skipping, then // we won't have a ColumnBufferManager object yet, but that's okay, because // we are only adding the empty extent at this point. if (fColBufferMgr) { RETURN_ON_ERROR(fColBufferMgr->setDbFile(curCol.dataFile.pFile, curCol.dataFile.hwm, hdr)); // Reinitialize ColBuf for the next extent long long startFileOffset; RETURN_ON_ERROR(fColBufferMgr->resetToBeCompressedColBuf(startFileOffset)); // Set the file offset to point to the chunk we are adding or updating RETURN_ON_ERROR(colOp->setFileOffset(curCol.dataFile.pFile, startFileOffset)); } return NO_ERROR; } //------------------------------------------------------------------------------ // Save HWM chunk for compressed dictionary store files, so that the HWM chunk // can be restored by bulk rollback if an error should occur. //------------------------------------------------------------------------------ // @bug 5572 - HDFS usage: add flag used to control *.tmp file usage int ColumnInfoCompressed::saveDctnryStoreHWMChunk(bool& needBackup) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_COMPRESS_DCT_BACKUP_CHUNK); #endif needBackup = false; int rc = NO_ERROR; try { needBackup = fRBMetaWriter->backupDctnryHWMChunk(column.dctnry.dctnryOid, curCol.dataFile.fDbRoot, curCol.dataFile.fPartition, curCol.dataFile.fSegment); } catch (WeException& ex) { fLog->logMsg(ex.what(), ex.errorCode(), MSGLVL_ERROR); rc = ex.errorCode(); } #ifdef PROFILE Stats::stopParseEvent(WE_STATS_COMPRESS_DCT_BACKUP_CHUNK); #endif return rc; } //------------------------------------------------------------------------------ // Truncate specified dictionary store file for this column. // Only applies to compressed columns. // // This function may logically belong in a dictionary related class, but I did // not particularly want to put a bulk import specific function in Dctnry- // Compress1 (a wrapper class shared with DML/DDL) or Dctnry, so I put it here. // May change my mind later. // // dmc-Not the most efficient implementation. We are reopening // the file to perform the truncation, instead of truncating the file before // we close it. This is done because we need to first flush the compressed // chunks before we can determine the truncation file size. But the Chunk- // Manager flushChunks() function immediately closes the file and clears itself // after if flushes the data. So by the time we get back to the application // code it's too late to truncate the file. At some point, we could look at // adding or changing the ChunkManager API to support a flush w/o a close. // That would be more optimum than having to reopen the file for truncation. //------------------------------------------------------------------------------ int ColumnInfoCompressed::truncateDctnryStore(OID dctnryOid, uint16_t root, uint32_t pNum, uint16_t sNum) const { int rc = NO_ERROR; // @bug5769 Don't initialize extents or truncate db files on HDFS if (idbdatafile::IDBPolicy::useHdfs()) { std::ostringstream oss1; oss1 << "Finished writing dictionary file" ": OID-" << dctnryOid << "; DBRoot-" << root << "; part-" << pNum << "; seg-" << sNum; // Have to rework this logging if we want to keep it. // Filesize is not correct when adding data to an "existing" file, // since in the case of HDFS, we are writing to a *.cdf.tmp file. // char dctnryFileName[FILE_NAME_SIZE]; // if (colOp->getFileName(dctnryOid,dctnryFileName, // root, pNum, sNum) == NO_ERROR) //{ // off64_t dctnryFileSize = idbdatafile::IDBFileSystem::getFs( // IDBDataFile::HDFS).size(dctnryFileName); // if (dctnryFileSize != -1) // { // oss1 << "; size-" << dctnryFileSize; // } //} fLog->logMsg(oss1.str(), MSGLVL_INFO2); } else { // See if the relevant dictionary store file can/should be truncated // (to the nearest extent) std::string segFile; IDBDataFile* dFile = fTruncateDctnryFileOp.openFile(dctnryOid, root, pNum, sNum, segFile); if (dFile == 0) { rc = ERR_FILE_OPEN; std::ostringstream oss; oss << "Error opening compressed dictionary store segment " "file for truncation" << ": OID-" << dctnryOid << "; DbRoot-" << root << "; partition-" << pNum << "; segment-" << sNum; fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } char controlHdr[CompressInterface::HDR_BUF_LEN]; rc = fTruncateDctnryFileOp.readFile(dFile, (unsigned char*)controlHdr, CompressInterface::HDR_BUF_LEN); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error reading compressed dictionary store control hdr " "for truncation" << ": OID-" << dctnryOid << "; DbRoot-" << root << "; partition-" << pNum << "; segment-" << sNum << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); fTruncateDctnryFileOp.closeFile(dFile); return rc; } int rc1 = compress::CompressInterface::verifyHdr(controlHdr); if (rc1 != 0) { rc = ERR_COMP_VERIFY_HDRS; WErrorCodes ec; std::ostringstream oss; oss << "Error verifying compressed dictionary store ptr hdr " "for truncation" << ": OID-" << dctnryOid << "; DbRoot-" << root << "; partition-" << pNum << "; segment-" << sNum << "; (" << rc1 << ")"; fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); fTruncateDctnryFileOp.closeFile(dFile); return rc; } // No need to perform file truncation if the dictionary file just contains // a single abbreviated extent. Truncating up to the nearest extent would // actually grow the file (something we don't want to do), because we have // not yet reserved a full extent (on disk) for this dictionary store file. const int PSEUDO_COL_WIDTH = 8; uint64_t numBlocks = compress::CompressInterface::getBlockCount(controlHdr); if (numBlocks == uint64_t(INITIAL_EXTENT_ROWS_TO_DISK * PSEUDO_COL_WIDTH / BYTE_PER_BLOCK)) { std::ostringstream oss1; oss1 << "Skip truncating abbreviated dictionary file" ": OID-" << dctnryOid << "; DBRoot-" << root << "; part-" << pNum << "; seg-" << sNum << "; blocks-" << numBlocks; fLog->logMsg(oss1.str(), MSGLVL_INFO2); fTruncateDctnryFileOp.closeFile(dFile); return NO_ERROR; } uint64_t hdrSize = compress::CompressInterface::getHdrSize(controlHdr); uint64_t ptrHdrSize = hdrSize - CompressInterface::HDR_BUF_LEN; char* pointerHdr = new char[ptrHdrSize]; rc = fTruncateDctnryFileOp.readFile(dFile, (unsigned char*)pointerHdr, ptrHdrSize); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error reading compressed dictionary store pointer hdr " "for truncation" << ": OID-" << dctnryOid << "; DbRoot-" << root << "; partition-" << pNum << "; segment-" << sNum << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); fTruncateDctnryFileOp.closeFile(dFile); delete[] pointerHdr; return rc; } CompChunkPtrList chunkPtrs; rc1 = compress::CompressInterface::getPtrList(pointerHdr, ptrHdrSize, chunkPtrs); delete[] pointerHdr; if (rc1 != 0) { rc = ERR_COMP_PARSE_HDRS; WErrorCodes ec; std::ostringstream oss; oss << "Error parsing compressed dictionary store ptr hdr " "for truncation" << ": OID-" << dctnryOid << "; DbRoot-" << root << "; partition-" << pNum << "; segment-" << sNum << "; (" << rc1 << ")"; fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); fTruncateDctnryFileOp.closeFile(dFile); return rc; } // Truncate the relevant dictionary store file to the nearest extent if (chunkPtrs.size() > 0) { long long dataByteLength = chunkPtrs[chunkPtrs.size() - 1].first + chunkPtrs[chunkPtrs.size() - 1].second - hdrSize; long long extentBytes = fRowsPerExtent * PSEUDO_COL_WIDTH; long long rem = dataByteLength % extentBytes; if (rem > 0) { dataByteLength = dataByteLength - rem + extentBytes; } long long truncateFileSize = dataByteLength + hdrSize; std::ostringstream oss1; oss1 << "Truncating dictionary file" ": OID-" << dctnryOid << "; DBRoot-" << root << "; part-" << pNum << "; seg-" << sNum << "; size-" << truncateFileSize; fLog->logMsg(oss1.str(), MSGLVL_INFO2); if (truncateFileSize > 0) rc = fTruncateDctnryFileOp.truncateFile(dFile, truncateFileSize); else rc = ERR_COMP_TRUNCATE_ZERO; //@bug3913-Catch truncate to 0 bytes if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error truncating compressed dictionary store file" ": OID-" << dctnryOid << "; DbRoot-" << root << "; partition-" << pNum << "; segment-" << sNum << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); fTruncateDctnryFileOp.closeFile(dFile); return rc; } } fTruncateDctnryFileOp.closeFile(dFile); } return NO_ERROR; } //------------------------------------------------------------------------------ // Fill out existing partial extent to extent boundary, so that we can resume // inserting rows on an extent boundary basis. This use case should only take // place when a DBRoot with a partial extent has been moved from one PM to // another. //------------------------------------------------------------------------------ int ColumnInfoCompressed::extendColumnOldExtent(uint16_t dbRootNext, uint32_t partitionNext, uint16_t segmentNext, HWM hwmNextIn) { const unsigned int BLKS_PER_EXTENT = (fRowsPerExtent * column.width) / BYTE_PER_BLOCK; // Round up HWM to the end of the current extent unsigned int nBlks = hwmNextIn + 1; unsigned int nRem = nBlks % BLKS_PER_EXTENT; HWM hwmNext = 0; if (nRem > 0) hwmNext = nBlks - nRem + BLKS_PER_EXTENT - 1; else hwmNext = nBlks - 1; std::ostringstream oss; oss << "Padding compressed partial extent to extent boundary in OID-" << curCol.dataFile.fid << "; DBRoot-" << dbRootNext << "; part-" << partitionNext << "; seg-" << segmentNext << "; hwm-" << hwmNext; fLog->logMsg(oss.str(), MSGLVL_INFO2); curCol.dataFile.pFile = 0; curCol.dataFile.fDbRoot = dbRootNext; curCol.dataFile.fPartition = partitionNext; curCol.dataFile.fSegment = segmentNext; curCol.dataFile.hwm = hwmNext; curCol.dataFile.fSegFileName.clear(); std::string segFileName; std::string errTask; int rc = colOp->fillCompColumnExtentEmptyChunks(curCol.dataFile.fid, curCol.colWidth, column.emptyVal, curCol.dataFile.fDbRoot, curCol.dataFile.fPartition, curCol.dataFile.fSegment, curCol.colDataType, curCol.dataFile.hwm, segFileName, errTask); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "extendColumnOldExtent: error padding extent (" << errTask << "); " << "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; newHwm-" << curCol.dataFile.hwm << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL); fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str()); return rc; } addToSegFileList(curCol.dataFile, hwmNext); return NO_ERROR; } } // namespace WriteEngine