You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-15 12:09:09 +03:00
Added more complete IOC::write logic fixed metadata bug started IOC::append.
This commit is contained in:
@@ -249,8 +249,10 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset
|
||||
uint64_t dataRemaining = length;
|
||||
uint64_t journalOffset = 0;
|
||||
vector<metadataObject> objects;
|
||||
vector<string> newObjectKeys;
|
||||
Synchronizer *synchronizer = Synchronizer::get(); // need to init sync here to break circular dependency...
|
||||
|
||||
//writeLock(filename);
|
||||
ScopedWriteLock lock(this, filename);
|
||||
|
||||
MetadataFile metadata = MetadataFile(filename);
|
||||
|
||||
@@ -263,7 +265,7 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset
|
||||
for (std::vector<metadataObject>::const_iterator i = objects.begin(); i != objects.end(); ++i)
|
||||
{
|
||||
// figure out how much data to write to this object
|
||||
if (count == 0 && offset > i->offset)
|
||||
if (count == 0 && offset >= i->offset)
|
||||
{
|
||||
// first object in the list so start at offset and
|
||||
// write to end of oject or all the data
|
||||
@@ -277,10 +279,122 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset
|
||||
writeLength = min(objectSize,dataRemaining);
|
||||
journalOffset = 0;
|
||||
}
|
||||
cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE);
|
||||
|
||||
err = replicator->addJournalEntry(i->key.c_str(),&data[count],journalOffset,writeLength);
|
||||
|
||||
if (err <= 0)
|
||||
{
|
||||
if ((count + journalOffset) > i->length)
|
||||
metadata.updateEntryLength(i->offset, (count + journalOffset));
|
||||
metadata.writeMetadata(filename);
|
||||
logger->log(LOG_ERR,"IOCoordinator::write(): object failed to complete write, %u of %u bytes written.",count,length);
|
||||
return count;
|
||||
}
|
||||
if ((writeLength + journalOffset) > i->length)
|
||||
metadata.updateEntryLength(i->offset, (writeLength + journalOffset));
|
||||
|
||||
cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE);
|
||||
|
||||
synchronizer->newJournalEntry(i->key);
|
||||
count += err;
|
||||
dataRemaining -= err;
|
||||
}
|
||||
}
|
||||
// there is no overlapping data, or data goes beyond end of last object
|
||||
while (dataRemaining > 0 && err >= 0)
|
||||
{
|
||||
//add a new metaDataObject
|
||||
if (count == 0)
|
||||
{
|
||||
//this is starting beyond last object in metadata
|
||||
//figure out if the offset is in this object
|
||||
if (offset < objectSize)
|
||||
{
|
||||
journalOffset = offset;
|
||||
writeLength = min((objectSize - journalOffset),dataRemaining);
|
||||
}
|
||||
else
|
||||
{
|
||||
//we need to create an object that is only padding
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// count != 0 we've already started writing and are going to new object
|
||||
// start at beginning of the new object
|
||||
writeLength = min(objectSize,dataRemaining);
|
||||
journalOffset = 0;
|
||||
}
|
||||
cache->makeSpace(journalOffset+writeLength);
|
||||
// add a new metadata object, this will get a new objectKey
|
||||
metadataObject newObject = metadata.addMetadataObject(filename,writeLength);
|
||||
// write the new object
|
||||
err = replicator->newObject(newObject.key.c_str(),data,journalOffset,writeLength);
|
||||
if (err <= 0)
|
||||
{
|
||||
// update metadataObject length to reflect what awas actually written
|
||||
if ((count + journalOffset) > newObject.length)
|
||||
metadata.updateEntryLength(newObject.offset, (count + journalOffset));
|
||||
metadata.writeMetadata(filename);
|
||||
logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length);
|
||||
return count;
|
||||
//log error and abort
|
||||
}
|
||||
if ((writeLength + journalOffset) > newObject.length)
|
||||
metadata.updateEntryLength(newObject.offset, (writeLength + journalOffset));
|
||||
|
||||
cache->newObject(newObject.key,(writeLength + journalOffset));
|
||||
newObjectKeys.push_back(newObject.key);
|
||||
|
||||
count += err;
|
||||
dataRemaining -= err;
|
||||
}
|
||||
|
||||
synchronizer->newObjects(newObjectKeys);
|
||||
|
||||
metadata.writeMetadata(filename);
|
||||
|
||||
lock.unlock();
|
||||
|
||||
return count;
|
||||
}
|
||||
//
|
||||
// Still fixing stuff here
|
||||
//
|
||||
int IOCoordinator::append(const char *filename, const uint8_t *data, size_t length)
|
||||
{
|
||||
int fd, err;
|
||||
size_t count = 0;
|
||||
uint64_t writeLength = 0;
|
||||
uint64_t dataRemaining = length;
|
||||
uint64_t journalOffset = 0;
|
||||
vector<metadataObject> objects;
|
||||
|
||||
//writeLock(filename);
|
||||
|
||||
MetadataFile metadata = MetadataFile(filename);
|
||||
|
||||
int offset = metadata.getLength();
|
||||
|
||||
//read metadata determine if this fits in the last object
|
||||
objects = metadata.metadataRead(offset,length);
|
||||
|
||||
if(!objects.empty() && objects.size() == 1)
|
||||
{
|
||||
std::vector<metadataObject>::const_iterator i = objects.begin();
|
||||
|
||||
// figure out how much data to write to this object
|
||||
if (offset > i->offset)
|
||||
{
|
||||
journalOffset = offset - i->offset;
|
||||
writeLength = min((objectSize - journalOffset),dataRemaining);
|
||||
}
|
||||
err = replicator->addJournalEntry(i->key.c_str(),&data[count],journalOffset,writeLength);
|
||||
if (err <= 0)
|
||||
{
|
||||
metadata.updateEntryLength(i->offset, count);
|
||||
if ((count + journalOffset) > i->length)
|
||||
metadata.updateEntryLength(i->offset, (count + journalOffset));
|
||||
metadata.writeMetadata(filename);
|
||||
logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length);
|
||||
return count;
|
||||
@@ -290,25 +404,33 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset
|
||||
|
||||
count += err;
|
||||
dataRemaining -= err;
|
||||
}
|
||||
//cache.makeSpace(journal_data_size)
|
||||
//cache->makeSpace(journal_data_size)
|
||||
//Synchronizer::newJournalData(journal_file);
|
||||
}
|
||||
|
||||
else
|
||||
{
|
||||
//Something went wrong this shouldn't overlap objects
|
||||
}
|
||||
// there is no overlapping data, or data goes beyond end of last object
|
||||
while (dataRemaining > 0 && err >= 0)
|
||||
{
|
||||
//add a new metaDataObject
|
||||
writeLength = min(objectSize,dataRemaining);
|
||||
//cache.makeSpace(size)
|
||||
if (count == 0)
|
||||
{
|
||||
//this is append and it starting beyond last object in metadata
|
||||
//figure out if the offset is in this object
|
||||
}
|
||||
//cache->makeSpace(size)
|
||||
// add a new metadata object, this will get a new objectKey NOTE: probably needs offset too
|
||||
metadataObject newObject = metadata.addMetadataObject(filename,writeLength);
|
||||
// write the new object
|
||||
err = replicator->newObject(newObject.key.c_str(),data,writeLength);
|
||||
err = replicator->newObject(newObject.key.c_str(),data,journalOffset,writeLength);
|
||||
if (err <= 0)
|
||||
{
|
||||
// update metadataObject length to reflect what awas actually written
|
||||
metadata.updateEntryLength(newObject.offset, count);
|
||||
if ((count + journalOffset) > newObject.length)
|
||||
metadata.updateEntryLength(newObject.offset, (count + journalOffset));
|
||||
metadata.writeMetadata(filename);
|
||||
logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length);
|
||||
return count;
|
||||
@@ -321,28 +443,6 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset
|
||||
}
|
||||
|
||||
metadata.writeMetadata(filename);
|
||||
|
||||
//writeUnlock(filename);
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
int IOCoordinator::append(const char *filename, const uint8_t *data, size_t length)
|
||||
{
|
||||
int fd, err;
|
||||
|
||||
OPEN(filename, O_WRONLY | O_APPEND);
|
||||
size_t count = 0;
|
||||
while (count < length) {
|
||||
err = ::write(fd, &data[count], length - count);
|
||||
if (err <= 0)
|
||||
if (count > 0) // return what was successfully written
|
||||
return count;
|
||||
else
|
||||
return err;
|
||||
count += err;
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
@@ -229,12 +229,14 @@ vector<metadataObject> MetadataFile::metadataRead(off_t offset, size_t length) c
|
||||
break;
|
||||
++i;
|
||||
}
|
||||
|
||||
// append objects until foundLen >= length or EOF
|
||||
// first time thrus foundLen should be adjusted based on offset
|
||||
off_t foundOffset = offset - i->offset;
|
||||
while (i != mObjects.end() && foundLen < length)
|
||||
{
|
||||
ret.push_back(*i);
|
||||
foundLen += i->length;
|
||||
foundLen += (i->length - foundOffset);
|
||||
foundOffset = 0; //zero on every other time thru this loop
|
||||
++i;
|
||||
}
|
||||
return ret;
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
#include "Replicator.h"
|
||||
#include "IOCoordinator.h"
|
||||
#include "SMLogging.h"
|
||||
#include "Cache.h"
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
@@ -42,7 +43,6 @@ Replicator::Replicator()
|
||||
mpLogger->log(LOG_CRIT, "Could not load metadata_path from storagemanger.cnf file.");
|
||||
throw runtime_error("Please set ObjectStorage/metadata_path in the storagemanager.cnf file");
|
||||
}
|
||||
boost::filesystem::create_directories(msJournalPath);
|
||||
try
|
||||
{
|
||||
boost::filesystem::create_directories(msJournalPath);
|
||||
@@ -52,6 +52,21 @@ Replicator::Replicator()
|
||||
syslog(LOG_CRIT, "Failed to create %s, got: %s", msJournalPath.c_str(), e.what());
|
||||
throw e;
|
||||
}
|
||||
msCachePath = mpConfig->getValue("Cache", "path");
|
||||
if (msCachePath.empty())
|
||||
{
|
||||
mpLogger->log(LOG_CRIT, "Cache/path is not set");
|
||||
throw runtime_error("Please set Cache/path in the storagemanager.cnf file");
|
||||
}
|
||||
try
|
||||
{
|
||||
boost::filesystem::create_directories(msCachePath);
|
||||
}
|
||||
catch (exception &e)
|
||||
{
|
||||
mpLogger->log(LOG_CRIT, "Failed to create %s, got: %s", msCachePath.c_str(), e.what());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
Replicator::~Replicator()
|
||||
@@ -85,14 +100,15 @@ struct scoped_closer {
|
||||
return fd; \
|
||||
scoped_closer sc(fd);
|
||||
|
||||
int Replicator::newObject(const char *filename, const uint8_t *data, size_t length )
|
||||
int Replicator::newObject(const char *filename, const uint8_t *data, off_t offset, size_t length )
|
||||
{
|
||||
int fd, err;
|
||||
string objectFilename = msCachePath + "/" + string(filename);
|
||||
|
||||
OPEN(filename, O_WRONLY | O_CREAT);
|
||||
OPEN(objectFilename.c_str(), O_WRONLY | O_CREAT);
|
||||
size_t count = 0;
|
||||
while (count < length) {
|
||||
err = ::write(fd, &data[count], length - count);
|
||||
err = ::pwrite(fd, &data[count], length - count, offset);
|
||||
if (err <= 0)
|
||||
if (count > 0) // return what was successfully written
|
||||
return count;
|
||||
@@ -112,11 +128,14 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t
|
||||
int version = 1;
|
||||
string journalFilename = msJournalPath + "/" + string(filename) + ".journal";
|
||||
uint64_t thisEntryMaxOffset = (offset + length - 1);
|
||||
Cache *cache = Cache::get(); // need to init sync here to break circular dependency...
|
||||
|
||||
if (!boost::filesystem::exists(journalFilename))
|
||||
{
|
||||
// create new journal file with header
|
||||
OPEN(journalFilename.c_str(), O_WRONLY | O_CREAT);
|
||||
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
|
||||
cache->makeSpace(header.size());
|
||||
err = ::write(fd, header.c_str(), header.length() + 1);
|
||||
if (err <= 0)
|
||||
return err;
|
||||
@@ -143,7 +162,7 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t
|
||||
|
||||
OPEN(journalFilename.c_str(), O_WRONLY | O_APPEND);
|
||||
|
||||
err = ::write(fd, offlen, 16);
|
||||
err = ::write(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE);
|
||||
if (err <= 0)
|
||||
return err;
|
||||
|
||||
|
||||
@@ -7,6 +7,8 @@
|
||||
#include <sys/types.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#define JOURNAL_ENTRY_HEADER_SIZE 16
|
||||
|
||||
namespace storagemanager
|
||||
{
|
||||
|
||||
@@ -28,7 +30,7 @@ class Replicator
|
||||
};
|
||||
|
||||
int addJournalEntry(const char *filename, const uint8_t *data, off_t offset, size_t length);
|
||||
int newObject(const char *filename, const uint8_t *data, size_t length);
|
||||
int newObject(const char *filename, const uint8_t *data, off_t offset, size_t length);
|
||||
int remove(const char *filename, Flags flags = NONE);
|
||||
int remove(const boost::filesystem::path &file, Flags flags = NONE);
|
||||
|
||||
@@ -39,6 +41,7 @@ class Replicator
|
||||
Config *mpConfig;
|
||||
SMLogging *mpLogger;
|
||||
std::string msJournalPath;
|
||||
std::string msCachePath;
|
||||
//ThreadPool threadPool;
|
||||
};
|
||||
|
||||
|
||||
@@ -205,11 +205,14 @@ bool replicatorTest()
|
||||
Config* config = Config::get();
|
||||
string metaPath = config->getValue("ObjectStorage", "metadata_path");
|
||||
string journalPath = config->getValue("ObjectStorage", "journal_path");
|
||||
string cacehPath = config->getValue("Cache", "path");
|
||||
|
||||
Replicator *repli = Replicator::get();
|
||||
int err,fd;
|
||||
const char *newobject = "newobjectTest";
|
||||
const char *newobjectJournal = "newobjectTest.journal";
|
||||
string newObjectJournalFullPath = journalPath + "/" + "newobjectTest.journal";
|
||||
string newObjectCacheFullPath = cacehPath + "/" + "newobjectTest";
|
||||
uint8_t buf[1024];
|
||||
uint8_t data[1024];
|
||||
int version = 1;
|
||||
@@ -218,10 +221,10 @@ bool replicatorTest()
|
||||
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % max_offset).str();
|
||||
|
||||
// test newObject
|
||||
repli->newObject(newobject,data,10);
|
||||
repli->newObject(newobject,data,0,10);
|
||||
|
||||
//check file contents
|
||||
fd = ::open(newobject, O_RDONLY);
|
||||
fd = ::open(newObjectCacheFullPath.c_str(), O_RDONLY);
|
||||
err = ::read(fd, buf, 1024);
|
||||
assert(err == 10);
|
||||
buf[10] = 0;
|
||||
@@ -240,9 +243,9 @@ bool replicatorTest()
|
||||
cout << "replicator addJournalEntry OK" << endl;
|
||||
::close(fd);
|
||||
|
||||
repli->remove(newobject);
|
||||
repli->remove(newObjectCacheFullPath.c_str());
|
||||
repli->remove(newObjectJournalFullPath.c_str());
|
||||
assert(!boost::filesystem::exists(newobject));
|
||||
assert(!boost::filesystem::exists(newObjectCacheFullPath.c_str()));
|
||||
cout << "replicator remove OK" << endl;
|
||||
return true;
|
||||
}
|
||||
@@ -283,10 +286,6 @@ bool metadataJournalTest(std::size_t size, off_t offset)
|
||||
assert(resp->header.payloadLen == 4);
|
||||
assert(resp->header.flags == 0);
|
||||
assert(resp->returnCode == size);
|
||||
|
||||
MetadataFile mdfTest(filename);
|
||||
mdfTest.printObjects();
|
||||
|
||||
}
|
||||
|
||||
void metadataJournalTestCleanup(std::size_t size)
|
||||
@@ -294,14 +293,18 @@ void metadataJournalTestCleanup(std::size_t size)
|
||||
Config* config = Config::get();
|
||||
string metaPath = config->getValue("ObjectStorage", "metadata_path");
|
||||
string journalPath = config->getValue("ObjectStorage", "journal_path");
|
||||
string cachePath = config->getValue("Cache", "path");
|
||||
|
||||
const char *filename = "metadataJournalTest";
|
||||
MetadataFile mdfTest(filename);
|
||||
mdfTest.printObjects();
|
||||
std::vector<metadataObject> objects = mdfTest.metadataRead(0,size);
|
||||
for (std::vector<metadataObject>::const_iterator i = objects.begin(); i != objects.end(); ++i)
|
||||
{
|
||||
string casheObject = cachePath + "/" + i->key;
|
||||
string keyJournal = journalPath + "/" + i->key + ".journal";
|
||||
if(boost::filesystem::exists(i->key.c_str()))
|
||||
::unlink(i->key.c_str());
|
||||
::unlink(casheObject.c_str());
|
||||
if(boost::filesystem::exists(keyJournal.c_str()))
|
||||
::unlink(keyJournal.c_str());
|
||||
}
|
||||
@@ -1415,7 +1418,7 @@ int main()
|
||||
scoped_closer sc1(serverSock), sc2(sessionSock), sc3(clientSock);
|
||||
|
||||
opentask();
|
||||
metadataUpdateTest();
|
||||
//metadataUpdateTest();
|
||||
// requires 8K object size to test boundries
|
||||
//Case 1 new write that spans full object
|
||||
metadataJournalTest((10*sizeKB),0);
|
||||
@@ -1428,13 +1431,12 @@ int main()
|
||||
//Case 5 write starts in new object at offset >0
|
||||
//TODO add zero padding to writes in this scenario
|
||||
//metadataJournalTest((8*sizeKB),4*sizeKB);
|
||||
metadataJournalTestCleanup(17*sizeKB);
|
||||
|
||||
//writetask();
|
||||
appendtask();
|
||||
//appendtask();
|
||||
unlinktask();
|
||||
stattask();
|
||||
//truncatetask(); // currently waiting on IOC::write() to be completed.
|
||||
truncatetask(); // currently waiting on IOC::write() to be completed.
|
||||
listdirtask();
|
||||
pingtask();
|
||||
copytask();
|
||||
@@ -1451,5 +1453,8 @@ int main()
|
||||
IOCUnlink();
|
||||
IOCCopyFile();
|
||||
|
||||
sleep(5); // sometimes this deletes them before syncwithjournal is called
|
||||
metadataJournalTestCleanup(17*sizeKB);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user