You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-10 22:42:30 +03:00
Fixed a race in Cache usage during delete & truncate ops.
An object could be flushed between the existence check & the delete done by delete() & truncate(). Made it atomic within Cache.
This commit is contained in:
@@ -453,6 +453,34 @@ void Cache::rename(const string &oldKey, const string &newKey, ssize_t sizediff)
|
||||
currentCacheSize += sizediff;
|
||||
}
|
||||
|
||||
int Cache::ifExistsThenDelete(const string &key)
|
||||
{
|
||||
bf::path cachedPath = prefix / key;
|
||||
bf::path journalPath = journalPrefix / (key + ".journal");
|
||||
|
||||
boost::unique_lock<boost::recursive_mutex> s(lru_mutex);
|
||||
bool objectExists = false;
|
||||
bool journalExists = bf::exists(journalPath);
|
||||
|
||||
auto it = m_lru.find(key);
|
||||
if (it != m_lru.end())
|
||||
{
|
||||
objectExists = true;
|
||||
lru.erase(it->lit);
|
||||
m_lru.erase(it);
|
||||
}
|
||||
assert(objectExists == bf::exists(cachedPath));
|
||||
|
||||
size_t objectSize = (objectExists ? bf::file_size(cachedPath) : 0);
|
||||
//size_t objectSize = (objectExists ? MetadataFile::getLengthFromKey(key) : 0);
|
||||
size_t journalSize = (journalExists ? bf::file_size(journalPath) : 0);
|
||||
currentCacheSize -= (objectSize + journalSize);
|
||||
|
||||
//assert(!objectExists || objectSize == bf::file_size(cachedPath));
|
||||
|
||||
return (objectExists ? 1 : 0) | (journalExists ? 2 : 0);
|
||||
}
|
||||
|
||||
size_t Cache::getCurrentCacheSize() const
|
||||
{
|
||||
return currentCacheSize;
|
||||
|
||||
@@ -34,6 +34,10 @@ class Cache : public boost::noncopyable
|
||||
void deletedObject(const std::string &key, size_t size);
|
||||
void deletedJournal(size_t size);
|
||||
|
||||
// an 'atomic' existence check & delete. Covers the object and journal. Does not delete the files.
|
||||
// returns 0 if it didn't exist, 1 if the object exists, 2 if the journal exists, and 3 (1 | 2) if both exist
|
||||
int ifExistsThenDelete(const std::string &key);
|
||||
|
||||
// rename is used when an old obj gets merged with its journal file
|
||||
// the size will change in that process; sizediff is by how much
|
||||
void rename(const std::string &oldKey, const std::string &newKey, ssize_t sizediff);
|
||||
|
||||
@@ -342,7 +342,7 @@ int IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offse
|
||||
//log error and abort
|
||||
}
|
||||
|
||||
cache->newObject(newObject.key,(writeLength + objectOffset));
|
||||
cache->newObject(newObject.key,writeLength);
|
||||
newObjectKeys.push_back(newObject.key);
|
||||
|
||||
count += writeLength;
|
||||
@@ -560,25 +560,14 @@ int IOCoordinator::truncate(const char *path, size_t newSize)
|
||||
|
||||
uint i = (newSize == objects[0].offset ? 0 : 1);
|
||||
vector<string> deletedObjects;
|
||||
while (i < objects.size())
|
||||
for (; i < objects.size(); ++i)
|
||||
{
|
||||
bf::path journal = journalPath / (objects[i].key + ".journal");
|
||||
if (bf::exists(journal))
|
||||
{
|
||||
size_t jsize = bf::file_size(journal);
|
||||
replicator->remove(journal);
|
||||
cache->deletedJournal(jsize);
|
||||
}
|
||||
|
||||
if (cache->exists(objects[i].key))
|
||||
{
|
||||
bf::path cached = cachePath / objects[i].key;
|
||||
size_t fsize = bf::file_size(cached);
|
||||
replicator->remove(cached);
|
||||
cache->deletedObject(objects[i].key, fsize);
|
||||
}
|
||||
int result = cache->ifExistsThenDelete(objects[i].key);
|
||||
if (result & 0x1)
|
||||
replicator->remove(cachePath / objects[i].key);
|
||||
if (result & 0x2)
|
||||
replicator->remove(journalPath / (objects[i].key + ".journal"));
|
||||
deletedObjects.push_back(objects[i].key);
|
||||
++i;
|
||||
}
|
||||
if (!deletedObjects.empty())
|
||||
synchronizer->deletedObjects(deletedObjects);
|
||||
@@ -609,24 +598,13 @@ void IOCoordinator::deleteMetaFile(const bf::path &file)
|
||||
|
||||
vector<metadataObject> objects = meta.metadataRead(0, meta.getLength());
|
||||
vector<string> deletedObjects;
|
||||
bf::path journal, obj;
|
||||
size_t size;
|
||||
for (auto &object : objects)
|
||||
{
|
||||
obj = cachePath/object.key;
|
||||
if (bf::exists(obj))
|
||||
{
|
||||
size = bf::file_size(obj);
|
||||
replicator->remove(obj);
|
||||
cache->deletedObject(object.key, size);
|
||||
}
|
||||
journal = journalPath/(object.key + ".journal");
|
||||
if (bf::exists(journal))
|
||||
{
|
||||
size = bf::file_size(journal);
|
||||
replicator->remove(journal);
|
||||
cache->deletedJournal(size);
|
||||
}
|
||||
int result = cache->ifExistsThenDelete(object.key);
|
||||
if (result & 0x1)
|
||||
replicator->remove(cachePath/object.key);
|
||||
if (result & 0x2)
|
||||
replicator->remove(journalPath/(object.key + ".journal"));
|
||||
deletedObjects.push_back(object.key);
|
||||
}
|
||||
synchronizer->deletedObjects(deletedObjects);
|
||||
|
||||
Reference in New Issue
Block a user