/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "we_confirmhdfsdbfile.h" #include #include #include #include #include #include #include #include "we_define.h" #include "we_config.h" #include "we_fileop.h" #include "we_rbmetawriter.h" #include "IDBPolicy.h" #include "IDBDataFile.h" namespace { const int BUF_SIZE = 1024; // size of buffer used to read meta data records } namespace WriteEngine { //------------------------------------------------------------------------------ // Constructor // This class should typically only be used on an HDFS system, so we could // hardcode this to pass HDFS to getFsi(); but it comes in handy for testing, // to be able to execute this class on a non-HDFS stack as well. So I rely // on useHdfs() to tell me which FileSystem reference to get. //------------------------------------------------------------------------------ ConfirmHdfsDbFile::ConfirmHdfsDbFile() : fFs(idbdatafile::IDBPolicy::useHdfs() ? idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::HDFS) : idbdatafile::IDBPolicy::useCloud() ? idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::CLOUD) : idbdatafile::IDBFileSystem::getFs(idbdatafile::IDBDataFile::BUFFERED)) { } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ ConfirmHdfsDbFile::~ConfirmHdfsDbFile() { } //------------------------------------------------------------------------------ // Backup a cdf file and replace it with the updated tmp file. //------------------------------------------------------------------------------ int ConfirmHdfsDbFile::confirmDbFileChange(const std::string& backUpFileType, const std::string& filename, std::string& errMsg) const { // return value int rc = NO_ERROR; // This rlc file should be renamed if success, just skip it if (backUpFileType.compare("rlc") == 0) { return rc; } if (backUpFileType.compare("tmp") != 0) { std::ostringstream oss; oss << backUpFileType << " is a bad type to confirm DbFile change: " << filename; errMsg = oss.str(); rc = ERR_HDFS_BACKUP; return rc; } // add safety checks, just in case std::string tmp(filename + ".tmp"); if (!fFs.exists(tmp.c_str())) // file already swapped return rc; if (fFs.size(tmp.c_str()) <= 0) { std::ostringstream oss; oss << "tmp file " << tmp << " has bad size" << fFs.size(tmp.c_str()); errMsg = oss.str(); rc = ERR_COMP_RENAME_FILE; return rc; } // remove the old orig if exists std::string orig(filename + ".orig"); errno = 0; if ((fFs.exists(orig.c_str())) && (fFs.remove(orig.c_str())) != 0) { int errNum = errno; std::ostringstream oss; oss << "remove old " << orig << " failed: " << strerror(errNum); errMsg = oss.str(); rc = ERR_COMP_REMOVE_FILE; return rc; } // backup the original errno = 0; if (fFs.rename(filename.c_str(), orig.c_str()) != 0) { int errNum = errno; std::ostringstream oss; oss << "rename " << filename << " to " << orig << " failed: " << strerror(errNum); errMsg = oss.str(); rc = ERR_COMP_RENAME_FILE; return rc; } // rename the new file errno = 0; if (fFs.rename(tmp.c_str(), filename.c_str()) != 0) { int errNum = errno; std::ostringstream oss; oss << "rename " << tmp << " to " << filename << " failed: " << strerror(errNum); errMsg = oss.str(); rc = ERR_COMP_RENAME_FILE; return rc; } return rc; } //------------------------------------------------------------------------------ // Finalize the changes to a db file. // If success flag is true, then remove the orig // otherwise, move the orig back to cdf //------------------------------------------------------------------------------ int ConfirmHdfsDbFile::endDbFileChange(const std::string& backUpFileType, const std::string& filename, bool success, std::string& errMsg) const { // return value int rc = NO_ERROR; // This rlc file should be renamed if success, it is useless if failed. if (backUpFileType.compare("rlc") == 0) { std::string rlc(filename + ".rlc"); if (fFs.exists(rlc.c_str())) fFs.remove(rlc.c_str()); // TBD-okay to ignore failed removal? return rc; } if (backUpFileType.compare("tmp") != 0) { std::ostringstream oss; oss << backUpFileType << " is a bad type to finalize DbFile change: " << filename; errMsg = oss.str(); rc = ERR_HDFS_BACKUP; return rc; } std::string orig(filename + ".orig"); if (success) { // remove the orig file errno = 0; if ((fFs.exists(orig.c_str())) && (fFs.remove(orig.c_str())) != 0) { int errNum = errno; std::ostringstream oss; oss << "remove " << orig << " failed: " << strerror(errNum); errMsg = oss.str(); rc = ERR_COMP_REMOVE_FILE; return rc; } } else { // restore the orig file if (fFs.exists(orig.c_str())) { errno = 0; // Try to remove file only if it exists if ((fFs.exists(filename.c_str())) && (fFs.remove(filename.c_str()) != 0)) { int errNum = errno; std::ostringstream oss; oss << "failed restore; remove " << filename << " failed: " << strerror(errNum); errMsg = oss.str(); rc = ERR_COMP_REMOVE_FILE; return rc; } errno = 0; if (fFs.rename(orig.c_str(), filename.c_str()) != 0) { int errNum = errno; std::ostringstream oss; oss << "failed restore; rename " << orig << " failed: " << strerror(errNum); errMsg = oss.str(); rc = ERR_COMP_RENAME_FILE; return rc; } } // remove the tmp file std::string tmp(filename + ".tmp"); errno = 0; if ((fFs.exists(tmp.c_str())) && (fFs.remove(tmp.c_str())) != 0) { int errNum = errno; std::ostringstream oss; oss << "failed restore; remove " << tmp << " failed: " << strerror(errNum); errMsg = oss.str(); rc = ERR_COMP_REMOVE_FILE; return rc; } // remove the chunk shifting helper std::string rlc(filename + ".rlc"); errno = 0; if ((fFs.exists(rlc.c_str())) && (fFs.remove(rlc.c_str())) != 0) { int errNum = errno; std::ostringstream oss; oss << "failed restore; remove " << rlc << " failed: " << strerror(errNum); errMsg = oss.str(); rc = ERR_COMP_REMOVE_FILE; return rc; } } return rc; } //------------------------------------------------------------------------------ // Confirm the changes to the hwm DB files listed in the bulk rollback meta // data file corresponding to the specified table OID. //------------------------------------------------------------------------------ int ConfirmHdfsDbFile::confirmDbFileListFromMetaFile(OID tableOID, std::string& errMsg) { int rc = NO_ERROR; try { std::vector dbRoots; Config::getRootIdList(dbRoots); for (unsigned m = 0; m < dbRoots.size(); m++) { std::istringstream metaDataStream; openMetaDataFile(tableOID, dbRoots[m], metaDataStream); confirmDbFiles(metaDataStream); } } catch (WeException& ex) { std::ostringstream oss; oss << "Error confirming changes to table " << tableOID << "; " << ex.what(); errMsg = oss.str(); rc = ex.errorCode(); } catch (std::exception& ex) { std::ostringstream oss; oss << "Error confirming changes to table " << tableOID << "; " << ex.what(); errMsg = oss.str(); rc = ERR_UNKNOWN; } return rc; } //------------------------------------------------------------------------------ // Confirm the changes to the hwm DB files listed in the bulk rollback meta // data file stream stored in metaDataStream. //------------------------------------------------------------------------------ void ConfirmHdfsDbFile::confirmDbFiles(std::istringstream& metaDataStream) const { char inBuf[BUF_SIZE]; // Loop through the records in the meta-data file while (metaDataStream.getline(inBuf, BUF_SIZE)) { // Restore Files for current DBRoot if (RBMetaWriter::verifyColumn1Rec(inBuf)) { confirmColumnDbFile(inBuf); } else if (RBMetaWriter::verifyDStore1Rec(inBuf)) { confirmDctnryStoreDbFile(inBuf); } } } //------------------------------------------------------------------------------ // Confirm the changes to the hwm column DB file described in the bulk // rollback meta data file record stored in inBuf. //------------------------------------------------------------------------------ void ConfirmHdfsDbFile::confirmColumnDbFile(const char* inBuf) const { char recType[100]; OID columnOID; uint32_t dbRootHwm; uint32_t partNumHwm; uint32_t segNumHwm; HWM lastLocalHwm; int colTypeInt; char colTypeName[100]; uint32_t colWidth; int compressionType = 0; // optional parameter // Read meta-data record int numFields = sscanf(inBuf, "%s %u %u %u %u %u %d %s %u %d", recType, &columnOID, &dbRootHwm, &partNumHwm, &segNumHwm, &lastLocalHwm, &colTypeInt, colTypeName, &colWidth, &compressionType); if (numFields < 9) // compressionType is optional { std::ostringstream oss; oss << "Invalid COLUM1 record in meta-data file " << fMetaFileName << "; record-<" << inBuf << ">"; throw WeException(oss.str(), ERR_INVALID_PARAM); } // Construct the DB file name char dbFileName[FILE_NAME_SIZE]; FileOp dbFile(false); int rc = dbFile.getFileName(columnOID, dbFileName, dbRootHwm, partNumHwm, segNumHwm); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error constructing column filename to confirm changes" << "; columnOID-" << columnOID << "; dbRoot-" << dbRootHwm << "; partNum-" << partNumHwm << "; segNum-" << segNumHwm << "; " << ec.errorString(rc); throw WeException(oss.str(), rc); } // Confirm the changes to the DB file name std::string errMsg; rc = confirmDbFileChange(std::string("tmp"), dbFileName, errMsg); if (rc != NO_ERROR) { throw WeException(errMsg, rc); } } //------------------------------------------------------------------------------ // Confirm the changes to the hwm dctnry store DB file described in the bulk // rollback meta data file record stored in inBuf. //------------------------------------------------------------------------------ void ConfirmHdfsDbFile::confirmDctnryStoreDbFile(const char* inBuf) const { char recType[100]; OID dColumnOID; OID dStoreOID; uint32_t dbRootHwm; uint32_t partNumHwm; uint32_t segNumHwm; HWM localHwm; int compressionType = 0; // optional parameter // Read meta-data record int numFields = sscanf(inBuf, "%s %u %u %u %u %u %u %d", recType, &dColumnOID, &dStoreOID, &dbRootHwm, &partNumHwm, &segNumHwm, &localHwm, &compressionType); if (numFields < 7) // compressionType optional { std::ostringstream oss; oss << "Invalid DSTOR1 record in meta-data file " << fMetaFileName << "; record-<" << inBuf << ">"; throw WeException(oss.str(), ERR_INVALID_PARAM); } // Construct the DB file name char dbFileName[FILE_NAME_SIZE]; FileOp dbFile(false); int rc = dbFile.getFileName(dStoreOID, dbFileName, dbRootHwm, partNumHwm, segNumHwm); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error constructing dictionary store filename to confirm changes" << "; columnOID-" << dStoreOID << "; dbRoot-" << dbRootHwm << "; partNum-" << partNumHwm << "; segNum-" << segNumHwm << "; " << ec.errorString(rc); throw WeException(oss.str(), rc); } // Confirm the changes to the DB file name std::string errMsg; rc = confirmDbFileChange(std::string("tmp"), dbFileName, errMsg); if (rc != NO_ERROR) { throw WeException(errMsg, rc); } } //------------------------------------------------------------------------------ // End the changes to the hwm DB files listed in the bulk rollback meta // data file corresponding to the specified table OID. Delete temp files. //------------------------------------------------------------------------------ int ConfirmHdfsDbFile::endDbFileListFromMetaFile(OID tableOID, bool success, std::string& errMsg) { int rc = NO_ERROR; errMsg.clear(); std::vector dbRoots; Config::getRootIdList(dbRoots); for (unsigned m = 0; m < dbRoots.size(); m++) { std::istringstream metaDataStream; try { std::istringstream metaDataStream; openMetaDataFile(tableOID, dbRoots[m], metaDataStream); endDbFiles(metaDataStream, success); } // We catch any errors, but not deleting a temp file is not fatal, // so we capture the error msg and keep going if a problem occurs. // We return a concatenated list of error msgs if multiple errors // take place. catch (WeException& ex) { if (errMsg.size() == 0) { std::ostringstream oss; oss << "Error deleting temp files for table " << tableOID << "; " << ex.what(); errMsg = oss.str(); rc = ex.errorCode(); } else { errMsg += "; "; errMsg += ex.what(); } } catch (std::exception& ex) { if (errMsg.size() == 0) { std::ostringstream oss; oss << "Error deleting temp files for table " << tableOID << "; " << ex.what(); errMsg = oss.str(); rc = ERR_UNKNOWN; } else { errMsg += "; "; errMsg += ex.what(); } } } return rc; } //------------------------------------------------------------------------------ // End the changes to the hwm DB files listed in the bulk rollback meta // data file stream stored in metaDataStream. Delete temp files. //------------------------------------------------------------------------------ void ConfirmHdfsDbFile::endDbFiles(std::istringstream& metaDataStream, bool success) const { char inBuf[BUF_SIZE]; std::string errMsg; int rc = NO_ERROR; // Loop through the records in the meta-data file while (metaDataStream.getline(inBuf, BUF_SIZE)) { try { // Delete Temp Files for current DBRoot if (RBMetaWriter::verifyColumn1Rec(inBuf)) { endColumnDbFile(inBuf, success); } else if (RBMetaWriter::verifyDStore1Rec(inBuf)) { endDctnryStoreDbFile(inBuf, success); } } // We catch any errors, but not deleting a temp file is not fatal, // so we capture the error msg and keep going if a problem occurs. // We return a concatenated list of error msgs if multiple errors // take place. catch (WeException& ex) { if (errMsg.size() == 0) { rc = ex.errorCode(); } else { errMsg += "; "; } errMsg += ex.what(); } catch (std::exception& ex) { if (errMsg.size() == 0) { rc = ERR_UNKNOWN; } else { errMsg += "; "; } errMsg += ex.what(); } } // Throw exception with cumulative list of any error msgs if (errMsg.size() > 0) { throw WeException(errMsg, rc); } } //------------------------------------------------------------------------------ // End the changes to the hwm column DB file described in the bulk // rollback meta data file record stored in inBuf. Delete the temp file. //------------------------------------------------------------------------------ void ConfirmHdfsDbFile::endColumnDbFile(const char* inBuf, bool success) const { char recType[100]; OID columnOID; uint32_t dbRootHwm; uint32_t partNumHwm; uint32_t segNumHwm; HWM lastLocalHwm; int colTypeInt; char colTypeName[100]; uint32_t colWidth; int compressionType = 0; // optional parameter // Read meta-data record int numFields = sscanf(inBuf, "%s %u %u %u %u %u %d %s %u %d", recType, &columnOID, &dbRootHwm, &partNumHwm, &segNumHwm, &lastLocalHwm, &colTypeInt, colTypeName, &colWidth, &compressionType); if (numFields < 9) // compressionType is optional { std::ostringstream oss; oss << "Invalid COLUM1 record in meta-data file " << fMetaFileName << "; record-<" << inBuf << ">"; throw WeException(oss.str(), ERR_INVALID_PARAM); } // Construct the DB file name char dbFileName[FILE_NAME_SIZE]; FileOp dbFile(false); int rc = dbFile.getFileName(columnOID, dbFileName, dbRootHwm, partNumHwm, segNumHwm); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error constructing column filename to end changes" << "; columnOID-" << columnOID << "; dbRoot-" << dbRootHwm << "; partNum-" << partNumHwm << "; segNum-" << segNumHwm << "; " << ec.errorString(rc); throw WeException(oss.str(), rc); } // Confirm the changes to the DB file name std::string errMsg; rc = endDbFileChange(std::string("tmp"), dbFileName, success, errMsg); if (rc != NO_ERROR) { throw WeException(errMsg, rc); } } //------------------------------------------------------------------------------ // End the changes to the hwm dctnry store DB file described in the bulk // rollback meta data file record stored in inBuf. Delete the temp file. //------------------------------------------------------------------------------ void ConfirmHdfsDbFile::endDctnryStoreDbFile(const char* inBuf, bool success) const { char recType[100]; OID dColumnOID; OID dStoreOID; uint32_t dbRootHwm; uint32_t partNumHwm; uint32_t segNumHwm; HWM localHwm; int compressionType = 0; // optional parameter // Read meta-data record int numFields = sscanf(inBuf, "%s %u %u %u %u %u %u %d", recType, &dColumnOID, &dStoreOID, &dbRootHwm, &partNumHwm, &segNumHwm, &localHwm, &compressionType); if (numFields < 7) // compressionType optional { std::ostringstream oss; oss << "Invalid DSTOR1 record in meta-data file " << fMetaFileName << "; record-<" << inBuf << ">"; throw WeException(oss.str(), ERR_INVALID_PARAM); } // Construct the DB file name char dbFileName[FILE_NAME_SIZE]; FileOp dbFile(false); int rc = dbFile.getFileName(dStoreOID, dbFileName, dbRootHwm, partNumHwm, segNumHwm); if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error constructing dictionary store filename to end changes" << "; columnOID-" << dStoreOID << "; dbRoot-" << dbRootHwm << "; partNum-" << partNumHwm << "; segNum-" << segNumHwm << "; " << ec.errorString(rc); throw WeException(oss.str(), rc); } // Confirm the changes to the DB file name std::string errMsg; rc = endDbFileChange(std::string("tmp"), dbFileName, success, errMsg); if (rc != NO_ERROR) { throw WeException(errMsg, rc); } } //------------------------------------------------------------------------------ // Open and read the bulk rollback metadata file for the specified table OID // and DBRoot. The contents of the metadata file is returned in the meta- // DataStream argument. //------------------------------------------------------------------------------ void ConfirmHdfsDbFile::openMetaDataFile(OID tableOID, uint16_t dbRoot, std::istringstream& metaDataStream) { std::string bulkRollbackPath(Config::getDBRootByNum(dbRoot)); // Construct file name and check for it's existence std::ostringstream ossFileName; ossFileName << '/' << DBROOT_BULK_ROLLBACK_SUBDIR << '/' << tableOID; fMetaFileName = bulkRollbackPath; fMetaFileName += ossFileName.str(); // Return if the meta-data file does not exist. if (!fFs.exists(fMetaFileName.c_str())) { std::ostringstream oss; oss << "Bulk rollback meta-data file " << fMetaFileName << " does not exist."; throw WeException(oss.str(), ERR_FILE_NOT_EXIST); } // Open the file boost::scoped_ptr metaFile; errno = 0; metaFile.reset(idbdatafile::IDBDataFile::open( idbdatafile::IDBPolicy::getType(fMetaFileName.c_str(), idbdatafile::IDBPolicy::WRITEENG), fMetaFileName.c_str(), "rb", 0)); if (!metaFile) { int errRc = errno; std::ostringstream oss; oss << "Error opening bulk rollback meta-data file " << fMetaFileName << "; err-" << errRc << "; " << strerror(errRc); throw WeException(oss.str(), ERR_FILE_OPEN); } // First record in the file must be a Version record. char inBuf[BUF_SIZE]; ssize_t metaFileSize = fFs.size(fMetaFileName.c_str()); boost::scoped_array buf(new char[metaFileSize]); // retry 10 times for partial reads, just in case ssize_t readSofar = 0; // bytes read so far ssize_t bytes = 0; // bytes read by one pread char* p = buf.get(); for (int i = 0; i < 10 && readSofar < metaFileSize; i++) { errno = 0; bytes = metaFile->pread(p + readSofar, readSofar, metaFileSize - readSofar); if (bytes < 0) break; readSofar += bytes; } if (readSofar != metaFileSize) { int errRc = errno; std::ostringstream oss; oss << "Error reading bulk rollback meta-data file " << fMetaFileName << "; read/expect:" << readSofar << "/" << metaFileSize << "; err-" << errRc << "; " << strerror(errRc); throw WeException(oss.str(), ERR_FILE_READ); } // put the data in a string stream metaDataStream.str(std::string(p, metaFileSize)); buf.reset(); // read data metaDataStream.getline(inBuf, BUF_SIZE); if (!RBMetaWriter::verifyVersion4(inBuf)) { std::ostringstream oss; oss << "Invalid version record in meta-data file " << fMetaFileName << "; record-<" << inBuf << ">"; throw WeException(oss.str(), ERR_INVALID_PARAM); } } } // namespace WriteEngine