From cf678170caa0de70b6b686c43a2986d1bfab5d43 Mon Sep 17 00:00:00 2001 From: Ben Thompson Date: Mon, 8 Apr 2019 16:34:48 -0500 Subject: [PATCH] fixes for write and finish append. add append unit_test cases. --- src/IOCoordinator.cpp | 142 +++++++++++++++++++++--------------------- src/unit_tests.cpp | 48 +++++++++++++- 2 files changed, 116 insertions(+), 74 deletions(-) diff --git a/src/IOCoordinator.cpp b/src/IOCoordinator.cpp index 3e290897c..794e7d766 100755 --- a/src/IOCoordinator.cpp +++ b/src/IOCoordinator.cpp @@ -247,7 +247,7 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset uint64_t count = 0; uint64_t writeLength = 0; uint64_t dataRemaining = length; - uint64_t journalOffset = 0; + uint64_t objectOffset = 0; vector objects; vector newObjectKeys; Synchronizer *synchronizer = Synchronizer::get(); // need to init sync here to break circular dependency... @@ -269,86 +269,77 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset { // first object in the list so start at offset and // write to end of oject or all the data - journalOffset = offset - i->offset; - writeLength = min((objectSize - journalOffset),dataRemaining); + objectOffset = offset - i->offset; + writeLength = min((objectSize - objectOffset),dataRemaining); } else { // starting at beginning of next object write the rest of data // or until object length is reached writeLength = min(objectSize,dataRemaining); - journalOffset = 0; + objectOffset = 0; } cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); - err = replicator->addJournalEntry(i->key.c_str(),&data[count],journalOffset,writeLength); + err = replicator->addJournalEntry(i->key.c_str(),&data[count],objectOffset,writeLength); if (err <= 0) { - if ((count + journalOffset) > i->length) - metadata.updateEntryLength(i->offset, (count + journalOffset)); + if ((count + objectOffset) > i->length) + metadata.updateEntryLength(i->offset, (count + objectOffset)); metadata.writeMetadata(filename); logger->log(LOG_ERR,"IOCoordinator::write(): object failed to complete write, %u of %u bytes written.",count,length); return count; } - if ((writeLength + journalOffset) > i->length) - metadata.updateEntryLength(i->offset, (writeLength + journalOffset)); + if ((writeLength + objectOffset) > i->length) + metadata.updateEntryLength(i->offset, (writeLength + objectOffset)); cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE); synchronizer->newJournalEntry(i->key); - count += err; - dataRemaining -= err; + count += writeLength; + dataRemaining -= writeLength; } } // there is no overlapping data, or data goes beyond end of last object - while (dataRemaining > 0 && err >= 0) + while (dataRemaining > 0) { - //add a new metaDataObject - if (count == 0) + cache->makeSpace(dataRemaining); + metadataObject newObject = metadata.addMetadataObject(filename,0); + if (count == 0 && (uint64_t) offset > newObject.offset) { //this is starting beyond last object in metadata //figure out if the offset is in this object - if ((uint64_t) offset < objectSize) - { - journalOffset = offset; - writeLength = min((objectSize - journalOffset),dataRemaining); - } - else - { - //we need to create an object that is only padding - } + objectOffset = offset - newObject.offset; + writeLength = min((objectSize - objectOffset),dataRemaining); } else { // count != 0 we've already started writing and are going to new object // start at beginning of the new object writeLength = min(objectSize,dataRemaining); - journalOffset = 0; + objectOffset = 0; } - cache->makeSpace(journalOffset+writeLength); - // add a new metadata object, this will get a new objectKey - metadataObject newObject = metadata.addMetadataObject(filename,writeLength); - // write the new object - err = replicator->newObject(newObject.key.c_str(),data,journalOffset,writeLength); + if ((writeLength + objectOffset) > newObject.length) + metadata.updateEntryLength(newObject.offset, (writeLength + objectOffset)); + // send to replicator + err = replicator->newObject(newObject.key.c_str(),data,objectOffset,writeLength); if (err <= 0) { // update metadataObject length to reflect what awas actually written - if ((count + journalOffset) > newObject.length) - metadata.updateEntryLength(newObject.offset, (count + journalOffset)); + if ((count + objectOffset) > newObject.length) + metadata.updateEntryLength(newObject.offset, (count + objectOffset)); metadata.writeMetadata(filename); logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length); return count; //log error and abort } - if ((writeLength + journalOffset) > newObject.length) - metadata.updateEntryLength(newObject.offset, (writeLength + journalOffset)); - cache->newObject(newObject.key,(writeLength + journalOffset)); + cache->newObject(newObject.key,(writeLength + objectOffset)); newObjectKeys.push_back(newObject.key); - count += err; - dataRemaining -= err; + count += writeLength; + dataRemaining -= writeLength; } synchronizer->newObjects(newObjectKeys); @@ -368,15 +359,16 @@ int IOCoordinator::append(const char *filename, const uint8_t *data, size_t leng size_t count = 0; uint64_t writeLength = 0; uint64_t dataRemaining = length; - uint64_t journalOffset = 0; vector objects; - - //writeLock(filename); + vector newObjectKeys; + Synchronizer *synchronizer = Synchronizer::get(); // need to init sync here to break circular dependency... + + ScopedWriteLock lock(this, filename); MetadataFile metadata = MetadataFile(filename); uint64_t offset = metadata.getLength(); - + //read metadata determine if this fits in the last object objects = metadata.metadataRead(offset,length); @@ -384,65 +376,71 @@ int IOCoordinator::append(const char *filename, const uint8_t *data, size_t leng { std::vector::const_iterator i = objects.begin(); + if ((objectSize - i->length) > 0) // if this is zero then we can't put anything else in this object + { // figure out how much data to write to this object - if (offset > i->offset) - { - journalOffset = offset - i->offset; - writeLength = min((objectSize - journalOffset),dataRemaining); - } - err = replicator->addJournalEntry(i->key.c_str(),&data[count],journalOffset,writeLength); + writeLength = min((objectSize - i->length),dataRemaining); + + cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); + + err = replicator->addJournalEntry(i->key.c_str(),&data[count],i->length,writeLength); if (err <= 0) { - if ((count + journalOffset) > i->length) - metadata.updateEntryLength(i->offset, (count + journalOffset)); + metadata.updateEntryLength(i->offset, (count + i->length)); metadata.writeMetadata(filename); - logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length); + logger->log(LOG_ERR,"IOCoordinator::append(): journal failed to complete write, %u of %u bytes written.",count,length); return count; } - if ((writeLength + journalOffset) > i->length) - metadata.updateEntryLength(i->offset, (writeLength + journalOffset)); + metadata.updateEntryLength(i->offset, (writeLength + i->length)); - count += err; - dataRemaining -= err; - //cache->makeSpace(journal_data_size) - //Synchronizer::newJournalData(journal_file); + cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE); + + synchronizer->newJournalEntry(i->key); + + count += writeLength; + dataRemaining -= writeLength; + } } - else + else if (objects.size() > 1) { //Something went wrong this shouldn't overlap objects + logger->log(LOG_ERR,"IOCoordinator::append(): overlapping objects found on append.",count,length); + return -1; } - // there is no overlapping data, or data goes beyond end of last object - while (dataRemaining > 0 && err >= 0) + // append is starting or adding to a new object + while (dataRemaining > 0) { //add a new metaDataObject writeLength = min(objectSize,dataRemaining); - if (count == 0) - { - //this is append and it starting beyond last object in metadata - //figure out if the offset is in this object - } - //cache->makeSpace(size) + + cache->makeSpace(writeLength); // add a new metadata object, this will get a new objectKey NOTE: probably needs offset too metadataObject newObject = metadata.addMetadataObject(filename,writeLength); + // write the new object - err = replicator->newObject(newObject.key.c_str(),data,journalOffset,writeLength); + err = replicator->newObject(newObject.key.c_str(),data,0,writeLength); if (err <= 0) { // update metadataObject length to reflect what awas actually written - if ((count + journalOffset) > newObject.length) - metadata.updateEntryLength(newObject.offset, (count + journalOffset)); + metadata.updateEntryLength(newObject.offset, (count)); metadata.writeMetadata(filename); - logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length); + logger->log(LOG_ERR,"IOCoordinator::append(): newObject failed to complete write, %u of %u bytes written.",count,length); return count; //log error and abort } - // sync - //Synchronizer::newObject(newname) - count += err; - dataRemaining -= err; + cache->newObject(newObject.key,writeLength); + newObjectKeys.push_back(newObject.key); + + count += writeLength; + dataRemaining -= writeLength; } + synchronizer->newObjects(newObjectKeys); + metadata.writeMetadata(filename); + + lock.unlock(); + return count; } diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index 33731dde8..7454623a5 100755 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -287,6 +287,43 @@ void metadataJournalTest(std::size_t size, off_t offset) assert(resp->returnCode == (int) size); } +void metadataJournalTest_append(std::size_t size) +{ + // make an empty file to write to + const char *filename = "metadataJournalTest"; + uint8_t buf[(sizeof(write_cmd)+std::strlen(filename)+size)]; + uint64_t *data; + + sm_msg_header *hdr = (sm_msg_header *) buf; + append_cmd *cmd = (append_cmd *) &hdr[1]; + cmd->opcode = APPEND; + cmd->count = size; + cmd->flen = std::strlen(filename); + memcpy(&cmd->filename, filename, cmd->flen); + data = (uint64_t *) &cmd->filename[cmd->flen]; + int count = 0; + for (uint64_t i = 0; i < (size/sizeof(uint64_t)); i++) + { + data[i] = i; + count++; + } + hdr->type = SM_MSG_START; + hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count; + AppendTask a(clientSock, hdr->payloadLen); + int err = ::write(sessionSock, cmd, hdr->payloadLen); + + a.run(); + + // verify response + err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); + sm_response *resp = (sm_response *) buf; + assert(err == sizeof(*resp)); + assert(resp->header.type == SM_MSG_START); + assert(resp->header.payloadLen == 4); + assert(resp->header.flags == 0); + assert(resp->returnCode == (int) size); +} + void metadataJournalTestCleanup(std::size_t size) { Config* config = Config::get(); @@ -1424,8 +1461,15 @@ int main() //Case 4 write starts object1 ends object3 metadataJournalTest((10*sizeKB),(7*sizeKB)); //Case 5 write starts in new object at offset >0 - //TODO add zero padding to writes in this scenario - //metadataJournalTest((8*sizeKB),4*sizeKB); + + //append test + // first one appends file to end of 8K object + metadataJournalTest_append((7*sizeKB)); + // this apppends that starts on new object + metadataJournalTest_append((7*sizeKB)); + // this starts in one object and crosses into new object + metadataJournalTest_append((7*sizeKB)); + //writetask(); //appendtask();