From 5443f8662c9ef08720dd8d88c46db6282282f4db Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Fri, 12 Apr 2019 15:08:01 -0500 Subject: [PATCH] Checkpointing some paranoid assertions & some fixes I noticed. --- src/Cache.cpp | 1 + src/IOCoordinator.cpp | 61 +++++++++++++++++++++++++------------------ src/IOCoordinator.h | 1 + src/Replicator.cpp | 36 +++++++++++++------------ src/Synchronizer.cpp | 6 +++++ src/ThreadPool.cpp | 2 +- src/Utilities.cpp | 5 +++- src/unit_tests.cpp | 18 +++++++++++-- 8 files changed, 84 insertions(+), 46 deletions(-) diff --git a/src/Cache.cpp b/src/Cache.cpp index d7c3d85eb..c6044974a 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -368,6 +368,7 @@ void Cache::_makeSpace(size_t size) assert(currentCacheSize >= (size_t) statbuf.st_size); currentCacheSize -= statbuf.st_size; thisMuch -= statbuf.st_size; + logger->log(LOG_WARNING, "Cache: flushing! Try to avoid this, it may deadlock!"); Synchronizer::get()->flushObject(*it); replicator->remove(cachedFile.string().c_str(), Replicator::LOCAL_ONLY); LRU_t::iterator toRemove = it++; diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index 48dff76d1..4f803dc1c 100755 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -81,16 +81,9 @@ IOCoordinator * IOCoordinator::get() void IOCoordinator::willRead(const char *, off_t, size_t) { - // no cache yet // not sure we will implement this. } -#define OPEN(name, mode) \ - fd = ::open(filename, mode, 0660); \ - if (fd < 0) \ - return fd; \ - ScopedCloser sc(fd); - int IOCoordinator::loadObject(int fd, uint8_t *data, off_t offset, size_t length) const { size_t count = 0; @@ -242,6 +235,12 @@ int IOCoordinator::read(const char *filename, uint8_t *data, off_t offset, size_ } int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset, size_t length) +{ + ScopedWriteLock lock(this, filename); + return _write(filename, data, offset, length); +} + +int IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offset, size_t length) { int err = 0; uint64_t count = 0; @@ -252,9 +251,12 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset vector newObjectKeys; Synchronizer *synchronizer = Synchronizer::get(); // need to init sync here to break circular dependency... - ScopedWriteLock lock(this, filename); - - MetadataFile metadata = MetadataFile(filename); + MetadataFile metadata = MetadataFile(filename, MetadataFile::no_create_t()); + if (!metadata.exists()) + { + errno = EBADF; + return -1; + } //read metadata determine how many objects overlap objects = metadata.metadataRead(offset,length); @@ -280,11 +282,14 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset objectOffset = 0; } cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); - + err = replicator->addJournalEntry(i->key.c_str(),&data[count],objectOffset,writeLength); - + assert((uint) err == writeLength); + if (err <= 0) { + // XXXPAT: Count hasn't been updated yet, so I'm not sure what we're trying to do here. + // There's another block below that looks similar. Also similar blocks in append(). if ((count + objectOffset) > i->length) metadata.updateEntryLength(i->offset, (count + objectOffset)); metadata.writeMetadata(filename); @@ -304,7 +309,6 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset // there is no overlapping data, or data goes beyond end of last object while (dataRemaining > 0) { - cache->makeSpace(dataRemaining); metadataObject newObject = metadata.addMetadataObject(filename,0); if (count == 0 && (uint64_t) offset > newObject.offset) { @@ -320,10 +324,12 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset writeLength = min(objectSize,dataRemaining); objectOffset = 0; } + cache->makeSpace(writeLength); if ((writeLength + objectOffset) > newObject.length) metadata.updateEntryLength(newObject.offset, (writeLength + objectOffset)); // send to replicator - err = replicator->newObject(newObject.key.c_str(),data,objectOffset,writeLength); + err = replicator->newObject(newObject.key.c_str(),&data[count],objectOffset,writeLength); + assert((uint) err == writeLength); if (err <= 0) { // update metadataObject length to reflect what awas actually written @@ -345,14 +351,10 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset synchronizer->newObjects(newObjectKeys); metadata.writeMetadata(filename); - - lock.unlock(); return count; } -// -// Still fixing stuff here -// + int IOCoordinator::append(const char *filename, const uint8_t *data, size_t length) { int err; @@ -365,8 +367,12 @@ int IOCoordinator::append(const char *filename, const uint8_t *data, size_t leng ScopedWriteLock lock(this, filename); - MetadataFile metadata = MetadataFile(filename); - + MetadataFile metadata = MetadataFile(filename, MetadataFile::no_create_t()); + if (!metadata.exists()) + { + errno = EBADF; + return -1; + } uint64_t offset = metadata.getLength(); //read metadata determine if this fits in the last object @@ -376,6 +382,8 @@ int IOCoordinator::append(const char *filename, const uint8_t *data, size_t leng { std::vector::const_iterator i = objects.begin(); + // XXXPAT: Need to handle the case where objectSize has been reduced since i was created + // ie, i->length may be > objectSize here, so objectSize - i->length may be a huge positive # if ((objectSize - i->length) > 0) // if this is zero then we can't put anything else in this object { // figure out how much data to write to this object @@ -384,6 +392,7 @@ int IOCoordinator::append(const char *filename, const uint8_t *data, size_t leng cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); err = replicator->addJournalEntry(i->key.c_str(),&data[count],i->length,writeLength); + assert((uint) err == writeLength); if (err <= 0) { metadata.updateEntryLength(i->offset, (count + i->length)); @@ -405,6 +414,7 @@ int IOCoordinator::append(const char *filename, const uint8_t *data, size_t leng { //Something went wrong this shouldn't overlap objects logger->log(LOG_ERR,"IOCoordinator::append(): overlapping objects found on append.",count,length); + assert(0); return -1; } // append is starting or adding to a new object @@ -418,7 +428,8 @@ int IOCoordinator::append(const char *filename, const uint8_t *data, size_t leng metadataObject newObject = metadata.addMetadataObject(filename,writeLength); // write the new object - err = replicator->newObject(newObject.key.c_str(),data,0,writeLength); + err = replicator->newObject(newObject.key.c_str(),&data[count],0,writeLength); + assert((uint) err == writeLength); if (err <= 0) { // update metadataObject length to reflect what awas actually written @@ -519,7 +530,7 @@ int IOCoordinator::truncate(const char *path, size_t newSize) { lock.unlock(); uint8_t zero = 0; - err = write(path, &zero, newSize - 1, 1); + err = _write(path, &zero, newSize - 1, 1); if (err < 0) return -1; return 0; @@ -1043,7 +1054,7 @@ void IOCoordinator::readLock(const string &filename) boost::unique_lock s(lockMutex); //cout << "read-locking " << filename << endl; - assert(filename[0] == '/'); + //assert(filename[0] == '/'); auto ins = locks.insert(pair(filename, NULL)); if (ins.second) ins.first->second = new RWLock(); @@ -1068,7 +1079,7 @@ void IOCoordinator::writeLock(const string &filename) boost::unique_lock s(lockMutex); //cout << "write-locking " << filename << endl; - assert(filename[0] == '/'); + //assert(filename[0] == '/'); auto ins = locks.insert(pair(filename, NULL)); if (ins.second) ins.first->second = new RWLock(); diff --git a/src/IOCoordinator.h b/src/IOCoordinator.h index 6b3bcdcd3..b83ee915d 100644 --- a/src/IOCoordinator.h +++ b/src/IOCoordinator.h @@ -86,6 +86,7 @@ class IOCoordinator : public boost::noncopyable void remove(const boost::filesystem::path &path); void deleteMetaFile(const boost::filesystem::path &file); + int _write(const char *filename, const uint8_t *data, off_t offset, size_t length); int loadObjectAndJournal(const char *objFilename, const char *journalFilename, uint8_t *data, off_t offset, size_t length) const; diff --git a/src/Replicator.cpp b/src/Replicator.cpp index e7b19b8bb..98c74ee18 100755 --- a/src/Replicator.cpp +++ b/src/Replicator.cpp @@ -3,6 +3,7 @@ #include "IOCoordinator.h" #include "SMLogging.h" #include "Cache.h" +#include "Utilities.h" #include #include #include @@ -84,21 +85,11 @@ Replicator * Replicator::get() return rep; } -struct scoped_closer { - scoped_closer(int f) : fd(f) { } - ~scoped_closer() { - int s_errno = errno; - ::close(fd); - errno = s_errno; - } - int fd; -}; - #define OPEN(name, mode) \ fd = ::open(name, mode, 0600); \ if (fd < 0) \ return fd; \ - scoped_closer sc(fd); + ScopedCloser sc(fd); int Replicator::newObject(const char *filename, const uint8_t *data, off_t offset, size_t length ) { @@ -132,20 +123,24 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t uint64_t thisEntryMaxOffset = (offset + length - 1); Cache *cache = Cache::get(); // need to init sync here to break circular dependency... - if (!boost::filesystem::exists(journalFilename)) + bool exists = boost::filesystem::exists(journalFilename); + OPEN(journalFilename.c_str(), (exists ? O_RDWR : O_WRONLY | O_CREAT)) + + if (!exists) { // create new journal file with header - OPEN(journalFilename.c_str(), O_WRONLY | O_CREAT); + //OPEN(journalFilename.c_str(), O_WRONLY | O_CREAT); string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); cache->makeSpace(header.size()); err = ::write(fd, header.c_str(), header.length() + 1); + assert((uint) err == header.length() + 1); if (err <= 0) return err; } else { // read the existing header and check if max_offset needs to be updated - OPEN(journalFilename.c_str(), O_RDWR); + //OPEN(journalFilename.c_str(), O_RDWR); boost::shared_array headertxt = seekToEndOfHeader1(fd); stringstream ss; ss << headertxt.get(); @@ -157,21 +152,28 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t { string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); err = ::pwrite(fd, header.c_str(), header.length() + 1,0); + assert((uint) err == header.length() + 1); if (err <= 0) return err; } } - OPEN(journalFilename.c_str(), O_WRONLY | O_APPEND); - + // XXXPAT: avoid closing and right away opening the file again when it's easy not to. + // Just reposition the file pointer. + //OPEN(journalFilename.c_str(), O_WRONLY | O_APPEND); + ::lseek(fd, 0, SEEK_END); + err = ::write(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE); + assert(err == JOURNAL_ENTRY_HEADER_SIZE); if (err <= 0) return err; while (count < length) { err = ::write(fd, &data[count], length - count); - if (err <= 0) + if (err < 0) { + /* XXXPAT: We can't return anything but success here, unless we also update the entry's + header */ if (count > 0) // return what was successfully written return count; else diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index d3cf7028b..e2eba059b 100644 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -257,6 +257,12 @@ void Synchronizer::process(list::iterator name) pending->opFlags, e.what()); success = false; sleep(1); + continue; + /* TODO: Need to think this about this requeue logic again. The potential problem is that + there may be threads waiting for this job to finish. If the insert doesn't happen because + there is already a job in pendingOps for the same file, then the threads waiting on this + job never get woken, right?? Or, can that never happen for some reason? + */ s.lock(); auto inserted = pendingOps.insert(pair >(key, pending)); if (!inserted.second) diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp index 3bf87ca12..dda524e73 100644 --- a/src/ThreadPool.cpp +++ b/src/ThreadPool.cpp @@ -107,7 +107,7 @@ void ThreadPool::_processingLoop() threadsWaiting++; bool timedout = !jobAvailable.timed_wait<>(s, idleThreadTimeout); threadsWaiting--; - if (timedout) + if (timedout && jobs.empty()) return; } if (jobs.empty()) diff --git a/src/Utilities.cpp b/src/Utilities.cpp index b6a6d66de..f93e28a83 100644 --- a/src/Utilities.cpp +++ b/src/Utilities.cpp @@ -54,7 +54,7 @@ void ScopedWriteLock::unlock() } } -ScopedCloser::ScopedCloser(int f) : fd(f) { } +ScopedCloser::ScopedCloser(int f) : fd(f) { assert(f != -1); } ScopedCloser::~ScopedCloser() { int s_errno = errno; ::close(fd); @@ -64,6 +64,7 @@ ScopedCloser::~ScopedCloser() { SharedCloser::SharedCloser(int f) { block = new CtrlBlock(); + assert(f != -1); block->fd = f; block->refCount = 1; } @@ -78,8 +79,10 @@ SharedCloser::~SharedCloser() block->refCount--; if (block->refCount == 0) { + int s_errno = errno; ::close(block->fd); delete block; + errno = s_errno; } } diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index 7454623a5..8910f62ae 100755 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -591,10 +591,23 @@ bool IOCTruncate() bf::path cachedObjectPath = cachePath/testObjKey; makeTestMetadata(metadataFile.string().c_str()); makeTestObject(objectPath.string().c_str()); + int err; uint8_t buf[1<<14]; + int *buf32 = (int *) buf; - // Extending a file doesn't quite work yet, punting on that part of the test for now + /* Need to enable this later. + // Extend the test file to 10000 bytes + err = ioc->truncate(testFile, 10000); + assert(!err); + err = ioc->read(testFile, buf, 0, 10000); + assert(err == 10000); + // verity the data is what it should be + for (int i = 0; i < 2048; i++) + assert(buf32[i] == i); + for (int i = 2048; i < 2500; i++) + assert(buf32[i] == 0); + */ err = ioc->truncate(testFile, 4000); assert(!err); @@ -635,7 +648,6 @@ bool IOCTruncate() memset(buf, 0, 16384); err = ioc->read(testFile, buf, 0, 16384); assert(err == 16384); - int *buf32 = (int *) buf; for (int i = 0; i < 16384/4; i++) assert(buf32[i] == (i % 2048)); assert(bf::exists(cachedSecondObject)); @@ -1451,6 +1463,8 @@ int main() opentask(); //metadataUpdateTest(); + // create the metadatafile to use + MetadataFile tmpfile("metadataJournalTest"); // requires 8K object size to test boundries //Case 1 new write that spans full object metadataJournalTest((10*sizeKB),0);