You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-17 01:02:23 +03:00
Add sync forceFlush and modified unit_test to be in working state again.
This commit is contained in:
@@ -215,7 +215,9 @@ int IOCoordinator::read(const char *filename, uint8_t *data, off_t offset, size_
|
|||||||
|
|
||||||
// if this is the first object, the offset to start reading at is offset - object->offset
|
// if this is the first object, the offset to start reading at is offset - object->offset
|
||||||
off_t thisOffset = (count == 0 ? offset - object.offset : 0);
|
off_t thisOffset = (count == 0 ? offset - object.offset : 0);
|
||||||
assert(thisOffset >= 0 && thisOffset < (off_t) object.length);
|
// This checks and returns if the read is starting past EOF
|
||||||
|
if (thisOffset >= (off_t) object.length)
|
||||||
|
return count;
|
||||||
// if this is the last object, the length of the read is length - count,
|
// if this is the last object, the length of the read is length - count,
|
||||||
// otherwise it is the length of the object - starting offset
|
// otherwise it is the length of the object - starting offset
|
||||||
|
|
||||||
@@ -565,7 +567,7 @@ int IOCoordinator::truncate(const char *path, size_t newSize)
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
meta.updateEntryLength(objects[0].offset, newSize - objects[0].offset);
|
meta.updateEntryLength(objects[0].offset, newSize - objects[0].offset);
|
||||||
assert(objects[0].offset >= 0 && objects[0].length > (newSize - objects[0].offset));
|
assert(objects[0].offset >= 0 && objectSize > (newSize - objects[0].offset));
|
||||||
}
|
}
|
||||||
for (uint i = 1; i < objects.size(); i++)
|
for (uint i = 1; i < objects.size(); i++)
|
||||||
meta.removeEntry(objects[i].offset);
|
meta.removeEntry(objects[i].offset);
|
||||||
|
|||||||
16
src/Synchronizer.cpp
Normal file → Executable file
16
src/Synchronizer.cpp
Normal file → Executable file
@@ -205,7 +205,14 @@ void Synchronizer::periodicSync()
|
|||||||
while (!die)
|
while (!die)
|
||||||
{
|
{
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
boost::this_thread::sleep_for(syncInterval);
|
try
|
||||||
|
{
|
||||||
|
boost::this_thread::sleep_for(syncInterval);
|
||||||
|
}
|
||||||
|
catch (const boost::thread_interrupted)
|
||||||
|
{
|
||||||
|
//logger->log(LOG_DEBUG,"Synchronizer Force Flush.");
|
||||||
|
}
|
||||||
lock.lock();
|
lock.lock();
|
||||||
//cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " <<
|
//cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " <<
|
||||||
// threadPool.currentQueueSize() << endl;
|
// threadPool.currentQueueSize() << endl;
|
||||||
@@ -214,6 +221,13 @@ void Synchronizer::periodicSync()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Synchronizer::forceFlush()
|
||||||
|
{
|
||||||
|
boost::unique_lock<boost::mutex> lock(mutex);
|
||||||
|
syncThread.interrupt();
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
void Synchronizer::makeJob(const string &key)
|
void Synchronizer::makeJob(const string &key)
|
||||||
{
|
{
|
||||||
objNames.push_front(key);
|
objNames.push_front(key);
|
||||||
|
|||||||
1
src/Synchronizer.h
Normal file → Executable file
1
src/Synchronizer.h
Normal file → Executable file
@@ -33,6 +33,7 @@ class Synchronizer : public boost::noncopyable
|
|||||||
void newObjects(const std::vector<std::string> &keys);
|
void newObjects(const std::vector<std::string> &keys);
|
||||||
void deletedObjects(const std::vector<std::string> &keys);
|
void deletedObjects(const std::vector<std::string> &keys);
|
||||||
void flushObject(const std::string &key);
|
void flushObject(const std::string &key);
|
||||||
|
void forceFlush();
|
||||||
|
|
||||||
// for testing primarily
|
// for testing primarily
|
||||||
boost::filesystem::path getJournalPath();
|
boost::filesystem::path getJournalPath();
|
||||||
|
|||||||
@@ -552,6 +552,7 @@ bool IOCTruncate()
|
|||||||
|
|
||||||
IOCoordinator *ioc = IOCoordinator::get();
|
IOCoordinator *ioc = IOCoordinator::get();
|
||||||
CloudStorage *cs = CloudStorage::get();
|
CloudStorage *cs = CloudStorage::get();
|
||||||
|
Synchronizer *sync = Synchronizer::get();
|
||||||
LocalStorage *ls = dynamic_cast<LocalStorage *>(cs);
|
LocalStorage *ls = dynamic_cast<LocalStorage *>(cs);
|
||||||
if (!ls)
|
if (!ls)
|
||||||
{
|
{
|
||||||
@@ -619,6 +620,8 @@ bool IOCTruncate()
|
|||||||
assert(err == 4000);
|
assert(err == 4000);
|
||||||
err = ioc->read(testFile, buf, 4000, 1);
|
err = ioc->read(testFile, buf, 4000, 1);
|
||||||
assert(err == 0);
|
assert(err == 0);
|
||||||
|
err = ioc->read(testFile, buf, 4005, 1);
|
||||||
|
assert(err == 0);
|
||||||
assert(bf::exists(objectPath));
|
assert(bf::exists(objectPath));
|
||||||
|
|
||||||
// truncate to 0 bytes, make sure everything is consistent with that, and the object no longer exists
|
// truncate to 0 bytes, make sure everything is consistent with that, and the object no longer exists
|
||||||
@@ -630,6 +633,7 @@ bool IOCTruncate()
|
|||||||
assert(err == 0);
|
assert(err == 0);
|
||||||
err = ioc->read(testFile, buf, 4000, 1);
|
err = ioc->read(testFile, buf, 4000, 1);
|
||||||
assert(err == 0);
|
assert(err == 0);
|
||||||
|
sync->forceFlush();
|
||||||
sleep(1); // give Sync a chance to delete the object from the cloud
|
sleep(1); // give Sync a chance to delete the object from the cloud
|
||||||
assert(!bf::exists(objectPath));
|
assert(!bf::exists(objectPath));
|
||||||
|
|
||||||
@@ -671,6 +675,7 @@ bool IOCTruncate()
|
|||||||
assert(meta.getLength() == 6000);
|
assert(meta.getLength() == 6000);
|
||||||
err = ioc->read(testFile, buf, 0, 8192);
|
err = ioc->read(testFile, buf, 0, 8192);
|
||||||
assert(err == 6000);
|
assert(err == 6000);
|
||||||
|
sync->forceFlush();
|
||||||
sleep(1); // give Synchronizer a chance to delete the file from the 'cloud'
|
sleep(1); // give Synchronizer a chance to delete the file from the 'cloud'
|
||||||
assert(!bf::exists(secondObjectPath));
|
assert(!bf::exists(secondObjectPath));
|
||||||
assert(!bf::exists(cachedSecondObject));
|
assert(!bf::exists(cachedSecondObject));
|
||||||
@@ -687,6 +692,7 @@ bool truncatetask()
|
|||||||
{
|
{
|
||||||
IOCoordinator *ioc = IOCoordinator::get();
|
IOCoordinator *ioc = IOCoordinator::get();
|
||||||
Cache *cache = Cache::get();
|
Cache *cache = Cache::get();
|
||||||
|
|
||||||
bf::path metaPath = ioc->getMetadataPath();
|
bf::path metaPath = ioc->getMetadataPath();
|
||||||
|
|
||||||
const char *filename = "trunctest1";
|
const char *filename = "trunctest1";
|
||||||
@@ -1025,6 +1031,7 @@ bool syncTest1()
|
|||||||
vObj.push_back(key);
|
vObj.push_back(key);
|
||||||
|
|
||||||
sync->newObjects(vObj);
|
sync->newObjects(vObj);
|
||||||
|
sync->forceFlush();
|
||||||
sleep(1); // wait for the job to run
|
sleep(1); // wait for the job to run
|
||||||
|
|
||||||
// make sure that it made it to the cloud
|
// make sure that it made it to the cloud
|
||||||
@@ -1034,6 +1041,7 @@ bool syncTest1()
|
|||||||
assert(exists);
|
assert(exists);
|
||||||
|
|
||||||
sync->newJournalEntry(key);
|
sync->newJournalEntry(key);
|
||||||
|
sync->forceFlush();
|
||||||
sleep(1); // let it do what it does
|
sleep(1); // let it do what it does
|
||||||
|
|
||||||
// check that the original objects no longer exist
|
// check that the original objects no longer exist
|
||||||
@@ -1072,6 +1080,7 @@ bool syncTest1()
|
|||||||
makeTestJournal((journalPath / (newKey + ".journal")).string().c_str());
|
makeTestJournal((journalPath / (newKey + ".journal")).string().c_str());
|
||||||
cache->newJournalEntry(bf::file_size(journalPath / (newKey + ".journal")));
|
cache->newJournalEntry(bf::file_size(journalPath / (newKey + ".journal")));
|
||||||
sync->newJournalEntry(newKey);
|
sync->newJournalEntry(newKey);
|
||||||
|
sync->forceFlush();
|
||||||
sleep(1);
|
sleep(1);
|
||||||
|
|
||||||
// verify that newkey is no longer in cloud storage, and that another permutation is
|
// verify that newkey is no longer in cloud storage, and that another permutation is
|
||||||
@@ -1093,6 +1102,8 @@ bool syncTest1()
|
|||||||
for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator(); ++dir)
|
for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator(); ++dir)
|
||||||
keys.push_back(dir->path().filename().string());
|
keys.push_back(dir->path().filename().string());
|
||||||
sync->deletedObjects(keys);
|
sync->deletedObjects(keys);
|
||||||
|
sync->forceFlush();
|
||||||
|
sleep(1);
|
||||||
::unlink((metaPath/"test-file.meta").string().c_str());
|
::unlink((metaPath/"test-file.meta").string().c_str());
|
||||||
|
|
||||||
cout << "Sync test 1 OK" << endl;
|
cout << "Sync test 1 OK" << endl;
|
||||||
@@ -1312,6 +1323,7 @@ void IOCUnlink()
|
|||||||
keys.push_back(cachedObjPath.filename().string());
|
keys.push_back(cachedObjPath.filename().string());
|
||||||
sync->newObjects(keys);
|
sync->newObjects(keys);
|
||||||
//sync->newJournalEntry(keys[0]); don't want to end up renaming it
|
//sync->newJournalEntry(keys[0]); don't want to end up renaming it
|
||||||
|
sync->forceFlush();
|
||||||
sleep(1);
|
sleep(1);
|
||||||
|
|
||||||
// ok, they should be fully 'in the system' now.
|
// ok, they should be fully 'in the system' now.
|
||||||
@@ -1329,6 +1341,7 @@ void IOCUnlink()
|
|||||||
assert(!bf::exists(metaPath/basedir));
|
assert(!bf::exists(metaPath/basedir));
|
||||||
assert(!bf::exists(cachedObjPath));
|
assert(!bf::exists(cachedObjPath));
|
||||||
assert(!bf::exists(cachedJournalPath));
|
assert(!bf::exists(cachedJournalPath));
|
||||||
|
sync->forceFlush();
|
||||||
sleep(1); // stall for sync
|
sleep(1); // stall for sync
|
||||||
cs->exists(cachedObjPath.filename().string(), &exists);
|
cs->exists(cachedObjPath.filename().string(), &exists);
|
||||||
assert(!exists);
|
assert(!exists);
|
||||||
|
|||||||
Reference in New Issue
Block a user