/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /* * $Id: we_rbmetawriter.cpp 4737 2013-08-14 20:45:46Z bwilkinson $ */ #include "we_rbmetawriter.h" #include #include #include #include #include #include #include "we_config.h" #include "we_convertor.h" #include "we_define.h" #include "we_log.h" #include "we_bulkrollbackmgr.h" #include "idbcompress.h" using namespace compress; using namespace execplan; #include "IDBDataFile.h" #include "IDBFileSystem.h" #include "IDBPolicy.h" using namespace idbdatafile; namespace { const char* DATA_DIR_SUFFIX = "_data"; const char* TMP_FILE_SUFFIX = ".tmp"; const char* VERSION3_REC = "# VERSION: 3"; const int VERSION3_REC_LEN = 12; const char* VERSION4_REC = "# VERSION: 4"; const int VERSION4_REC_LEN = 12; const char* COLUMN1_REC = "COLUM1"; // HWM extent for a DBRoot const int COLUMN1_REC_LEN = 6; const char* COLUMN2_REC = "COLUM2"; // Placeholder for empty DBRoot const int COLUMN2_REC_LEN = 6; const char* DSTORE1_REC = "DSTOR1"; // HWM extent for a DBRoot const int DSTORE1_REC_LEN = 6; const char* DSTORE2_REC = "DSTOR2"; // Placeholder for empty DBRoot const int DSTORE2_REC_LEN = 6; //-------------------------------------------------------------------------- // Local Function that prints contents of an RBChunkInfo object //-------------------------------------------------------------------------- std::ostream& operator<<(std::ostream& os, const WriteEngine::RBChunkInfo& chk) { os << "OID-" << chk.fOid << "; DBRoot-" << chk.fDbRoot << "; Part-" << chk.fPartition << "; Seg-" << chk.fSegment << "; HWM-" << chk.fHwm; return os; } } // namespace namespace WriteEngine { //------------------------------------------------------------------------------ // Compare function used for set of RBChunkInfo objects. //------------------------------------------------------------------------------ bool RBChunkInfoCompare::operator()(const RBChunkInfo& lhs, const RBChunkInfo& rhs) const { if (lhs.fOid < rhs.fOid) { return true; } if ((lhs.fOid == rhs.fOid) && (lhs.fSegment < rhs.fSegment)) { return true; } return false; } //------------------------------------------------------------------------------ // RBMetaWriter constructor //------------------------------------------------------------------------------ RBMetaWriter::RBMetaWriter(const std::string& appDesc, Log* logger) : fMetaDataFile(NULL), fAppDesc(appDesc), fLog(logger), fCreatedSubDir(false) { } //------------------------------------------------------------------------------ // Initialize this meta data file object using the specified table OID and name. // We assume the application code calling this function, was able to acquire a // table lock, meaning if there should happen to be any leftoever metadata files // from a previous job, they can be deleted. //------------------------------------------------------------------------------ void RBMetaWriter::init(OID tableOID, const std::string& tableName) { fTableOID = tableOID; fTableName = tableName; std::vector dbRoots; Config::getRootIdList(dbRoots); std::string metaFileName; std::ostringstream oss; oss << "/" << fTableOID; // Delete any files that collide with the file names we are going to need. // Construct the filenames; we will use a temporary file name until we are // finished creating, at which time we will rename the temp files. for (unsigned m = 0; m < dbRoots.size(); m++) { std::string bulkRollbackPath(Config::getDBRootByNum(dbRoots[m])); bulkRollbackPath += '/'; bulkRollbackPath += DBROOT_BULK_ROLLBACK_SUBDIR; metaFileName = bulkRollbackPath; metaFileName += oss.str(); std::string tmpMetaFileName = metaFileName; tmpMetaFileName += TMP_FILE_SUFFIX; // Delete any files that collide with the filenames we intend to use IDBPolicy::remove(metaFileName.c_str()); IDBPolicy::remove(tmpMetaFileName.c_str()); // Clear out any data subdirectory deleteSubDir(metaFileName); } } //------------------------------------------------------------------------------ // Saves snapshot of extentmap into a bulk rollback meta data file, for // use in a bulk rollback. Function was closely modeled after function // of similar name in bulk/we_tableinfo.cpp. API was modified to help // facilitate its use by DML. // // columns - Column vector with information about column in table. // Includes information about the initial HWM extent, so that // the corresponding HWM chunk can be backed up. // dctnryStoreOids - Dictionary store OIDs that correspond to columns. // dbRootHWMInfoVecCol - vector of last local HWM info for each DBRoot // (asssigned to current PM) for each column in tblOid. //------------------------------------------------------------------------------ void RBMetaWriter::saveBulkRollbackMetaData(const std::vector& columns, const std::vector& dctnryStoreOids, const std::vector& dbRootHWMInfoVecCol) { int rc = NO_ERROR; bool bOpenedFile = false; try { std::vector dbRoots; Config::getRootIdList(dbRoots); // Loop through DBRoot HWMs for this PM for (unsigned m = 0; m < dbRoots.size(); m++) { std::string metaFileName = openMetaFile(dbRoots[m]); bOpenedFile = true; fCreatedSubDir = false; // Loop through the columns in the specified table for (size_t i = 0; i < columns.size(); i++) { const BRM::EmDbRootHWMInfo_v& dbRootHWMInfo = dbRootHWMInfoVecCol[i]; // Select dbRootHWMInfo that matches DBRoot for this iteration unsigned k = 0; for (; k < dbRootHWMInfo.size(); k++) { if (dbRoots[m] == dbRootHWMInfo[k].dbRoot) break; } if (k >= dbRootHWMInfo.size()) // logic error; should not happen { std::ostringstream oss; oss << "Error creating meta file; DBRoot" << dbRoots[m] << " listed in Calpont config file, but not in extentmap" " for OID " << columns[i].dataFile.oid; throw WeException(oss.str(), ERR_INVALID_PARAM); } uint16_t dbRoot = dbRootHWMInfo[k].dbRoot; uint32_t partition = 0; uint16_t segment = 0; HWM localHWM = 0; bool bExtentWithData = false; // For empty DBRoot (totalBlocks == 0), // leave partition, segment, and HWM set to 0 if ((dbRootHWMInfo[k].totalBlocks > 0) || (dbRootHWMInfo[k].status == BRM::EXTENTOUTOFSERVICE)) { partition = dbRootHWMInfo[k].partitionNum; segment = dbRootHWMInfo[k].segmentNum; localHWM = dbRootHWMInfo[k].localHWM; bExtentWithData = true; } // Save column meta-data info to support bulk rollback writeColumnMetaData(metaFileName, bExtentWithData, columns[i].dataFile.oid, dbRoot, partition, segment, localHWM, columns[i].colDataType, ColDataTypeStr[columns[i].colDataType], columns[i].colWidth, columns[i].compressionType); // Save dctnry store meta-data info to support bulk rollback if (dctnryStoreOids[i] > 0) { std::vector segList; std::string segFileListErrMsg; if (bExtentWithData) { std::string dirName; FileOp fileOp(false); rc = fileOp.getDirName(dctnryStoreOids[i], dbRoot, partition, dirName); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Bulk rollback error constructing path " "for dictionary " << dctnryStoreOids[i] << "; dbRoot-" << dbRoot << "; partition-" << partition << "; " << ec.errorString(rc); throw WeException(oss.str(), rc); } rc = BulkRollbackMgr::getSegFileList(dirName, false, segList, segFileListErrMsg); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Bulk rollback error for dictionary " << dctnryStoreOids[i] << "; directory-" << dirName << "; " << segFileListErrMsg << "; " << ec.errorString(rc); throw WeException(oss.str(), rc); } } // end of "if (bExtentWithData)" if (segList.size() == 0) { writeDictionaryStoreMetaNoDataMarker(columns[i].dataFile.oid, dctnryStoreOids[i], dbRoot, partition, 0, // segment columns[i].compressionType); } else { // Loop thru dictionary store seg files for this DBRoot for (unsigned int kk = 0; kk < segList.size(); kk++) { unsigned int segDictionary = segList[kk]; // check HWM for dictionary store file HWM dictHWMStore; int extState; rc = BRMWrapper::getInstance()->getLocalHWM(dctnryStoreOids[i], partition, segDictionary, dictHWMStore, extState); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error getting rollback HWM for " "dictionary file " << dctnryStoreOids[i] << "; partition-" << partition << "; segment-" << segDictionary << "; " << ec.errorString(rc); throw WeException(oss.str(), rc); } writeDictionaryStoreMetaData(columns[i].dataFile.oid, dctnryStoreOids[i], dbRoot, partition, segDictionary, dictHWMStore, columns[i].compressionType); } // loop thru dictionary store seg files in this DBRoot } // dictionary OID has 1 or more seg files in partition } // if dictionary column // For a compressed column, backup the starting HWM chunk if the // starting HWM block is not on an empty DBRoot (or outOfSrvc) if ((columns[i].compressionType) && (columns[i].dataFile.fDbRoot == dbRootHWMInfo[k].dbRoot) && (dbRootHWMInfo[k].totalBlocks > 0) && (dbRootHWMInfo[k].status != BRM::EXTENTOUTOFSERVICE)) { backupColumnHWMChunk(columns[i].dataFile.oid, columns[i].dataFile.fDbRoot, columns[i].dataFile.fPartition, columns[i].dataFile.fSegment, columns[i].dataFile.hwm); } } // End of loop through columns // time to dump the string stream to file std::string data(fMetaDataStream.str()); // this is to cover partical writes // no need for retry if low layer takes care partial writes. const char* p = data.c_str(); // buffer contents size_t s = data.size(); // buffer size size_t w = 0; // total bytes written so far ssize_t n = 0; // bytes written in one write for (int i = 0; i < 10 && w < s; i++) { n = fMetaDataFile->write(p + w, s - w); if (n < 0) break; w += n; } if (w != s) { int errRc = errno; std::ostringstream oss; oss << "Error writing bulk rollback meta-data file " << metaFileName << "; written/expect:" << w << "/" << s << "; err-" << errRc << "; " << strerror(errRc); throw WeException(oss.str(), ERR_FILE_WRITE); } fMetaDataStream.str(""); closeMetaFile(); bOpenedFile = false; } // End of loop through DBRoot HWMs for this PM renameMetaFile(); // rename meta files from temporary filenames } catch (WeException& ex) // catch exception to close file, then rethrow { if (bOpenedFile) closeMetaFile(); // If any error occurred, then go back and try to delete all meta files. // We catch and drop any exception, and return the original exception, // since we are already in error-mode at this point. try { deleteFile(); } catch (...) { } throw WeException(ex.what(), ex.errorCode()); } } //------------------------------------------------------------------------------ // Open a meta data file to save info about the specified table OID. //------------------------------------------------------------------------------ std::string RBMetaWriter::openMetaFile(uint16_t dbRoot) { std::string bulkRollbackPath(Config::getDBRootByNum(dbRoot)); bulkRollbackPath += '/'; bulkRollbackPath += DBROOT_BULK_ROLLBACK_SUBDIR; if (!IDBPolicy::exists(bulkRollbackPath.c_str())) { if (IDBPolicy::mkdir(bulkRollbackPath.c_str()) != 0) { std::ostringstream oss; oss << "Error creating bulk rollback directory " << bulkRollbackPath << ";" << std::endl; throw WeException(oss.str(), ERR_DIR_CREATE); } } // Open the file std::ostringstream oss; oss << "/" << fTableOID; std::string metaFileName(bulkRollbackPath); metaFileName += oss.str(); fMetaFileNames.insert(make_pair(dbRoot, metaFileName)); std::string tmpMetaFileName(metaFileName); tmpMetaFileName += TMP_FILE_SUFFIX; fMetaDataFile = IDBDataFile::open(IDBPolicy::getType(tmpMetaFileName.c_str(), IDBPolicy::WRITEENG), tmpMetaFileName.c_str(), "wb", 0); if (!fMetaDataFile) { int errRc = errno; std::ostringstream oss; std::string eMsg; Convertor::mapErrnoToString(errRc, eMsg); oss << "Error opening bulk rollback file " << tmpMetaFileName << "; " << eMsg; throw WeException(oss.str(), ERR_FILE_OPEN); } { std::ostringstream ossChown; idbdatafile::IDBFileSystem& fs = IDBPolicy::getFs(tmpMetaFileName.c_str()); if (chownPath(ossChown, tmpMetaFileName, fs) || chownPath(ossChown, bulkRollbackPath, fs)) { throw WeException(ossChown.str(), ERR_FILE_CHOWN); } } fMetaDataStream << "# VERSION: 4" << std::endl << "# APPLICATION: " << fAppDesc << std::endl << "# PID: " << ::getpid() << std::endl << "# TABLE: " << fTableName << std::endl << "# COLUM1: coloid," "dbroot,part,seg,lastLocalHWM,type,typename,width,comp" << std::endl << "# COLUM2: coloid," "dbroot,part,seg,type,typename,width,comp" << std::endl << "# DSTOR1: coloid,dctoid," "dbroot,part,seg,localHWM,comp" << std::endl << "# DSTOR2: coloid,dctoid," "dbroot,part,seg,comp" << std::endl; // Clear out any data subdirectory // This is redundant because init() also calls deleteSubDir(), but it can't // hurt to call twice. We "really" want to make sure we start with a clean // slate (no leftover backup chunk files from a previous import job). deleteSubDir(metaFileName); return metaFileName; } //------------------------------------------------------------------------------ // Close the currently open "temporary named" meta data file used during // construction. We will rename all the meta data files (for the various // dbroots) to their eventual file names later, in renameMetaFile(). //------------------------------------------------------------------------------ void RBMetaWriter::closeMetaFile() { delete fMetaDataFile; fMetaDataFile = NULL; } //------------------------------------------------------------------------------ // Rename temporary metafile names to their permanent name, taking file names // from fMetaFileNames. In the normal case there will be one file name per // DBRoot for the local PM we are running on. //------------------------------------------------------------------------------ void RBMetaWriter::renameMetaFile() { for (std::map::const_iterator iter = fMetaFileNames.begin(); iter != fMetaFileNames.end(); ++iter) { const std::string& metaFileName = iter->second; if (!metaFileName.empty()) { std::string tmpMetaFileName = metaFileName; tmpMetaFileName += TMP_FILE_SUFFIX; if (IDBPolicy::rename(tmpMetaFileName.c_str(), metaFileName.c_str())) { int errRc = errno; std::ostringstream oss; std::string eMsg; Convertor::mapErrnoToString(errRc, eMsg); oss << "Error renaming meta data file-" << tmpMetaFileName << "; will be deleted; " << eMsg; throw WeException(oss.str(), ERR_METADATABKUP_FILE_RENAME); } } } } //------------------------------------------------------------------------------ // Delete the meta data files for the specified table OID. We loop through all // the DBRoots for the local PM, deleting the applicable meta data files. // If the call to deleteSubDir() should throw an exception, we might not want // to consider that a fatal error, but we go ahead and let the exception // go up the call-stack so that the caller can log the corresponding message. // The application code can then decide whether they want to consider this // condition as fatal or not. //------------------------------------------------------------------------------ void RBMetaWriter::deleteFile() { for (std::map::const_iterator iter = fMetaFileNames.begin(); iter != fMetaFileNames.end(); ++iter) { const std::string& metaFileName = iter->second; if (!metaFileName.empty()) { std::string tmpMetaFileName = metaFileName; tmpMetaFileName += TMP_FILE_SUFFIX; IDBPolicy::remove(metaFileName.c_str()); IDBPolicy::remove(tmpMetaFileName.c_str()); deleteSubDir(metaFileName); // can throw an exception } } fMetaFileNames.clear(); } //------------------------------------------------------------------------------ // New version of writeColumnMetaData for Shared-Nothing //------------------------------------------------------------------------------ void RBMetaWriter::writeColumnMetaData(const std::string& metaFileName, bool withHWM, OID columnOID, uint16_t dbRoot, uint32_t partition, uint16_t segment, HWM lastLocalHwm, CalpontSystemCatalog::ColDataType colType, const std::string& colTypeName, int colWidth, int compressionType) { if (withHWM) { fMetaDataStream << "COLUM1: " << columnOID << ' ' << dbRoot << ' ' << partition << ' ' << segment << ' ' << lastLocalHwm << ' ' << colType << ' ' << colTypeName << ' ' << colWidth; } else { fMetaDataStream << "COLUM2: " << columnOID << ' ' << dbRoot << ' ' << partition << ' ' << segment << ' ' << colType << ' ' << colTypeName << ' ' << colWidth; } if (compressionType) fMetaDataStream << ' ' << compressionType << ' '; fMetaDataStream << std::endl; // If column is compressed, then create directory for storing HWM chunks if (compressionType) { if (!fCreatedSubDir) { // @bug 5572 - Don't need db backup files for HDFS; // use hdfs buffer file if (!IDBPolicy::useHdfs()) createSubDir(metaFileName); } } } //------------------------------------------------------------------------------ // New version of writeDictionaryStoreMetaData for Shared-Nothing. //------------------------------------------------------------------------------ void RBMetaWriter::writeDictionaryStoreMetaData(OID columnOID, OID dictionaryStoreOID, uint16_t dbRoot, uint32_t partition, uint16_t segment, HWM localHwm, int compressionType) { fMetaDataStream << "DSTOR1: " << columnOID << ' ' << dictionaryStoreOID << ' ' << dbRoot << ' ' << partition << ' ' << segment << ' ' << localHwm; if (compressionType) fMetaDataStream << ' ' << compressionType << ' '; fMetaDataStream << std::endl; // Save dictionary meta data for later use in backing up the HWM chunks if (compressionType) { RBChunkInfo chunkInfo(dictionaryStoreOID, dbRoot, partition, segment, localHwm); fRBChunkDctnrySet.insert(chunkInfo); if ((fLog) && (fLog->isDebug(DEBUG_1))) printDctnryChunkList(chunkInfo, "after adding "); } } //------------------------------------------------------------------------------ // New version of writeDictionaryStoreMetaNoDataMarker for Shared-Nothing. //------------------------------------------------------------------------------ void RBMetaWriter::writeDictionaryStoreMetaNoDataMarker(OID columnOID, OID dictionaryStoreOID, uint16_t dbRoot, uint32_t partition, uint16_t segment, int compressionType) { fMetaDataStream << "DSTOR2: " << columnOID << ' ' << dictionaryStoreOID << ' ' << dbRoot << ' ' << partition << ' ' << segment; if (compressionType) fMetaDataStream << ' ' << compressionType << ' '; fMetaDataStream << std::endl; } //------------------------------------------------------------------------------ // Create the subdirectory we will use to backup data needed for rollback. //------------------------------------------------------------------------------ void RBMetaWriter::createSubDir(const std::string& metaFileName) { std::string bulkRollbackSubPath(metaFileName); bulkRollbackSubPath += DATA_DIR_SUFFIX; if (IDBPolicy::mkdir(bulkRollbackSubPath.c_str()) != 0) { std::ostringstream oss; oss << "Error creating bulk rollback data subdirectory " << bulkRollbackSubPath << ";"; throw WeException(oss.str(), ERR_DIR_CREATE); } fCreatedSubDir = true; } //------------------------------------------------------------------------------ // Delete the subdirectory used to backup data needed for rollback. //------------------------------------------------------------------------------ void RBMetaWriter::deleteSubDir(const std::string& metaFileName) { std::string bulkRollbackSubPath(metaFileName); bulkRollbackSubPath += DATA_DIR_SUFFIX; if (IDBPolicy::remove(bulkRollbackSubPath.c_str()) != 0) { std::ostringstream oss; oss << "Error deleting bulk rollback data subdirectory " << bulkRollbackSubPath << ";"; throw WeException(oss.str(), ERR_FILE_DELETE); } } //------------------------------------------------------------------------------ // Backup the contents of the HWM chunk for the specified column OID extent, // so that the chunk is available for bulk rollback. // This operation is only performed for compressed columns. //------------------------------------------------------------------------------ void RBMetaWriter::backupColumnHWMChunk(OID columnOID, uint16_t dbRoot, uint32_t partition, uint16_t segment, HWM startingHWM) { // @bug 5572 - Don't need db backup file for HDFS; we use hdfs buffer file if (!IDBPolicy::useHdfs()) { backupHWMChunk(true, columnOID, dbRoot, partition, segment, startingHWM); } } //------------------------------------------------------------------------------ // Backup the contents of the HWM chunk for the specified dictionary store OID // extent, so that the chunk is available for bulk rollback. // This operation is only performed for compressed columns. Once the chunk is // saved, we remove that OID, partition, and segment from the internal list // (fRBChunkDctnrySet) that is maintained. // Return value indicates whether the specified file needs to be backed up or // not. // // This function MUST be maintained to be thread-safe so that multiple threads // can concurrently call this function, with each thread managing a different // dictionary column. //------------------------------------------------------------------------------ // @bug 5572 - HDFS usage: add return flag to indicate backup status bool RBMetaWriter::backupDctnryHWMChunk(OID dctnryOID, uint16_t dbRoot, uint32_t partition, uint16_t segment) { bool bBackupApplies = false; if (fRBChunkDctnrySet.size() > 0) { RBChunkInfo chunkInfo(dctnryOID, 0, partition, segment, 0); RBChunkInfo chunkInfoFound(0, 0, 0, 0, 0); bool bFound = false; { // Use scoped lock to perform "find" boost::mutex::scoped_lock lock(fRBChunkDctnryMutex); if ((fLog) && (fLog->isDebug(DEBUG_1))) printDctnryChunkList(chunkInfo, "when searching "); RBChunkSet::iterator iter = fRBChunkDctnrySet.find(chunkInfo); if (iter != fRBChunkDctnrySet.end()) { bFound = true; chunkInfoFound = *iter; } } if (bFound) { if (chunkInfoFound.fPartition == partition) { // @bug 5572 - Don't need db backup file for HDFS; // we use hdfs buffer file. Set backup flag // so application knows to use tmp buffer file. bBackupApplies = true; if (!IDBPolicy::useHdfs()) { backupHWMChunk(false, dctnryOID, dbRoot, partition, segment, chunkInfoFound.fHwm); } } else { // How could this happen? Ended up asking for different // partition than expected for the first instance of this // OID and segment file. Perhaps initial blockskipping // or something caused us to advance to another segment file // without ever changing the expected extent. At any rate // we still fall through and delete our entry because we // apparently did not end up changing the chunk referenced // by this RBChunkInfo object. } { // Use scoped lock to perform "erase" boost::mutex::scoped_lock lock(fRBChunkDctnryMutex); fRBChunkDctnrySet.erase(chunkInfoFound); if ((fLog) && (fLog->isDebug(DEBUG_1))) printDctnryChunkList(chunkInfoFound, "after deleting "); } } } return bBackupApplies; } //------------------------------------------------------------------------------ // Backup entire contents of HWM file for the specified columnOID,dbRoot,etc, // so that the file is available for bulk rollback. This function is used for // HDFS files only. This operation is only performed for compressed columns. // // This function MUST be kept thread-safe in support of backupDctnryHWMChunk(). // See that function description for more details. This is the reason // backupHWMChunk() has to have a local FileOp object. We can't share/reuse // a FileOp data member variable unless we want to employ a mutex. //------------------------------------------------------------------------------ // @bug 5572 - Stopped using backupHWMFile(). // Don't need db backup file for HDFS; we use hdfs buffer file void RBMetaWriter::backupHWMFile(bool bColumnFile, // is this a column (vs dictionary) file OID columnOID, // OID of column or dictionary store uint16_t dbRoot, // DB Root for db segment file uint32_t partition, // partition for db segment file uint16_t segment, // segment for db segment file HWM startingHWM) // starting HWM for db segment file { std::string fileType("column"); if (!bColumnFile) fileType = "dictionary"; FileOp fileOp; // @bug 4960: to keep thread-safe, we use local FileOp // Construct file name for db file to be backed up char dbFileName[FILE_NAME_SIZE]; int rc = fileOp.getFileName(columnOID, dbFileName, dbRoot, partition, segment); if (rc != NO_ERROR) { std::ostringstream oss; oss << "Error creating backup " << fileType << " file for OID " << columnOID << "; Can't construct file name for DBRoot" << dbRoot << "; partition-" << partition << "; segment-" << segment; throw WeException(oss.str(), rc); } // Construct file name for backup copy of db file std::ostringstream ossFile; ossFile << "/" << columnOID << ".p" << partition << ".s" << segment; std::string backupFileName; rc = getSubDirPath(dbRoot, backupFileName); if (rc != NO_ERROR) { std::ostringstream oss; oss << "Error creating backup " << fileType << " file for OID " << columnOID << "; Can't find matching meta file for DBRoot" << dbRoot; throw WeException(oss.str(), rc); } backupFileName += ossFile.str(); std::string backupFileNameTmp = backupFileName; backupFileNameTmp += TMP_FILE_SUFFIX; // if ( (fLog) && (fLog->isDebug(DEBUG_1)) ) if (fLog) { std::ostringstream oss; oss << "Backing up HWM file for " << fileType << " file for OID " << columnOID << "; file-" << backupFileNameTmp << "; HWM-" << startingHWM; fLog->logMsg(oss.str(), MSGLVL_INFO2); } // Copy the db file to a temporary name IDBFileSystem& fs = IDBPolicy::getFs(backupFileNameTmp.c_str()); if (!fs.exists(dbFileName)) { std::ostringstream oss; oss << "Error creating backup " << fileType << " file for OID " << columnOID << "; dbfile does not exist for DBRoot" << dbRoot << "; partition-" << partition << "; segment-" << segment; throw WeException(oss.str(), ERR_FILE_NOT_EXIST); } rc = fs.copyFile(dbFileName, backupFileNameTmp.c_str()); if (rc != 0) { std::ostringstream oss; oss << "Error copying backup for " << fileType << " OID-" << columnOID << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; rc-" << rc; fs.remove(backupFileNameTmp.c_str()); throw WeException(oss.str(), ERR_METADATABKUP_COMP_WRITE_BULK_BKUP); } // Rename temporary named backup file to final name rc = fs.rename(backupFileNameTmp.c_str(), backupFileName.c_str()); if (rc != 0) { std::ostringstream oss; oss << "Error renaming temp backup for " << fileType << " OID-" << columnOID << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; rc-" << rc; fs.remove(backupFileNameTmp.c_str()); fs.remove(backupFileName.c_str()); throw WeException(oss.str(), ERR_METADATABKUP_COMP_RENAME); } } //------------------------------------------------------------------------------ // Backup the contents of the HWM chunk for the specified columnOID,dbRoot,etc, // so that the chunk is available for bulk rollback. This function is used for // non-hdfs files. This operation is only performed for compressed columns. // // This function MUST be kept thread-safe in support of backupDctnryHWMChunk(). // See that function description for more details. This is the reason // backupHWMChunk() has to have a local FileOp object. We can't share/reuse // a FileOp data member variable unless we want to employ a mutex. //------------------------------------------------------------------------------ void RBMetaWriter::backupHWMChunk(bool bColumnFile, // is this a column (vs dictionary) file OID columnOID, // OID of column or dictionary store uint16_t dbRoot, // DB Root for db segment file uint32_t partition, // partition for db segment file uint16_t segment, // segment for db segment file HWM startingHWM) // starting HWM for db segment file { std::string fileType("column"); if (!bColumnFile) fileType = "dictionary"; // Open the applicable database column segment file std::string segFile; FileOp fileOp; // @bug 4960: to keep thread-safe, we use local FileOp IDBDataFile* dbFile = fileOp.openFile(columnOID, dbRoot, partition, segment, segFile, "rb"); if (!dbFile) { std::ostringstream oss; oss << "Backup error opening " << fileType << " file for OID-" << columnOID << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment; throw WeException(oss.str(), ERR_FILE_OPEN); } // Get the size of the file, so we know where to truncate back to. long long fileSizeBytes; int rc = fileOp.getFileSize(dbFile, fileSizeBytes); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Backup error getting file size for " << fileType << " OID-" << columnOID << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; " << ec.errorString(rc); fileOp.closeFile(dbFile); throw WeException(oss.str(), rc); } // Read Control header char controlHdr[CompressInterface::HDR_BUF_LEN]; rc = fileOp.readFile(dbFile, (unsigned char*)controlHdr, CompressInterface::HDR_BUF_LEN); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Backup error reading " << fileType << " file control hdr for OID-" << columnOID << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; " << ec.errorString(rc); fileOp.closeFile(dbFile); throw WeException(oss.str(), rc); } int rc1 = compress::CompressInterface::verifyHdr(controlHdr); if (rc1 != 0) { rc = ERR_METADATABKUP_COMP_VERIFY_HDRS; WErrorCodes ec; std::ostringstream oss; oss << "Backup error verifying " << fileType << " file control hdr for OID-" << columnOID << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; " << ec.errorString(rc) << "; rc: " << rc1; fileOp.closeFile(dbFile); throw WeException(oss.str(), rc); } auto compressionType = compress::CompressInterface::getCompressionType(controlHdr); std::unique_ptr compressor( compress::getCompressInterfaceByType(compressionType)); if (!compressor) { WErrorCodes ec; std::ostringstream oss; oss << "Ivalid compression type " << compressionType; fileOp.closeFile(dbFile); throw WeException(oss.str(), rc); } // Read Pointer header data uint64_t hdrSize = compress::CompressInterface::getHdrSize(controlHdr); uint64_t ptrHdrSize = hdrSize - CompressInterface::HDR_BUF_LEN; char* pointerHdr = new char[ptrHdrSize]; rc = fileOp.readFile(dbFile, (unsigned char*)pointerHdr, ptrHdrSize); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Backup error reading " << fileType << " file pointer hdr for OID-" << columnOID << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; " << ec.errorString(rc); delete[] pointerHdr; fileOp.closeFile(dbFile); throw WeException(oss.str(), rc); } CompChunkPtrList chunkPtrs; rc = compress::CompressInterface::getPtrList(pointerHdr, ptrHdrSize, chunkPtrs); delete[] pointerHdr; if (rc != 0) { std::ostringstream oss; oss << "Backup error getting " << fileType << " file hdr for OID-" << columnOID << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment; fileOp.closeFile(dbFile); throw WeException(oss.str(), ERR_METADATABKUP_COMP_PARSE_HDRS); } // Locate HWM chunk unsigned int chunkIndex = 0; unsigned int blockOffsetWithinChunk = 0; unsigned char* buffer = 0; uint64_t chunkSize = 0; compressor->locateBlock(startingHWM, chunkIndex, blockOffsetWithinChunk); if (chunkIndex < chunkPtrs.size()) { chunkSize = chunkPtrs[chunkIndex].second; // Read the HWM chunk rc = fileOp.setFileOffset(dbFile, chunkPtrs[chunkIndex].first, SEEK_SET); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Backup error seeking in " << fileType << " file for OID-" << columnOID << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; " << ec.errorString(rc); fileOp.closeFile(dbFile); throw WeException(oss.str(), rc); } buffer = new unsigned char[chunkPtrs[chunkIndex].second]; rc = fileOp.readFile(dbFile, buffer, chunkPtrs[chunkIndex].second); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Backup error reading in " << fileType << " file for OID-" << columnOID << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; " << ec.errorString(rc); delete[] buffer; fileOp.closeFile(dbFile); throw WeException(oss.str(), rc); } } else if (startingHWM == 0) { // Okay to proceed. Empty file with no chunks. Save 0 length chunk. } else { rc = ERR_METADATABKUP_COMP_CHUNK_NOT_FOUND; WErrorCodes ec; std::ostringstream oss; oss << "Backup error for " << fileType << " file for OID-" << columnOID << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; hwm-" << startingHWM << "; chunkIdx-" << chunkIndex << "; numPtrs-" << chunkPtrs.size() << "; not in hdrPtrs" << "; " << ec.errorString(rc); fileOp.closeFile(dbFile); throw WeException(oss.str(), rc); } // Backup the HWM chunk std::string errMsg; rc = writeHWMChunk(bColumnFile, columnOID, dbRoot, partition, segment, buffer, chunkSize, fileSizeBytes, startingHWM, errMsg); if (rc != NO_ERROR) { std::ostringstream oss; oss << "Backup error writing backup for " << fileType << " OID-" << columnOID << "; DBRoot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; " << errMsg; delete[] buffer; fileOp.closeFile(dbFile); throw WeException(oss.str(), rc); } // Close the applicable database column segment file and free memory delete[] buffer; fileOp.closeFile(dbFile); } //------------------------------------------------------------------------------ // Writes out the specified HWM chunk to disk, in case we need it for bulk // rollback. If an error occurs, errMsg will contain the error message. // This function is careful not to create a corrupt file (should the system // crash in the middle of writing the file for example). It's imperative // that during a failure of any kind, that we not "accidentally" create and // leave around a corrupt or incomplete HWM backup file that could cause a // bulk rollback to fail, and eventually corrupt a data base file. // So this function first creates the HWM backup file to a temp file, and // after it is successfully created, it is it renamed to the final destination. // If anything goes wrong, we try to delete any files we were creating. // // This function MUST be kept thread-safe in support of backupDctnryHWMChunk(). // See that function description for more details. //------------------------------------------------------------------------------ int RBMetaWriter::writeHWMChunk(bool bColumnFile, // is this a column (vs dictionary) file OID columnOID, // OID of column or dictionary store uint16_t dbRoot, // dbroot for db segment file uint32_t partition, // partition for db segment file uint16_t segment, // segment for db segment file const unsigned char* compressedOutBuf, // compressed chunk to be written uint64_t chunkSize, // number of bytes in compressedOutBuf uint64_t fileSize, // size of file in bytes HWM chunkHWM, // HWM in the chunk being written std::string& errMsg) const // error msg if error occurs { std::ostringstream ossFile; ossFile << "/" << columnOID << ".p" << partition << ".s" << segment; std::string fileName; std::string dirPath; int rc = getSubDirPath(dbRoot, fileName); if (rc != NO_ERROR) { std::ostringstream oss; oss << "Error creating backup file for OID " << columnOID << "; Can't find matching meta file for DBRoot" << dbRoot; errMsg = oss.str(); return ERR_METADATABKUP_COMP_OPEN_BULK_BKUP; } dirPath = fileName; fileName += ossFile.str(); std::string fileNameTmp = fileName; fileNameTmp += TMP_FILE_SUFFIX; // if ( (fLog) && (fLog->isDebug(DEBUG_1)) ) if (fLog) { std::string fileType("column"); if (!bColumnFile) fileType = "dictionary"; std::ostringstream oss; oss << "Backing up HWM chunk for " << fileType << " OID-" << columnOID << "; file-" << fileNameTmp << "; HWM-" << chunkHWM << "; bytes-" << chunkSize << "; fileSize-" << fileSize; fLog->logMsg(oss.str(), MSGLVL_INFO2); } IDBDataFile* backupFile = IDBDataFile::open(IDBPolicy::getType(fileNameTmp.c_str(), IDBPolicy::WRITEENG), fileNameTmp.c_str(), "w+b", 0); if (!backupFile) { int errRc = errno; WErrorCodes ec; std::ostringstream oss; std::string eMsg; Convertor::mapErrnoToString(errRc, eMsg); oss << ec.errorString(ERR_METADATABKUP_COMP_OPEN_BULK_BKUP) << "; " << eMsg; errMsg = oss.str(); return ERR_METADATABKUP_COMP_OPEN_BULK_BKUP; } IDBFileSystem& fs = IDBPolicy::getFs(fileNameTmp.c_str()); // Format of backup compressed chunk file: // 8 byte unsigned int carrying chunk size // 8 byte unsigned int carrying original file size // N bytes containing compressed chunk uint64_t sizeHdr[2]; sizeHdr[0] = chunkSize; sizeHdr[1] = fileSize; size_t itemsWritten = backupFile->write(sizeHdr, sizeof(uint64_t) * 2) / (sizeof(uint64_t) * 2); if (itemsWritten != 1) { int errRc = errno; WErrorCodes ec; std::ostringstream oss; std::string eMsg; Convertor::mapErrnoToString(errRc, eMsg); oss << ec.errorString(ERR_METADATABKUP_COMP_WRITE_BULK_BKUP) << "; " << eMsg; errMsg = oss.str(); delete backupFile; fs.remove(fileNameTmp.c_str()); return ERR_METADATABKUP_COMP_WRITE_BULK_BKUP; } if (chunkSize > 0) { itemsWritten = backupFile->write(compressedOutBuf, chunkSize) / chunkSize; if (itemsWritten != 1) { int errRc = errno; WErrorCodes ec; std::ostringstream oss; std::string eMsg; Convertor::mapErrnoToString(errRc, eMsg); oss << ec.errorString(ERR_METADATABKUP_COMP_WRITE_BULK_BKUP) << "; " << eMsg; errMsg = oss.str(); delete backupFile; fs.remove(fileNameTmp.c_str()); return ERR_METADATABKUP_COMP_WRITE_BULK_BKUP; } } backupFile->flush(); // IDBDataFile flush() does a sync where appropriate delete backupFile; // Rename HWM backup file to final name. if (fs.rename(fileNameTmp.c_str(), fileName.c_str())) { int errRc = errno; WErrorCodes ec; std::ostringstream oss; std::string eMsg; Convertor::mapErrnoToString(errRc, eMsg); oss << ec.errorString(ERR_METADATABKUP_COMP_RENAME) << "; " << eMsg; errMsg = oss.str(); fs.remove(fileNameTmp.c_str()); fs.remove(fileName.c_str()); return ERR_METADATABKUP_COMP_RENAME; } { std::ostringstream ossChown; idbdatafile::IDBFileSystem& fs = IDBPolicy::getFs(fileName.c_str()); if (chownPath(ossChown, fileName, fs) || chownPath(ossChown, dirPath, fs)) { throw WeException(ossChown.str(), ERR_FILE_CHOWN); } } return NO_ERROR; } //------------------------------------------------------------------------------ // Returns the directory path to be used for storing any backup data files. // // This function MUST be kept thread-safe in support of backupDctnryHWMChunk(). // See that function description for more details. //------------------------------------------------------------------------------ int RBMetaWriter::getSubDirPath(uint16_t dbRoot, std::string& bulkRollbackSubPath) const { std::map::const_iterator iter = fMetaFileNames.find(dbRoot); if (iter == fMetaFileNames.end()) { return ERR_INVALID_PARAM; } bulkRollbackSubPath = iter->second; bulkRollbackSubPath += DATA_DIR_SUFFIX; return NO_ERROR; } //------------------------------------------------------------------------------ // Prints list of compressed dictionary HWM chunks that we are tracking, // in order to backup to disk as needed, before we start adding rows to a // previously existing chunk. //------------------------------------------------------------------------------ void RBMetaWriter::printDctnryChunkList(const RBChunkInfo& rbChk, const char* assocAction) { if (fLog) { std::ostringstream oss; oss << "Dumping metaDictHWMChunks " << assocAction << rbChk << ":"; if (fRBChunkDctnrySet.size() > 0) { RBChunkSet::iterator iter = fRBChunkDctnrySet.begin(); int k = 1; while (iter != fRBChunkDctnrySet.end()) { oss << std::endl; oss << '\t' << k << ". " << *iter; ++k; ++iter; } } else { oss << std::endl; oss << '\t' << "Empty list"; } fLog->logMsg(oss.str(), MSGLVL_INFO2); } } //------------------------------------------------------------------------------ // Verify that specified string represents Version 3 file format //------------------------------------------------------------------------------ /* static */ bool RBMetaWriter::verifyVersion3(const char* versionRec) { if (strncmp(versionRec, VERSION3_REC, VERSION3_REC_LEN) == 0) return true; else return false; } //------------------------------------------------------------------------------ // Verify that specified string represents Version 4 file format //------------------------------------------------------------------------------ /* static */ bool RBMetaWriter::verifyVersion4(const char* versionRec) { if (strncmp(versionRec, VERSION4_REC, VERSION4_REC_LEN) == 0) return true; else return false; } //------------------------------------------------------------------------------ // Verify that specified record type is a Column1 record //------------------------------------------------------------------------------ /* static */ bool RBMetaWriter::verifyColumn1Rec(const char* recType) { if (strncmp(recType, COLUMN1_REC, COLUMN1_REC_LEN) == 0) return true; else return false; } //------------------------------------------------------------------------------ // Verify that specified record type is a Column2 record //------------------------------------------------------------------------------ /* static */ bool RBMetaWriter::verifyColumn2Rec(const char* recType) { if (strncmp(recType, COLUMN2_REC, COLUMN2_REC_LEN) == 0) return true; else return false; } //------------------------------------------------------------------------------ // Verify that specified record type is a DStore1 record //------------------------------------------------------------------------------ /* static */ bool RBMetaWriter::verifyDStore1Rec(const char* recType) { if (strncmp(recType, DSTORE1_REC, DSTORE1_REC_LEN) == 0) return true; else return false; } //------------------------------------------------------------------------------ // Verify that specified record type is a DStore2 record //------------------------------------------------------------------------------ /* static */ bool RBMetaWriter::verifyDStore2Rec(const char* recType) { if (strncmp(recType, DSTORE2_REC, DSTORE2_REC_LEN) == 0) return true; else return false; } } // namespace WriteEngine