#include "Replicator.h" #include "IOCoordinator.h" #include "SMLogging.h" #include "Utilities.h" #include #include #include #include #include #include #include #define BOOST_SPIRIT_THREADSAFE #include #include #include #include using namespace std; namespace { storagemanager::Replicator *rep = NULL; boost::mutex m; } namespace storagemanager { Replicator::Replicator() { mpConfig = Config::get(); mpLogger = SMLogging::get(); try { msJournalPath = mpConfig->getValue("ObjectStorage", "journal_path"); if (msJournalPath.empty()) { mpLogger->log(LOG_CRIT, "ObjectStorage/journal_path is not set"); throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file"); } } catch (...) { mpLogger->log(LOG_CRIT, "Could not load metadata_path from storagemanger.cnf file."); throw runtime_error("Please set ObjectStorage/metadata_path in the storagemanager.cnf file"); } try { boost::filesystem::create_directories(msJournalPath); } catch (exception &e) { syslog(LOG_CRIT, "Failed to create %s, got: %s", msJournalPath.c_str(), e.what()); throw e; } msCachePath = mpConfig->getValue("Cache", "path"); if (msCachePath.empty()) { mpLogger->log(LOG_CRIT, "Cache/path is not set"); throw runtime_error("Please set Cache/path in the storagemanager.cnf file"); } try { boost::filesystem::create_directories(msCachePath); } catch (exception &e) { mpLogger->log(LOG_CRIT, "Failed to create %s, got: %s", msCachePath.c_str(), e.what()); throw e; } } Replicator::~Replicator() { } Replicator * Replicator::get() { if (rep) return rep; boost::mutex::scoped_lock s(m); if (rep) return rep; rep = new Replicator(); return rep; } #define OPEN(name, mode) \ fd = ::open(name, mode, 0600); \ if (fd < 0) \ return fd; \ ScopedCloser sc(fd); int Replicator::newObject(const char *filename, const uint8_t *data, off_t offset, size_t length ) { int fd, err; string objectFilename = msCachePath + "/" + string(filename); OPEN(objectFilename.c_str(), O_WRONLY | O_CREAT); size_t count = 0; while (count < length) { err = ::pwrite(fd, &data[count], length - count, offset); if (err <= 0) { if (count > 0) // return what was successfully written return count; else return err; } count += err; } return count; } int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t offset, size_t length) { int fd, err; uint64_t offlen[] = {(uint64_t) offset,length}; size_t count = 0; int version = 1; string journalFilename = msJournalPath + "/" + string(filename) + ".journal"; uint64_t thisEntryMaxOffset = (offset + length - 1); bool exists = boost::filesystem::exists(journalFilename); OPEN(journalFilename.c_str(), (exists ? O_RDWR : O_WRONLY | O_CREAT)) if (!exists) { // create new journal file with header //OPEN(journalFilename.c_str(), O_WRONLY | O_CREAT); string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); err = ::write(fd, header.c_str(), header.length() + 1); assert((uint) err == header.length() + 1); if (err <= 0) return err; } else { // read the existing header and check if max_offset needs to be updated //OPEN(journalFilename.c_str(), O_RDWR); boost::shared_array headertxt = seekToEndOfHeader1(fd); stringstream ss; ss << headertxt.get(); boost::property_tree::ptree header; boost::property_tree::json_parser::read_json(ss, header); assert(header.get("version") == 1); uint64_t currentMaxOffset = header.get("max_offset"); if (thisEntryMaxOffset > currentMaxOffset) { string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); err = ::pwrite(fd, header.c_str(), header.length() + 1,0); assert((uint) err == header.length() + 1); if (err <= 0) return err; } } // XXXPAT: avoid closing and right away opening the file again when it's easy not to. // Just reposition the file pointer. //OPEN(journalFilename.c_str(), O_WRONLY | O_APPEND); ::lseek(fd, 0, SEEK_END); err = ::write(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE); assert(err == JOURNAL_ENTRY_HEADER_SIZE); if (err <= 0) return err; while (count < length) { err = ::write(fd, &data[count], length - count); if (err < 0) { /* XXXPAT: We can't return anything but success here, unless we also update the entry's header */ if (count > 0) // return what was successfully written return count; else return err; } count += err; } return count; } int Replicator::remove(const boost::filesystem::path &filename, Flags flags) { int ret = 0; if (flags & NO_LOCAL) return 0; // not implemented yet try { //#ifndef NDEBUG // assert(boost::filesystem::remove_all(filename) > 0); //#else boost::filesystem::remove_all(filename); //#endif } catch(boost::filesystem::filesystem_error &e) { #ifndef NDEBUG cout << "Replicator::remove(): caught an execption: " << e.what() << endl; assert(0); #endif errno = e.code().value(); ret = -1; } return ret; } int Replicator::remove(const char *filename, Flags flags) { if (flags & NO_LOCAL) return 0; // not implemented yet boost::filesystem::path p(filename); return remove(p); } int Replicator::updateMetadata(const char *filename, MetadataFile &meta) { return meta.writeMetadata(filename); } }