You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
Added code to delete orphaned objects from the cache & from cloud storage.
This commit is contained in:
@ -355,7 +355,6 @@ void Cache::configListener()
|
|||||||
logger->log(LOG_CRIT, "Cache/cache_size is not a number. Using current value = %zi",maxCacheSize);
|
logger->log(LOG_CRIT, "Cache/cache_size is not a number. Using current value = %zi",maxCacheSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -350,6 +350,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
|
|||||||
{
|
{
|
||||||
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Truncate to previous EOF failed! (%s)",
|
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Truncate to previous EOF failed! (%s)",
|
||||||
strerror_r(errno, errbuf, 80));
|
strerror_r(errno, errbuf, 80));
|
||||||
|
errno = l_errno;
|
||||||
if (err < 0)
|
if (err < 0)
|
||||||
return err;
|
return err;
|
||||||
else
|
else
|
||||||
|
@ -474,11 +474,26 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
|
|||||||
char buf[80];
|
char buf[80];
|
||||||
bool exists = false;
|
bool exists = false;
|
||||||
int err;
|
int err;
|
||||||
|
bf::path objectPath = cachePath/key;
|
||||||
MetadataFile md(sourceFile, MetadataFile::no_create_t(),true);
|
MetadataFile md(sourceFile, MetadataFile::no_create_t(),true);
|
||||||
|
|
||||||
if (!md.exists())
|
if (!md.exists())
|
||||||
{
|
{
|
||||||
logger->log(LOG_DEBUG, "synchronize(): no metadata found for %s. It must have been deleted.", sourceFile.c_str());
|
logger->log(LOG_DEBUG, "synchronize(): no metadata found for %s. It must have been deleted.", sourceFile.c_str());
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!bf::exists(objectPath))
|
||||||
|
return;
|
||||||
|
size_t size = bf::file_size(objectPath);
|
||||||
|
replicator->remove(objectPath);
|
||||||
|
cache->deletedObject(prefix, cloudKey, size);
|
||||||
|
cs->deleteObject(cloudKey);
|
||||||
|
}
|
||||||
|
catch (exception &e)
|
||||||
|
{
|
||||||
|
logger->log(LOG_DEBUG, "synchronize(): failed to remove orphaned object '%s' from the cache, got %s",
|
||||||
|
objectPath.string().c_str(), e.what());
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -506,14 +521,14 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bf::file_size(cachePath/key) != MetadataFile::getLengthFromKey(cloudKey))
|
if (bf::file_size(objectPath) != MetadataFile::getLengthFromKey(cloudKey))
|
||||||
{
|
{
|
||||||
ostringstream oss;
|
ostringstream oss;
|
||||||
oss << "Synchronizer::synchronize(): found a size mismatch in key = " << cloudKey <<
|
oss << "Synchronizer::synchronize(): found a size mismatch in key = " << cloudKey <<
|
||||||
" real size = " << bf::file_size(cachePath/key);
|
" real size = " << bf::file_size(objectPath);
|
||||||
logger->log(LOG_ERR, oss.str().c_str());
|
logger->log(LOG_ERR, oss.str().c_str());
|
||||||
}
|
}
|
||||||
err = cs->putObject((cachePath / key).string(), cloudKey);
|
err = cs->putObject(objectPath.string(), cloudKey);
|
||||||
if (err)
|
if (err)
|
||||||
throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80));
|
throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80));
|
||||||
|
|
||||||
@ -521,7 +536,7 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
|
|||||||
bytesReadBySync += mdEntry.length;
|
bytesReadBySync += mdEntry.length;
|
||||||
numBytesUploaded += mdEntry.length;
|
numBytesUploaded += mdEntry.length;
|
||||||
++objectsSyncedWithNoJournal;
|
++objectsSyncedWithNoJournal;
|
||||||
replicator->remove((cachePath/key), Replicator::NO_LOCAL);
|
replicator->remove(objectPath, Replicator::NO_LOCAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Synchronizer::synchronizeDelete(const string &sourceFile, list<string>::iterator &it)
|
void Synchronizer::synchronizeDelete(const string &sourceFile, list<string>::iterator &it)
|
||||||
@ -546,6 +561,29 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
|||||||
if (!md.exists())
|
if (!md.exists())
|
||||||
{
|
{
|
||||||
logger->log(LOG_DEBUG, "synchronizeWithJournal(): no metadata found for %s. It must have been deleted.", sourceFile.c_str());
|
logger->log(LOG_DEBUG, "synchronizeWithJournal(): no metadata found for %s. It must have been deleted.", sourceFile.c_str());
|
||||||
|
try
|
||||||
|
{
|
||||||
|
bf::path objectPath = cachePath/key;
|
||||||
|
if (bf::exists(objectPath))
|
||||||
|
{
|
||||||
|
size_t objSize = bf::file_size(objectPath);
|
||||||
|
replicator->remove(objectPath);
|
||||||
|
cache->deletedObject(prefix, cloudKey, objSize);
|
||||||
|
cs->deleteObject(cloudKey);
|
||||||
|
}
|
||||||
|
bf::path jPath = journalPath/(key + ".journal");
|
||||||
|
if (bf::exists(jPath))
|
||||||
|
{
|
||||||
|
size_t jSize = bf::file_size(jPath);
|
||||||
|
replicator->remove(jPath);
|
||||||
|
cache->deletedJournal(prefix, jSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch(exception &e)
|
||||||
|
{
|
||||||
|
logger->log(LOG_DEBUG, "synchronizeWithJournal(): failed to remove orphaned object '%s' from the cache, got %s",
|
||||||
|
(cachePath/key).string().c_str(), e.what());
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -559,7 +597,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
|||||||
//assert(key == mdEntry->key); <--- I suspect this can happen in a truncate + write situation + a deep sync queue
|
//assert(key == mdEntry->key); <--- I suspect this can happen in a truncate + write situation + a deep sync queue
|
||||||
|
|
||||||
bf::path oldCachePath = cachePath / key;
|
bf::path oldCachePath = cachePath / key;
|
||||||
string journalName = (journalPath/ (key + ".journal")).string();
|
string journalName = (journalPath/(key + ".journal")).string();
|
||||||
|
|
||||||
if (!bf::exists(journalName))
|
if (!bf::exists(journalName))
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user