diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index e948fdf14..dca6d10b0 100755 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -249,8 +249,10 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset uint64_t dataRemaining = length; uint64_t journalOffset = 0; vector objects; + vector newObjectKeys; + Synchronizer *synchronizer = Synchronizer::get(); // need to init sync here to break circular dependency... - //writeLock(filename); + ScopedWriteLock lock(this, filename); MetadataFile metadata = MetadataFile(filename); @@ -263,7 +265,7 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset for (std::vector::const_iterator i = objects.begin(); i != objects.end(); ++i) { // figure out how much data to write to this object - if (count == 0 && offset > i->offset) + if (count == 0 && offset >= i->offset) { // first object in the list so start at offset and // write to end of oject or all the data @@ -277,10 +279,122 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset writeLength = min(objectSize,dataRemaining); journalOffset = 0; } + cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); + + err = replicator->addJournalEntry(i->key.c_str(),&data[count],journalOffset,writeLength); + + if (err <= 0) + { + if ((count + journalOffset) > i->length) + metadata.updateEntryLength(i->offset, (count + journalOffset)); + metadata.writeMetadata(filename); + logger->log(LOG_ERR,"IOCoordinator::write(): object failed to complete write, %u of %u bytes written.",count,length); + return count; + } + if ((writeLength + journalOffset) > i->length) + metadata.updateEntryLength(i->offset, (writeLength + journalOffset)); + + cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE); + + synchronizer->newJournalEntry(i->key); + count += err; + dataRemaining -= err; + } + } + // there is no overlapping data, or data goes beyond end of last object + while (dataRemaining > 0 && err >= 0) + { + //add a new metaDataObject + if (count == 0) + { + //this is starting beyond last object in metadata + //figure out if the offset is in this object + if (offset < objectSize) + { + journalOffset = offset; + writeLength = min((objectSize - journalOffset),dataRemaining); + } + else + { + //we need to create an object that is only padding + } + } + else + { + // count != 0 we've already started writing and are going to new object + // start at beginning of the new object + writeLength = min(objectSize,dataRemaining); + journalOffset = 0; + } + cache->makeSpace(journalOffset+writeLength); + // add a new metadata object, this will get a new objectKey + metadataObject newObject = metadata.addMetadataObject(filename,writeLength); + // write the new object + err = replicator->newObject(newObject.key.c_str(),data,journalOffset,writeLength); + if (err <= 0) + { + // update metadataObject length to reflect what awas actually written + if ((count + journalOffset) > newObject.length) + metadata.updateEntryLength(newObject.offset, (count + journalOffset)); + metadata.writeMetadata(filename); + logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length); + return count; + //log error and abort + } + if ((writeLength + journalOffset) > newObject.length) + metadata.updateEntryLength(newObject.offset, (writeLength + journalOffset)); + + cache->newObject(newObject.key,(writeLength + journalOffset)); + newObjectKeys.push_back(newObject.key); + + count += err; + dataRemaining -= err; + } + + synchronizer->newObjects(newObjectKeys); + + metadata.writeMetadata(filename); + + lock.unlock(); + + return count; +} +// +// Still fixing stuff here +// +int IOCoordinator::append(const char *filename, const uint8_t *data, size_t length) +{ + int fd, err; + size_t count = 0; + uint64_t writeLength = 0; + uint64_t dataRemaining = length; + uint64_t journalOffset = 0; + vector objects; + + //writeLock(filename); + + MetadataFile metadata = MetadataFile(filename); + + int offset = metadata.getLength(); + + //read metadata determine if this fits in the last object + objects = metadata.metadataRead(offset,length); + + if(!objects.empty() && objects.size() == 1) + { + std::vector::const_iterator i = objects.begin(); + + // figure out how much data to write to this object + if (offset > i->offset) + { + journalOffset = offset - i->offset; + writeLength = min((objectSize - journalOffset),dataRemaining); + } err = replicator->addJournalEntry(i->key.c_str(),&data[count],journalOffset,writeLength); if (err <= 0) { - metadata.updateEntryLength(i->offset, count); + if ((count + journalOffset) > i->length) + metadata.updateEntryLength(i->offset, (count + journalOffset)); metadata.writeMetadata(filename); logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length); return count; @@ -290,25 +404,33 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset count += err; dataRemaining -= err; - } - //cache.makeSpace(journal_data_size) + //cache->makeSpace(journal_data_size) //Synchronizer::newJournalData(journal_file); } - + else + { + //Something went wrong this shouldn't overlap objects + } // there is no overlapping data, or data goes beyond end of last object while (dataRemaining > 0 && err >= 0) { //add a new metaDataObject writeLength = min(objectSize,dataRemaining); - //cache.makeSpace(size) + if (count == 0) + { + //this is append and it starting beyond last object in metadata + //figure out if the offset is in this object + } + //cache->makeSpace(size) // add a new metadata object, this will get a new objectKey NOTE: probably needs offset too metadataObject newObject = metadata.addMetadataObject(filename,writeLength); // write the new object - err = replicator->newObject(newObject.key.c_str(),data,writeLength); + err = replicator->newObject(newObject.key.c_str(),data,journalOffset,writeLength); if (err <= 0) { // update metadataObject length to reflect what awas actually written - metadata.updateEntryLength(newObject.offset, count); + if ((count + journalOffset) > newObject.length) + metadata.updateEntryLength(newObject.offset, (count + journalOffset)); metadata.writeMetadata(filename); logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length); return count; @@ -321,28 +443,6 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset } metadata.writeMetadata(filename); - - //writeUnlock(filename); - - return count; -} - -int IOCoordinator::append(const char *filename, const uint8_t *data, size_t length) -{ - int fd, err; - - OPEN(filename, O_WRONLY | O_APPEND); - size_t count = 0; - while (count < length) { - err = ::write(fd, &data[count], length - count); - if (err <= 0) - if (count > 0) // return what was successfully written - return count; - else - return err; - count += err; - } - return count; } diff --git a/src/MetadataFile.cpp b/src/MetadataFile.cpp index cbf794578..9c4b7bc0b 100755 --- a/src/MetadataFile.cpp +++ b/src/MetadataFile.cpp @@ -229,12 +229,14 @@ vector MetadataFile::metadataRead(off_t offset, size_t length) c break; ++i; } - // append objects until foundLen >= length or EOF + // first time thrus foundLen should be adjusted based on offset + off_t foundOffset = offset - i->offset; while (i != mObjects.end() && foundLen < length) { ret.push_back(*i); - foundLen += i->length; + foundLen += (i->length - foundOffset); + foundOffset = 0; //zero on every other time thru this loop ++i; } return ret; diff --git a/src/Replicator.cpp b/src/Replicator.cpp index 166240ef2..25c675ffe 100755 --- a/src/Replicator.cpp +++ b/src/Replicator.cpp @@ -2,6 +2,7 @@ #include "Replicator.h" #include "IOCoordinator.h" #include "SMLogging.h" +#include "Cache.h" #include #include #include @@ -42,7 +43,6 @@ Replicator::Replicator() mpLogger->log(LOG_CRIT, "Could not load metadata_path from storagemanger.cnf file."); throw runtime_error("Please set ObjectStorage/metadata_path in the storagemanager.cnf file"); } - boost::filesystem::create_directories(msJournalPath); try { boost::filesystem::create_directories(msJournalPath); @@ -52,6 +52,21 @@ Replicator::Replicator() syslog(LOG_CRIT, "Failed to create %s, got: %s", msJournalPath.c_str(), e.what()); throw e; } + msCachePath = mpConfig->getValue("Cache", "path"); + if (msCachePath.empty()) + { + mpLogger->log(LOG_CRIT, "Cache/path is not set"); + throw runtime_error("Please set Cache/path in the storagemanager.cnf file"); + } + try + { + boost::filesystem::create_directories(msCachePath); + } + catch (exception &e) + { + mpLogger->log(LOG_CRIT, "Failed to create %s, got: %s", msCachePath.c_str(), e.what()); + throw e; + } } Replicator::~Replicator() @@ -85,14 +100,15 @@ struct scoped_closer { return fd; \ scoped_closer sc(fd); -int Replicator::newObject(const char *filename, const uint8_t *data, size_t length ) +int Replicator::newObject(const char *filename, const uint8_t *data, off_t offset, size_t length ) { int fd, err; + string objectFilename = msCachePath + "/" + string(filename); - OPEN(filename, O_WRONLY | O_CREAT); + OPEN(objectFilename.c_str(), O_WRONLY | O_CREAT); size_t count = 0; while (count < length) { - err = ::write(fd, &data[count], length - count); + err = ::pwrite(fd, &data[count], length - count, offset); if (err <= 0) if (count > 0) // return what was successfully written return count; @@ -112,11 +128,14 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t int version = 1; string journalFilename = msJournalPath + "/" + string(filename) + ".journal"; uint64_t thisEntryMaxOffset = (offset + length - 1); + Cache *cache = Cache::get(); // need to init sync here to break circular dependency... + if (!boost::filesystem::exists(journalFilename)) { // create new journal file with header OPEN(journalFilename.c_str(), O_WRONLY | O_CREAT); string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); + cache->makeSpace(header.size()); err = ::write(fd, header.c_str(), header.length() + 1); if (err <= 0) return err; @@ -143,7 +162,7 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t OPEN(journalFilename.c_str(), O_WRONLY | O_APPEND); - err = ::write(fd, offlen, 16); + err = ::write(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE); if (err <= 0) return err; diff --git a/src/Replicator.h b/src/Replicator.h index d4c407ebf..8e2bdfa53 100755 --- a/src/Replicator.h +++ b/src/Replicator.h @@ -7,6 +7,8 @@ #include #include +#define JOURNAL_ENTRY_HEADER_SIZE 16 + namespace storagemanager { @@ -28,7 +30,7 @@ class Replicator }; int addJournalEntry(const char *filename, const uint8_t *data, off_t offset, size_t length); - int newObject(const char *filename, const uint8_t *data, size_t length); + int newObject(const char *filename, const uint8_t *data, off_t offset, size_t length); int remove(const char *filename, Flags flags = NONE); int remove(const boost::filesystem::path &file, Flags flags = NONE); @@ -39,6 +41,7 @@ class Replicator Config *mpConfig; SMLogging *mpLogger; std::string msJournalPath; + std::string msCachePath; //ThreadPool threadPool; }; diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index 21e0c345c..30908f162 100755 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -205,11 +205,14 @@ bool replicatorTest() Config* config = Config::get(); string metaPath = config->getValue("ObjectStorage", "metadata_path"); string journalPath = config->getValue("ObjectStorage", "journal_path"); + string cacehPath = config->getValue("Cache", "path"); + Replicator *repli = Replicator::get(); int err,fd; const char *newobject = "newobjectTest"; const char *newobjectJournal = "newobjectTest.journal"; string newObjectJournalFullPath = journalPath + "/" + "newobjectTest.journal"; + string newObjectCacheFullPath = cacehPath + "/" + "newobjectTest"; uint8_t buf[1024]; uint8_t data[1024]; int version = 1; @@ -218,10 +221,10 @@ bool replicatorTest() string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % max_offset).str(); // test newObject - repli->newObject(newobject,data,10); + repli->newObject(newobject,data,0,10); //check file contents - fd = ::open(newobject, O_RDONLY); + fd = ::open(newObjectCacheFullPath.c_str(), O_RDONLY); err = ::read(fd, buf, 1024); assert(err == 10); buf[10] = 0; @@ -240,9 +243,9 @@ bool replicatorTest() cout << "replicator addJournalEntry OK" << endl; ::close(fd); - repli->remove(newobject); + repli->remove(newObjectCacheFullPath.c_str()); repli->remove(newObjectJournalFullPath.c_str()); - assert(!boost::filesystem::exists(newobject)); + assert(!boost::filesystem::exists(newObjectCacheFullPath.c_str())); cout << "replicator remove OK" << endl; return true; } @@ -283,10 +286,6 @@ bool metadataJournalTest(std::size_t size, off_t offset) assert(resp->header.payloadLen == 4); assert(resp->header.flags == 0); assert(resp->returnCode == size); - - MetadataFile mdfTest(filename); - mdfTest.printObjects(); - } void metadataJournalTestCleanup(std::size_t size) @@ -294,14 +293,18 @@ void metadataJournalTestCleanup(std::size_t size) Config* config = Config::get(); string metaPath = config->getValue("ObjectStorage", "metadata_path"); string journalPath = config->getValue("ObjectStorage", "journal_path"); + string cachePath = config->getValue("Cache", "path"); + const char *filename = "metadataJournalTest"; MetadataFile mdfTest(filename); + mdfTest.printObjects(); std::vector objects = mdfTest.metadataRead(0,size); for (std::vector::const_iterator i = objects.begin(); i != objects.end(); ++i) { + string casheObject = cachePath + "/" + i->key; string keyJournal = journalPath + "/" + i->key + ".journal"; if(boost::filesystem::exists(i->key.c_str())) - ::unlink(i->key.c_str()); + ::unlink(casheObject.c_str()); if(boost::filesystem::exists(keyJournal.c_str())) ::unlink(keyJournal.c_str()); } @@ -1415,7 +1418,7 @@ int main() scoped_closer sc1(serverSock), sc2(sessionSock), sc3(clientSock); opentask(); - metadataUpdateTest(); + //metadataUpdateTest(); // requires 8K object size to test boundries //Case 1 new write that spans full object metadataJournalTest((10*sizeKB),0); @@ -1428,13 +1431,12 @@ int main() //Case 5 write starts in new object at offset >0 //TODO add zero padding to writes in this scenario //metadataJournalTest((8*sizeKB),4*sizeKB); - metadataJournalTestCleanup(17*sizeKB); //writetask(); - appendtask(); + //appendtask(); unlinktask(); stattask(); - //truncatetask(); // currently waiting on IOC::write() to be completed. + truncatetask(); // currently waiting on IOC::write() to be completed. listdirtask(); pingtask(); copytask(); @@ -1451,5 +1453,8 @@ int main() IOCUnlink(); IOCCopyFile(); + sleep(5); // sometimes this deletes them before syncwithjournal is called + metadataJournalTestCleanup(17*sizeKB); + return 0; }