/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2019 MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: we_chunkmanager.cpp 4737 2013-08-14 20:45:46Z bwilkinson $ #include #include #include #include #include #include //#define NDEBUG #include using namespace std; #include #include #include "logger.h" #include "cacheutils.h" #include "we_chunkmanager.h" #include "we_macro.h" #include "we_brm.h" #include "we_config.h" #include "we_confirmhdfsdbfile.h" #include "we_fileop.h" #include "../dictionary/we_dctnry.h" #include "we_stats.h" using namespace execplan; #include "IDBDataFile.h" #include "IDBPolicy.h" #include "cloudio/SMFileSystem.h" using namespace idbdatafile; namespace { // Function to compare 2 ChunkData pointers. bool chunkDataPtrLessCompare(WriteEngine::ChunkData* p1, WriteEngine::ChunkData* p2) { return (p1->fChunkId) < (p2->fChunkId); } } // namespace namespace WriteEngine { extern int NUM_BLOCKS_PER_INITIAL_EXTENT; // defined in we_dctnry.cpp extern WErrorCodes ec; // defined in we_log.cpp //------------------------------------------------------------------------------ // Search for the specified chunk in fChunkList. //------------------------------------------------------------------------------ ChunkData* CompFileData::findChunk(int64_t id) const { ChunkData* pChunkData = NULL; for (list::const_iterator lit = fChunkList.begin(); lit != fChunkList.end(); ++lit) { if ((*lit)->fChunkId == id) { pChunkData = *lit; break; } } return pChunkData; } //------------------------------------------------------------------------------ // ChunkManager constructor //------------------------------------------------------------------------------ ChunkManager::ChunkManager() : fMaxActiveChunkNum(100) , fLenCompressed(0) , fIsBulkLoad(false) , fDropFdCache(false) , fIsInsert(false) , fIsHdfs(IDBPolicy::useHdfs()) , fFileOp(0) , fSysLogger(NULL) , fTransId(-1) , fLocalModuleId(Config::getLocalModuleID()) , fFs(fIsHdfs ? IDBFileSystem::getFs(IDBDataFile::HDFS) : IDBPolicy::useCloud() ? IDBFileSystem::getFs(IDBDataFile::CLOUD) : IDBFileSystem::getFs(IDBDataFile::BUFFERED)) { fUserPaddings = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK; compress::initializeCompressorPool(fCompressorPool, fUserPaddings); COMPRESSED_CHUNK_SIZE = compress::CompressInterface::getMaxCompressedSizeGeneric(UNCOMPRESSED_CHUNK_SIZE) + 64 + 3 + 8 * 1024; fMaxCompressedBufSize = COMPRESSED_CHUNK_SIZE + fUserPaddings; fBufCompressed = new char[fMaxCompressedBufSize]; fSysLogger = new logging::Logger(SUBSYSTEM_ID_WE); logging::MsgMap msgMap; msgMap[logging::M0080] = logging::Message(logging::M0080); fSysLogger->msgMap(msgMap); } //------------------------------------------------------------------------------ // ChunkManager destructor //------------------------------------------------------------------------------ ChunkManager::~ChunkManager() { std::map columnOids; cleanUp(columnOids); delete[] fBufCompressed; fBufCompressed = NULL; delete fSysLogger; fSysLogger = NULL; } //------------------------------------------------------------------------------ // Log a message into the DML recovery log. //------------------------------------------------------------------------------ int ChunkManager::writeLog(TxnID txnId, string backUpFileType, string filename, string& aDMLLogFileName, int64_t size, int64_t offset) const { // Get log file name if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR) return ERR_DML_LOG_NAME; // Open file boost::scoped_ptr aDMLLogFile; try { aDMLLogFile.reset(IDBDataFile::open(IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG), aDMLLogFileName.c_str(), "a+b", 0)); if (!aDMLLogFile) { ostringstream oss; oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened (no exception thrown)"; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_OPEN_DML_LOG; } } catch (exception& e) { ostringstream oss; oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened: " << e.what(); logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_OPEN_DML_LOG; } // Write the log ostringstream entry; entry << backUpFileType << '\n' << filename << '\n' << size << '\n' << offset << '\n'; string tmp = entry.str(); aDMLLogFile->seek(0, SEEK_END); aDMLLogFile->tell(); aDMLLogFile->write(tmp.c_str(), tmp.size()); return NO_ERROR; } int ChunkManager::removeBackups(TxnID txnId) { // HDFS update/delete is handled differently if (fIsHdfs || fIsBulkLoad) return NO_ERROR; string aDMLLogFileName; if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR) return ERR_DML_LOG_NAME; if (IDBPolicy::exists(aDMLLogFileName.c_str())) { boost::scoped_ptr aDMLLogFile(IDBDataFile::open( IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG), aDMLLogFileName.c_str(), "r", 0)); if (aDMLLogFile) // need recover { ssize_t fileSize = aDMLLogFile->size(); boost::scoped_array buf(new char[fileSize]); if (aDMLLogFile->read(buf.get(), fileSize) != fileSize) return ERR_FILE_READ; std::istringstream strstream(string(buf.get(), fileSize)); std::string backUpFileType; std::string filename; int64_t size; int64_t offset; while (strstream >> backUpFileType >> filename >> size >> offset) { if (backUpFileType.compare("tmp") == 0) { filename += ".tmp"; IDBPolicy::remove(filename.c_str()); } else { std::string backFileName(filename); if (backUpFileType.compare("chk") == 0) backFileName += ".chk"; else backFileName += ".hdr"; IDBPolicy::remove(backFileName.c_str()); } } aDMLLogFile.reset(); // closes the file in IDBDataFile destructor. IDBPolicy::remove(aDMLLogFileName.c_str()); } else { return ERR_OPEN_DML_LOG; } } return NO_ERROR; } //------------------------------------------------------------------------------ // Get/Return IDBDataFile* for specified OID, root, partition, and segment. // Function is to be used to open column files. // If the IDBDataFile* is not found, then a segment file will be opened using the // mode (mode) and I/O buffer size (size) that is given. Name of the resulting // file is returned in filename. // // For Bulk HDFS usage: // If useTmpSuffix flag is set, then IDBDataFile will use *.tmp for output. //------------------------------------------------------------------------------ // @bug 5572 - HDFS usage: add *.tmp file backup flag IDBDataFile* ChunkManager::getFilePtr(const Column& column, uint16_t root, uint32_t partition, uint16_t segment, string& filename, const char* mode, int size, bool useTmpSuffix, bool isReadOnly) const { CompFileData* fileData = getFileData(column.dataFile.fid, root, partition, segment, filename, mode, size, column.colDataType, column.colWidth, useTmpSuffix, false, isReadOnly); return (fileData ? fileData->fFilePtr : NULL); } //------------------------------------------------------------------------------ // Get/Return IDBDataFile* for specified OID, root, partition, and segment. // Function is to be used to open dictionary store files. // If the IDBDataFile* is not found, then a segment file will be opened using the // mode (mode) and I/O buffer size (size) that is given. Name of the resulting // file is returned in filename. // // For Bulk HDFS usage: // If useTmpSuffix flag is set, then IDBDataFile will use *.tmp for output. //------------------------------------------------------------------------------ // @bug 5572 - HDFS usage: add *.tmp file backup flag IDBDataFile* ChunkManager::getFilePtr(const FID& fid, uint16_t root, uint32_t partition, uint16_t segment, string& filename, const char* mode, int size, bool useTmpSuffix) const { CompFileData* fileData = getFileData(fid, root, partition, segment, filename, mode, size, CalpontSystemCatalog::VARCHAR, 8, useTmpSuffix, true); // hard code (varchar, 8) are dummy values for dictionary file return (fileData ? fileData->fFilePtr : NULL); } //------------------------------------------------------------------------------ // Get/Return IDBDataFile* by the given `filename`. // OID, partition and segment are needed for a file cache. // Function is to be used to open column/dict segment file. // If the IDBDataFile* is not found, then a segment file will be opened using // the mode (mode) and I/O buffer size (size) that is given. //------------------------------------------------------------------------------ IDBDataFile* ChunkManager::getFilePtrByName(const std::string& filename, FID& fid, uint16_t root, uint32_t partition, uint16_t segment, execplan::CalpontSystemCatalog::ColDataType colDataType, uint32_t colWidth, const char* mode, int32_t size, bool useTmpSuffix, bool isDict) const { CompFileData* fileData = getFileDataByName(filename, fid, root, partition, segment, mode, size, colDataType, colWidth, useTmpSuffix, isDict); return (fileData ? fileData->fFilePtr : NULL); } //------------------------------------------------------------------------------ // Get/Return CompFileData* for specified column OID, root, partition, and // segment. If the IDBDataFile* is not found, then a segment file will be opened // using the mode (mode) and I/O buffer size (size) that is given. Name of // the resulting file is returned in filename. // If the CompFileData* needs to be created, it will also be created and // inserted into the fFileMap and fFilePtrMap for later use. // // For Bulk HDFS usage: // If useTmpSuffix flag is set, then IDBDataFile will use *.tmp for output. //------------------------------------------------------------------------------ // @bug 5572 - HDFS usage: add *.tmp file backup flag CompFileData* ChunkManager::getFileData(const FID& fid, uint16_t root, uint32_t partition, uint16_t segment, string& filename, const char* mode, int size, const CalpontSystemCatalog::ColDataType colDataType, int colWidth, bool useTmpSuffix, bool dctnry, bool isReadOnly) const { FileID fileID(fid, root, partition, segment); map::const_iterator mit = fFileMap.find(fileID); WE_COMP_DBG(cout << "getFileData: fid:" << fid << " root:" << root << " part:" << partition << " seg:" << segment << " file* " << ((mit != fFileMap.end()) ? "" : "not ") << "found." << endl;) // Get CompFileData pointer for existing Column or Dictionary store file if (mit != fFileMap.end()) { filename = mit->second->fFileName; return mit->second; } // New CompFileData pointer needs to be created char name[FILE_NAME_SIZE]; if (fFileOp->getFileName(fid, name, root, partition, segment) != NO_ERROR) return NULL; // Initialize the given `filename`. filename = name; return getFileData_(fileID, filename, mode, size, colDataType, colWidth, useTmpSuffix, dctnry, isReadOnly); } CompFileData* ChunkManager::getFileDataByName(const std::string& filename, const FID& fid, uint16_t root, uint32_t partition, uint16_t segment, const char* mode, int size, const CalpontSystemCatalog::ColDataType colDataType, int colWidth, bool useTmpSuffix, bool dctnry) const { FileID fileID(fid, root, partition, segment); map::const_iterator mit = fFileMap.find(fileID); WE_COMP_DBG(cout << "getFileData: fid:" << fid << " root:" << root << " part:" << partition << " seg:" << segment << " file* " << ((mit != fFileMap.end()) ? "" : "not ") << "found." << endl;) // Get CompFileData pointer for existing Column or Dictionary store file if (mit != fFileMap.end()) return mit->second; return getFileData_(fileID, filename, mode, size, colDataType, colWidth, useTmpSuffix, dctnry); } CompFileData* ChunkManager::getFileData_(const FileID& fileID, const string& filename, const char* mode, int size, const CalpontSystemCatalog::ColDataType colDataType, int colWidth, bool useTmpSuffix, bool dctnry, bool isReadOnly) const { CompFileData* fileData = new CompFileData(fileID, fileID.fFid, colDataType, colWidth, isReadOnly); fileData->fFileName = filename; if (openFile(fileData, mode, colWidth, useTmpSuffix, __LINE__) != NO_ERROR) { WE_COMP_DBG(cout << "Failed to open " << fileData->fFileName << " ." << endl;) delete fileData; return NULL; } fileData->fDctnryCol = dctnry; WE_COMP_DBG(cout << "open file* " << name << endl;) // get the control data in header. if (readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fControlData, COMPRESSED_FILE_HEADER_UNIT, __LINE__) != NO_ERROR) { WE_COMP_DBG(cout << "Failed to read control header." << endl;) delete fileData; return NULL; } // make sure the header is valid if (compress::CompressInterface::verifyHdr(fileData->fFileHeader.fControlData) != 0) { WE_COMP_DBG(cout << "Invalid header." << endl;) delete fileData; return NULL; } const int32_t headerSize = compress::CompressInterface::getHdrSize(fileData->fFileHeader.fControlData); const int32_t ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT; // Save segment file compression type. fileData->fCompressionType = compress::CompressInterface::getCompressionType(fileData->fFileHeader.fControlData); if (ptrSecSize > COMPRESSED_FILE_HEADER_UNIT) { // >8K header, dictionary width > 128 fileData->fFileHeader.fPtrSection = new char[ptrSecSize]; fileData->fFileHeader.fLongPtrSectData.reset(fileData->fFileHeader.fPtrSection); } // read in the pointer section in header if (readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fPtrSection, ptrSecSize, __LINE__) != NO_ERROR) { WE_COMP_DBG(cout << "Failed to read pointer header." << endl;) delete fileData; return NULL; } fFileMap.insert(make_pair(fileID, fileData)); // cout << "Insert into fFilemap root:partition:seg:fileID = " <fFilePtr, fileData)); return fileData; } //------------------------------------------------------------------------------ // Return new IDBDataFile* for specified dictionary OID, root, partition, segment, and // width. A new segment file will be opened using the mode (mode) and I/O // buffer size (size) that is given. Name of the resulting file is returned // in filename. // A corresponding CompFileData* is created and inserted into fFileMap and // fFilePtrMap for later use. //------------------------------------------------------------------------------ IDBDataFile* ChunkManager::createDctnryFile(const FID& fid, int64_t width, uint16_t root, uint32_t partition, uint16_t segment, const char* filename, const char* mode, int size, BRM::LBID_t lbid) { FileID fileID(fid, root, partition, segment); CompFileData* fileData = new CompFileData(fileID, fid, CalpontSystemCatalog::VARCHAR, width); fileData->fFileName = filename; if (openFile(fileData, mode, width, false, __LINE__) != NO_ERROR) // @bug 5572 HDFS tmp file { WE_COMP_DBG(cout << "Failed to open " << fileData->fFileName << " ." << endl;) delete fileData; return NULL; } fileData->fDctnryCol = true; WE_COMP_DBG(cout << "create file* " << filename << endl;) int hdrSize = calculateHeaderSize(width); int ptrSecSize = hdrSize - COMPRESSED_FILE_HEADER_UNIT; if (ptrSecSize > COMPRESSED_FILE_HEADER_UNIT) { // >8K header, dictionary width > 128 fileData->fFileHeader.fPtrSection = new char[ptrSecSize]; fileData->fFileHeader.fLongPtrSectData.reset(fileData->fFileHeader.fPtrSection); } // Dictionary store extent width == 0. See more details in function // `createDictStoreExtent`. compress::CompressInterface::initHdr(fileData->fFileHeader.fControlData, fileData->fFileHeader.fPtrSection, /*colWidth=*/0, fileData->fColDataType, fFileOp->compressionType(), hdrSize); compress::CompressInterface::setLBIDByIndex(fileData->fFileHeader.fControlData, lbid, 0); // Save compression type. fileData->fCompressionType = fFileOp->compressionType(); if (writeHeader(fileData, __LINE__) != NO_ERROR) { WE_COMP_DBG(cout << "Failed to write header." << endl;) delete fileData; return NULL; } //@Bug 4977 remove log file removeBackups(fTransId); fFileMap.insert(make_pair(fileID, fileData)); fFilePtrMap.insert(make_pair(fileData->fFilePtr, fileData)); return fileData->fFilePtr; } //------------------------------------------------------------------------------ // Read the block for the specified fbo, from pFile's applicable chunk, and // into readBuf. //------------------------------------------------------------------------------ int ChunkManager::readBlock(IDBDataFile* pFile, unsigned char* readBuf, uint64_t fbo) { map::iterator fpIt = fFilePtrMap.find(pFile); if (fpIt == fFilePtrMap.end()) { logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_FILE_NOT_FOUND; } // find the chunk ID and offset in the chunk lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE); ChunkData* chunkData = (fpIt->second)->findChunk(offset.quot); WE_COMP_DBG(cout << "fbo:" << fbo << " chunk id:" << offset.quot << " offset:" << offset.rem << " chunkData*:" << chunkData << endl;) int rc = NO_ERROR; // chunk is not already uncompressed if (chunkData == NULL) rc = fetchChunkFromFile(pFile, offset.quot, chunkData); if (rc == NO_ERROR) { // copy the data at fbo to readBuf memcpy(readBuf, chunkData->fBufUnCompressed + offset.rem, BYTE_PER_BLOCK); } return rc; } //------------------------------------------------------------------------------ // Write writeBuf to the block for the specified fbo, within pFile's applicable // chunk. //------------------------------------------------------------------------------ int ChunkManager::saveBlock(IDBDataFile* pFile, const unsigned char* writeBuf, uint64_t fbo) { WE_COMP_DBG(cout << "save block fbo:" << fbo << endl;) map::iterator fpIt = fFilePtrMap.find(pFile); if (fpIt == fFilePtrMap.end()) { logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_FILE_NOT_FOUND; } // find the chunk ID and offset in the chunk lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE); ChunkData* chunkData = (fpIt->second)->findChunk(offset.quot); int rc = NO_ERROR; // chunk is not already read in if ((chunkData == NULL) && ((rc = fetchChunkFromFile(pFile, offset.quot, chunkData)) != NO_ERROR)) return rc; WE_COMP_DBG(cout << "fbo:" << fbo << " chunk id:" << offset.quot << " offset:" << offset.rem << " saved @" << (&(chunkData->fBufUnCompressed) + offset.rem) << endl;) memcpy(chunkData->fBufUnCompressed + offset.rem, writeBuf, BYTE_PER_BLOCK); chunkData->fWriteToFile = true; // if the chunk is full for insert, flush it // cout << "current offset.rem/8192 = " << offset.rem/8192 << endl; if (fIsInsert && (offset.rem == MAXOFFSET_PER_CHUNK)) { if (((rc = writeChunkToFile(fpIt->second, chunkData)) == NO_ERROR) && ((rc = writeHeader(fpIt->second, __LINE__)) == NO_ERROR)) { // cout << "saveblock flushed the full chunk"<flush(); //@Bug 4977 remove log file removeBackups(fTransId); } } return rc; } //------------------------------------------------------------------------------ // Flush all pending chunks to their corresponding segment files. //------------------------------------------------------------------------------ int ChunkManager::flushChunks(int rc, const std::map& columOids) { // shall fail the the statement if failed here WE_COMP_DBG(cout << "flushChunks." << endl;) int k = fFilePtrMap.size(); std::map::const_iterator it; if ((rc == NO_ERROR) && fIsInsert) { while (k-- > 0 && rc == NO_ERROR) { map::iterator i = fFilePtrMap.begin(); // sort the chunk list first CompFileData* fileData = i->second; it = columOids.find(fileData->fFid); if (it != columOids.end()) { list& chunkList = fileData->fChunkList; chunkList.sort(chunkDataPtrLessCompare); list::iterator j = chunkList.begin(); while (j != chunkList.end()) { if ((rc = writeChunkToFile(fileData, *j)) != NO_ERROR) break; // write chunk to file removes the written chunk from the list j = chunkList.begin(); } if (rc != NO_ERROR) break; // finally update the header if ((rc = writeHeader(fileData, __LINE__)) != NO_ERROR) break; //@Bug 4977 remove log file removeBackups(fTransId); // closeFile invalidates the iterator closeFile(fileData); } } } else if (rc == NO_ERROR) { while (k-- > 0 && rc == NO_ERROR) { map::iterator i = fFilePtrMap.begin(); // sort the chunk list first CompFileData* fileData = i->second; list& chunkList = fileData->fChunkList; chunkList.sort(chunkDataPtrLessCompare); list::iterator j = chunkList.begin(); while (j != chunkList.end()) { if ((rc = writeChunkToFile(fileData, *j)) != NO_ERROR) break; // write chunk to file removes the written chunk from the list j = chunkList.begin(); } if (rc != NO_ERROR) break; // finally update the header if (!fileData->fReadOnly && (rc = writeHeader(fileData, __LINE__)) != NO_ERROR) break; //@Bug 4977 remove log file removeBackups(fTransId); // closeFile invalidates the iterator closeFile(fileData); } } if (rc != NO_ERROR) { cleanUp(columOids); return rc; } // fActiveChunks.clear(); // fFileMap.clear(); // fFilePtrMap.clear(); if (fDropFdCache) { cacheutils::dropPrimProcFdCache(); fDropFdCache = false; } return NO_ERROR; } //------------------------------------------------------------------------------ // Load and uncompress the requested chunk (id) for the specified file (pFile), // into chunkData. // id is: (fbo*BYTE_PER_BLOCK)/UNCOMPRESSED_CHUNK_SIZE // If the active chunk list is already full, then we flush the oldest pending // chunk to disk, to make room for fetching the requested chunk. // If the header ptr for the requested chunk is 0 (or has length 0), then // chunkData is initialized with a new empty chunk. //------------------------------------------------------------------------------ int ChunkManager::fetchChunkFromFile(IDBDataFile* pFile, int64_t id, ChunkData*& chunkData) { // return value int rc = NO_ERROR; // remove the oldest one if the max active chunk number is reached. WE_COMP_DBG(cout << "fActiveChunks.size:" << fActiveChunks.size() << endl;) // cout << "fetchChunkFromFile1: pFile = " << pFile << endl; map::iterator fpIt = fFilePtrMap.find(pFile); if (fpIt == fFilePtrMap.end()) { logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_FILE_NOT_FOUND; } CompFileData* fileData = fpIt->second; if (fActiveChunks.size() >= fMaxActiveChunkNum) { list >::iterator lIt = fActiveChunks.begin(); if (!fIsBulkLoad && !(fpIt->second->fDctnryCol)) { while ((lIt->first == fpIt->second->fFileID) && (lIt != fActiveChunks.end())) lIt++; } if (lIt != fActiveChunks.end()) { map::iterator fIt = fFileMap.find(lIt->first); if (fIt == fFileMap.end()) { logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_FILE_NOT_FOUND; } if ((rc = writeChunkToFile(fIt->second, lIt->second)) != NO_ERROR) { ostringstream oss; oss << "write inactive chunk to file failed:" << fIt->second->fFileName << "@" << __LINE__; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return rc; } if (!fIt->second->fReadOnly && (rc = writeHeader(fIt->second, __LINE__)) != NO_ERROR) { // logged by writeHeader return rc; } //@Bug 4977 remove the log files removeBackups(fTransId); } } #ifdef PROFILE Stats::startParseEvent(WE_STATS_COMPRESS_DCT_INIT_BUF); #endif // get a new ChunkData object chunkData = new ChunkData(id); pFile = fileData->fFilePtr; // update to get the reopened file ptr. fileData->fChunkList.push_back(chunkData); fActiveChunks.push_back(make_pair(fileData->fFileID, chunkData)); // read the compressed chunk from file uint64_t* ptrs = reinterpret_cast(fileData->fFileHeader.fPtrSection); if (ptrs[id] && ptrs[id + 1]) // compressed chunk data exists { // safety check if (ptrs[id] >= ptrs[id + 1]) { logMessage(ERR_COMP_WRONG_PTR, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_WRONG_PTR; } unsigned int chunkSize = (ptrs[id + 1] - ptrs[id]); if ((rc = setFileOffset(pFile, fileData->fFileName, ptrs[id], __LINE__)) != NO_ERROR || (rc = readFile(pFile, fileData->fFileName, fBufCompressed, chunkSize, __LINE__)) != NO_ERROR) { // logged by setFileOffset/readFile return rc; } // uncompress the read in buffer size_t dataLen = sizeof(chunkData->fBufUnCompressed); auto fCompressor = compress::getCompressorByType(fCompressorPool, fileData->fCompressionType); if (!fCompressor) { return ERR_COMP_WRONG_COMP_TYPE; } if (fCompressor->uncompressBlock((char*)fBufCompressed, chunkSize, (unsigned char*)chunkData->fBufUnCompressed, dataLen) != 0) { if (fIsFix) { uint64_t blocks = 512; if (id == 0) { char* hdr = fileData->fFileHeader.fControlData; if (compress::CompressInterface::getBlockCount(hdr) < 512) blocks = 256; } dataLen = 8192 * blocks; // load the uncompressed buffer with empty values. char* buf = chunkData->fBufUnCompressed; chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE; if (fileData->fDctnryCol) initializeDctnryChunk(buf, UNCOMPRESSED_CHUNK_SIZE); else initializeColumnChunk(buf, fileData); } else { logMessage(ERR_COMP_UNCOMPRESS, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_UNCOMPRESS; } } //@bug 3313-Remove validation that incorrectly fails for long string store files // WE_COMP_DBG(cout << "chunk uncompressed to " << dataLen << endl;) // if (dataLen < (id+1) * BYTE_PER_BLOCK) // { // logMessage(ERR_COMP_UNCOMPRESS, logging::LOG_TYPE_ERROR, __LINE__); // return ERR_COMP_UNCOMPRESS; // } chunkData->fLenUnCompressed = dataLen; } else // new chunk { if (id == 0 && ptrs[id] == 0) // if the 1st ptr is not set for new extent { ptrs[0] = compress::CompressInterface::getHdrSize(fileData->fFileHeader.fControlData); } // load the uncompressed buffer with empty values. char* buf = chunkData->fBufUnCompressed; chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE; if (fileData->fDctnryCol) initializeDctnryChunk(buf, UNCOMPRESSED_CHUNK_SIZE); else initializeColumnChunk(buf, fileData); } #ifdef PROFILE Stats::stopParseEvent(WE_STATS_COMPRESS_DCT_INIT_BUF); #endif return NO_ERROR; } //------------------------------------------------------------------------------ // Initialize a column based chunk with the applicable empty values. //------------------------------------------------------------------------------ void ChunkManager::initializeColumnChunk(char* buf, CompFileData* fileData) { int size = UNCOMPRESSED_CHUNK_SIZE; const uint8_t* emptyVal = fFileOp->getEmptyRowValue(fileData->fColDataType, fileData->fColWidth); fFileOp->setEmptyBuf((unsigned char*)buf, size, emptyVal, fileData->fColWidth); } //------------------------------------------------------------------------------ // Initialize a dictionary based chunk with empty blocks. //------------------------------------------------------------------------------ void ChunkManager::initializeDctnryChunk(char* buf, int size) { Dctnry* dctnry = dynamic_cast(fFileOp); memset(buf, 0, size); char* end = buf + size; while (buf < end) { dctnry->copyDctnryHeader(buf); buf += BYTE_PER_BLOCK; } } //------------------------------------------------------------------------------ // Compress and write the requested chunk (id) for fileData, to disk. // id is: (fbo*BYTE_PER_BLOCK)/UNCOMPRESSED_CHUNK_SIZE //------------------------------------------------------------------------------ int ChunkManager::writeChunkToFile(CompFileData* fileData, int64_t id) { ChunkData* chunkData = fileData->findChunk(id); if (!chunkData) { logMessage(ERR_COMP_CHUNK_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_CHUNK_NOT_FOUND; } return writeChunkToFile(fileData, chunkData); } //------------------------------------------------------------------------------ // Compress and write the given chunk for fileData to disk. // If the chunk has been flagged for writing (fWriteToFile is true), // then subsequent chunks in the file are shifted down as needed, if the com- // pressed chunk will not fit in the currently available embedded free space. //------------------------------------------------------------------------------ int ChunkManager::writeChunkToFile(CompFileData* fileData, ChunkData* chunkData) { WE_COMP_DBG(cout << "write chunk id=" << chunkData->fChunkId << " data " << ((chunkData->fWriteToFile) ? "changed" : "NOT changed") << endl;) int rc = NO_ERROR; // return value bool needReallocateChunks = false; int64_t spaceAvl = 0; if (chunkData->fWriteToFile) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_COMPRESS_DCT_COMPRESS); #endif // compress the chunk before writing it to file fLenCompressed = fMaxCompressedBufSize; auto fCompressor = compress::getCompressorByType(fCompressorPool, fileData->fCompressionType); if (!fCompressor) { return ERR_COMP_WRONG_COMP_TYPE; } if (fCompressor->compressBlock((char*)chunkData->fBufUnCompressed, chunkData->fLenUnCompressed, (unsigned char*)fBufCompressed, fLenCompressed) != 0) { logMessage(ERR_COMP_COMPRESS, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_COMPRESS; } WE_COMP_DBG(cout << "Chunk compressed from " << chunkData->fLenUnCompressed << " to " << fLenCompressed;) // Removed padding code here, will add padding for the last chunk. // The existing chunks are already correctly aligned, use the padding to absort chunk // size increase when update. This improves the performance with less chunk shifting. #ifdef PROFILE Stats::stopParseEvent(WE_STATS_COMPRESS_DCT_COMPRESS); #endif // need more work if the new compressed buffer is larger uint64_t* ptrs = reinterpret_cast(fileData->fFileHeader.fPtrSection); ChunkId chunkId = chunkData->fChunkId; if (ptrs[chunkId + 1] > 0) spaceAvl = (ptrs[chunkId + 1] - ptrs[chunkId]); WE_COMP_DBG(cout << ", available space:" << spaceAvl;) bool lastChunk = true; // usable chunkIds are 0 .. POINTERS_IN_HEADER-2 // [chunkId+0] is the start offset of current chunk. // [chunkId+1] is the start offset of next chunk, the offset diff is current chunk size. // [chunkId+2] is 0 or not indicates if the next chunk exists. int headerSize = compress::CompressInterface::getHdrSize(fileData->fFileHeader.fControlData); int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT; int64_t usablePtrIds = (ptrSecSize / sizeof(uint64_t)) - 2; if (chunkId < usablePtrIds) // make sure [chunkId+2] has valid value lastChunk = (ptrs[(chunkId + 2)] == 0); WE_COMP_DBG(cout << ", last chunk:" << (lastChunk ? "true" : "false") << endl;) if (spaceAvl < 0) { logMessage(ERR_COMP_WRONG_PTR, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_WRONG_PTR; } if ((int64_t)fLenCompressed <= spaceAvl) { // There is enough sapce. if ((rc = writeCompressedChunk(fileData, ptrs[chunkId], spaceAvl)) != NO_ERROR) { // log in writeCompressedChunk by setFileOffset and writeFile return rc; } } else if (lastChunk) { // add padding space if the chunk is written first time if (fCompressor->padCompressedChunks((unsigned char*)fBufCompressed, fLenCompressed, fMaxCompressedBufSize) != 0) { WE_COMP_DBG(cout << "Last chunk:" << chunkId << ", padding failed." << endl;) logMessage(ERR_COMP_PAD_DATA, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_PAD_DATA; } WE_COMP_DBG(cout << "Last chunk:" << chunkId << ", padded to " << fLenCompressed;) // This is the last chunk, safe to write any length of data. //@Bug 3888. Assign the error code if ((rc = writeCompressedChunk(fileData, ptrs[chunkId], spaceAvl)) != NO_ERROR) { // log in writeCompressedChunk by setFileOffset and writeFile return rc; } // Update the current chunk size. ptrs[chunkId + 1] = ptrs[chunkId] + fLenCompressed; } else { needReallocateChunks = true; } } if (!needReallocateChunks) { fActiveChunks.remove(make_pair(fileData->fFileID, chunkData)); fileData->fChunkList.remove(chunkData); delete chunkData; } else { ostringstream oss; oss << "Compressed data does not fit, caused a chunk shifting @line:" << __LINE__ << " filename:" << fileData->fFileName << ", chunkId:" << chunkData->fChunkId << " data size:" << fLenCompressed << "/available:" << spaceAvl << " -- shifting "; if ((rc = reallocateChunks(fileData)) == NO_ERROR) { oss << "SUCCESS"; logMessage(oss.str(), logging::LOG_TYPE_INFO); } else { oss << "FAILED"; logMessage(oss.str(), logging::LOG_TYPE_CRITICAL); } } return rc; } //------------------------------------------------------------------------------ // Write the current compressed data in fBufCompressed to the specified segment // file offset (offset) and file (fileData). For DML usage, "size" specifies // how many bytes to backup, for error recovery. cpimport.bin does it's own // backup and error recovery, so "size" is not applicable for bulk import usage. //------------------------------------------------------------------------------ int ChunkManager::writeCompressedChunk(CompFileData* fileData, int64_t offset, int64_t size) { int rc = NO_ERROR; if (!fIsBulkLoad && !fIsHdfs) { // backup current chunk to chk file string chkFileName(fileData->fFileName + ".chk"); string aDMLLogFileName; unsigned char* buf = new unsigned char[size]; if (((rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, offset, __LINE__)) == NO_ERROR) && ((rc = readFile(fileData->fFilePtr, fileData->fFileName, buf, size, __LINE__)) == NO_ERROR)) { IDBDataFile* chkFilePtr = IDBDataFile::open( IDBPolicy::getType(chkFileName.c_str(), IDBPolicy::WRITEENG), chkFileName.c_str(), "w+b", 0); if (chkFilePtr) { rc = writeFile(chkFilePtr, chkFileName, buf, size, __LINE__); delete chkFilePtr; } delete[] buf; if (rc != NO_ERROR) { IDBPolicy::remove(chkFileName.c_str()); return rc; } // log the chunk information for recovery rc = writeLog(fTransId, "chk", fileData->fFileName, aDMLLogFileName, size, offset); if (rc != NO_ERROR) { ostringstream oss; oss << "log " << fileData->fFileName << ".chk to DML logfile failed."; logMessage(oss.str(), logging::LOG_TYPE_INFO); return rc; } } // write out the compressed data + padding if ((rc == NO_ERROR) && ((rc = writeCompressedChunk_(fileData, offset)) == NO_ERROR)) { if ((fileData->fFilePtr)->flush() != 0) //@Bug3162. { rc = ERR_FILE_WRITE; ostringstream oss; oss << "Failed to flush " << fileData->fFileName << " @line: " << __LINE__; logMessage(oss.str(), logging::LOG_TYPE_ERROR); } } } else { // write out the compressed data + padding rc = writeCompressedChunk_(fileData, offset); } return rc; } //------------------------------------------------------------------------------ // Actually write the current compressed data in fBufCompressed to the specified // segment file offset (offset) and file (fileData) //------------------------------------------------------------------------------ inline int ChunkManager::writeCompressedChunk_(CompFileData* fileData, int64_t offset) { int rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, offset, __LINE__); if (rc != NO_ERROR) return rc; return writeFile(fileData->fFilePtr, fileData->fFileName, fBufCompressed, fLenCompressed, __LINE__); } //------------------------------------------------------------------------------ // Open the specified segment file (fileData) using the given mode. // ln is the source code line number of the code invoking this operation // (ex __LINE__); this is used for logging error messages. // // useTmpSuffix controls whether HDFS file is opened with USE_TMPFILE bit set. // Typically set for bulk load, single insert, and batch insert, when adding // rows to an "existing" file. // Typically always set for DML update and delete. // // @bug 5572 - HDFS usage: add *.tmp file backup flag to API //------------------------------------------------------------------------------ int ChunkManager::openFile(CompFileData* fileData, const char* mode, int colWidth, bool useTmpSuffix, int ln) const { int rc = NO_ERROR; unsigned opts = IDBDataFile::USE_VBUF; IDBPolicy::Contexts ctxt = IDBPolicy::WRITEENG; if (fIsHdfs) { if (useTmpSuffix) { if (!fIsBulkLoad) { // keep a DML log for confirm or cleanup the .tmp file string aDMLLogFileName; if ((rc = writeLog(fTransId, "tmp", fileData->fFileName, aDMLLogFileName, 0)) != NO_ERROR) { ostringstream oss; oss << "Failed to put " << fileData->fFileName << " into DML log."; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return rc; } } opts |= IDBDataFile::USE_TMPFILE; } } fileData->fFilePtr = IDBDataFile::open(IDBPolicy::getType(fileData->fFileName.c_str(), ctxt), fileData->fFileName.c_str(), mode, opts, colWidth); if (fileData->fFilePtr == NULL) { ostringstream oss; oss << "Failed to open compressed data file " << fileData->fFileName << " @line: " << ln; logMessage(oss.str(), logging::LOG_TYPE_ERROR); rc = ERR_COMP_OPEN_FILE; } return rc; } //------------------------------------------------------------------------------ // Set the file offset for the specified segment file (fileData) using the // given offset. // ln is the source code line number of the code invoking this operation // (ex __LINE__); this is used for logging error messages. Likewise, filename // is used for logging any error message. //------------------------------------------------------------------------------ int ChunkManager::setFileOffset(IDBDataFile* pFile, const string& fileName, off64_t offset, int ln) const { int rc = NO_ERROR; if (pFile->seek(offset, SEEK_SET) != 0) rc = ERR_COMP_SET_OFFSET; if (rc != NO_ERROR) { ostringstream oss; oss << "Failed to set offset in compressed data file " << fileName << " @line: " << ln << " offset:" << offset; logMessage(oss.str(), logging::LOG_TYPE_ERROR); } return rc; } //------------------------------------------------------------------------------ // Read the requested number of bytes (size) from the specified file pFile. // ln is the source code line number of the code invoking this operation // (ex __LINE__); this is used for logging error messages. Likewise, filename // is used for logging any error message. //------------------------------------------------------------------------------ int ChunkManager::readFile(IDBDataFile* pFile, const string& fileName, void* buf, size_t size, int ln) const { size_t bytes = pFile->read(buf, size); if (bytes != size) { ostringstream oss; oss << "Failed to read from compressed data file " << fileName << " @line: " << ln << " read/expect:" << bytes << "/" << size; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_COMP_READ_FILE; } return NO_ERROR; } //------------------------------------------------------------------------------ // Write the requested number of bytes (size) to the specified file pFile. // ln is the source code line number of the code invoking this operation // (ex __LINE__); this is used for logging error messages. Likewise, filename // is used for logging any error message. //------------------------------------------------------------------------------ int ChunkManager::writeFile(IDBDataFile* pFile, const string& fileName, void* buf, size_t size, int ln) const { size_t bytes = pFile->write(buf, size); if (bytes != size) { ostringstream oss; oss << "Failed to write to compressed data file " << fileName << " @line: " << ln << " written/expect:" << bytes << "/" << size; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_COMP_WRITE_FILE; } return NO_ERROR; } //------------------------------------------------------------------------------ // Close the specified segment file (fileData), and remove the // corresponding CompFileData reference from fFileMap and fFilePtrMap. //------------------------------------------------------------------------------ int ChunkManager::closeFile(CompFileData* fileData) { int rc = NO_ERROR; WE_COMP_DBG(cout << "closing file:" << fileData->fFileName << endl;) fFileMap.erase(fileData->fFileID); fFilePtrMap.erase(fileData->fFilePtr); if (fileData->fFilePtr) delete fileData->fFilePtr; delete fileData; fileData = NULL; return rc; } //------------------------------------------------------------------------------ // Write the chunk pointers headers for the specified file (fileData). // ln is the source code line number of the code invoking this operation // (ex __LINE__); this is used for logging error messages. For DML usage, // backup for recovery is also performed. This step is skipped for cpimport.bin // as bulk import performs its own backup and recovery operations. //------------------------------------------------------------------------------ int ChunkManager::writeHeader(CompFileData* fileData, int ln) { int rc = NO_ERROR; int headerSize = compress::CompressInterface::getHdrSize(fileData->fFileHeader.fControlData); int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT; if (!fIsHdfs && !fIsBulkLoad) { // write a backup header string hdrFileName(fileData->fFileName + ".hdr"); string aDMLLogFileName; IDBDataFile* hdrFilePtr = IDBDataFile::open(IDBPolicy::getType(hdrFileName.c_str(), IDBPolicy::WRITEENG), hdrFileName.c_str(), "w+b", 0, fileData->fColWidth); if (hdrFilePtr) { rc = writeFile(hdrFilePtr, hdrFileName, fileData->fFileHeader.fControlData, COMPRESSED_FILE_HEADER_UNIT, __LINE__); if (rc == NO_ERROR) rc = writeFile(hdrFilePtr, hdrFileName, fileData->fFileHeader.fPtrSection, ptrSecSize, __LINE__); delete hdrFilePtr; } if (rc == NO_ERROR) { // log the header information for recovery rc = writeLog(fTransId, "hdr", fileData->fFileName, aDMLLogFileName, headerSize); if (rc != NO_ERROR) { ostringstream oss; oss << "log " << fileData->fFileName << ".hdr to DML logfile failed."; logMessage(oss.str(), logging::LOG_TYPE_ERROR); } if ((rc == NO_ERROR) && (rc = writeHeader_(fileData, ptrSecSize)) == NO_ERROR) { (fileData->fFilePtr)->flush(); } } else { IDBPolicy::remove(hdrFileName.c_str()); } } else { if ((rc = writeHeader_(fileData, ptrSecSize)) == NO_ERROR) { (fileData->fFilePtr)->flush(); } } if (rc != NO_ERROR) { ostringstream oss; oss << "write header failed: " << fileData->fFileName << "call from line:" << ln; logMessage(oss.str(), logging::LOG_TYPE_ERROR); } return rc; } //------------------------------------------------------------------------------ // Write the chunk pointers headers for the specified file (fileData). // ln is the source code line number of the code invoking this operation // (ex __LINE__); this is used for logging error messages. For DML usage, // backup for recovery is also performed. This step is skipped for cpimport.bin // as bulk import performs its own backup and recovery operations. //------------------------------------------------------------------------------ inline int ChunkManager::writeHeader_(CompFileData* fileData, int ptrSecSize) { int rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, 0, __LINE__); if (rc == NO_ERROR) rc = writeFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fControlData, COMPRESSED_FILE_HEADER_UNIT, __LINE__); if (rc == NO_ERROR) rc = writeFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fPtrSection, ptrSecSize, __LINE__); return rc; } //------------------------------------------------------------------------------ // For the specified segment file (pFile), read in an abbreviated/compressed // chunk extent, uncompress, and expand to a full chunk for a full extent. //------------------------------------------------------------------------------ int ChunkManager::expandAbbrevColumnExtent(IDBDataFile* pFile, const uint8_t* emptyVal, int width) { map::iterator i = fFilePtrMap.find(pFile); if (i == fFilePtrMap.end()) { logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_FILE_NOT_FOUND; } int rc = NO_ERROR; // fetch the initial chunk if not already done. ChunkData* chunkData = (i->second)->findChunk(0); if ((chunkData == NULL) && ((rc = fetchChunkFromFile(pFile, 0, chunkData)) != NO_ERROR)) return rc; // buf points to the end of existing data char* buf = chunkData->fBufUnCompressed + chunkData->fLenUnCompressed; int size = UNCOMPRESSED_CHUNK_SIZE - chunkData->fLenUnCompressed; fFileOp->setEmptyBuf((unsigned char*)buf, size, emptyVal, width); chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE; chunkData->fWriteToFile = true; //(writeChunkToFile(i->second, chunkData)); //(writeHeader(i->second, __LINE__)); return NO_ERROR; } //------------------------------------------------------------------------------ // For column segment file: // Increment the block count stored in the chunk header used to track how many // blocks are allocated to the corresponding segment file. //------------------------------------------------------------------------------ // same here as for dict. int ChunkManager::updateColumnExtent(IDBDataFile* pFile, int addBlockCount, int64_t lbid) { map::iterator i = fFilePtrMap.find(pFile); if (i == fFilePtrMap.end()) { logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_FILE_NOT_FOUND; } CompFileData* pFileData = i->second; if (!pFileData) { logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_FILE_NOT_FOUND; } int rc = NO_ERROR; char* hdr = pFileData->fFileHeader.fControlData; compress::CompressInterface::setBlockCount(hdr, compress::CompressInterface::getBlockCount(hdr) + addBlockCount); compress::CompressInterface::setLBIDByIndex(hdr, lbid, 1); ChunkData* chunkData = (pFileData)->findChunk(0); if (chunkData != NULL) { if ((rc = writeChunkToFile(pFileData, chunkData)) == NO_ERROR) { rc = writeHeader(pFileData, __LINE__); if (rc == NO_ERROR) { //@Bug 4977 remove log files removeBackups(fTransId); } } else { ostringstream oss; oss << "write chunk to file failed when updateColumnExtent: " << pFileData->fFileName; logMessage(oss.str(), logging::LOG_TYPE_ERROR); } } pFile->flush(); return rc; } //------------------------------------------------------------------------------ // For dictionary store segment file: // Increment the block count stored in the chunk header used to track how many // blocks are allocated to the corresponding segment file. //------------------------------------------------------------------------------ int ChunkManager::updateDctnryExtent(IDBDataFile* pFile, int addBlockCount, BRM::LBID_t lbid) { map::iterator i = fFilePtrMap.find(pFile); if (i == fFilePtrMap.end()) { logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__); return ERR_COMP_FILE_NOT_FOUND; } int rc = NO_ERROR; // fetch the initial chunk if not already done. ChunkData* chunkData = (i->second)->findChunk(0); if ((chunkData == NULL) && ((rc = fetchChunkFromFile(pFile, 0, chunkData)) != NO_ERROR)) return rc; // logged by fetchChunkFromFile char* hdr = i->second->fFileHeader.fControlData; char* uncompressedBuf = chunkData->fBufUnCompressed; int currentBlockCount = compress::CompressInterface::getBlockCount(hdr); // Bug 3203, write out the compressed initial extent. if (currentBlockCount == 0) { int initSize = NUM_BLOCKS_PER_INITIAL_EXTENT * BYTE_PER_BLOCK; initializeDctnryChunk(uncompressedBuf, initSize); chunkData->fWriteToFile = true; if ((rc = writeChunkToFile(i->second, chunkData)) == NO_ERROR) { rc = writeHeader(i->second, __LINE__); if (rc == NO_ERROR) { //@Bug 4977 remove the log file removeBackups(fTransId); } } else { ostringstream oss; oss << "write chunk to file failed when updateDctnryExtent: " << i->second->fFileName; logMessage(oss.str(), logging::LOG_TYPE_ERROR); } } else if (currentBlockCount == NUM_BLOCKS_PER_INITIAL_EXTENT) { int initSize = NUM_BLOCKS_PER_INITIAL_EXTENT * BYTE_PER_BLOCK; int incrSize = UNCOMPRESSED_CHUNK_SIZE - initSize; initializeDctnryChunk(uncompressedBuf + initSize, incrSize); uint64_t* ptrs = reinterpret_cast(i->second->fFileHeader.fPtrSection); ptrs[1] = 0; // the compressed chunk size is unknown } if (rc == NO_ERROR) compress::CompressInterface::setBlockCount( hdr, compress::CompressInterface::getBlockCount(hdr) + addBlockCount); if (currentBlockCount) { // Append to the end. uint64_t lbidCount = compress::CompressInterface::getLBIDCount(hdr); compress::CompressInterface::setLBIDByIndex(hdr, lbid, lbidCount); } return rc; } //------------------------------------------------------------------------------ // Close any open segment files, and free up memory. //------------------------------------------------------------------------------ void ChunkManager::cleanUp(const std::map& columOids) { WE_COMP_DBG(cout << "cleanUp with " << fActiveChunks.size() << " active chunk(s)." << endl;) std::map::const_iterator it; map::iterator i = fFilePtrMap.begin(); while (i != fFilePtrMap.end()) { CompFileData* fileData = i->second; it = columOids.find(fileData->fFid); if (fIsInsert && it != columOids.end()) { list& chunks = fileData->fChunkList; for (list::iterator j = chunks.begin(); j != chunks.end(); ++j) delete *j; delete fileData->fFilePtr; fFileMap.erase(fileData->fFileID); fFilePtrMap.erase(i++); delete fileData; } else if (!fIsInsert || (columOids.size() == 0)) { list& chunks = fileData->fChunkList; for (list::iterator j = chunks.begin(); j != chunks.end(); ++j) delete *j; delete fileData->fFilePtr; fFileMap.erase(fileData->fFileID); fFilePtrMap.erase(i++); delete fileData; } else { i++; } } if (fDropFdCache) { cacheutils::dropPrimProcFdCache(); fDropFdCache = false; } } //------------------------------------------------------------------------------ // Read "n" blocks from pFile starting at fbo, into readBuf. //------------------------------------------------------------------------------ int ChunkManager::readBlocks(IDBDataFile* pFile, unsigned char* readBuf, uint64_t fbo, size_t n) { WE_COMP_DBG(cout << "backup blocks fbo:" << fbo << " num:" << n << " file:" << pFile << endl;) // safety check if (pFile == NULL || n < 1) { return -1; } map::iterator fpIt = fFilePtrMap.find(pFile); if (fpIt == fFilePtrMap.end()) { return -1; } // the n blocks may cross more than one chunk // find the chunk ID and offset of the 1st fbo lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE); int idx = offset.quot; // current chunk id int rem = offset.rem; // offset in current chunk int num = UNCOMPRESSED_CHUNK_SIZE - rem; // # of bytes available in current chunk int left = n * BYTE_PER_BLOCK; // # of bytest to be read // # of bytes to be read from current chunk num = (left > num) ? num : left; do { ChunkData* chunkData = (fpIt->second)->findChunk(idx); WE_COMP_DBG(cout << "id:" << idx << " ofst:" << rem << " num:" << num << " left:" << left << endl;) // chunk is not already uncompressed if (chunkData == NULL) { if (fetchChunkFromFile(pFile, idx, chunkData) != NO_ERROR) { return -1; } } // copy the data at fbo to readBuf memcpy(readBuf, chunkData->fBufUnCompressed + rem, num); // prepare for the next read readBuf += num; rem = 0; left -= num; num = (left > UNCOMPRESSED_CHUNK_SIZE) ? UNCOMPRESSED_CHUNK_SIZE : left; idx++; } while (left > 0); return n; } //------------------------------------------------------------------------------ // Write the a block (writeBuf) into the fbo block of the specified file. // Updated chunk is not flushed to disk but left pending in the applicable // CompFileData object. //------------------------------------------------------------------------------ int ChunkManager::restoreBlock(IDBDataFile* pFile, const unsigned char* writeBuf, uint64_t fbo) { WE_COMP_DBG(cout << "restore blocks fbo:" << fbo << " file:" << pFile << endl;) // safety check if (pFile == NULL) return -1; map::iterator fpIt = fFilePtrMap.find(pFile); if (fpIt == fFilePtrMap.end()) return -1; // the n blocks may cross more than one chunk // find the chunk ID and offset of the 1st fbo lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE); ChunkData* chunkData = (fpIt->second)->findChunk(offset.quot); WE_COMP_DBG(cout << "id:" << offset.quot << " ofst:" << offset.rem << endl;) // chunk is not already uncompressed if (chunkData == NULL) { if (fetchChunkFromFile(pFile, offset.quot, chunkData) != NO_ERROR) return -1; } // copy the data to chunk buffer memcpy(chunkData->fBufUnCompressed + offset.rem, writeBuf, BYTE_PER_BLOCK); chunkData->fWriteToFile = true; return BYTE_PER_BLOCK; } //------------------------------------------------------------------------------ // Get the allocated block count from the header, for the specified file (pFile) //------------------------------------------------------------------------------ int ChunkManager::getBlockCount(IDBDataFile* pFile) { map::iterator fpIt = fFilePtrMap.find(pFile); idbassert(fpIt != fFilePtrMap.end()); return compress::CompressInterface::getBlockCount(fpIt->second->fFileHeader.fControlData); } //------------------------------------------------------------------------------ // Set the FileOp pointer and dictionary flag //------------------------------------------------------------------------------ void ChunkManager::fileOp(FileOp* fileOp) { fFileOp = fileOp; if (fileOp) { setTransId(fileOp->getTransId()); } } //------------------------------------------------------------------------------ // Calculate and return the size of the chunk pointer header for a column of the // specified width. //------------------------------------------------------------------------------ int ChunkManager::calculateHeaderSize(int width) { int headerUnits = 1; // dictionary columns may need variable length header if (width > 8) { int extentsPerFile = Config::getExtentsPerSegmentFile(); int rowsPerExtent = BRMWrapper::getInstance()->getExtentRows(); int rowsPerFile = rowsPerExtent * extentsPerFile; int stringsPerBlock = 8180 / (width + 2); // 8180 = 8192 - 12 // BLOB is 1 string per block if (stringsPerBlock == 0) stringsPerBlock = 1; int blocksNeeded = rowsPerFile / stringsPerBlock; int blocksPerChunk = UNCOMPRESSED_CHUNK_SIZE / BYTE_PER_BLOCK; lldiv_t chunks = lldiv(blocksNeeded, blocksPerChunk); int chunksNeeded = chunks.quot + (chunks.rem ? 1 : 0); // round up int ptrsNeeded = chunksNeeded + 1; // 1 more ptr for 0 ptr marking end int ptrsIn4K = (4 * 1024) / sizeof(uint64_t); lldiv_t hdrs = lldiv(ptrsNeeded, ptrsIn4K); headerUnits = hdrs.quot + (hdrs.rem ? 1 : 0); // round up // Always include odd number of 4K ptr headers, so that when we add the // single 4K control header, the cumulative header space will be an even // multiple of an 8K boundary. if ((headerUnits % 2) == 0) headerUnits++; } headerUnits++; // add the control data block return (headerUnits * COMPRESSED_FILE_HEADER_UNIT); } //------------------------------------------------------------------------------ // Reallocate the chunks in a file to account for an expanding chunk that will // not fit in the available embedded free space. //------------------------------------------------------------------------------ int ChunkManager::reallocateChunks(CompFileData* fileData) { WE_COMP_DBG(cout << "reallocate chunks in " << fileData->fFileName << " (" << fileData->fFilePtr << ")" << endl;) // return value int rc = NO_ERROR; // original file info string origFileName = fileData->fFileName; IDBDataFile* origFilePtr = fileData->fFilePtr; origFilePtr->flush(); // back out the current pointers int headerSize = compress::CompressInterface::getHdrSize(fileData->fFileHeader.fControlData); int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT; compress::CompChunkPtrList origPtrs; if (compress::CompressInterface::getPtrList(fileData->fFileHeader.fPtrSection, ptrSecSize, origPtrs) != 0) { ostringstream oss; oss << "Chunk shifting failed, file:" << origFileName << " -- invalid header."; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_COMP_PARSE_HDRS; } // get the chunks already in memory list& chunkList = fileData->fChunkList; chunkList.sort(chunkDataPtrLessCompare); list::iterator j = chunkList.begin(); int numOfChunks = origPtrs.size(); // number of chunks that contain user data vector chunksTouched; // chunk data is being modified, and in memory for (int i = 0; i < numOfChunks; i++) chunksTouched.push_back(NULL); // mark touched chunks while (j != chunkList.end()) { chunksTouched[(*j)->fChunkId] = *j; j++; } // new file name and pointer string rlcFileName(fileData->fFileName + ".rlc"); IDBDataFile* rlcFilePtr = IDBDataFile::open(IDBPolicy::getType(rlcFileName.c_str(), IDBPolicy::WRITEENG), rlcFileName.c_str(), "w+b", 0, fileData->fColWidth); if (!rlcFilePtr) { ostringstream oss; oss << "Chunk shifting failed, file:" << origFileName << " -- cannot open rlc file."; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_FILE_OPEN; } // log the recover information here string aDMLLogFileName; rc = writeLog(fTransId, "rlc", fileData->fFileName, aDMLLogFileName); if (rc != NO_ERROR) { ostringstream oss; oss << "log " << fileData->fFileName << ".rlc to DML logfile failed."; logMessage(oss.str(), logging::LOG_TYPE_ERROR); delete rlcFilePtr; rlcFilePtr = NULL; IDBPolicy::remove(rlcFileName.c_str()); return rc; } // !!! May conside to use mmap to speed up the copy !!! // !!! copy the whole file and update the shifting part !!! // store updated chunk pointers uint64_t* ptrs = reinterpret_cast(fileData->fFileHeader.fPtrSection); ptrs[0] = origPtrs[0].first; // the first chunk offset has no change. // bug3913, file size 0 after reallocate. // write the header, to be updated later, make sure there is someing in the file if ((rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fControlData, COMPRESSED_FILE_HEADER_UNIT, __LINE__)) == NO_ERROR) rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fPtrSection, ptrSecSize, __LINE__); int k = 0; for (; k < numOfChunks && rc == NO_ERROR; k++) { uint64_t chunkSize = 0; // size of current chunk unsigned char* buf = NULL; // output buffer // Find the current chunk size, and allocate the data -- buf point to the data. if (chunksTouched[k] == NULL) { // Chunks not touched will be copied to new file without being uncompressed first. // cout << "reallocateChunks: chunk has not been updated" << endl; chunkSize = origPtrs[k].second; // read disk data into compressed data buffer buf = (unsigned char*)fBufCompressed; if ((rc = setFileOffset(origFilePtr, origFileName, origPtrs[k].first, __LINE__)) != NO_ERROR) { ostringstream oss; oss << "set file offset failed @line:" << __LINE__ << "with retCode:" << rc << " filename:" << origFileName; logMessage(oss.str(), logging::LOG_TYPE_ERROR); continue; } if ((rc = readFile(origFilePtr, origFileName, buf, chunkSize, __LINE__)) != NO_ERROR) { ostringstream oss; oss << "readfile failed @line:" << __LINE__ << "with retCode:" << rc << " filename:" << origFileName; logMessage(oss.str(), logging::LOG_TYPE_ERROR); continue; } } else // chunksTouched[k] != NULL { // chunk has been updated, and in memory. // cout << "reallocateChunks: chunk has been updated" << endl; ChunkData* chunkData = chunksTouched[k]; fLenCompressed = fMaxCompressedBufSize; auto fCompressor = compress::getCompressorByType(fCompressorPool, fileData->fCompressionType); if (!fCompressor) { return ERR_COMP_WRONG_COMP_TYPE; } if ((rc = fCompressor->compressBlock((char*)chunkData->fBufUnCompressed, chunkData->fLenUnCompressed, (unsigned char*)fBufCompressed, fLenCompressed)) != 0) { ostringstream oss; oss << "Compress data failed @line:" << __LINE__ << "with retCode:" << rc << " filename:" << rlcFileName; logMessage(oss.str(), logging::LOG_TYPE_ERROR); rc = ERR_COMP_COMPRESS; continue; } WE_COMP_DBG(cout << "Chunk compressed from " << chunkData->fLenUnCompressed << " to " << fLenCompressed;) // shifting chunk, add padding space if ((rc = fCompressor->padCompressedChunks((unsigned char*)fBufCompressed, fLenCompressed, fMaxCompressedBufSize)) != 0) { WE_COMP_DBG(cout << ", but padding failed." << endl;) ostringstream oss; oss << "Compress data failed @line:" << __LINE__ << "with retCode:" << rc << " filename:" << rlcFileName; logMessage(oss.str(), logging::LOG_TYPE_ERROR); rc = ERR_COMP_PAD_DATA; continue; } WE_COMP_DBG(cout << ", and padded to " << fLenCompressed;) buf = (unsigned char*)fBufCompressed; chunkSize = fLenCompressed; } // write is in sequence, no need to call setFileOffset // cout << "reallocateChunks: writing to temp file " << rlcFileName << " with fileptr " << rlcFilePtr << // endl; rc = writeFile(rlcFilePtr, rlcFileName, buf, chunkSize, __LINE__); if (rc != NO_ERROR) { // cout << "reallocateChunks: writing to temp file " << rlcFileName << " with fileptr " << rlcFilePtr << // " failed" << endl; ostringstream oss; oss << "write file failed @line:" << __LINE__ << "with retCode:" << rc << " filename:" << rlcFileName; logMessage(oss.str(), logging::LOG_TYPE_ERROR); continue; } // Update the current chunk size. ptrs[k + 1] = ptrs[k] + chunkSize; } // up to now, everything OK: rc == NO_ERROR // remove all written chunks from active chunk list. j = chunkList.begin(); while (j != chunkList.end()) { ChunkData* chunkData = *j; fActiveChunks.remove(make_pair(fileData->fFileID, chunkData)); fileData->fChunkList.remove(chunkData); delete chunkData; j = chunkList.begin(); } // finally update the header if (rc == NO_ERROR) rc = setFileOffset(rlcFilePtr, rlcFileName, 0, __LINE__); if (rc == NO_ERROR) rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fControlData, COMPRESSED_FILE_HEADER_UNIT, __LINE__); if (rc == NO_ERROR) rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fPtrSection, ptrSecSize, __LINE__); if (rc != NO_ERROR) { struct timeval tv; gettimeofday(&tv, 0); struct tm ltm; localtime_r(reinterpret_cast(&tv.tv_sec), <m); char tmText[24]; // this snprintf call causes a compiler warning b/c buffer size is less // then maximum string size. #if defined(__GNUC__) && __GNUC__ >= 7 #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wformat-truncation=" snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld", ltm.tm_year + 1900, ltm.tm_mon + 1, ltm.tm_mday, ltm.tm_hour, ltm.tm_min, ltm.tm_sec, tv.tv_usec); #pragma GCC diagnostic pop #else snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld", ltm.tm_year + 1900, ltm.tm_mon + 1, ltm.tm_mday, ltm.tm_hour, ltm.tm_min, ltm.tm_sec, tv.tv_usec); #endif string dbgFileName(rlcFileName + tmText); ostringstream oss; oss << "Chunk shifting failed, file:" << origFileName; if (IDBPolicy::rename(rlcFileName.c_str(), dbgFileName.c_str()) == 0) oss << ", rlc file is:" << dbgFileName; // write out the header for debugging in case the header in rlc file is bad or not updated. string rlcPtrFileName(dbgFileName + ".ptr"); IDBDataFile* rlcPtrFilePtr = IDBDataFile::open(IDBPolicy::getType(rlcPtrFileName.c_str(), IDBPolicy::WRITEENG), rlcPtrFileName.c_str(), "w+b", 0, fileData->fColWidth); if (rlcPtrFilePtr && (writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fControlData, COMPRESSED_FILE_HEADER_UNIT, __LINE__) == NO_ERROR) && (writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fPtrSection, ptrSecSize, __LINE__) == NO_ERROR)) { oss << ", rlc file header in memory: " << rlcPtrFileName; } else { oss << ", possible incomplete rlc file header in memory: " << rlcPtrFileName; } logMessage(oss.str(), logging::LOG_TYPE_ERROR); closeFile(fileData); delete rlcFilePtr; rlcFilePtr = NULL; if (rlcPtrFilePtr != NULL) { delete rlcPtrFilePtr; rlcPtrFilePtr = NULL; } return rc; } // update the file pointer map w/ new file pointer // cout << "realloc1: remove ptr = " << fileData->fFilePtr << endl; fFilePtrMap.erase(fileData->fFilePtr); delete fileData->fFilePtr; fileData->fFilePtr = NULL; delete rlcFilePtr; rlcFilePtr = NULL; // put reallocated file size here for logging purpose. uint64_t fileSize = 0; if (rc == NO_ERROR) { // @bug3913, keep the original file until the new file is properly renamed. // 1. check the new file size is NOT 0, matching ptr[k]. // 2. mv the current to be backup. // 3. rename the rlc file. // 4. check the file size again. // 5. verify each chunk. // 5. rm the bak file or mv bak file back. // check the new file size using two methods mostly for curiosity on 0 size file. // They can be removed because all chunks are to be verified after rename. if (IDBPolicy::size(rlcFileName.c_str()) != (int64_t)ptrs[k]) { ostringstream oss; oss << "Incorrect file size, expect:" << ptrs[k] << ", stat:" << fileSize << ", filename:" << rlcFileName; logMessage(oss.str(), logging::LOG_TYPE_ERROR); rc = ERR_COMP_RENAME_FILE; } if (rc == NO_ERROR) { if (fIsHdfs) rc = swapTmpFile(rlcFileName, fileData->fFileName + ".tmp"); else rc = swapTmpFile(rlcFileName, fileData->fFileName); } if ((rc == NO_ERROR) && (rc = openFile(fileData, "r+b", fileData->fColWidth, true, __LINE__)) == NO_ERROR) // @bug 5572 HDFS tmp file { fileSize = fileData->fFilePtr->size(); if (fileSize == ptrs[k]) { rc = verifyChunksAfterRealloc(fileData); } else { ostringstream oss; oss << "Incorrect file size, expect:" << ptrs[k] << ", stat:" << fileSize << ", filename:" << fileData->fFileName; logMessage(oss.str(), logging::LOG_TYPE_ERROR); rc = ERR_COMP_RENAME_FILE; } if (rc == NO_ERROR) { fFilePtrMap.insert(make_pair(fileData->fFilePtr, fileData)); // cout << "realloc2: insert ptr = " << fileData->fFilePtr << endl; // notify the PrimProc of unlinking original data file fDropFdCache = true; } } if (!fIsHdfs) { string bakFileName(fileData->fFileName + ".orig"); if (rc == NO_ERROR) { // unlink the original file (remove is portable) if (fFs.remove(bakFileName.c_str()) != 0) { ostringstream oss; oss << "remove backup file " << bakFileName << " failed: " << strerror(errno); // not much we can do, log an info message for manual cleanup logMessage(oss.str(), logging::LOG_TYPE_INFO); } } else { // keep the bad file for debugging purpose if (fFs.rename(fileData->fFileName.c_str(), rlcFileName.c_str()) == 0) { ostringstream oss; oss << "data file after chunk shifting failed verification."; logMessage(oss.str(), logging::LOG_TYPE_ERROR); } // roll back the bak file if (fFs.rename(bakFileName.c_str(), fileData->fFileName.c_str()) != 0) { ostringstream oss; oss << "rename " << bakFileName << " to " << fileData->fFileName << " failed: " << strerror(errno); // must manually move it back logMessage(oss.str(), logging::LOG_TYPE_ERROR); } } } } if (!fIsHdfs) { if (rc == NO_ERROR) { // remove the log file fFs.remove(aDMLLogFileName.c_str()); } else { struct timeval tv; gettimeofday(&tv, 0); struct tm ltm; localtime_r(reinterpret_cast(&tv.tv_sec), <m); char tmText[24]; // this snprintf call causes a compiler warning b/c buffer size is less // then maximum string size. #if defined(__GNUC__) && __GNUC__ >= 7 #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wformat-truncation=" snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld", ltm.tm_year + 1900, ltm.tm_mon + 1, ltm.tm_mday, ltm.tm_hour, ltm.tm_min, ltm.tm_sec, tv.tv_usec); #pragma GCC diagnostic pop #else snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld", ltm.tm_year + 1900, ltm.tm_mon + 1, ltm.tm_mday, ltm.tm_hour, ltm.tm_min, ltm.tm_sec, tv.tv_usec); #endif string dbgFileName(rlcFileName + tmText); ostringstream oss; oss << "Chunk shifting failed, file:" << origFileName; if (IDBPolicy::rename(rlcFileName.c_str(), dbgFileName.c_str()) == 0) oss << ", rlc file is:" << dbgFileName; // write out the header for debugging in case the header in rlc file is bad. string rlcPtrFileName(dbgFileName + ".hdr"); IDBDataFile* rlcPtrFilePtr = IDBDataFile::open( IDBPolicy::getType(rlcPtrFileName.c_str(), IDBPolicy::WRITEENG), rlcPtrFileName.c_str(), "w+b", 0); if (rlcPtrFilePtr && (writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fControlData, COMPRESSED_FILE_HEADER_UNIT, __LINE__) == NO_ERROR) && (writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fPtrSection, ptrSecSize, __LINE__) == NO_ERROR)) { oss << ", rlc file header in memory: " << rlcPtrFileName; } else { oss << ", possible incomplete rlc file header in memory: " << rlcPtrFileName; } logMessage(oss.str(), logging::LOG_TYPE_ERROR); closeFile(fileData); if (rlcFilePtr != NULL) { delete rlcFilePtr; rlcFilePtr = NULL; } if (rlcPtrFilePtr != NULL) { delete rlcPtrFilePtr; rlcPtrFilePtr = NULL; } } } return rc; } //------------------------------------------------------------------------------ // Verify chunks can be uncompressed after a chunk shift. //------------------------------------------------------------------------------ int ChunkManager::verifyChunksAfterRealloc(CompFileData* fileData) { int rc = NO_ERROR; // read in the header if ((rc = readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fControlData, COMPRESSED_FILE_HEADER_UNIT, __LINE__)) != NO_ERROR) { ostringstream oss; oss << "Failed to read control header from new " << fileData->fFileName << ", roll back"; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return rc; } // make sure the header is valid if ((rc = compress::CompressInterface::verifyHdr(fileData->fFileHeader.fControlData)) != 0) { ostringstream oss; oss << "Invalid header in new " << fileData->fFileName << ", roll back"; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return rc; } int headerSize = compress::CompressInterface::getHdrSize(fileData->fFileHeader.fControlData); int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT; // read in the pointer section in header if ((rc = readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fPtrSection, ptrSecSize, __LINE__)) != NO_ERROR) { ostringstream oss; oss << "Failed to read pointer header from new " << fileData->fFileName << "@" << __LINE__; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return rc; } // get pointer list compress::CompChunkPtrList ptrs; if (compress::CompressInterface::getPtrList(fileData->fFileHeader.fPtrSection, ptrSecSize, ptrs) != 0) { ostringstream oss; oss << "Failed to parse pointer list from new " << fileData->fFileName << "@" << __LINE__; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_COMP_PARSE_HDRS; } // now verify each chunk ChunkData chunkData; int numOfChunks = ptrs.size(); // number of chunks in the file auto fCompressor = compress::getCompressorByType(fCompressorPool, fileData->fCompressionType); if (!fCompressor) { return ERR_COMP_WRONG_COMP_TYPE; } for (int i = 0; i < numOfChunks && rc == NO_ERROR; i++) { unsigned int chunkSize = ptrs[i].second; if ((rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, ptrs[i].first, __LINE__))) { ostringstream oss; oss << "Failed to setFileOffset new " << fileData->fFileName << "@" << __LINE__; logMessage(oss.str(), logging::LOG_TYPE_ERROR); continue; } if ((rc = readFile(fileData->fFilePtr, fileData->fFileName, fBufCompressed, chunkSize, __LINE__))) { ostringstream oss; oss << "Failed to read chunk from new " << fileData->fFileName << "@" << __LINE__; logMessage(oss.str(), logging::LOG_TYPE_ERROR); continue; } // uncompress the read in buffer size_t dataLen = sizeof(chunkData.fBufUnCompressed); if (fCompressor->uncompressBlock((char*)fBufCompressed, chunkSize, (unsigned char*)chunkData.fBufUnCompressed, dataLen) != 0) { ostringstream oss; oss << "Failed to uncompress chunk new " << fileData->fFileName << "@" << __LINE__; logMessage(oss.str(), logging::LOG_TYPE_ERROR); rc = ERR_COMP_UNCOMPRESS; continue; } } return rc; } //------------------------------------------------------------------------------ // Log an error message for the specified error code, and msg level. //------------------------------------------------------------------------------ void ChunkManager::logMessage(int code, int level, int lineNum, int fromLine) const { ostringstream oss; oss << ec.errorString(code) << " @line:" << lineNum; if (fromLine != -1) oss << " called from line:" << fromLine; logMessage(oss.str(), level); } //------------------------------------------------------------------------------ // Log the requested error message using the specified msg level. //------------------------------------------------------------------------------ void ChunkManager::logMessage(const string& msg, int level) const { logging::Message::Args args; args.add(msg); fSysLogger->logMessage((logging::LOG_TYPE)level, logging::M0080, args, // FIXME: store session id in class to pass on to LogginID... logging::LoggingID(SUBSYSTEM_ID_WE, 0, fTransId)); } //------------------------------------------------------------------------------ // Replace the cdf file with the updated tmp file. //------------------------------------------------------------------------------ int ChunkManager::swapTmpFile(const string& src, const string& dest) { // return value int rc = NO_ERROR; // if no change to the cdf, the tmp may not exist, no need to swap. if (!fFs.exists(src.c_str())) return rc; ssize_t srcFileSize = IDBPolicy::size(src.c_str()); if (srcFileSize <= 0) { ostringstream oss; oss << "swapTmpFile aborted. Source file size = " << srcFileSize; logMessage(oss.str(), logging::LOG_TYPE_CRITICAL); rc = ERR_COMP_RENAME_FILE; return rc; } errno = 0; // save the original file string orig(dest + ".orig"); fFs.remove(orig.c_str()); // remove left overs if (fFs.rename(dest.c_str(), orig.c_str()) != 0) { ostringstream oss; oss << "rename " << dest << " to " << orig << " failed: " << strerror(errno); logMessage(oss.str(), logging::LOG_TYPE_ERROR); rc = ERR_COMP_RENAME_FILE; } // rename the new file if (rc == NO_ERROR && fFs.rename(src.c_str(), dest.c_str()) != 0) { ostringstream oss; oss << "rename " << src << " to " << dest << " failed: " << strerror(errno); logMessage(oss.str(), logging::LOG_TYPE_ERROR); rc = ERR_COMP_RENAME_FILE; } if (rc == NO_ERROR && fFs.remove(orig.c_str()) != 0) rc = ERR_COMP_REMOVE_FILE; return rc; } //------------------------------------------------------------------------------ // Construct a DML log file name based on transaction ID, etc. //------------------------------------------------------------------------------ int ChunkManager::getDMLLogFileName(string& aDMLLogFileName, const TxnID& txnId) const { config::Config* config = config::Config::makeConfig(); string prefix = config->getConfig("SystemConfig", "DBRMRoot"); if (prefix.length() == 0) { ostringstream oss; oss << "trans " << txnId << ":Need a valid DBRMRoot entry in Calpont configuation file"; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_DML_LOG_NAME; } uint64_t pos = prefix.find_last_of("/"); if (pos != string::npos) { aDMLLogFileName = prefix.substr(0, pos + 1); // Get the file path } else { ostringstream oss; oss << "trans " << txnId << ":Cannot find the dbrm directory (" << prefix.c_str() << ") for the DML log file"; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_DML_LOG_NAME; } ostringstream oss; oss << txnId << "_" << fLocalModuleId; aDMLLogFileName += "DMLLog_" + oss.str(); return NO_ERROR; } //------------------------------------------------------------------------------ // clear the DML log file //------------------------------------------------------------------------------ int ChunkManager::startTransaction(const TxnID& txnId) const { // this step is for HDFS update/delete only. if (!fIsHdfs || fIsBulkLoad) return NO_ERROR; // Construct the DML log file name string aDMLLogFileName; if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR) return ERR_DML_LOG_NAME; // truncate the existing file boost::scoped_ptr aDMLLogFile(IDBDataFile::open( IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG), aDMLLogFileName.c_str(), "wb", 0)); if (!aDMLLogFile) { ostringstream oss; oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened."; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_OPEN_DML_LOG; } return NO_ERROR; } //------------------------------------------------------------------------------ // Backup cdf file and replace the with the updated tmp file. //------------------------------------------------------------------------------ int ChunkManager::confirmTransaction(const TxnID& txnId) const { // return value int rc = NO_ERROR; // this step is for HDFS update/delete only. if (!fIsHdfs || fIsBulkLoad) return rc; string aDMLLogFileName; if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR) return ERR_DML_LOG_NAME; // Open log file boost::scoped_ptr aDMLLogFile; aDMLLogFile.reset(IDBDataFile::open(IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG), aDMLLogFileName.c_str(), "r", 0)); if (!aDMLLogFile) { ostringstream oss; oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened"; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_OPEN_DML_LOG; } ssize_t logSize = fFs.size(aDMLLogFileName.c_str()); boost::scoped_array buf(new char[logSize]); if (aDMLLogFile->read(buf.get(), logSize) != logSize) return ERR_FILE_READ; std::istringstream strstream(string(buf.get(), logSize)); std::string backUpFileType; std::string filename; int64_t size; int64_t offset; ConfirmHdfsDbFile confirmHdfs; while (strstream >> backUpFileType >> filename >> size >> offset) { std::string confirmErrMsg; rc = confirmHdfs.confirmDbFileChange(backUpFileType, filename, confirmErrMsg); if (rc != NO_ERROR) { logMessage(confirmErrMsg, logging::LOG_TYPE_ERROR); break; } } return rc; } //------------------------------------------------------------------------------ // Finalize the chages // if success, remove the orig // otherwise, move the orig back to cdf //------------------------------------------------------------------------------ int ChunkManager::endTransaction(const TxnID& txnId, bool success) const { // return value int rc = NO_ERROR; // this step is for HDFS update/delete only. if (!fIsHdfs || fIsBulkLoad) return rc; string aDMLLogFileName; if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR) return ERR_DML_LOG_NAME; // Open log file boost::scoped_ptr aDMLLogFile; aDMLLogFile.reset(IDBDataFile::open(IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG), aDMLLogFileName.c_str(), "r", 0)); if (!aDMLLogFile) { ostringstream oss; oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened"; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_OPEN_DML_LOG; } ssize_t logSize = fFs.size(aDMLLogFileName.c_str()); ssize_t logRead = 0; boost::scoped_array buf(new char[logSize]); if ((logRead = aDMLLogFile->read(buf.get(), logSize)) != logSize) { ostringstream oss; oss << "trans " << txnId << ":File " << aDMLLogFileName << " filed to read: " << logRead << "/" << logSize; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_FILE_READ; } std::istringstream strstream(string(buf.get(), logSize)); std::string backUpFileType; std::string filename; int64_t size; int64_t offset; ConfirmHdfsDbFile confirmHdfs; while (strstream >> backUpFileType >> filename >> size >> offset) { std::string finalizeErrMsg; rc = confirmHdfs.endDbFileChange(backUpFileType, filename, success, finalizeErrMsg); if (rc != NO_ERROR) { logMessage(finalizeErrMsg, logging::LOG_TYPE_ERROR); break; } } // final clean up or recover if (rc == NO_ERROR) rc = fFs.remove(aDMLLogFileName.c_str()); return rc; } int ChunkManager::checkFixLastDictChunk(const FID& fid, uint16_t root, uint32_t partition, uint16_t segment) { int rc = 0; // Find the file info FileID fileID(fid, root, partition, segment); map::const_iterator mit = fFileMap.find(fileID); WE_COMP_DBG(cout << "getFileData: fid:" << fid << " root:" << root << " part:" << partition << " seg:" << segment << " file* " << ((mit != fFileMap.end()) ? "" : "not ") << "found." << endl;) // Get CompFileData pointer for existing Dictionary store file mit->second is CompFileData if (mit != fFileMap.end()) { int headerSize = compress::CompressInterface::getHdrSize(mit->second->fFileHeader.fControlData); int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT; // get pointer list compress::CompChunkPtrList ptrs; if (compress::CompressInterface::getPtrList(mit->second->fFileHeader.fPtrSection, ptrSecSize, ptrs) != 0) { ostringstream oss; oss << "Failed to parse pointer list from new " << mit->second->fFileName << "@" << __LINE__; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return ERR_COMP_PARSE_HDRS; } // now verify last chunk ChunkData* chunkData; int numOfChunks = ptrs.size(); // number of chunks in the file unsigned int chunkSize = ptrs[numOfChunks - 1].second; if ((rc = setFileOffset(mit->second->fFilePtr, mit->second->fFileName, ptrs[numOfChunks - 1].first, __LINE__))) { ostringstream oss; oss << "Failed to setFileOffset new " << mit->second->fFileName << "@" << __LINE__; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return rc; } if ((rc = readFile(mit->second->fFilePtr, mit->second->fFileName, fBufCompressed, chunkSize, __LINE__))) { ostringstream oss; oss << "Failed to read chunk from new " << mit->second->fFileName << "@" << __LINE__; logMessage(oss.str(), logging::LOG_TYPE_ERROR); return rc; } // uncompress the read in buffer chunkData = new ChunkData(numOfChunks - 1); size_t dataLen = sizeof(chunkData->fBufUnCompressed); auto fCompressor = compress::getCompressorByType(fCompressorPool, mit->second->fCompressionType); if (!fCompressor) { return ERR_COMP_WRONG_COMP_TYPE; } if (fCompressor->uncompressBlock((char*)fBufCompressed, chunkSize, (unsigned char*)chunkData->fBufUnCompressed, dataLen) != 0) { mit->second->fChunkList.push_back(chunkData); fActiveChunks.push_back(make_pair(mit->second->fFileID, chunkData)); // replace this chunk with empty chunk uint64_t blocks = 512; if ((numOfChunks - 1) == 0) { char* hdr = mit->second->fFileHeader.fControlData; if (compress::CompressInterface::getBlockCount(hdr) < 512) blocks = 256; } dataLen = 8192 * blocks; // load the uncompressed buffer with empty values. char* buf = chunkData->fBufUnCompressed; chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE; initializeDctnryChunk(buf, UNCOMPRESSED_CHUNK_SIZE); chunkData->fLenUnCompressed = dataLen; chunkData->fWriteToFile = true; } } return rc; } } // namespace WriteEngine