/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: we_fileop.cpp 4737 2013-08-14 20:45:46Z bwilkinson $ #include "mcsconfig.h" #include #include #include #include #include #include #include #include #include #if defined(__FreeBSD__) #include #include #else #include #endif #include #include #include using namespace std; #include "we_fileop.h" #include "we_convertor.h" #include "we_log.h" #include "we_config.h" #include "we_stats.h" #include "we_simplesyslog.h" #include "idbcompress.h" using namespace compress; #include "messagelog.h" using namespace logging; #include "IDBDataFile.h" #include "IDBFileSystem.h" #include "IDBPolicy.h" using namespace idbdatafile; namespace WriteEngine { /*static*/ boost::mutex FileOp::m_createDbRootMutexes; /*static*/ boost::mutex FileOp::m_mkdirMutex; /*static*/ std::map FileOp::m_DbRootAddExtentMutexes; // in 1 call to fwrite(), during initialization // StopWatch timer; /** * Constructor */ FileOp::FileOp(bool doAlloc) : m_compressionType(0), m_transId((TxnID)INVALID_NUM), m_buffer(0) { if (doAlloc) { m_buffer = new char[DEFAULT_BUFSIZ]; memset(m_buffer, '\0', DEFAULT_BUFSIZ); } } /** * Default Destructor */ FileOp::~FileOp() { if (m_buffer) { delete[] m_buffer; } m_buffer = 0; } /*********************************************************** * DESCRIPTION: * Close a file * PARAMETERS: * pFile - file handle * RETURN: * none ***********************************************************/ void FileOp::closeFile(IDBDataFile* pFile) const { delete pFile; } /*********************************************************** * DESCRIPTION: * Create directory * Function uses mutex lock to prevent thread contention trying to create * 2 subdirectories in the same directory at the same time. * PARAMETERS: * dirName - directory name * mode - create mode * RETURN: * NO_ERROR if success, otherwise if fail ***********************************************************/ int FileOp::createDir(const char* dirName, mode_t mode) const { boost::mutex::scoped_lock lk(m_mkdirMutex); int rc = IDBPolicy::mkdir(dirName); if (rc != 0) { int errRc = errno; if (errRc == EEXIST) return NO_ERROR; // ignore "File exists" error if (getLogger()) { std::ostringstream oss; std::string errnoMsg; Convertor::mapErrnoToString(errRc, errnoMsg); oss << "Error creating directory " << dirName << "; err-" << errRc << "; " << errnoMsg; getLogger()->logMsg(oss.str(), ERR_DIR_CREATE, MSGLVL_ERROR); } return ERR_DIR_CREATE; } return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Create the "first" segment file for a column with a fixed file size * Note: the file is created in binary mode * PARAMETERS: * fileName - file name with complete path * numOfBlock - the total number of blocks to be initialized (written out) * compressionType - Compression Type * emptyVal - empty value used to initialize column values * width - width of column in bytes * dbRoot - DBRoot of column file we are creating * RETURN: * NO_ERROR if success * ERR_FILE_EXIST if file exists * ERR_FILE_CREATE if can not create the file ***********************************************************/ int FileOp::createFile(const char* fileName, int numOfBlock, const uint8_t* emptyVal, int width, execplan::CalpontSystemCatalog::ColDataType colDataType, uint16_t dbRoot, BRM::LBID_t startLbid) { IDBDataFile* pFile = IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, "w+b", IDBDataFile::USE_VBUF, width); int rc = 0; if (pFile != NULL) { // Initialize the contents of the extent. if (m_compressionType) { rc = initAbbrevCompColumnExtent(pFile, dbRoot, numOfBlock, emptyVal, width, startLbid, colDataType); } else { rc = initColumnExtent(pFile, dbRoot, numOfBlock, emptyVal, width, colDataType, true, // new file false, // don't expand; add new extent true); // add abbreviated extent } closeFile(pFile); } else return ERR_FILE_CREATE; return rc; } /*********************************************************** * DESCRIPTION: * Create the "first" segment file for a column with a fixed file size * Note: the file is created in binary mode * PARAMETERS: * fid - OID of the column file to be created * allocSize (out) - number of blocks allocated to the first extent * dbRoot - DBRoot where file is to be located * partition- Starting partition number for segment file path * compressionType - Compression type * colDataType - the column data type * emptyVal - designated "empty" value for this OID * width - width of column in bytes * RETURN: * NO_ERROR if success * ERR_FILE_EXIST if file exists * ERR_FILE_CREATE if can not create the file ***********************************************************/ int FileOp::createFile(FID fid, int& allocSize, uint16_t dbRoot, uint32_t partition, execplan::CalpontSystemCatalog::ColDataType colDataType, const uint8_t* emptyVal, int width) { // std::cout << "Creating file oid: " << fid << // "; compress: " << m_compressionType << std::endl; char fileName[FILE_NAME_SIZE]; int rc; uint16_t segment = 0; // should always be 0 when starting a new column RETURN_ON_ERROR((rc = oid2FileName(fid, fileName, true, dbRoot, partition, segment))); //@Bug 3196 if (exists(fileName)) return ERR_FILE_EXIST; // allocatColExtent() treats dbRoot and partition as in/out // arguments, so we need to pass in a non-const variable. uint16_t dbRootx = dbRoot; uint32_t partitionx = partition; // Since we are creating a new column OID, we know partition // and segment are 0, so we ignore their output values. // timer.start( "allocateColExtent" ); BRM::LBID_t startLbid; uint32_t startBlock; RETURN_ON_ERROR(BRMWrapper::getInstance()->allocateColExtentExactFile((OID)fid, (uint32_t)width, dbRootx, partitionx, segment, colDataType, startLbid, allocSize, startBlock)); // We allocate a full extent from BRM, but only write an abbreviated 256K // rows to disk for 1st extent, to conserve disk usage for small tables. // One exception here is if we have rolled off partition 0, and we are // adding a column to an existing table, then we are adding a column // whose first partition is not 0. In this case, we know we are not // dealing with a small table, so we init a full extent for 1st extent. int totalSize = 0; if (partition == 0) totalSize = (INITIAL_EXTENT_ROWS_TO_DISK / BYTE_PER_BLOCK) * width; else totalSize = allocSize; // full extent if starting partition > 0 // Note we can't pass full file name to isDiskSpaceAvail() because the // file does not exist yet, but passing DBRoot directory should suffice. if (!isDiskSpaceAvail(Config::getDBRootByNum(dbRoot), totalSize)) { return ERR_FILE_DISK_SPACE; } // timer.stop( "allocateColExtent" ); return createFile(fileName, totalSize, emptyVal, width, colDataType, dbRoot, startLbid); } /*********************************************************** * DESCRIPTION: * Delete a file * PARAMETERS: * fileName - file name with complete path * RETURN: * NO_ERROR if success * ERR_FILE_NOT_EXIST if file does not exist * ERR_FILE_DELETE if can not delete a file ***********************************************************/ int FileOp::deleteFile(const char* fileName) const { if (!exists(fileName)) return ERR_FILE_NOT_EXIST; return (IDBPolicy::remove(fileName) == -1) ? ERR_FILE_DELETE : NO_ERROR; } /*********************************************************** * DESCRIPTION: * Deletes all the segment or dictionary store files associated with the * specified fid. * PARAMETERS: * fid - OID of the column being deleted. * RETURN: * NO_ERROR if success * ERR_DM_CONVERT_OID if error occurs converting OID to file name ***********************************************************/ int FileOp::deleteFile(FID fid) const { char tempFileName[FILE_NAME_SIZE]; char oidDirName[FILE_NAME_SIZE]; char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE]; RETURN_ON_ERROR((Convertor::oid2FileName(fid, tempFileName, dbDir, 0, 0))); sprintf(oidDirName, "%s/%s/%s/%s", dbDir[0], dbDir[1], dbDir[2], dbDir[3]); // std::cout << "Deleting files for OID " << fid << // "; dirpath: " << oidDirName << std::endl; // need check return code. RETURN_ON_ERROR(BRMWrapper::getInstance()->deleteOid(fid)); std::vector dbRootPathList; Config::getDBRootPathList(dbRootPathList); int rc; for (unsigned i = 0; i < dbRootPathList.size(); i++) { char rootOidDirName[FILE_NAME_SIZE]; rc = snprintf(rootOidDirName, FILE_NAME_SIZE, "%s/%s", dbRootPathList[i].c_str(), oidDirName); if (rc == FILE_NAME_SIZE || IDBPolicy::remove(rootOidDirName) != 0) { ostringstream oss; oss << "Unable to remove " << rootOidDirName; throw std::runtime_error(oss.str()); } } return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Deletes all the segment or dictionary store files associated with the * specified fid. * PARAMETERS: * fid - OIDs of the column/dictionary being deleted. * RETURN: * NO_ERROR if success * ERR_DM_CONVERT_OID if error occurs converting OID to file name ***********************************************************/ int FileOp::deleteFiles(const std::vector& fids) const { char tempFileName[FILE_NAME_SIZE]; char oidDirName[FILE_NAME_SIZE]; char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE]; std::vector dbRootPathList; Config::getDBRootPathList(dbRootPathList); int rc; for (unsigned n = 0; n < fids.size(); n++) { RETURN_ON_ERROR((Convertor::oid2FileName(fids[n], tempFileName, dbDir, 0, 0))); sprintf(oidDirName, "%s/%s/%s/%s", dbDir[0], dbDir[1], dbDir[2], dbDir[3]); // std::cout << "Deleting files for OID " << fid << // "; dirpath: " << oidDirName << std::endl; for (unsigned i = 0; i < dbRootPathList.size(); i++) { char rootOidDirName[FILE_NAME_SIZE]; rc = snprintf(rootOidDirName, FILE_NAME_SIZE, "%s/%s", dbRootPathList[i].c_str(), oidDirName); if (rc == FILE_NAME_SIZE || IDBPolicy::remove(rootOidDirName) != 0) { ostringstream oss; oss << "Unable to remove " << rootOidDirName; throw std::runtime_error(oss.str()); } } } return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Deletes all the segment or dictionary store files associated with the * specified fid and partition. * PARAMETERS: * fids - OIDs of the column/dictionary being deleted. * partition - the partition number * RETURN: * NO_ERROR if success * ERR_DM_CONVERT_OID if error occurs converting OID to file name ***********************************************************/ int FileOp::deletePartitions(const std::vector& fids, const std::vector& partitions) const { char tempFileName[FILE_NAME_SIZE]; char oidDirName[FILE_NAME_SIZE]; char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE]; char rootOidDirName[FILE_NAME_SIZE]; char partitionDirName[FILE_NAME_SIZE]; int rcd, rcp; for (uint32_t i = 0; i < partitions.size(); i++) { RETURN_ON_ERROR((Convertor::oid2FileName(partitions[i].oid, tempFileName, dbDir, partitions[i].lp.pp, partitions[i].lp.seg))); sprintf(oidDirName, "%s/%s/%s/%s/%s", dbDir[0], dbDir[1], dbDir[2], dbDir[3], dbDir[4]); // config expects dbroot starting from 0 std::string rt(Config::getDBRootByNum(partitions[i].lp.dbroot)); rcd = snprintf(rootOidDirName, FILE_NAME_SIZE, "%s/%s", rt.c_str(), tempFileName); rcp = snprintf(partitionDirName, FILE_NAME_SIZE, "%s/%s", rt.c_str(), oidDirName); if (rcd == FILE_NAME_SIZE || rcp == FILE_NAME_SIZE || IDBPolicy::remove(rootOidDirName) != 0) { ostringstream oss; oss << "Unable to remove " << rootOidDirName; throw std::runtime_error(oss.str()); } list dircontents; if (IDBPolicy::listDirectory(partitionDirName, dircontents) == 0) { // the directory exists, now check if empty if (dircontents.size() == 0) { // empty directory if (IDBPolicy::remove(partitionDirName) != 0) { ostringstream oss; oss << "Unable to remove " << rootOidDirName; throw std::runtime_error(oss.str()); } } } } return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Deletes the specified db segment file. * PARAMETERS: * fid - column OID of file to be deleted. * dbRoot - DBRoot associated with segment file * partition - partition number of associated segment file * segment - segment number of associated segment file * RETURN: * NO_ERROR if success ***********************************************************/ int FileOp::deleteFile(FID fid, uint16_t dbRoot, uint32_t partition, uint16_t segment) const { char fileName[FILE_NAME_SIZE]; RETURN_ON_ERROR(getFileName(fid, fileName, dbRoot, partition, segment)); return (deleteFile(fileName)); } /*********************************************************** * DESCRIPTION: * Check whether a file exists or not * PARAMETERS: * fileName - file name with complete path * RETURN: * true if exists, false otherwise ***********************************************************/ bool FileOp::exists(const char* fileName) const { return IDBPolicy::exists(fileName); } /*********************************************************** * DESCRIPTION: * Check whether a file exists or not * PARAMETERS: * fid - OID of file to be checked * dbRoot - DBRoot associated with segment file * partition - partition number of associated segment file * segment - segment number of associated segment file * RETURN: * true if exists, false otherwise ***********************************************************/ bool FileOp::exists(FID fid, uint16_t dbRoot, uint32_t partition, uint16_t segment) const { char fileName[FILE_NAME_SIZE]; if (getFileName(fid, fileName, dbRoot, partition, segment) != NO_ERROR) return false; return exists(fileName); } /*********************************************************** * DESCRIPTION: * Check whether an OID directory exists or not * PARAMETERS: * fid - column or dictionary store OID * RETURN: * true if exists, false otherwise ***********************************************************/ bool FileOp::existsOIDDir(FID fid) const { char fileName[FILE_NAME_SIZE]; if (oid2DirName(fid, fileName) != NO_ERROR) { return false; } return exists(fileName); } /*********************************************************** * DESCRIPTION: * Adds an extent to the specified column OID and DBRoot. * Function uses ExtentMap to add the extent and determine which * specific column segment file the extent is to be added to. If * the applicable column segment file does not exist, it is created. * If this is the very first file for the specified DBRoot, then the * partition and segment number must be specified, else the selected * partition and segment numbers are returned. This method tries to * optimize full extents creation skiping disk space * preallocation(if activated). * PARAMETERS: * oid - OID of the column to be extended * emptyVal - Empty value to be used for oid * width - Width of the column (in bytes) * hwm - The HWM (or fbo) of the column segment file where the new * extent begins. * startLbid - The starting LBID for the new extent * allocSize - Number of blocks allocated to the extent. * dbRoot - The DBRoot of the file with the new extent. * partition - The partition number of the file with the new extent. * segment - The segment number of the file with the new extent. * segFile - The name of the relevant column segment file. * pFile - IDBDataFile ptr to the file where the extent is added. * newFile - Indicates if the extent was added to a new or existing file * hdrs - Contents of the headers if file is compressed. * RETURN: * NO_ERROR if success * else the applicable error code is returned ***********************************************************/ int FileOp::extendFile(OID oid, const uint8_t* emptyVal, int width, execplan::CalpontSystemCatalog::ColDataType colDataType, HWM hwm, BRM::LBID_t startLbid, int allocSize, uint16_t dbRoot, uint32_t partition, uint16_t segment, std::string& segFile, IDBDataFile*& pFile, bool& newFile, char* hdrs) { int rc = NO_ERROR; pFile = 0; segFile.clear(); newFile = false; char fileName[FILE_NAME_SIZE]; // If starting hwm or fbo is 0 then this is the first extent of a new file, // else we are adding an extent to an existing segment file if (hwm > 0) // db segment file should exist { RETURN_ON_ERROR(oid2FileName(oid, fileName, false, dbRoot, partition, segment)); segFile = fileName; if (!exists(fileName)) { ostringstream oss; oss << "oid: " << oid << " with path " << segFile; logging::Message::Args args; args.add("File not found "); args.add(oss.str()); args.add(""); args.add(""); SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0001); return ERR_FILE_NOT_EXIST; } pFile = openFile(oid, dbRoot, partition, segment, segFile, "r+b"); // old file if (pFile == 0) { ostringstream oss; oss << "oid: " << oid << " with path " << segFile; logging::Message::Args args; args.add("Error opening file "); args.add(oss.str()); args.add(""); args.add(""); SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0001); return ERR_FILE_OPEN; } if (isDebug(DEBUG_1) && getLogger()) { std::ostringstream oss; oss << "Opening existing column file (extendFile)" << ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment << "; LBID-" << startLbid << "; hwm-" << hwm << "; file-" << segFile; getLogger()->logMsg(oss.str(), MSGLVL_INFO2); } // @bug 5349: check that new extent's fbo is not past current EOF if (m_compressionType) { char hdrsIn[compress::CompressInterface::HDR_BUF_LEN * 2]; RETURN_ON_ERROR(readHeaders(pFile, hdrsIn)); std::unique_ptr compressor( compress::getCompressInterfaceByType(compress::CompressInterface::getCompressionType(hdrsIn))); unsigned int ptrCount = compress::CompressInterface::getPtrCount(hdrsIn); unsigned int chunkIndex = 0; unsigned int blockOffsetWithinChunk = 0; compressor->locateBlock((hwm - 1), chunkIndex, blockOffsetWithinChunk); // std::ostringstream oss1; // oss1 << "Extending compressed column file"<< // ": OID-" << oid << // "; LBID-" << startLbid << // "; fbo-" << hwm << // "; file-" << segFile << // "; chkidx-" << chunkIndex<< // "; numPtrs-"<< ptrCount; // getLogger()->logMsg( oss1.str(), MSGLVL_INFO2 ); if (chunkIndex >= ptrCount) { ostringstream oss; oss << "oid: " << oid << " with path " << segFile << "; new extent fbo " << hwm << "; number of " "compressed chunks " << ptrCount << "; chunkIndex " << chunkIndex; logging::Message::Args args; args.add("compressed"); args.add(oss.str()); SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0103); // Expand the partial extent to full with emptyVal // Since fillCompColumnExtentEmptyChunks() messes with the // file on disk, we need to close it and reopen after or // the cache isn't updated. if ((pFile)) closeFile(pFile); pFile = NULL; string failedTask; // For return error message, if any. rc = FileOp::fillCompColumnExtentEmptyChunks(oid, width, emptyVal, dbRoot, partition, segment, colDataType, hwm, segFile, failedTask); if (rc != NO_ERROR) { if (getLogger()) { std::ostringstream oss; oss << "FileOp::extendFile: error padding partial compressed extent for " << "column OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment << "; hwm-" << hwm << " " << failedTask.c_str(); getLogger()->logMsg(oss.str(), rc, MSGLVL_CRITICAL); } return rc; } pFile = openFile(oid, dbRoot, partition, segment, segFile, "r+b"); // modified file } // Get the latest file header for the caller. If a partial extent was filled out, // this will be different than when we first read the headers. if (hdrs) { RETURN_ON_ERROR(readHeaders(pFile, hdrs)); } } else { long long fileSize; RETURN_ON_ERROR(getFileSize(pFile, fileSize)); long long calculatedFileSize = ((long long)hwm) * BYTE_PER_BLOCK; // std::ostringstream oss2; // oss2 << "Extending uncompressed column file"<< // ": OID-" << oid << // "; LBID-" << startLbid << // "; fbo-" << hwm << // "; file-" << segFile << // "; filesize-"<logMsg( oss2.str(), MSGLVL_INFO2 ); if (calculatedFileSize > fileSize) { ostringstream oss; oss << "oid: " << oid << " with path " << segFile << "; new extent fbo " << hwm << "; file size (bytes) " << fileSize; logging::Message::Args args; args.add("uncompressed"); args.add(oss.str()); SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0103); // Expand the partial extent to full with emptyVal // This generally won't ever happen, as uncompressed files // are created with full extents. rc = FileOp::expandAbbrevColumnExtent(pFile, dbRoot, emptyVal, width, colDataType); if (rc != NO_ERROR) { if (getLogger()) { std::ostringstream oss; oss << "FileOp::extendFile: error padding partial uncompressed extent for " << "column OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment << "; hwm-" << hwm; getLogger()->logMsg(oss.str(), rc, MSGLVL_CRITICAL); } return rc; } } } } else // db segment file should not exist { RETURN_ON_ERROR(oid2FileName(oid, fileName, true, dbRoot, partition, segment)); segFile = fileName; // if obsolete file exists, "w+b" will truncate and write over pFile = openFile(fileName, "w+b"); // new file if (pFile == 0) return ERR_FILE_CREATE; { // We presume the path will contain / std::string filePath(fileName); if (chownDataPath(filePath)) return ERR_FILE_CHOWN; } newFile = true; if (isDebug(DEBUG_1) && getLogger()) { std::ostringstream oss; oss << "Opening new column file" << ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment << "; LBID-" << startLbid << "; hwm-" << hwm << "; file-" << segFile; getLogger()->logMsg(oss.str(), MSGLVL_INFO2); } if ((m_compressionType) && (hdrs)) { compress::CompressInterface::initHdr(hdrs, width, colDataType, m_compressionType); compress::CompressInterface::setLBIDByIndex(hdrs, startLbid, 0); } } if (!isDiskSpaceAvail(segFile, allocSize)) { return ERR_FILE_DISK_SPACE; } // We set to EOF just before we start adding the blocks for the new extent. // At one time, I considered changing this to seek to the HWM block, but // with compressed files, this is murky; do I find and seek to the chunk // containing the HWM block? So I left as-is for now, seeking to EOF. rc = setFileOffset(pFile, 0, SEEK_END); if (rc != NO_ERROR) return rc; // Initialize the contents of the extent. // MCOL-498 optimize full extent creation. rc = initColumnExtent(pFile, dbRoot, allocSize, emptyVal, width, colDataType, newFile, // new or existing file false, // don't expand; new extent false, // add full (not abbreviated) extent true, // try to optimize extent creation startLbid); return rc; } /*********************************************************** * DESCRIPTION: * Add an extent to the exact segment file specified by * the designated OID, DBRoot, partition, and segment. * PARAMETERS: * oid - OID of the column to be extended * emptyVal - Empty value to be used for oid * width - Width of the column (in bytes) * allocSize - Number of blocks allocated to the extent. * dbRoot - The DBRoot of the file with the new extent. * partition - The partition number of the file with the new extent. * segment - The segment number of the file with the new extent. * segFile - The name of the relevant column segment file. * startLbid - The starting LBID for the new extent * newFile - Indicates if the extent was added to a new or existing file * hdrs - Contents of the headers if file is compressed. * RETURN: * none ***********************************************************/ int FileOp::addExtentExactFile(OID oid, const uint8_t* emptyVal, int width, int& allocSize, uint16_t dbRoot, uint32_t partition, uint16_t segment, execplan::CalpontSystemCatalog::ColDataType colDataType, std::string& segFile, BRM::LBID_t& startLbid, bool& newFile, char* hdrs) { int rc = NO_ERROR; IDBDataFile* pFile = 0; segFile.clear(); newFile = false; HWM hwm; // Allocate the new extent in the ExtentMap RETURN_ON_ERROR(BRMWrapper::getInstance()->allocateColExtentExactFile( oid, width, dbRoot, partition, segment, colDataType, startLbid, allocSize, hwm)); // Determine the existence of the "next" segment file, and either open // or create the segment file accordingly. if (exists(oid, dbRoot, partition, segment)) { pFile = openFile(oid, dbRoot, partition, segment, segFile, "r+b"); // old file if (pFile == 0) { ostringstream oss; oss << "oid: " << oid << " with path " << segFile; logging::Message::Args args; args.add("Error opening file "); args.add(oss.str()); args.add(""); args.add(""); SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0001); return ERR_FILE_OPEN; } if (isDebug(DEBUG_1) && getLogger()) { std::ostringstream oss; oss << "Opening existing column file" << ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment << "; LBID-" << startLbid << "; hwm-" << hwm << "; file-" << segFile; getLogger()->logMsg(oss.str(), MSGLVL_INFO2); } if ((m_compressionType) && (hdrs)) { rc = readHeaders(pFile, hdrs); if (rc != NO_ERROR) return rc; } } else { char fileName[FILE_NAME_SIZE]; RETURN_ON_ERROR(oid2FileName(oid, fileName, true, dbRoot, partition, segment)); segFile = fileName; pFile = openFile(fileName, "w+b"); // new file if (pFile == 0) return ERR_FILE_CREATE; newFile = true; if (isDebug(DEBUG_1) && getLogger()) { std::ostringstream oss; oss << "Opening new column file" << ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment << "; LBID-" << startLbid << "; hwm-" << hwm << "; file-" << segFile; getLogger()->logMsg(oss.str(), MSGLVL_INFO2); } if ((m_compressionType) && (hdrs)) { compress::CompressInterface::initHdr(hdrs, width, colDataType, m_compressionType); compress::CompressInterface::setLBIDByIndex(hdrs, startLbid, 0); } } if (!isDiskSpaceAvail(segFile, allocSize)) { return ERR_FILE_DISK_SPACE; } // We set to EOF just before we start adding the blocks for the new extent. // At one time, I considered changing this to seek to the HWM block, but // with compressed files, this is murky; do I find and seek to the chunk // containing the HWM block? So I left as-is for now, seeking to EOF. rc = setFileOffset(pFile, 0, SEEK_END); if (rc != NO_ERROR) return rc; // Initialize the contents of the extent. // CS doesn't optimize file operations to have a valid // segment files with empty magics rc = initColumnExtent(pFile, dbRoot, allocSize, emptyVal, width, colDataType, newFile, // new or existing file false, // don't expand; new extent false, // add full (not abbreviated) extent startLbid); closeFile(pFile); return rc; } /*********************************************************** * DESCRIPTION: * Write out (initialize) an extent in a column file. * A mutex is used for each DBRoot, to prevent contention between * threads, because if multiple threads are creating extents on * the same DBRoot at the same time, the extents can become * fragmented. It is best to only create one extent at a time * on each DBRoot. * This function can be used to initialize an entirely new extent, or * to finish initializing an extent that has already been started. * nBlocks controls how many 8192-byte blocks are to be written out. * If bOptExtension is set then method first checks config for * DBRootX.Prealloc. If it is disabled then it skips disk space * preallocation. * PARAMETERS: * pFile (in) - IDBDataFile* of column segment file to be written to * dbRoot (in) - DBRoot of pFile * nBlocks (in) - number of blocks to be written for an extent * emptyVal(in) - empty value to be used for column data values * width (in) - width of the applicable column * bNewFile(in) - are we adding an extent to a new file, in which case * headers will be included "if" it is a compressed file. * bExpandExtent (in) - Expand existing extent, or initialize a new one * bAbbrevExtent(in) - if creating new extent, is it an abbreviated extent * bOptExtension(in) - skip full extent preallocation. * RETURN: * returns ERR_FILE_WRITE if an error occurs, * else returns NO_ERROR. ***********************************************************/ int FileOp::initColumnExtent(IDBDataFile* pFile, uint16_t dbRoot, int nBlocks, const uint8_t* emptyVal, int width, execplan::CalpontSystemCatalog::ColDataType colDataType, bool bNewFile, bool bExpandExtent, bool bAbbrevExtent, bool bOptExtension, int64_t lbid) { if ((bNewFile) && (m_compressionType)) { char hdrs[CompressInterface::HDR_BUF_LEN * 2]; compress::CompressInterface::initHdr(hdrs, width, colDataType, m_compressionType); compress::CompressInterface::setLBIDByIndex(hdrs, lbid, 0); if (bAbbrevExtent) compress::CompressInterface::setBlockCount(hdrs, nBlocks); RETURN_ON_ERROR(writeHeaders(pFile, hdrs)); } // @bug5769 Don't initialize extents or truncate db files on HDFS if (idbdatafile::IDBPolicy::useHdfs()) { //@Bug 3219. update the compression header after the extent is expanded. if ((!bNewFile) && (m_compressionType) && (bExpandExtent)) { updateColumnExtent(pFile, nBlocks, lbid); } // @bug 2378. Synchronize here to avoid write buffer pile up too much, // which could cause controllernode to timeout later when it needs to // save a snapshot. pFile->flush(); } else { // Create vector of mutexes used to serialize extent access per DBRoot initDbRootExtentMutexes(); // MCOL-498 Skip the huge preallocations if the option is set // for the dbroot. This check is skiped for abbreviated extent. // IMO it is better to check bool then to call a function. if (bOptExtension) { bOptExtension = (idbdatafile::IDBPolicy::PreallocSpaceDisabled(dbRoot)) ? bOptExtension : false; } // Reduce number of blocks allocated for abbreviated extents thus // CS writes less when creates a new table. This couldn't be zero // b/c Snappy compressed file format doesn't tolerate empty files. int realNBlocks = (bOptExtension && nBlocks <= MAX_INITIAL_EXTENT_BLOCKS_TO_DISK) ? 3 : nBlocks; // Determine the number of blocks in each call to fwrite(), and the // number of fwrite() calls to make, based on this. In other words, // we put a cap on the "writeSize" so that we don't allocate and write // an entire extent at once for the 64M row extents. If we are // expanding an abbreviated 64M extent, we may not have an even // multiple of MAX_NBLOCKS to write; remWriteSize is the number of // blocks above and beyond loopCount*MAX_NBLOCKS. int writeSize = realNBlocks * BYTE_PER_BLOCK; // 1M and 8M row extent size int loopCount = 1; int remWriteSize = 0; if (realNBlocks > MAX_NBLOCKS) // 64M row extent size { writeSize = MAX_NBLOCKS * BYTE_PER_BLOCK; loopCount = realNBlocks / MAX_NBLOCKS; remWriteSize = realNBlocks - (loopCount * MAX_NBLOCKS); } // Allocate a buffer, initialize it, and use it to create the extent idbassert(dbRoot > 0); #ifdef PROFILE if (bExpandExtent) Stats::startParseEvent(WE_STATS_WAIT_TO_EXPAND_COL_EXTENT); else Stats::startParseEvent(WE_STATS_WAIT_TO_CREATE_COL_EXTENT); #endif boost::mutex::scoped_lock lk(m_DbRootAddExtentMutexes[dbRoot]); #ifdef PROFILE if (bExpandExtent) Stats::stopParseEvent(WE_STATS_WAIT_TO_EXPAND_COL_EXTENT); else Stats::stopParseEvent(WE_STATS_WAIT_TO_CREATE_COL_EXTENT); #endif // Skip space preallocation if configured so // fallback to sequential write otherwise. // Couldn't avoid preallocation for full extents, // e.g. ADD COLUMN DDL b/c CS has to fill the file // with empty magics. if (!bOptExtension || !m_compressionType) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_INIT_COL_EXTENT); #endif // Allocate buffer, store it in scoped_array to insure it's deletion. // Create scope {...} to manage deletion of writeBuf. { unsigned char* writeBuf = new unsigned char[writeSize]; boost::scoped_array writeBufPtr(writeBuf); setEmptyBuf(writeBuf, writeSize, emptyVal, width); #ifdef PROFILE Stats::stopParseEvent(WE_STATS_INIT_COL_EXTENT); if (bExpandExtent) Stats::startParseEvent(WE_STATS_EXPAND_COL_EXTENT); else Stats::startParseEvent(WE_STATS_CREATE_COL_EXTENT); #endif // std::ostringstream oss; // oss << "initColExtent: width-" << width << //"; loopCount-" << loopCount << //"; writeSize-" << writeSize; // std::cout << oss.str() << std::endl; if (remWriteSize > 0) { if (pFile->write(writeBuf, remWriteSize) != remWriteSize) { return ERR_FILE_WRITE; } } for (int j = 0; j < loopCount; j++) { if (pFile->write(writeBuf, writeSize) != writeSize) { return ERR_FILE_WRITE; } } } //@Bug 3219. update the compression header after the extent is expanded. if ((!bNewFile) && (m_compressionType) && (bExpandExtent)) { updateColumnExtent(pFile, nBlocks, lbid); } // @bug 2378. Synchronize here to avoid write buffer pile up too much, // which could cause controllernode to timeout later when it needs to // save a snapshot. pFile->flush(); #ifdef PROFILE if (bExpandExtent) Stats::stopParseEvent(WE_STATS_EXPAND_COL_EXTENT); else Stats::stopParseEvent(WE_STATS_CREATE_COL_EXTENT); #endif } } return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Write (initialize) an abbreviated compressed extent in a column file. * nBlocks controls how many 8192-byte blocks are to be written out. * PARAMETERS: * pFile (in) - IDBDataFile* of column segment file to be written to * dbRoot (in) - DBRoot of pFile * nBlocks (in) - number of blocks to be written for an extent * emptyVal(in) - empty value to be used for column data values * width (in) - width of the applicable column * RETURN: * returns ERR_FILE_WRITE or ERR_FILE_SEEK if an error occurs, * else returns NO_ERROR. ***********************************************************/ int FileOp::initAbbrevCompColumnExtent(IDBDataFile* pFile, uint16_t dbRoot, int nBlocks, const uint8_t* emptyVal, int width, BRM::LBID_t startLBID, execplan::CalpontSystemCatalog::ColDataType colDataType) { // Reserve disk space for optimized abbreviated extent int rc = initColumnExtent(pFile, dbRoot, nBlocks, emptyVal, width, colDataType, true, // new file false, // don't expand; add new extent true, // add abbreviated extent true, // optimize the initial extent startLBID); if (rc != NO_ERROR) { return rc; } #ifdef PROFILE Stats::startParseEvent(WE_STATS_COMPRESS_COL_INIT_ABBREV_EXT); #endif char hdrs[CompressInterface::HDR_BUF_LEN * 2]; rc = writeInitialCompColumnChunk(pFile, nBlocks, INITIAL_EXTENT_ROWS_TO_DISK, emptyVal, width, startLBID, colDataType, hdrs); if (rc != NO_ERROR) { return rc; } #ifdef PROFILE Stats::stopParseEvent(WE_STATS_COMPRESS_COL_INIT_ABBREV_EXT); #endif return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Write (initialize) the first extent in a compressed db file. * PARAMETERS: * pFile - IDBDataFile* of column segment file to be written to * nBlocksAllocated - number of blocks allocated to the extent; should be * enough blocks for a full extent, unless it's the abbreviated extent * nRows - number of rows to initialize, or write out to the file * emptyVal - empty value to be used for column data values * width - width of the applicable column (in bytes) * hdrs - (in/out) chunk pointer headers * RETURN: * returns NO_ERROR on success. ***********************************************************/ int FileOp::writeInitialCompColumnChunk(IDBDataFile* pFile, int nBlocksAllocated, int nRows, const uint8_t* emptyVal, int width, BRM::LBID_t startLBID, execplan::CalpontSystemCatalog::ColDataType colDataType, char* hdrs) { const size_t INPUT_BUFFER_SIZE = nRows * width; char* toBeCompressedInput = new char[INPUT_BUFFER_SIZE]; unsigned int userPaddingBytes = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK; // Compress an initialized abbreviated extent // Initially m_compressionType == 0, but this function is used under // condtion where m_compressionType > 0. std::unique_ptr compressor( compress::getCompressInterfaceByType(m_compressionType, userPaddingBytes)); const size_t OUTPUT_BUFFER_SIZE = compressor->maxCompressedSize(INPUT_BUFFER_SIZE) + userPaddingBytes + compress::CompressInterface::COMPRESSED_CHUNK_INCREMENT_SIZE; unsigned char* compressedOutput = new unsigned char[OUTPUT_BUFFER_SIZE]; size_t outputLen = OUTPUT_BUFFER_SIZE; boost::scoped_array toBeCompressedInputPtr(toBeCompressedInput); boost::scoped_array compressedOutputPtr(compressedOutput); setEmptyBuf((unsigned char*)toBeCompressedInput, INPUT_BUFFER_SIZE, emptyVal, width); int rc = compressor->compressBlock(toBeCompressedInput, INPUT_BUFFER_SIZE, compressedOutput, outputLen); if (rc != 0) { return ERR_COMP_COMPRESS; } // Round up the compressed chunk size rc = compressor->padCompressedChunks(compressedOutput, outputLen, OUTPUT_BUFFER_SIZE); if (rc != 0) { return ERR_COMP_PAD_DATA; } // std::cout << "Uncompressed rowCount: " << nRows << // "; colWidth: " << width << // "; uncompByteCnt: " << INPUT_BUFFER_SIZE << // "; blkAllocCnt: " << nBlocksAllocated << // "; compressedByteCnt: " << outputLen << std::endl; compress::CompressInterface::initHdr(hdrs, width, colDataType, m_compressionType); compress::CompressInterface::setBlockCount(hdrs, nBlocksAllocated); compress::CompressInterface::setLBIDByIndex(hdrs, startLBID, 0); // Store compression pointers in the header std::vector ptrs; ptrs.push_back(CompressInterface::HDR_BUF_LEN * 2); ptrs.push_back(outputLen + (CompressInterface::HDR_BUF_LEN * 2)); compress::CompressInterface::storePtrs(ptrs, hdrs); RETURN_ON_ERROR(writeHeaders(pFile, hdrs)); // Write the compressed data size_t writtenLen = pFile->write(compressedOutput, outputLen); if (writtenLen != outputLen) return ERR_FILE_WRITE; return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Fill specified compressed extent with empty value chunks. * PARAMETERS: * oid - OID for relevant column * colWidth - width in bytes of this column * emptyVal - empty value to be used in filling empty chunks * dbRoot - DBRoot of extent to be filled * partition - partition of extent to be filled * segment - segment file number of extent to be filled * colDataType - Column data type * hwm - proposed new HWM of filled in extent * segFile - (out) name of updated segment file * failedTask - (out) if error occurs, this is the task that failed * RETURN: * returns NO_ERROR if success. ***********************************************************/ int FileOp::fillCompColumnExtentEmptyChunks(OID oid, int colWidth, const uint8_t* emptyVal, uint16_t dbRoot, uint32_t partition, uint16_t segment, execplan::CalpontSystemCatalog::ColDataType colDataType, HWM hwm, std::string& segFile, std::string& failedTask) { int rc = NO_ERROR; segFile.clear(); failedTask.clear(); // Open the file and read the headers with the compression chunk pointers // @bug 5572 - HDFS usage: incorporate *.tmp file backup flag IDBDataFile* pFile = openFile(oid, dbRoot, partition, segment, segFile, "r+b", DEFAULT_COLSIZ, true); if (!pFile) { failedTask = "Opening file"; ostringstream oss; oss << "oid: " << oid << " with path " << segFile; logging::Message::Args args; args.add("Error opening file "); args.add(oss.str()); args.add(""); args.add(""); SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0001); return ERR_FILE_OPEN; } char hdrs[CompressInterface::HDR_BUF_LEN * 2]; rc = readHeaders(pFile, hdrs); if (rc != NO_ERROR) { failedTask = "Reading headers"; closeFile(pFile); return rc; } int userPadBytes = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK; std::unique_ptr compressor(compress::getCompressInterfaceByType( compress::CompressInterface::getCompressionType(hdrs), userPadBytes)); CompChunkPtrList chunkPtrs; int rcComp = compress::CompressInterface::getPtrList(hdrs, chunkPtrs); if (rcComp != 0) { failedTask = "Getting header ptrs"; closeFile(pFile); return ERR_COMP_PARSE_HDRS; } // Nothing to do if the proposed HWM is < the current block count uint64_t blkCount = compress::CompressInterface::getBlockCount(hdrs); if (blkCount > (hwm + 1)) { closeFile(pFile); return NO_ERROR; } const unsigned int ROWS_PER_EXTENT = BRMWrapper::getInstance()->getInstance()->getExtentRows(); const unsigned int ROWS_PER_CHUNK = CompressInterface::UNCOMPRESSED_INBUF_LEN / colWidth; const unsigned int CHUNKS_PER_EXTENT = ROWS_PER_EXTENT / ROWS_PER_CHUNK; // If this is an abbreviated extent, we first expand to a full extent // @bug 4340 - support moving the DBRoot with a single abbrev extent if ((chunkPtrs.size() == 1) && ((blkCount * BYTE_PER_BLOCK) == (uint64_t)(INITIAL_EXTENT_ROWS_TO_DISK * colWidth))) { if (getLogger()) { std::ostringstream oss; oss << "Converting abbreviated partial extent to full extent for" << ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment << "; file-" << segFile << "; wid-" << colWidth << "; oldBlkCnt-" << blkCount << "; newBlkCnt-" << ((ROWS_PER_EXTENT * colWidth) / BYTE_PER_BLOCK); getLogger()->logMsg(oss.str(), MSGLVL_INFO2); } off64_t endHdrsOffset = pFile->tell(); rc = expandAbbrevColumnExtent(pFile, dbRoot, emptyVal, colWidth, colDataType); if (rc != NO_ERROR) { failedTask = "Expanding abbreviated extent"; closeFile(pFile); return rc; } CompChunkPtr chunkOutPtr; rc = expandAbbrevColumnChunk(pFile, emptyVal, colWidth, chunkPtrs[0], chunkOutPtr, hdrs); if (rc != NO_ERROR) { failedTask = "Expanding abbreviated chunk"; closeFile(pFile); return rc; } chunkPtrs[0] = chunkOutPtr; // update chunkPtrs with new chunk size rc = setFileOffset(pFile, endHdrsOffset); if (rc != NO_ERROR) { failedTask = "Positioning file to end of headers"; closeFile(pFile); return rc; } // Update block count to reflect a full extent blkCount = (ROWS_PER_EXTENT * colWidth) / BYTE_PER_BLOCK; compress::CompressInterface::setBlockCount(hdrs, blkCount); } // Calculate the number of empty chunks we need to add to fill this extent unsigned numChunksToFill = 0; ldiv_t ldivResult = ldiv(chunkPtrs.size(), CHUNKS_PER_EXTENT); if (ldivResult.rem != 0) { numChunksToFill = CHUNKS_PER_EXTENT - ldivResult.rem; } #if 0 std::cout << "Number of allocated blocks: " << compressor.getBlockCount(hdrs) << std::endl; std::cout << "Pointer Header Size (in bytes): " << (compressor.getHdrSize(hdrs) - CompressInterface::HDR_BUF_LEN) << std::endl; std::cout << "Chunk Pointers (offset,length): " << std::endl; for (unsigned k = 0; k < chunkPtrs.size(); k++) { std::cout << " " << k << ". " << chunkPtrs[k].first << " , " << chunkPtrs[k].second << std::endl; } std::cout << std::endl; std::cout << "Number of chunks to fill in: " << numChunksToFill << std::endl << std::endl; #endif off64_t endOffset = 0; // Fill in or add necessary remaining empty chunks if (numChunksToFill > 0) { const int IN_BUF_LEN = CompressInterface::UNCOMPRESSED_INBUF_LEN; const int OUT_BUF_LEN = compressor->maxCompressedSize(IN_BUF_LEN) + userPadBytes + compress::CompressInterface::COMPRESSED_CHUNK_INCREMENT_SIZE; // Allocate buffer, and store in scoped_array to insure it's deletion. // Create scope {...} to manage deletion of buffers { char* toBeCompressedBuf = new char[IN_BUF_LEN]; unsigned char* compressedBuf = new unsigned char[OUT_BUF_LEN]; boost::scoped_array toBeCompressedInputPtr(toBeCompressedBuf); boost::scoped_array compressedOutputPtr(compressedBuf); // Compress and then pad the compressed chunk setEmptyBuf((unsigned char*)toBeCompressedBuf, IN_BUF_LEN, emptyVal, colWidth); size_t outputLen = OUT_BUF_LEN; rcComp = compressor->compressBlock(toBeCompressedBuf, IN_BUF_LEN, compressedBuf, outputLen); if (rcComp != 0) { failedTask = "Compressing chunk"; closeFile(pFile); return ERR_COMP_COMPRESS; } toBeCompressedInputPtr.reset(); // release memory rcComp = compressor->padCompressedChunks(compressedBuf, outputLen, OUT_BUF_LEN); if (rcComp != 0) { failedTask = "Padding compressed chunk"; closeFile(pFile); return ERR_COMP_PAD_DATA; } // Position file to write empty chunks; default to end of headers // in case there are no chunks listed in the header off64_t startOffset = pFile->tell(); if (chunkPtrs.size() > 0) { startOffset = chunkPtrs[chunkPtrs.size() - 1].first + chunkPtrs[chunkPtrs.size() - 1].second; rc = setFileOffset(pFile, startOffset); if (rc != NO_ERROR) { failedTask = "Positioning file to begin filling chunks"; closeFile(pFile); return rc; } } // Write chunks needed to fill out the current extent, add chunk ptr for (unsigned k = 0; k < numChunksToFill; k++) { rc = writeFile(pFile, (unsigned char*)compressedBuf, outputLen); if (rc != NO_ERROR) { failedTask = "Writing a chunk"; closeFile(pFile); return rc; } CompChunkPtr compChunk(startOffset, outputLen); chunkPtrs.push_back(compChunk); startOffset = pFile->tell(); } } // end of scope for boost scoped array pointers endOffset = pFile->tell(); // Update the compressed chunk pointers in the header std::vector ptrs; for (unsigned i = 0; i < chunkPtrs.size(); i++) { ptrs.push_back(chunkPtrs[i].first); } ptrs.push_back(chunkPtrs[chunkPtrs.size() - 1].first + chunkPtrs[chunkPtrs.size() - 1].second); compress::CompressInterface::storePtrs(ptrs, hdrs); rc = writeHeaders(pFile, hdrs); if (rc != NO_ERROR) { failedTask = "Writing headers"; closeFile(pFile); return rc; } } // end of "numChunksToFill > 0" else { // if no chunks to add, then set endOffset to truncate the db file // strictly based on the chunks that are already in the file if (chunkPtrs.size() > 0) { endOffset = chunkPtrs[chunkPtrs.size() - 1].first + chunkPtrs[chunkPtrs.size() - 1].second; } } // Truncate the file to release unused space for the extent we just filled if (endOffset > 0) { rc = truncateFile(pFile, endOffset); if (rc != NO_ERROR) { failedTask = "Truncating file"; closeFile(pFile); return rc; } } closeFile(pFile); return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Expand first chunk in pFile from an abbreviated chunk for an abbreviated * extent to a full compressed chunk for a full extent. * PARAMETERS: * pFile - file to be updated * colWidth - width in bytes of this column * emptyVal - empty value to be used in filling empty chunks * chunkInPtr - chunk pointer referencing first (abbrev) chunk * chunkOutPtr- (out) updated chunk ptr referencing first (full) chunk * RETURN: * returns NO_ERROR if success. ***********************************************************/ int FileOp::expandAbbrevColumnChunk(IDBDataFile* pFile, const uint8_t* emptyVal, int colWidth, const CompChunkPtr& chunkInPtr, CompChunkPtr& chunkOutPtr, const char* hdrs) { int userPadBytes = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK; auto realCompressionType = m_compressionType; if (hdrs) { realCompressionType = compress::CompressInterface::getCompressionType(hdrs); } std::unique_ptr compressor( compress::getCompressInterfaceByType(realCompressionType, userPadBytes)); const int IN_BUF_LEN = CompressInterface::UNCOMPRESSED_INBUF_LEN; const int OUT_BUF_LEN = compressor->maxCompressedSize(IN_BUF_LEN) + userPadBytes + compress::CompressInterface::COMPRESSED_CHUNK_INCREMENT_SIZE; char* toBeCompressedBuf = new char[IN_BUF_LEN]; boost::scoped_array toBeCompressedPtr(toBeCompressedBuf); setEmptyBuf((unsigned char*)toBeCompressedBuf, IN_BUF_LEN, emptyVal, colWidth); RETURN_ON_ERROR(setFileOffset(pFile, chunkInPtr.first, SEEK_SET)); char* compressedInBuf = new char[chunkInPtr.second]; boost::scoped_array compressedInBufPtr(compressedInBuf); RETURN_ON_ERROR(readFile(pFile, (unsigned char*)compressedInBuf, chunkInPtr.second)); // Uncompress an "abbreviated" chunk into our 4MB buffer size_t outputLen = IN_BUF_LEN; int rc = compressor->uncompressBlock(compressedInBuf, chunkInPtr.second, (unsigned char*)toBeCompressedBuf, outputLen); if (rc != 0) { return ERR_COMP_UNCOMPRESS; } compressedInBufPtr.reset(); // release memory RETURN_ON_ERROR(setFileOffset(pFile, chunkInPtr.first, SEEK_SET)); unsigned char* compressedOutBuf = new unsigned char[OUT_BUF_LEN]; boost::scoped_array compressedOutBufPtr(compressedOutBuf); // Compress the data we just read, as a "full" 4MB chunk outputLen = OUT_BUF_LEN; rc = compressor->compressBlock(reinterpret_cast(toBeCompressedBuf), IN_BUF_LEN, compressedOutBuf, outputLen); if (rc != 0) { return ERR_COMP_COMPRESS; } // Round up the compressed chunk size rc = compressor->padCompressedChunks(compressedOutBuf, outputLen, OUT_BUF_LEN); if (rc != 0) { return ERR_COMP_PAD_DATA; } RETURN_ON_ERROR(writeFile(pFile, compressedOutBuf, outputLen)); chunkOutPtr.first = chunkInPtr.first; chunkOutPtr.second = outputLen; return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Write headers to a compressed column file. * PARAMETERS: * pFile (in) - IDBDataFile* of column segment file to be written to * hdr (in) - header pointers to be written * RETURN: * returns ERR_FILE_WRITE or ERR_FILE_SEEK if an error occurs, * else returns NO_ERROR. ***********************************************************/ int FileOp::writeHeaders(IDBDataFile* pFile, const char* hdr) const { RETURN_ON_ERROR(setFileOffset(pFile, 0, SEEK_SET)); // Write the headers if (pFile->write(hdr, CompressInterface::HDR_BUF_LEN * 2) != CompressInterface::HDR_BUF_LEN * 2) { return ERR_FILE_WRITE; } return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Write headers to a compressed column or dictionary file. * PARAMETERS: * pFile (in) - IDBDataFile* of column segment file to be written to * controlHdr (in) - control header to be written * pointerHdr (in) - pointer header to be written * ptrHdrSize (in) - size (in bytes) of pointer header * RETURN: * returns ERR_FILE_WRITE or ERR_FILE_SEEK if an error occurs, * else returns NO_ERROR. ***********************************************************/ int FileOp::writeHeaders(IDBDataFile* pFile, const char* controlHdr, const char* pointerHdr, uint64_t ptrHdrSize) const { RETURN_ON_ERROR(setFileOffset(pFile, 0, SEEK_SET)); // Write the control header if (pFile->write(controlHdr, CompressInterface::HDR_BUF_LEN) != CompressInterface::HDR_BUF_LEN) { return ERR_FILE_WRITE; } // Write the pointer header if (pFile->write(pointerHdr, ptrHdrSize) != (ssize_t)ptrHdrSize) { return ERR_FILE_WRITE; } return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Write out (initialize) an extent in a dictionary store file. * A mutex is used for each DBRoot, to prevent contention between * threads, because if multiple threads are creating extents on * the same DBRoot at the same time, the extents can become * fragmented. It is best to only create one extent at a time * on each DBRoot. * This function can be used to initialize an entirely new extent, or * to finish initializing an extent that has already been started. * nBlocks controls how many 8192-byte blocks are to be written out. * If bOptExtension is set then method first checks config for * DBRootX.Prealloc. If it is disabled then it skips disk space * preallocation. * PARAMETERS: * pFile (in) - IDBDataFile* of column segment file to be written to * dbRoot (in) - DBRoot of pFile * nBlocks (in) - number of blocks to be written for an extent * blockHdrInit(in) - data used to initialize each block * blockHdrInitSize(in) - number of bytes in blockHdrInit * bExpandExtent (in) - Expand existing extent, or initialize a new one * bOptExtension(in) - skip full extent preallocation. * RETURN: * returns ERR_FILE_WRITE if an error occurs, * else returns NO_ERROR. ***********************************************************/ int FileOp::initDctnryExtent(IDBDataFile* pFile, uint16_t dbRoot, int nBlocks, unsigned char* blockHdrInit, int blockHdrInitSize, bool bExpandExtent, bool bOptExtension, int64_t lbid) { // @bug5769 Don't initialize extents or truncate db files on HDFS if (idbdatafile::IDBPolicy::useHdfs()) { if (m_compressionType) updateDctnryExtent(pFile, nBlocks, lbid); // Synchronize to avoid write buffer pile up too much, which could cause // controllernode to timeout later when it needs to save a snapshot. pFile->flush(); } else { // Create vector of mutexes used to serialize extent access per DBRoot initDbRootExtentMutexes(); // MCOL-498 Skip the huge preallocations if the option is set // for the dbroot. This check is skiped for abbreviated extent. // IMO it is better to check bool then to call a function. // CS uses non-compressed dict files for its system catalog so // CS doesn't optimize non-compressed dict creation. if (bOptExtension) { bOptExtension = (idbdatafile::IDBPolicy::PreallocSpaceDisabled(dbRoot) && m_compressionType) ? bOptExtension : false; } // Reduce number of blocks allocated for abbreviated extents thus // CS writes less when creates a new table. This couldn't be zero // b/c Snappy compressed file format doesn't tolerate empty files. int realNBlocks = (bOptExtension && nBlocks <= MAX_INITIAL_EXTENT_BLOCKS_TO_DISK) ? 1 : nBlocks; // Determine the number of blocks in each call to fwrite(), and the // number of fwrite() calls to make, based on this. In other words, // we put a cap on the "writeSize" so that we don't allocate and write // an entire extent at once for the 64M row extents. If we are // expanding an abbreviated 64M extent, we may not have an even // multiple of MAX_NBLOCKS to write; remWriteSize is the number of // blocks above and beyond loopCount*MAX_NBLOCKS. int writeSize = realNBlocks * BYTE_PER_BLOCK; // 1M and 8M row extent size int loopCount = 1; int remWriteSize = 0; if (realNBlocks > MAX_NBLOCKS) // 64M row extent size { writeSize = MAX_NBLOCKS * BYTE_PER_BLOCK; loopCount = realNBlocks / MAX_NBLOCKS; remWriteSize = realNBlocks - (loopCount * MAX_NBLOCKS); } // Allocate a buffer, initialize it, and use it to create the extent idbassert(dbRoot > 0); #ifdef PROFILE if (bExpandExtent) Stats::startParseEvent(WE_STATS_WAIT_TO_EXPAND_DCT_EXTENT); else Stats::startParseEvent(WE_STATS_WAIT_TO_CREATE_DCT_EXTENT); #endif boost::mutex::scoped_lock lk(m_DbRootAddExtentMutexes[dbRoot]); #ifdef PROFILE if (bExpandExtent) Stats::stopParseEvent(WE_STATS_WAIT_TO_EXPAND_DCT_EXTENT); else Stats::stopParseEvent(WE_STATS_WAIT_TO_CREATE_DCT_EXTENT); #endif // Skip space preallocation if configured so // fallback to sequential write otherwise. // Couldn't avoid preallocation for full extents, // e.g. ADD COLUMN DDL b/c CS has to fill the file // with empty magics. if (!bOptExtension) { // Allocate buffer, and store in scoped_array to insure it's deletion. // Create scope {...} to manage deletion of writeBuf. { #ifdef PROFILE Stats::startParseEvent(WE_STATS_INIT_DCT_EXTENT); #endif unsigned char* writeBuf = new unsigned char[writeSize]; boost::scoped_array writeBufPtr(writeBuf); memset(writeBuf, 0, writeSize); for (int i = 0; i < realNBlocks; i++) { memcpy(writeBuf + (i * BYTE_PER_BLOCK), blockHdrInit, blockHdrInitSize); } #ifdef PROFILE Stats::stopParseEvent(WE_STATS_INIT_DCT_EXTENT); if (bExpandExtent) Stats::startParseEvent(WE_STATS_EXPAND_DCT_EXTENT); else Stats::startParseEvent(WE_STATS_CREATE_DCT_EXTENT); #endif if (remWriteSize > 0) { if (pFile->write(writeBuf, remWriteSize) != remWriteSize) { return ERR_FILE_WRITE; } } for (int j = 0; j < loopCount; j++) { if (pFile->write(writeBuf, writeSize) != writeSize) { return ERR_FILE_WRITE; } } // CS doesn't account flush timings. #ifdef PROFILE if (bExpandExtent) Stats::stopParseEvent(WE_STATS_EXPAND_DCT_EXTENT); else Stats::stopParseEvent(WE_STATS_CREATE_DCT_EXTENT); #endif } } // preallocation fallback end // MCOL-498 CS has to set a number of blocs in the chunk header if (m_compressionType) { updateDctnryExtent(pFile, nBlocks, lbid); } pFile->flush(); } return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Create a vector containing the mutexes used to serialize * extent creation per DBRoot. Serializing extent creation * helps to prevent disk fragmentation. ***********************************************************/ /* static */ void FileOp::initDbRootExtentMutexes() { boost::mutex::scoped_lock lk(m_createDbRootMutexes); if (m_DbRootAddExtentMutexes.size() == 0) { std::vector rootIds; Config::getRootIdList(rootIds); for (size_t i = 0; i < rootIds.size(); i++) { m_DbRootAddExtentMutexes.emplace(std::piecewise_construct, std::forward_as_tuple(rootIds[i]), std::forward_as_tuple()); } } } /*********************************************************** * DESCRIPTION: * Write out (reinitialize) a partial extent in a column file. * A mutex is not used to prevent contention between threads, * because the extent should already be in place on disk; so * disk fragmentation is not an issue. * PARAMETERS: * pFile (in) - IDBDataFile* of column segment file to be written to * startOffset(in)-file offset where we are to begin writing blocks * nBlocks (in) - number of blocks to be written to the extent * emptyVal(in) - empty value to be used for column data values * width (in) - width of the applicable column * RETURN: * returns ERR_FILE_WRITE if an error occurs, * else returns NO_ERROR. ***********************************************************/ int FileOp::reInitPartialColumnExtent(IDBDataFile* pFile, long long startOffset, int nBlocks, const uint8_t* emptyVal, int width) { int rc = setFileOffset(pFile, startOffset, SEEK_SET); if (rc != NO_ERROR) return rc; if (nBlocks == 0) return NO_ERROR; // Determine the number of blocks in each call to fwrite(), and the // number of fwrite() calls to make, based on this. In other words, // we put a cap on the "writeSize" so that we don't allocate and write // an entire extent at once for the 64M row extents. int writeSize = nBlocks * BYTE_PER_BLOCK; // 1M and 8M row extent size int loopCount = 0; int remainderSize = writeSize; if (nBlocks > MAX_NBLOCKS) // 64M row extent size { writeSize = MAX_NBLOCKS * BYTE_PER_BLOCK; loopCount = nBlocks / MAX_NBLOCKS; remainderSize = nBlocks - (loopCount * MAX_NBLOCKS); } // Allocate a buffer, initialize it, and use it to initialize the extent // Store in scoped_array to insure it's deletion. // Create scope {...} to manage deletion of writeBuf. { unsigned char* writeBuf = new unsigned char[writeSize]; boost::scoped_array writeBufPtr(writeBuf); setEmptyBuf(writeBuf, writeSize, emptyVal, width); for (int j = 0; j < loopCount; j++) { if (pFile->write(writeBuf, writeSize) != writeSize) { return ERR_FILE_WRITE; } } if (remainderSize > 0) { if (pFile->write(writeBuf, remainderSize) != remainderSize) { return ERR_FILE_WRITE; } } } // Synchronize here to avoid write buffer pile up too much, which could // cause controllernode to timeout later when it needs to save a snapshot. pFile->flush(); return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Write out (reinitialize) a partial extent in a dictionary store file. * A mutex is not used to prevent contention between threads, * because the extent should already be in place on disk; so * disk fragmentation is not an issue. * PARAMETERS: * pFile (in) - IDBDataFile* of column segment file to be written to * startOffset(in)-file offset where we are to begin writing blocks * nBlocks (in) - number of blocks to be written to the extent * blockHdrInit(in) - data used to initialize each block * blockHdrInitSize(in) - number of bytes in blockHdrInit * RETURN: * returns ERR_FILE_WRITE if an error occurs, * else returns NO_ERROR. ***********************************************************/ int FileOp::reInitPartialDctnryExtent(IDBDataFile* pFile, long long startOffset, int nBlocks, unsigned char* blockHdrInit, int blockHdrInitSize) { int rc = setFileOffset(pFile, startOffset, SEEK_SET); if (rc != NO_ERROR) return rc; if (nBlocks == 0) return NO_ERROR; // Determine the number of blocks in each call to fwrite(), and the // number of fwrite() calls to make, based on this. In other words, // we put a cap on the "writeSize" so that we don't allocate and write // an entire extent at once for the 64M row extents. int writeSize = nBlocks * BYTE_PER_BLOCK; // 1M and 8M row extent size int loopCount = 0; int remainderSize = writeSize; if (nBlocks > MAX_NBLOCKS) // 64M row extent size { writeSize = MAX_NBLOCKS * BYTE_PER_BLOCK; loopCount = nBlocks / MAX_NBLOCKS; remainderSize = nBlocks - (loopCount * MAX_NBLOCKS); nBlocks = MAX_NBLOCKS; } // Allocate a buffer, initialize it, and use it to initialize the extent // Store in scoped_array to insure it's deletion. // Create scope {...} to manage deletion of writeBuf. { unsigned char* writeBuf = new unsigned char[writeSize]; boost::scoped_array writeBufPtr(writeBuf); memset(writeBuf, 0, writeSize); for (int i = 0; i < nBlocks; i++) { memcpy(writeBuf + (i * BYTE_PER_BLOCK), blockHdrInit, blockHdrInitSize); } for (int j = 0; j < loopCount; j++) { if (pFile->write(writeBuf, writeSize) != writeSize) { return ERR_FILE_WRITE; } } if (remainderSize > 0) { if (pFile->write(writeBuf, remainderSize) != remainderSize) { return ERR_FILE_WRITE; } } } // Synchronize here to avoid write buffer pile up too much, which could // cause controllernode to timeout later when it needs to save a snapshot. pFile->flush(); return NO_ERROR; } /*********************************************************** * DESCRIPTION: * PARAMETERS: * pFile - file handle * fileSize (out) - file size in bytes * RETURN: * error code ***********************************************************/ int FileOp::getFileSize(IDBDataFile* pFile, long long& fileSize) const { fileSize = 0; if (pFile == NULL) return ERR_FILE_NULL; fileSize = pFile->size(); if (fileSize < 0) { fileSize = 0; return ERR_FILE_STAT; } return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Get file size using file id * PARAMETERS: * fid - column OID * dbroot - DBRoot of applicable segment file * partition - partition of applicable segment file * segment - segment of applicable segment file * fileSize (out) - current file size for requested segment file * RETURN: * NO_ERROR if okay, else an error return code. ***********************************************************/ int FileOp::getFileSize(FID fid, uint16_t dbRoot, uint32_t partition, uint16_t segment, long long& fileSize) const { fileSize = 0; char fileName[FILE_NAME_SIZE]; RETURN_ON_ERROR(getFileName(fid, fileName, dbRoot, partition, segment)); fileSize = IDBPolicy::size(fileName); if (fileSize < 0) { fileSize = 0; return ERR_FILE_STAT; } return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Check whether it is a directory * PARAMETERS: * dirName - directory name * RETURN: * true if it is, false otherwise ***********************************************************/ bool FileOp::isDir(const char* dirName) const { return IDBPolicy::isDir(dirName); } /*********************************************************** * DESCRIPTION: * Convert an oid to a filename * PARAMETERS: * fid - fid * fullFileName - file name * bCreateDir - whether need to create a directory * dbRoot - DBRoot where file is to be located; 1->DBRoot1, * 2->DBRoot2, etc. If bCreateDir is false, meaning we * are not creating the file but only searching for an * existing file, then dbRoot can be 0, and oid2FileName * will search all the DBRoots for the applicable filename. * partition - Partition number to be used in filepath subdirectory * segment - Segment number to be used in filename * RETURN: * NO_ERROR if success, other if fail ***********************************************************/ int FileOp::oid2FileName(FID fid, char* fullFileName, bool bCreateDir, uint16_t dbRoot, uint32_t partition, uint16_t segment) const { #ifdef SHARED_NOTHING_DEMO_2 if (fid >= 10000) { char root[FILE_NAME_SIZE]; Config::getSharedNothingRoot(root); sprintf(fullFileName, "%s/FILE%d", root, fid); return NO_ERROR; } #endif // Need this stub to use ColumnOp::writeRow in the unit tests #ifdef WITH_UNIT_TESTS if (fid == 42) { sprintf(fullFileName, "./versionbuffer"); return NO_ERROR; } #endif /* If is a version buffer file, the format is different. */ if (fid < 1000) { /* Get the dbroot # * Get the root of that dbroot * Add "/versionbuffer.cdf" */ BRM::DBRM dbrm; int _dbroot = dbrm.getDBRootOfVBOID(fid); if (_dbroot < 0) return ERR_INVALID_VBOID; snprintf(fullFileName, FILE_NAME_SIZE, "%s/versionbuffer.cdf", Config::getDBRootByNum(_dbroot).c_str()); return NO_ERROR; } // Get hashed part of the filename. This is the tail-end of the filename path, // excluding the DBRoot. char tempFileName[FILE_NAME_SIZE]; char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE]; RETURN_ON_ERROR((Convertor::oid2FileName(fid, tempFileName, dbDir, partition, segment))); // see if file exists in specified DBRoot; return if found if (fullFileName == nullptr) { return ERR_INTERNAL; } if (dbRoot > 0) { sprintf(fullFileName, "%s/%s", Config::getDBRootByNum(dbRoot).c_str(), tempFileName); // std::cout << "oid2FileName() OID: " << fid << // " searching for file: " << fullFileName < dbRootPathList; Config::getDBRootPathList(dbRootPathList); for (unsigned i = 0; i < dbRootPathList.size(); i++) { sprintf(fullFileName, "%s/%s", dbRootPathList[i].c_str(), tempFileName); // found it, nothing more to do, return // if (access(fullFileName, R_OK) == 0) return NO_ERROR; //@Bug 5397 if (IDBPolicy::exists(fullFileName)) return NO_ERROR; } // file wasn't found, user didn't specify DBRoot so we can't create return ERR_FILE_NOT_EXIST; } std::stringstream aDirName; for (size_t i = 0; i < MaxDirLevels; i++) { if (i == 0) { aDirName << Config::getDBRootByNum(dbRoot).c_str() << "/" << dbDir[i]; } else { aDirName << "/" << dbDir[i]; } if (!isDir(aDirName.str().c_str())) RETURN_ON_ERROR(createDir(aDirName.str().c_str())); { std::ostringstream ossChown; if (chownDataPath(aDirName.str())) return ERR_FILE_CHOWN; } } return NO_ERROR; } void FileOp::getFileNameForPrimProc(FID fid, char* fullFileName, uint16_t dbRoot, uint32_t partition, uint16_t segment) const { string dbRootPath = Config::getDBRootByNum(dbRoot); if (dbRootPath.empty()) { ostringstream oss; oss << "(dbroot " << dbRoot << " offline)"; dbRootPath = oss.str(); } // different filenames for the version buffer files if (fid < 1000) snprintf(fullFileName, FILE_NAME_SIZE, "%s/versionbuffer.cdf", dbRootPath.c_str()); else snprintf(fullFileName, FILE_NAME_SIZE, "%s/%03u.dir/%03u.dir/%03u.dir/%03u.dir/%03u.dir/FILE%03d.cdf", dbRootPath.c_str(), fid >> 24, (fid & 0x00ff0000) >> 16, (fid & 0x0000ff00) >> 8, fid & 0x000000ff, partition, segment); } /*********************************************************** * DESCRIPTION: * Search for directory path associated with specified OID. * If the OID is a version buffer file, it returns the whole * filename. * PARAMETERS: * fid - (in) OID to search for * pFile - (out) OID directory path (including DBRoot) that is found * RETURN: * NO_ERROR if OID dir path found, else returns ERR_FILE_NOT_EXIST ***********************************************************/ int FileOp::oid2DirName(FID fid, char* oidDirName) const { char tempFileName[FILE_NAME_SIZE]; char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE]; /* If is a version buffer file, the format is different. */ if (fid < 1000) { /* Get the dbroot # * Get the root of that dbroot */ BRM::DBRM dbrm; int _dbroot = dbrm.getDBRootOfVBOID(fid); if (_dbroot < 0) return ERR_INVALID_VBOID; snprintf(oidDirName, FILE_NAME_SIZE, "%s", Config::getDBRootByNum(_dbroot).c_str()); return NO_ERROR; } if (oidDirName == nullptr) { return ERR_INTERNAL; } RETURN_ON_ERROR((Convertor::oid2FileName(fid, tempFileName, dbDir, 0, 0))); // Now try to find the directory in each of the DBRoots. std::vector dbRootPathList; Config::getDBRootPathList(dbRootPathList); for (unsigned i = 0; i < dbRootPathList.size(); i++) { sprintf(oidDirName, "%s/%s/%s/%s/%s", dbRootPathList[i].c_str(), dbDir[0], dbDir[1], dbDir[2], dbDir[3]); // found it, nothing more to do, return //@Bug 5397. use the new way to check if (IDBPolicy::exists(oidDirName)) return NO_ERROR; } return ERR_FILE_NOT_EXIST; } /*********************************************************** * DESCRIPTION: * Construct directory path for the specified fid (OID), DBRoot, and * partition number. Directory path need not exist, nor is it created. * PARAMETERS: * fid - (in) OID of interest * dbRoot - (in) DBRoot of interest * partition - (in) partition of interest * dirName - (out) constructed directory path * RETURN: * NO_ERROR if path is successfully constructed. ***********************************************************/ int FileOp::getDirName(FID fid, uint16_t dbRoot, uint32_t partition, std::string& dirName) const { char tempFileName[FILE_NAME_SIZE]; char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE]; RETURN_ON_ERROR((Convertor::oid2FileName(fid, tempFileName, dbDir, partition, 0))); std::string rootPath = Config::getDBRootByNum(dbRoot); std::ostringstream oss; oss << rootPath << '/' << dbDir[0] << '/' << dbDir[1] << '/' << dbDir[2] << '/' << dbDir[3] << '/' << dbDir[4]; dirName = oss.str(); return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Open a file * PARAMETERS: * fileName - file name with complete path * pFile - file handle * RETURN: * true if exists, false otherwise ***********************************************************/ // @bug 5572 - HDFS usage: add *.tmp file backup flag IDBDataFile* FileOp::openFile(const char* fileName, const char* mode, const int ioColSize, bool useTmpSuffix) const { IDBDataFile* pFile; errno = 0; unsigned opts; if (ioColSize > 0) opts = IDBDataFile::USE_VBUF; else opts = IDBDataFile::USE_NOVBUF; if ((useTmpSuffix) && idbdatafile::IDBPolicy::useHdfs()) opts |= IDBDataFile::USE_TMPFILE; pFile = IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, mode, opts, ioColSize); if (pFile == NULL) { int errRc = errno; std::ostringstream oss; std::string errnoMsg; Convertor::mapErrnoToString(errRc, errnoMsg); oss << "FileOp::openFile(): fopen(" << fileName << ", " << mode << "): errno = " << errRc << ": " << errnoMsg; logging::Message::Args args; args.add(oss.str()); SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_CRITICAL, logging::M0006); SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0006); } return pFile; } // @bug 5572 - HDFS usage: add *.tmp file backup flag IDBDataFile* FileOp::openFile(FID fid, uint16_t dbRoot, uint32_t partition, uint16_t segment, std::string& segFile, const char* mode, int ioColSize, bool useTmpSuffix) const { char fileName[FILE_NAME_SIZE]; int rc; // fid2FileName( fileName, fid ); RETURN_ON_WE_ERROR((rc = getFileName(fid, fileName, dbRoot, partition, segment)), NULL); // disable buffering for versionbuffer file if (fid < 1000) ioColSize = 0; IDBDataFile* pF = openFile(fileName, mode, ioColSize, useTmpSuffix); segFile = fileName; return pF; } /*********************************************************** * DESCRIPTION: * Read a portion of file to a buffer * PARAMETERS: * pFile - file handle * readBuf - read buffer * readSize - the size to read * RETURN: * NO_ERROR if success * ERR_FILE_NULL if file handle is NULL * ERR_FILE_READ if something wrong in reading the file ***********************************************************/ int FileOp::readFile(IDBDataFile* pFile, unsigned char* readBuf, int readSize) const { if (pFile != NULL) { int bc = pFile->read(readBuf, readSize); if (bc != readSize) { // MCOL-498 EOF if a next block is empty if (bc == 0) { return ERR_FILE_EOF; } return ERR_FILE_READ; } } else return ERR_FILE_NULL; return NO_ERROR; } /*********************************************************** * Reads contents of headers from "pFile" and stores into "hdrs". ***********************************************************/ int FileOp::readHeaders(IDBDataFile* pFile, char* hdrs) const { RETURN_ON_ERROR(setFileOffset(pFile, 0)); RETURN_ON_ERROR( readFile(pFile, reinterpret_cast(hdrs), (CompressInterface::HDR_BUF_LEN * 2))); int rc = compress::CompressInterface::verifyHdr(hdrs); if (rc != 0) { return ERR_COMP_VERIFY_HDRS; } return NO_ERROR; } /*********************************************************** * Reads contents of headers from "pFile" and stores into "hdr1" and "hdr2". ***********************************************************/ int FileOp::readHeaders(IDBDataFile* pFile, char* hdr1, char* hdr2) const { unsigned char* hdrPtr = reinterpret_cast(hdr1); RETURN_ON_ERROR(setFileOffset(pFile, 0)); RETURN_ON_ERROR(readFile(pFile, hdrPtr, CompressInterface::HDR_BUF_LEN)); int ptrSecSize = compress::CompressInterface::getHdrSize(hdrPtr) - CompressInterface::HDR_BUF_LEN; return readFile(pFile, reinterpret_cast(hdr2), ptrSecSize); } /*********************************************************** * DESCRIPTION: No change Old signature * Read a portion of file to a buffer * PARAMETERS: * pFile - file handle * offset - file offset * origin - can be SEEK_SET, or SEEK_CUR, or SEEK_END * RETURN: * NO_ERROR if success * ERR_FILE_NULL if file handle is NULL * ERR_FILE_SEEK if something wrong in setting the position ***********************************************************/ int FileOp::setFileOffset(IDBDataFile* pFile, long long offset, int origin) const { int rc; long long fboOffset = offset; // workaround solution to pass leakcheck error if (pFile == NULL) return ERR_FILE_NULL; if (offset < 0) return ERR_FILE_FBO_NEG; rc = pFile->seek(fboOffset, origin); if (rc) return ERR_FILE_SEEK; return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Read a portion of file to a buffer * PARAMETERS: * pFile - file handle * offset - file offset * origin - can be SEEK_SET, or SEEK_CUR, or SEEK_END * RETURN: * NO_ERROR if success * ERR_FILE_NULL if file handle is NULL * ERR_FILE_SEEK if something wrong in setting the position ***********************************************************/ int FileOp::setFileOffsetBlock(IDBDataFile* pFile, uint64_t lbid, int origin) const { long long fboOffset = 0; int fbo = 0; // only when fboFlag is false, we get in here uint16_t dbRoot; uint32_t partition; uint16_t segment; RETURN_ON_ERROR(BRMWrapper::getInstance()->getFboOffset(lbid, dbRoot, partition, segment, fbo)); fboOffset = ((long long)fbo) * (long)BYTE_PER_BLOCK; return setFileOffset(pFile, fboOffset, origin); } /*********************************************************** * DESCRIPTION: * Truncate file to the specified size. * PARAMETERS: * pFile - file handle * fileSize - size of file in bytes. * RETURN: * NO_ERROR if success * ERR_FILE_NULL if file handle is NULL * ERR_FILE_SEEK if something wrong in setting the position ***********************************************************/ int FileOp::truncateFile(IDBDataFile* pFile, long long fileSize) const { if (pFile == NULL) return ERR_FILE_NULL; if (pFile->truncate(fileSize) != 0) return ERR_FILE_TRUNCATE; return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Write a buffer to a file at at current location * PARAMETERS: * pFile - file handle * writeBuf - write buffer * writeSize - the write size * RETURN: * NO_ERROR if success * ERR_FILE_NULL if file handle is NULL * ERR_FILE_WRITE if something wrong in writing to the file ***********************************************************/ int FileOp::writeFile(IDBDataFile* pFile, const unsigned char* writeBuf, int writeSize) const { if (pFile != NULL) { if (pFile->write(writeBuf, writeSize) != writeSize) return ERR_FILE_WRITE; } else return ERR_FILE_NULL; return NO_ERROR; } /*********************************************************** * DESCRIPTION: * Determine whether the applicable filesystem has room to add the * specified number of blocks (where the blocks contain BYTE_PER_BLOCK * bytes). * PARAMETERS: * fileName - file whose file system is to be checked. Does not have to * be a complete file name. Dir path is sufficient. * nBlock - number of 8192-byte blocks to be added * RETURN: * true if there is room for the blocks or it can not be determined; * false if file system usage would exceed allowable threshold ***********************************************************/ bool FileOp::isDiskSpaceAvail(const std::string& fileName, int nBlocks) const { bool bSpaceAvail = true; unsigned maxDiskUsage = Config::getMaxFileSystemDiskUsage(); if (maxDiskUsage < 100) // 100% means to disable the check { struct statfs fStats; int rc = statfs(fileName.c_str(), &fStats); if (rc == 0) { double totalBlocks = fStats.f_blocks; double blksToAlloc = (double)(nBlocks * BYTE_PER_BLOCK) / fStats.f_bsize; double freeBlocks = fStats.f_bavail - blksToAlloc; if ((((totalBlocks - freeBlocks) / totalBlocks) * 100.0) > maxDiskUsage) bSpaceAvail = false; // std::cout << "isDiskSpaceAvail" << //": totalBlocks: " << totalBlocks << //"; blkSize: " << fStats.f_bsize << //"; nBlocks: " << nBlocks << //"; freeBlks: " << freeBlocks << //"; pctUsed: " << (((totalBlocks-freeBlocks)/totalBlocks)*100.0) << //"; bAvail: " << bSpaceAvail << std::endl; } } return bSpaceAvail; } //------------------------------------------------------------------------------ // Virtual default functions follow; placeholders for derived class if they want // to override (see ColumnOpCompress1 and DctnryCompress1 in /wrapper). //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // Expand current abbreviated extent to a full extent for column segment file // associated with pFile. Function leaves fileposition at end of file after // extent is expanded. //------------------------------------------------------------------------------ int FileOp::expandAbbrevColumnExtent( IDBDataFile* pFile, // FILE ptr to file where abbrev extent is to be expanded uint16_t dbRoot, // The DBRoot of the file with the abbreviated extent const uint8_t* emptyVal, // Empty value to be used in expanding the extent int width, // Width of the column (in bytes) execplan::CalpontSystemCatalog::ColDataType colDataType) // Column data type. { // Based on extent size, see how many blocks to add to fill the extent int blksToAdd = (((int)BRMWrapper::getInstance()->getExtentRows() - INITIAL_EXTENT_ROWS_TO_DISK) / BYTE_PER_BLOCK) * width; // Make sure there is enough disk space to expand the extent. RETURN_ON_ERROR(setFileOffset(pFile, 0, SEEK_END)); // TODO-will have to address this DiskSpaceAvail check at some point if (!isDiskSpaceAvail(Config::getDBRootByNum(dbRoot), blksToAdd)) { return ERR_FILE_DISK_SPACE; } // Add blocks to turn the abbreviated extent into a full extent. int rc = FileOp::initColumnExtent(pFile, dbRoot, blksToAdd, emptyVal, width, colDataType, false, // existing file true, // expand existing extent false, // n/a since not adding new extent true); // optimize segment file extension return rc; } void FileOp::setTransId(const TxnID& transId) { m_transId = transId; } void FileOp::setBulkFlag(bool isBulkLoad) { m_isBulk = isBulkLoad; } int FileOp::flushFile(int rc, std::map& oids) { return NO_ERROR; } int FileOp::updateColumnExtent(IDBDataFile* pFile, int nBlocks, int64_t lbid) { return NO_ERROR; } int FileOp::updateDctnryExtent(IDBDataFile* pFile, int nBlocks, int64_t lbid) { return NO_ERROR; } void FileOp::setFixFlag(bool isFix) { m_isFix = isFix; } // Small note. We call chownFileDir in couple places to chown of the // target file and call in oid2Filename() chowns directories created bool FileOp::chownDataPath(const std::string& fileName) const { std::ostringstream error; idbdatafile::IDBFileSystem& fs = IDBPolicy::getFs(fileName); if (chownPath(error, fileName, fs)) { logging::Message::Args args; logging::Message message(1); args.add(error.str()); message.format(args); logging::LoggingID lid(SUBSYSTEM_ID_WE_BULK); logging::MessageLog ml(lid); ml.logErrorMessage(message); return true; } return false; } } // namespace WriteEngine