/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /***************************************************************************** * $Id: we_colbufcompressed.cpp 4737 2013-08-14 20:45:46Z bwilkinson $ * ****************************************************************************/ /** @file * Implementation of the ColumnBufferCompressed class * */ #include "we_colbufcompressed.h" #include #include #include #include #include #include #include "we_define.h" #include "we_config.h" #include "we_convertor.h" #include "we_columninfo.h" #include "we_fileop.h" #include "we_log.h" #include "we_stats.h" #include "IDBDataFile.h" using namespace idbdatafile; #include "idbcompress.h" using namespace compress; namespace WriteEngine { //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ ColumnBufferCompressed::ColumnBufferCompressed(ColumnInfo* pColInfo, Log* logger) : ColumnBuffer(pColInfo, logger) , fToBeCompressedBuffer(0) , fToBeCompressedCapacity(0) , fNumBytes(0) , fPreLoadHWMChunk(true) , fFlushedStartHwmChunk(false) { fUserPaddingBytes = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK; compress::initializeCompressorPool(fCompressorPool, fUserPaddingBytes); } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ ColumnBufferCompressed::~ColumnBufferCompressed() { if (fToBeCompressedBuffer) delete[] fToBeCompressedBuffer; fToBeCompressedBuffer = 0; fToBeCompressedCapacity = 0; fNumBytes = 0; } //------------------------------------------------------------------------------ // Reset "this" ColumnBufferCompressed object to read a different file, by // resetting the FILE*, starting HWM, and the chunk pointers. //------------------------------------------------------------------------------ int ColumnBufferCompressed::setDbFile(IDBDataFile* f, HWM startHwm, const char* hdrs) { fFile = f; fStartingHwm = startHwm; if (compress::CompressInterface::getPtrList(hdrs, fChunkPtrs) != 0) { return ERR_COMP_PARSE_HDRS; } // If we have any orphaned chunk pointers (ex: left over after a DML // rollback), that fall after the HWM, then drop those trailing ptrs. unsigned int chunkIndex = 0; unsigned int blockOffsetWithinChunk = 0; auto compressor = compress::getCompressorByType(fCompressorPool, fColInfo->column.compressionType); if (!compressor) { return ERR_COMP_WRONG_COMP_TYPE; } compressor->locateBlock(fStartingHwm, chunkIndex, blockOffsetWithinChunk); if ((chunkIndex + 1) < fChunkPtrs.size()) { fChunkPtrs.resize(chunkIndex + 1); } return NO_ERROR; } //------------------------------------------------------------------------------ // Reinitialize to-be-compressed column buffer (to empty chunk) prior to // importing the first chunk of the next extent. Returns startFileOffset // which indicates file offset (in bytes) where next extent will be starting. //------------------------------------------------------------------------------ int ColumnBufferCompressed::resetToBeCompressedColBuf(long long& startFileOffset) { // Don't load chunk, once we go to next extent fPreLoadHWMChunk = false; // Lazy creation of to-be-compressed buffer if (!fToBeCompressedBuffer) { fToBeCompressedBuffer = new unsigned char[CompressInterface::UNCOMPRESSED_INBUF_LEN]; } BlockOp::setEmptyBuf(fToBeCompressedBuffer, CompressInterface::UNCOMPRESSED_INBUF_LEN, fColInfo->column.emptyVal, fColInfo->column.width); if (fLog->isDebug(DEBUG_2)) { std::ostringstream oss; oss << "Initializing empty chunk for next extent: OID-" << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment << "; hwm-" << fStartingHwm; fLog->logMsg(oss.str(), MSGLVL_INFO2); } fToBeCompressedCapacity = CompressInterface::UNCOMPRESSED_INBUF_LEN; // Set file offset past end of last chunk startFileOffset = CompressInterface::HDR_BUF_LEN * 2; if (fChunkPtrs.size() > 0) startFileOffset = fChunkPtrs[fChunkPtrs.size() - 1].first + fChunkPtrs[fChunkPtrs.size() - 1].second; // Positition ourselves to start of empty to-be-compressed buffer fNumBytes = 0; return NO_ERROR; } //------------------------------------------------------------------------------ // Intercept data being copied from the raw-data output buffer to the output // file, and instead buffer up the data to be compressed in 4M chunks before // writing it out. //------------------------------------------------------------------------------ int ColumnBufferCompressed::writeToFile(int startOffset, int writeSize, bool fillUpWEmpties) { if (writeSize == 0) // skip unnecessary write, if 0 bytes given return NO_ERROR; int fillUpWEmptiesWriteSize = 0; if (fillUpWEmpties) fillUpWEmptiesWriteSize = BYTE_PER_BLOCK - writeSize % BYTE_PER_BLOCK; // If we are starting a new file, we need to reinit the buffer and // find out what our file offset should be set to. if (!fToBeCompressedCapacity) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_COMPRESS_COL_INIT_BUF); #endif long long startFileOffset; int rc = initToBeCompressedBuffer(startFileOffset); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "writeToFile: error initializing to-be-compressed buffer " "for OID " << fColInfo->curCol.dataFile.fid << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } rc = fColInfo->colOp->setFileOffset(fFile, startFileOffset, SEEK_SET); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "writeToFile: error init compressed file offset for " << "OID " << fColInfo->curCol.dataFile.fid << "; " << startFileOffset << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } #ifdef PROFILE Stats::stopParseEvent(WE_STATS_COMPRESS_COL_INIT_BUF); #endif } unsigned char* bufOffset = fToBeCompressedBuffer + fNumBytes; // Expand the compression buffer size if working with an abbrev extent, and // the bytes we are about to add will overflow the abbreviated extent. if ((fToBeCompressedCapacity < CompressInterface::UNCOMPRESSED_INBUF_LEN) && ((fNumBytes + writeSize + fillUpWEmptiesWriteSize) > fToBeCompressedCapacity)) { std::ostringstream oss; oss << "Expanding abbrev to-be-compressed buffer for: OID-" << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment; fLog->logMsg(oss.str(), MSGLVL_INFO2); fToBeCompressedCapacity = CompressInterface::UNCOMPRESSED_INBUF_LEN; } if ((fNumBytes + writeSize + fillUpWEmptiesWriteSize) <= fToBeCompressedCapacity) { if (fLog->isDebug(DEBUG_2)) { std::ostringstream oss; oss << "Buffering data to-be-compressed for: OID-" << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment << "; addBytes-" << writeSize << "; extraBytes-" << fillUpWEmptiesWriteSize << "; totBytes-" << (fNumBytes + writeSize); fLog->logMsg(oss.str(), MSGLVL_INFO2); } memcpy(bufOffset, (fBuffer + startOffset), writeSize); fNumBytes += writeSize; fNumBytes += fillUpWEmptiesWriteSize; } else // Not enough room to add all the data to the to-be-compressed buffer { int startOffsetX = startOffset; int writeSizeX = writeSize; // The number of bytes (in fBuffer) to be written, could be larger than // our to-be-compressed buffer, so we require a loop to potentially // iterate thru all the bytes to be compresssed and written from fBuffer while (writeSizeX > 0) { idbassert((fNumBytes <= fToBeCompressedCapacity)); // DMC-temp debug size_t writeSizeOut = 0; if ((fNumBytes + writeSizeX) > fToBeCompressedCapacity) { writeSizeOut = fToBeCompressedCapacity - fNumBytes; if (fLog->isDebug(DEBUG_2)) { std::ostringstream oss; oss << "Buffering data (full) to-be-compressed for: OID-" << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment << "; addBytes-" << writeSizeOut << "; totBytes-" << (fNumBytes + writeSizeOut); fLog->logMsg(oss.str(), MSGLVL_INFO2); } if (writeSizeOut > 0) { memcpy(bufOffset, (fBuffer + startOffsetX), writeSizeOut); fNumBytes += writeSizeOut; } // char resp; // std::cout << "dbg: before writeToFile->compressAndFlush" << // std::endl; // std::cin >> resp; int rc = compressAndFlush(false); // std::cout << "dbg: after writeToFile->compressAndFlush" << // std::endl; // std::cin >> resp; if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "writeToFile: error compressing and writing chunk " "for OID " << fColInfo->curCol.dataFile.fid << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } // Start over again loading a new to-be-compressed buffer BlockOp::setEmptyBuf(fToBeCompressedBuffer, CompressInterface::UNCOMPRESSED_INBUF_LEN, fColInfo->column.emptyVal, fColInfo->column.width); fToBeCompressedCapacity = CompressInterface::UNCOMPRESSED_INBUF_LEN; bufOffset = fToBeCompressedBuffer; fNumBytes = 0; } else { writeSizeOut = writeSizeX; if (fLog->isDebug(DEBUG_2)) { std::ostringstream oss; oss << "Buffering data (new) to-be-compressed for: OID-" << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment << "; addBytes-" << writeSizeOut << "; totBytes-" << (fNumBytes + writeSizeOut); fLog->logMsg(oss.str(), MSGLVL_INFO2); } memcpy(bufOffset, (fBuffer + startOffsetX), writeSizeOut); fNumBytes += writeSizeOut; fNumBytes += fillUpWEmptiesWriteSize; } startOffsetX += writeSizeOut; writeSizeX -= writeSizeOut; } // end of while loop } return NO_ERROR; } //------------------------------------------------------------------------------ // Compress and write out the data in the to-be-compressed buffer. // Also may write out the compression header. // // bFinishingFile indicates whether we are finished working with this file, // either because we are completing an extent or because we have reached the // end of the input data. In either case, if bFinishingFile is true, then // in addition to flushing the current working chunk to disk, this function // will also write out the updated compression header to match the data. // // This function will also write out the compression header if we are writing // out the first (starting HWM) chunk for this import. We do this to keep the // compression header in sync with the data, in case PrimProc is trying to read // the db file. It is not necessary to immediately update the header for the // remaining chunks as they are written out, because PrimProc will not be try- // ing to access those chunk until we update the extentmap HWM at the end of // this import. It's only the starting HWM chunk that may cause a problem and // requires the immediate rewriting of the header, because we are modifying // that chunk and adding rows to it. //------------------------------------------------------------------------------ int ColumnBufferCompressed::compressAndFlush(bool bFinishingFile) { auto compressor = compress::getCompressorByType(fCompressorPool, fColInfo->column.compressionType); if (!compressor) { return ERR_COMP_WRONG_COMP_TYPE; } const size_t OUTPUT_BUFFER_SIZE = compressor->maxCompressedSize(fToBeCompressedCapacity) + fUserPaddingBytes + // Padded len = len + COMPRESSED_SIZE_INCREMENT_CHUNK - (len % // COMPRESSED_SIZE_INCREMENT_CHUNK) + usePadding compress::CompressInterface::COMPRESSED_CHUNK_INCREMENT_SIZE; unsigned char* compressedOutBuf = new unsigned char[OUTPUT_BUFFER_SIZE]; boost::scoped_array compressedOutBufPtr(compressedOutBuf); size_t outputLen = OUTPUT_BUFFER_SIZE; #ifdef PROFILE Stats::startParseEvent(WE_STATS_COMPRESS_COL_COMPRESS); #endif int rc = compressor->compressBlock(reinterpret_cast(fToBeCompressedBuffer), fToBeCompressedCapacity, compressedOutBuf, outputLen); if (rc != 0) { return ERR_COMP_COMPRESS; } // Round up the compressed chunk size rc = compressor->padCompressedChunks(compressedOutBuf, outputLen, OUTPUT_BUFFER_SIZE); if (rc != 0) { return ERR_COMP_PAD_DATA; } #ifdef PROFILE Stats::stopParseEvent(WE_STATS_COMPRESS_COL_COMPRESS); Stats::startParseEvent(WE_STATS_WRITE_COL); #endif off64_t fileOffset = fFile->tell(); size_t nitems = fFile->write(compressedOutBuf, outputLen) / outputLen; if (nitems != 1) return ERR_FILE_WRITE; CompChunkPtr compChunk((uint64_t)fileOffset, (uint64_t)outputLen); fChunkPtrs.push_back(compChunk); if (fLog->isDebug(DEBUG_2)) { std::ostringstream oss; oss << "Writing compressed data for: OID-" << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment << "; bytes-" << outputLen << "; fileOffset-" << fileOffset; fLog->logMsg(oss.str(), MSGLVL_INFO2); } // We write out the compression headers if we are finished with this file // (either because we are through with the extent or the data), or because // this is the first HWM chunk that we may be modifying. // See the description that precedes this function for more details. if (bFinishingFile || !fFlushedStartHwmChunk) { fileOffset = fFile->tell(); RETURN_ON_ERROR(saveCompressionHeaders()); // If we just updated the chunk header for the starting HWM chunk, // then we flush our output, to synchronize with compressed chunks, if (!fFlushedStartHwmChunk) { // char resp; // std::cout << "dbg: before fflush of hdrs" << std::endl; // std::cin >> resp; if (fFile->flush() != 0) return ERR_FILE_FLUSH; // std::cout << "dbg: after fflush of hdrs" << std::endl; // std::cin >> resp; fFlushedStartHwmChunk = true; } // After seeking to the top of the file to write the headers, // we restore the file offset to continue adding more chunks, // if we are not through with this file. if (!bFinishingFile) { RETURN_ON_ERROR(fColInfo->colOp->setFileOffset(fFile, fileOffset, SEEK_SET)); } } #ifdef PROFILE Stats::stopParseEvent(WE_STATS_WRITE_COL); #endif return NO_ERROR; } //------------------------------------------------------------------------------ // Final flushing of data and headers prior to closing the file. // File is also truncated if applicable. //------------------------------------------------------------------------------ int ColumnBufferCompressed::finishFile(bool bTruncFile) { // If capacity is 0, we never got far enough to read in the HWM chunk for // the current column segment file, so no need to update the file contents. // But we do continue in case we need to truncate the file before exiting. // This could happen if our initial block skipping finished an extent. if (fToBeCompressedCapacity > 0) { // char resp; // std::cout << "dbg: before finishFile->compressAndFlush" << std::endl; // std::cin >> resp; // Write out any data still waiting to be compressed RETURN_ON_ERROR(compressAndFlush(true)); // std::cout << "dbg: after finishFile->compressAndFlush" << std::endl; // std::cin >> resp; } #ifdef PROFILE Stats::startParseEvent(WE_STATS_COMPRESS_COL_FINISH_EXTENT); #endif // Truncate file (if applicable) based on offset and size of last chunk if (bTruncFile && (fChunkPtrs.size() > 0)) { long long truncateFileSize = fChunkPtrs[fChunkPtrs.size() - 1].first + fChunkPtrs[fChunkPtrs.size() - 1].second; // @bug5769 Don't initialize extents or truncate db files on HDFS if (idbdatafile::IDBPolicy::useHdfs()) { std::ostringstream oss1; oss1 << "Finished writing column file" ": OID-" << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment << "; size-" << truncateFileSize; fLog->logMsg(oss1.str(), MSGLVL_INFO2); } else { std::ostringstream oss1; oss1 << "Truncating column file" ": OID-" << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment << "; size-" << truncateFileSize; fLog->logMsg(oss1.str(), MSGLVL_INFO2); int rc = NO_ERROR; if (truncateFileSize > 0) rc = fColInfo->colOp->truncateFile(fFile, truncateFileSize); else rc = ERR_COMP_TRUNCATE_ZERO; //@bug3913-Catch truncate to 0 bytes if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss2; oss2 << "finishFile: error truncating file for " << "OID " << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment << "; size-" << truncateFileSize << "; " << ec.errorString(rc); fLog->logMsg(oss2.str(), rc, MSGLVL_ERROR); return rc; } } } // Nothing more to do if we are not updating the file contents. if (fToBeCompressedCapacity == 0) { #ifdef PROFILE Stats::stopParseEvent(WE_STATS_COMPRESS_COL_FINISH_EXTENT); #endif return NO_ERROR; } fToBeCompressedCapacity = 0; fNumBytes = 0; fChunkPtrs.clear(); #ifdef PROFILE Stats::stopParseEvent(WE_STATS_COMPRESS_COL_FINISH_EXTENT); #endif return NO_ERROR; } //------------------------------------------------------------------------------ // Write out the updated compression headers. //------------------------------------------------------------------------------ int ColumnBufferCompressed::saveCompressionHeaders() { // Construct the header records char hdrBuf[CompressInterface::HDR_BUF_LEN * 2]; RETURN_ON_ERROR(fColInfo->colOp->readHeaders(fFile, hdrBuf)); BRM::LBID_t lbid = compress::CompressInterface::getLBIDByIndex(hdrBuf, 0); compress::CompressInterface::initHdr(hdrBuf, fColInfo->column.width, fColInfo->column.dataType, fColInfo->column.compressionType); compress::CompressInterface::setBlockCount(hdrBuf, (fColInfo->getFileSize() / BYTE_PER_BLOCK)); // If lbid written in the header is not 0 and not equal to `lastupdatedlbid` - we are running // for the next extent for column segment file. const auto lastUpdatedLbid = fColInfo->getLastUpdatedLBID(); if (lbid && lastUpdatedLbid != lbid) { // Write back lbid, after header initialization. compress::CompressInterface::setLBIDByIndex(hdrBuf, lbid, 0); compress::CompressInterface::setLBIDByIndex(hdrBuf, lastUpdatedLbid, 1); } else compress::CompressInterface::setLBIDByIndex(hdrBuf, fColInfo->getLastUpdatedLBID(), 0); std::vector ptrs; for (unsigned i = 0; i < fChunkPtrs.size(); i++) { ptrs.push_back(fChunkPtrs[i].first); } unsigned lastIdx = fChunkPtrs.size() - 1; ptrs.push_back(fChunkPtrs[lastIdx].first + fChunkPtrs[lastIdx].second); compress::CompressInterface::storePtrs(ptrs, hdrBuf); // Write out the header records // char resp; // std::cout << "dbg: before writeHeaders" << std::endl; // std::cin >> resp; RETURN_ON_ERROR(fColInfo->colOp->writeHeaders(fFile, hdrBuf)); // std::cout << "dbg: after writeHeaders" << std::endl; // std::cin >> resp; return NO_ERROR; } //------------------------------------------------------------------------------ // Allocates to-be-compressed buffer if it has not already been allocated. // Initializes to-be-compressed buffer with the contents of the chunk containing // the fStartingHwm block, as long as that chunk is in the pointer list. // If the chunk is not in the list, then we must be adding a new chunk, in // which case we just initialize an empty chunk. // Returns startFileOffset which indicates file offset (in bytes) where the // next chunk will be starting. //------------------------------------------------------------------------------ int ColumnBufferCompressed::initToBeCompressedBuffer(long long& startFileOffset) { bool bNewBuffer = false; // Lazy initialization of to-be-compressed buffer if (!fToBeCompressedBuffer) { fToBeCompressedBuffer = new unsigned char[CompressInterface::UNCOMPRESSED_INBUF_LEN]; BlockOp::setEmptyBuf(fToBeCompressedBuffer, CompressInterface::UNCOMPRESSED_INBUF_LEN, fColInfo->column.emptyVal, fColInfo->column.width); bNewBuffer = true; } // Find the chunk containing the starting HWM, as long as our initial // block skipping has not caused us to exit the HWM chunk; in which // case we start a new empty chunk. unsigned int chunkIndex = 0; unsigned int blockOffsetWithinChunk = 0; bool bSkipStartingBlks = false; auto compressor = compress::getCompressorByType(fCompressorPool, fColInfo->column.compressionType); if (!compressor) { return ERR_COMP_WRONG_COMP_TYPE; } if (fPreLoadHWMChunk) { if (fChunkPtrs.size() > 0) { compressor->locateBlock(fStartingHwm, chunkIndex, blockOffsetWithinChunk); if (chunkIndex < fChunkPtrs.size()) startFileOffset = fChunkPtrs[chunkIndex].first; else fPreLoadHWMChunk = false; } // If we are at the start of the job, fPreLoadHWMChunk will be true, // to preload the old HWM chunk. But if we have no chunk ptrs, then // we are starting on an empty PM. In this case, we skip starting // blks if fStartingHwm has been set. else { fPreLoadHWMChunk = false; bSkipStartingBlks = true; } } // Preload (read and uncompress) the chunk for the starting HWM extent only if (fPreLoadHWMChunk) { fPreLoadHWMChunk = false; // only preload HWM chunk in the first extent std::ostringstream oss; oss << "Reading HWM chunk for: OID-" << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment << "; hwm-" << fStartingHwm << "; chunk#-" << chunkIndex << "; blkInChunk-" << blockOffsetWithinChunk; fLog->logMsg(oss.str(), MSGLVL_INFO2); // Read the chunk RETURN_ON_ERROR(fColInfo->colOp->setFileOffset(fFile, startFileOffset, SEEK_SET)); char* compressedOutBuf = new char[fChunkPtrs[chunkIndex].second]; boost::scoped_array compressedOutBufPtr(compressedOutBuf); size_t itemsRead = fFile->read(compressedOutBuf, fChunkPtrs[chunkIndex].second) / fChunkPtrs[chunkIndex].second; if (itemsRead != 1) { std::ostringstream oss; oss << "Error reading HWM chunk for: " << "OID-" << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment << "; hwm-" << fStartingHwm; fLog->logMsg(oss.str(), ERR_COMP_READ_BLOCK, MSGLVL_ERROR); return ERR_COMP_READ_BLOCK; } // Uncompress the chunk into our 4MB buffer size_t outLen = CompressInterface::UNCOMPRESSED_INBUF_LEN; int rc = compressor->uncompressBlock(compressedOutBuf, fChunkPtrs[chunkIndex].second, fToBeCompressedBuffer, outLen); if (rc) { WErrorCodes ec; std::ostringstream oss; oss << "Error uncompressing HWM chunk for: " << "OID-" << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment << "; hwm-" << fStartingHwm << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return ERR_COMP_UNCOMPRESS; } fToBeCompressedCapacity = outLen; // Positition ourselves to start adding data to the HWM block fNumBytes = blockOffsetWithinChunk * BYTE_PER_BLOCK; // We are going to add data to, and thus re-add, the last chunk; so we // drop it from our list. fChunkPtrs.resize(fChunkPtrs.size() - 1); } else // We have left the HWM chunk; just position file offset, // without reading anything { // If it's not a new buffer, we need to initialize, since we won't be // reading in anything to overlay what's in the to-be-compressed buffer. if (!bNewBuffer) { BlockOp::setEmptyBuf(fToBeCompressedBuffer, CompressInterface::UNCOMPRESSED_INBUF_LEN, fColInfo->column.emptyVal, fColInfo->column.width); } if (fLog->isDebug(DEBUG_2)) { std::ostringstream oss; oss << "Initializing new empty chunk: OID-" << fColInfo->curCol.dataFile.fid << "; DBRoot-" << fColInfo->curCol.dataFile.fDbRoot << "; part-" << fColInfo->curCol.dataFile.fPartition << "; seg-" << fColInfo->curCol.dataFile.fSegment << "; hwm-" << fStartingHwm; fLog->logMsg(oss.str(), MSGLVL_INFO2); } fToBeCompressedCapacity = CompressInterface::UNCOMPRESSED_INBUF_LEN; // Set file offset to start after last current chunk startFileOffset = CompressInterface::HDR_BUF_LEN * 2; if (fChunkPtrs.size() > 0) startFileOffset = fChunkPtrs[fChunkPtrs.size() - 1].first + fChunkPtrs[fChunkPtrs.size() - 1].second; // Position ourselves to start of empty to-be-compressed buffer. // If we are starting the first extent on a PM, we may employ blk // skipping at start of import; adjust fNumBytes accordingly. // (see ColumnInfo::createDelayedFileIfNeeded() for discussion) if (bSkipStartingBlks) fNumBytes = fStartingHwm * BYTE_PER_BLOCK; else fNumBytes = 0; } return NO_ERROR; } } // namespace WriteEngine