You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-13 23:02:14 +03:00
Checkpointing a bunch of edge-case fixes I made and mistakenly mushed
together with the cache size consistency code.
This commit is contained in:
@@ -132,7 +132,7 @@ void Cache::populate()
|
||||
lru.push_back(p.filename().string());
|
||||
auto last = lru.end();
|
||||
m_lru.insert(--last);
|
||||
currentCacheSize += bf::file_size(*dir);
|
||||
currentCacheSize += bf::file_size(p);
|
||||
}
|
||||
else
|
||||
logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str());
|
||||
@@ -147,7 +147,7 @@ void Cache::populate()
|
||||
if (bf::is_regular_file(p))
|
||||
{
|
||||
if (p.extension() == ".journal")
|
||||
currentCacheSize += bf::file_size(*dir);
|
||||
currentCacheSize += bf::file_size(p);
|
||||
else
|
||||
logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str());
|
||||
}
|
||||
@@ -157,6 +157,31 @@ void Cache::populate()
|
||||
}
|
||||
}
|
||||
|
||||
// be careful using this! SM should be idle. No ongoing reads or writes.
|
||||
void Cache::validateCacheSize()
|
||||
{
|
||||
boost::unique_lock<boost::mutex> s(lru_mutex);
|
||||
|
||||
if (!doNotEvict.empty() || !toBeDeleted.empty())
|
||||
{
|
||||
cout << "Not safe to use validateCacheSize() at the moment." << endl;
|
||||
return;
|
||||
}
|
||||
|
||||
size_t oldSize = currentCacheSize;
|
||||
currentCacheSize = 0;
|
||||
m_lru.clear();
|
||||
lru.clear();
|
||||
populate();
|
||||
|
||||
if (oldSize != currentCacheSize)
|
||||
logger->log(LOG_DEBUG, "Cache::validateCacheSize(): found a discrepancy. Actual size is %lld, had %lld.",
|
||||
currentCacheSize, oldSize);
|
||||
else
|
||||
logger->log(LOG_DEBUG, "Cache::validateCacheSize(): Cache size accounting agrees with reality for now.");
|
||||
}
|
||||
|
||||
|
||||
#if 0
|
||||
/* Need to simplify this, we keep running into sneaky problems, and I just spotted a couple more.
|
||||
Just going to rewrite it. We can revisit later if the simplified version needs improvement */
|
||||
@@ -250,7 +275,7 @@ void Cache::read(const vector<string> &keys)
|
||||
if (mit != m_lru.end())
|
||||
{
|
||||
addToDNE(mit->lit);
|
||||
lru.splice(lru.end(), lru, mit->lit);
|
||||
lru.splice(lru.end(), lru, mit->lit); // move them to the back so they are last to pick for eviction
|
||||
// if it's about to be deleted, stop that
|
||||
TBD_t::iterator tbd_it = toBeDeleted.find(mit->lit);
|
||||
if (tbd_it != toBeDeleted.end())
|
||||
@@ -260,14 +285,15 @@ void Cache::read(const vector<string> &keys)
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
addToDNE(key);
|
||||
keysToFetch.push_back(&key);
|
||||
}
|
||||
}
|
||||
if (keysToFetch.empty())
|
||||
return;
|
||||
|
||||
assert(s.owns_lock());
|
||||
downloader.download(keysToFetch, &dlErrnos, &dlSizes);
|
||||
assert(s.owns_lock());
|
||||
|
||||
size_t sum_sizes = 0;
|
||||
for (uint i = 0; i < keysToFetch.size(); ++i)
|
||||
@@ -282,7 +308,6 @@ void Cache::read(const vector<string> &keys)
|
||||
lru.push_back(*keysToFetch[i]);
|
||||
LRU_t::iterator lit = lru.end();
|
||||
m_lru.insert(--lit); // I dislike this way of grabbing the last iterator in a list.
|
||||
addToDNE(lit);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -290,7 +315,7 @@ void Cache::read(const vector<string> &keys)
|
||||
for (const string &key : keys)
|
||||
{
|
||||
mit = m_lru.find(key);
|
||||
if (mit != m_lru.end()) // it could have been deleted by deletedObject() or ifExistsThenDelete()
|
||||
if (mit != m_lru.end()) // all of the files exist, just not all of them are 'owned by' this thread.
|
||||
lru.splice(lru.end(), lru, mit->lit);
|
||||
}
|
||||
|
||||
@@ -320,23 +345,25 @@ Cache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1)
|
||||
{
|
||||
}
|
||||
|
||||
void Cache::addToDNE(const LRU_t::iterator &key)
|
||||
Cache::DNEElement::DNEElement(const string &k) : sKey(k), refCount(1)
|
||||
{
|
||||
DNEElement e(key);
|
||||
DNE_t::iterator it = doNotEvict.find(e);
|
||||
}
|
||||
|
||||
void Cache::addToDNE(const DNEElement &key)
|
||||
{
|
||||
DNE_t::iterator it = doNotEvict.find(key);
|
||||
if (it != doNotEvict.end())
|
||||
{
|
||||
DNEElement &dnee = const_cast<DNEElement &>(*it);
|
||||
++(dnee.refCount);
|
||||
}
|
||||
else
|
||||
doNotEvict.insert(e);
|
||||
doNotEvict.insert(key);
|
||||
}
|
||||
|
||||
void Cache::removeFromDNE(const LRU_t::iterator &key)
|
||||
void Cache::removeFromDNE(const DNEElement &key)
|
||||
{
|
||||
DNEElement e(key);
|
||||
DNE_t::iterator it = doNotEvict.find(e);
|
||||
DNE_t::iterator it = doNotEvict.find(key);
|
||||
if (it == doNotEvict.end())
|
||||
return;
|
||||
DNEElement &dnee = const_cast<DNEElement &>(*it);
|
||||
@@ -400,7 +427,7 @@ void Cache::deletedObject(const string &key, size_t size)
|
||||
M_LRU_t::iterator mit = m_lru.find(key);
|
||||
assert(mit != m_lru.end()); // TODO: 5/16/19 - got this assertion using S3 by running test000, then test000 again.
|
||||
|
||||
// if it's being flushed, let it do the deleting
|
||||
// if it's being flushed, let makeSpace() do the deleting
|
||||
if (toBeDeleted.find(mit->lit) == toBeDeleted.end())
|
||||
{
|
||||
doNotEvict.erase(mit->lit);
|
||||
@@ -456,9 +483,9 @@ void Cache::_makeSpace(size_t size)
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
bf::path cachedFile = prefix / *it;
|
||||
assert(bf::exists(cachedFile));
|
||||
if (!bf::exists(prefix / *it))
|
||||
cout << prefix / *it << " doesn't exist, WTF?" << endl;
|
||||
assert(bf::exists(prefix / *it));
|
||||
/*
|
||||
tell Synchronizer that this key will be evicted
|
||||
delete the file
|
||||
@@ -468,11 +495,12 @@ void Cache::_makeSpace(size_t size)
|
||||
|
||||
//logger->log(LOG_WARNING, "Cache: flushing!");
|
||||
toBeDeleted.insert(it);
|
||||
string key = *it; // need to make a copy; it could be in the process of being renamed
|
||||
|
||||
lru_mutex.unlock();
|
||||
try
|
||||
{
|
||||
Synchronizer::get()->flushObject(*it);
|
||||
Synchronizer::get()->flushObject(key);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@@ -489,9 +517,9 @@ void Cache::_makeSpace(size_t size)
|
||||
// if it's still in toBeDeleted then it is safe to delete.
|
||||
// if read() happened to access it while it was flushing, it will not
|
||||
// be in that set.
|
||||
cachedFile = prefix / *it;
|
||||
toBeDeleted.erase(tbd_it);
|
||||
bf::path cachedFile = prefix / *it;
|
||||
m_lru.erase(*it);
|
||||
toBeDeleted.erase(tbd_it);
|
||||
lru.erase(it);
|
||||
size_t newSize = bf::file_size(cachedFile);
|
||||
replicator->remove(cachedFile, Replicator::LOCAL_ONLY);
|
||||
@@ -557,10 +585,15 @@ int Cache::ifExistsThenDelete(const string &key)
|
||||
boost::unique_lock<boost::mutex> s(lru_mutex);
|
||||
bool objectExists = false;
|
||||
|
||||
|
||||
auto it = m_lru.find(key);
|
||||
if (it != m_lru.end())
|
||||
{
|
||||
if (doNotEvict.find(it->lit) != doNotEvict.end())
|
||||
{
|
||||
cout << "almost deleted a file being read, are we really doing that somewhere?" << endl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (toBeDeleted.find(it->lit) == toBeDeleted.end())
|
||||
{
|
||||
doNotEvict.erase(it->lit);
|
||||
@@ -572,7 +605,7 @@ int Cache::ifExistsThenDelete(const string &key)
|
||||
return 0;
|
||||
}
|
||||
bool journalExists = bf::exists(journalPath);
|
||||
assert(objectExists == bf::exists(cachedPath));
|
||||
//assert(objectExists == bf::exists(cachedPath));
|
||||
|
||||
size_t objectSize = (objectExists ? bf::file_size(cachedPath) : 0);
|
||||
//size_t objectSize = (objectExists ? MetadataFile::getLengthFromKey(key) : 0);
|
||||
@@ -637,12 +670,16 @@ inline bool Cache::KeyEquals::operator()(const M_LRU_element_t &l1, const M_LRU_
|
||||
|
||||
inline size_t Cache::DNEHasher::operator()(const DNEElement &l) const
|
||||
{
|
||||
return hash<string>()(*(l.key));
|
||||
return (l.sKey.empty() ? hash<string>()(*(l.key)) : hash<string>()(l.sKey));
|
||||
}
|
||||
|
||||
inline bool Cache::DNEEquals::operator()(const DNEElement &l1, const DNEElement &l2) const
|
||||
{
|
||||
return (*(l1.key) == *(l2.key));
|
||||
const string *s1, *s2;
|
||||
s1 = l1.sKey.empty() ? &(*(l1.key)) : &(l1.sKey);
|
||||
s2 = l2.sKey.empty() ? &(*(l2.key)) : &(l2.sKey);
|
||||
|
||||
return (*s1 == *s2);
|
||||
}
|
||||
|
||||
inline bool Cache::TBDLess::operator()(const LRU_t::iterator &i1, const LRU_t::iterator &i2) const
|
||||
@@ -651,7 +688,3 @@ inline bool Cache::TBDLess::operator()(const LRU_t::iterator &i1, const LRU_t::i
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -59,6 +59,8 @@ class Cache : public boost::noncopyable
|
||||
const boost::filesystem::path &getJournalPath();
|
||||
// this will delete everything in the cache and journal paths, and empty all Cache structures.
|
||||
void reset();
|
||||
|
||||
void validateCacheSize();
|
||||
private:
|
||||
Cache();
|
||||
|
||||
@@ -104,7 +106,9 @@ class Cache : public boost::noncopyable
|
||||
struct DNEElement
|
||||
{
|
||||
DNEElement(const LRU_t::iterator &);
|
||||
DNEElement(const std::string &);
|
||||
LRU_t::iterator key;
|
||||
std::string sKey;
|
||||
uint refCount;
|
||||
};
|
||||
|
||||
@@ -120,8 +124,8 @@ class Cache : public boost::noncopyable
|
||||
|
||||
typedef std::unordered_set<DNEElement, DNEHasher, DNEEquals> DNE_t;
|
||||
DNE_t doNotEvict;
|
||||
void addToDNE(const LRU_t::iterator &key);
|
||||
void removeFromDNE(const LRU_t::iterator &key);
|
||||
void addToDNE(const DNEElement &);
|
||||
void removeFromDNE(const DNEElement &);
|
||||
|
||||
// the to-be-deleted set. Elements removed from the LRU but not yet deleted will be here.
|
||||
// Elements are inserted and removed by makeSpace(). If read() references a file that is in this,
|
||||
|
||||
@@ -150,7 +150,8 @@ int IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, size
|
||||
vector<metadataObject> relevants = meta.metadataRead(offset, length);
|
||||
map<string, int> journalFDs, objectFDs;
|
||||
map<string, string> keyToJournalName, keyToObjectName;
|
||||
vector<SharedCloser> fdMinders;
|
||||
ScopedCloser fdMinders[relevants.size() * 2];
|
||||
int mindersIndex = 0;
|
||||
char buf[80];
|
||||
|
||||
// load them into the cache
|
||||
@@ -174,7 +175,8 @@ int IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, size
|
||||
{
|
||||
keyToJournalName[key] = filename;
|
||||
journalFDs[key] = fd;
|
||||
fdMinders.push_back(SharedCloser(fd));
|
||||
fdMinders[mindersIndex++].fd = fd;
|
||||
//fdMinders.push_back(SharedCloser(fd));
|
||||
}
|
||||
else if (errno != ENOENT)
|
||||
{
|
||||
@@ -202,7 +204,8 @@ int IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset, size
|
||||
}
|
||||
keyToObjectName[key] = filename;
|
||||
objectFDs[key] = fd;
|
||||
fdMinders.push_back(SharedCloser(fd));
|
||||
fdMinders[mindersIndex++].fd = fd;
|
||||
//fdMinders.push_back(SharedCloser(fd));
|
||||
}
|
||||
//fileLock.unlock();
|
||||
|
||||
@@ -317,7 +320,7 @@ int IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offse
|
||||
// There's another block below that looks similar. Also similar blocks in append().
|
||||
if ((count + objectOffset) > i->length)
|
||||
metadata.updateEntryLength(i->offset, (count + objectOffset));
|
||||
metadata.writeMetadata(filename);
|
||||
replicator->updateMetadata(filename, metadata);
|
||||
logger->log(LOG_ERR,"IOCoordinator::write(): object failed to complete write, %u of %u bytes written.",count,length);
|
||||
return count;
|
||||
}
|
||||
@@ -368,7 +371,7 @@ int IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offse
|
||||
// update metadataObject length to reflect what awas actually written
|
||||
if ((count + objectOffset) > newObject.length)
|
||||
metadata.updateEntryLength(newObject.offset, (count + objectOffset));
|
||||
metadata.writeMetadata(filename);
|
||||
replicator->updateMetadata(filename, metadata);
|
||||
logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length);
|
||||
return count;
|
||||
//log error and abort
|
||||
@@ -382,7 +385,7 @@ int IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offse
|
||||
}
|
||||
synchronizer->newObjects(newObjectKeys);
|
||||
|
||||
metadata.writeMetadata(filename);
|
||||
replicator->updateMetadata(filename, metadata);
|
||||
|
||||
return count;
|
||||
}
|
||||
@@ -432,7 +435,7 @@ int IOCoordinator::append(const char *_filename, const uint8_t *data, size_t len
|
||||
if (err <= 0)
|
||||
{
|
||||
metadata.updateEntryLength(i->offset, (count + i->length));
|
||||
metadata.writeMetadata(filename);
|
||||
replicator->updateMetadata(filename, metadata);
|
||||
logger->log(LOG_ERR,"IOCoordinator::append(): journal failed to complete write, %u of %u bytes written.",count,length);
|
||||
goto out;
|
||||
}
|
||||
@@ -469,7 +472,7 @@ int IOCoordinator::append(const char *_filename, const uint8_t *data, size_t len
|
||||
{
|
||||
// update metadataObject length to reflect what awas actually written
|
||||
metadata.updateEntryLength(newObject.offset, (count));
|
||||
metadata.writeMetadata(filename);
|
||||
replicator->updateMetadata(filename, metadata);
|
||||
logger->log(LOG_ERR,"IOCoordinator::append(): newObject failed to complete write, %u of %u bytes written.",count,length);
|
||||
goto out;
|
||||
//log error and abort
|
||||
@@ -481,7 +484,7 @@ int IOCoordinator::append(const char *_filename, const uint8_t *data, size_t len
|
||||
dataRemaining -= writeLength;
|
||||
}
|
||||
synchronizer->newObjects(newObjectKeys);
|
||||
metadata.writeMetadata(filename);
|
||||
replicator->updateMetadata(filename, metadata);
|
||||
|
||||
// had to add this hack to prevent deadlock
|
||||
out:
|
||||
@@ -797,10 +800,15 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
|
||||
|
||||
vector<string> newJournalEntries;
|
||||
ScopedReadLock lock(this, filename1);
|
||||
ScopedWriteLock lock2(this, filename2);
|
||||
MetadataFile meta1(metaFile1);
|
||||
MetadataFile meta2(metaFile2);
|
||||
vector<metadataObject> objects = meta1.metadataRead(0, meta1.getLength());
|
||||
|
||||
if (meta2.exists())
|
||||
cout << "copyFile: overwriting a file" << endl;
|
||||
meta2.removeAllEntries();
|
||||
|
||||
// TODO. I dislike large try-catch blocks, and large loops. Maybe a little refactoring is in order.
|
||||
try
|
||||
{
|
||||
@@ -871,8 +879,9 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
|
||||
return -1;
|
||||
}
|
||||
lock.unlock();
|
||||
|
||||
replicator->updateMetadata(filename2, meta2);
|
||||
lock2.unlock();
|
||||
|
||||
for (auto &jEntry : newJournalEntries)
|
||||
sync->newJournalEntry(jEntry);
|
||||
return 0;
|
||||
@@ -1086,6 +1095,15 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size
|
||||
uint64_t startReadingAt = offlen[0];
|
||||
uint64_t lengthOfRead = offlen[1];
|
||||
|
||||
// XXXPAT: Speculative change. Got mem errors from writing past the end of objData. The length
|
||||
// in the metadata is shorter than this journal entry, and not because it got crazy values.
|
||||
// I think the explanation is a truncation. Remove the log here if we see good results.
|
||||
if (startReadingAt + lengthOfRead > len)
|
||||
{
|
||||
logger->log(LOG_CRIT, "mergeJournalInMem: possibly bad journal entry in %s. jStart = %lld, jEnd = %lld, max = %lld",
|
||||
journalPath, startReadingAt, startReadingAt + lengthOfRead, len);
|
||||
lengthOfRead = len - startReadingAt;
|
||||
}
|
||||
uint count = 0;
|
||||
while (count < lengthOfRead)
|
||||
{
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
#include <time.h>
|
||||
#include "LocalStorage.h"
|
||||
#include "Config.h"
|
||||
|
||||
@@ -30,6 +31,22 @@ LocalStorage::LocalStorage()
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
string stmp = Config::get()->getValue("LocalStorage", "fake_latency");
|
||||
if (!stmp.empty() && (stmp[0] == 'Y' || stmp[0] == 'y'))
|
||||
{
|
||||
fakeLatency = true;
|
||||
stmp = Config::get()->getValue("LocalStorage", "max_latency");
|
||||
usecLatencyCap = strtoull(stmp.c_str(), NULL, 10);
|
||||
if (usecLatencyCap == 0)
|
||||
{
|
||||
logger->log(LOG_CRIT, "LocalStorage: bad value for max_latency");
|
||||
throw runtime_error("LocalStorage: bad value for max_latency");
|
||||
}
|
||||
r_seed = (uint) ::time(NULL);
|
||||
logger->log(LOG_DEBUG, "LocalStorage: Will simulate cloud latency of max %llu us", usecLatencyCap);
|
||||
}
|
||||
else
|
||||
fakeLatency = false;
|
||||
}
|
||||
|
||||
LocalStorage::~LocalStorage()
|
||||
@@ -41,8 +58,19 @@ const bf::path & LocalStorage::getPrefix() const
|
||||
return prefix;
|
||||
}
|
||||
|
||||
void LocalStorage::addLatency()
|
||||
{
|
||||
if (fakeLatency)
|
||||
{
|
||||
uint64_t usec_delay = ((double) rand_r(&r_seed) / (double) RAND_MAX) * usecLatencyCap;
|
||||
::usleep(usec_delay);
|
||||
}
|
||||
}
|
||||
|
||||
int LocalStorage::copy(const bf::path &source, const bf::path &dest)
|
||||
{
|
||||
addLatency();
|
||||
|
||||
boost::system::error_code err;
|
||||
bf::copy_file(source, dest, bf::copy_option::fail_if_exists, err);
|
||||
if (err)
|
||||
@@ -72,6 +100,8 @@ int LocalStorage::getObject(const string &source, const string &dest, size_t *si
|
||||
|
||||
int LocalStorage::getObject(const std::string &sourceKey, boost::shared_array<uint8_t> *data, size_t *size)
|
||||
{
|
||||
addLatency();
|
||||
|
||||
bf::path source = prefix / sourceKey;
|
||||
const char *c_source = source.string().c_str();
|
||||
//char buf[80];
|
||||
@@ -115,6 +145,8 @@ int LocalStorage::putObject(const string &source, const string &dest)
|
||||
|
||||
int LocalStorage::putObject(boost::shared_array<uint8_t> data, size_t len, const string &dest)
|
||||
{
|
||||
addLatency();
|
||||
|
||||
bf::path destPath = prefix / dest;
|
||||
const char *c_dest = destPath.string().c_str();
|
||||
//char buf[80];
|
||||
@@ -155,14 +187,17 @@ int LocalStorage::copyObject(const string &source, const string &dest)
|
||||
|
||||
int LocalStorage::deleteObject(const string &key)
|
||||
{
|
||||
boost::system::error_code err;
|
||||
addLatency();
|
||||
|
||||
boost::system::error_code err;
|
||||
bf::remove(prefix / key, err);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int LocalStorage::exists(const std::string &key, bool *out)
|
||||
{
|
||||
addLatency();
|
||||
|
||||
*out = bf::exists(prefix / key);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -27,8 +27,13 @@ class LocalStorage : public CloudStorage
|
||||
|
||||
private:
|
||||
boost::filesystem::path prefix;
|
||||
|
||||
int copy(const boost::filesystem::path &sourceKey, const boost::filesystem::path &destKey);
|
||||
|
||||
// stuff for faking the latency on cloud ops
|
||||
bool fakeLatency;
|
||||
uint64_t usecLatencyCap;
|
||||
uint r_seed;
|
||||
void addLatency();
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -326,6 +326,11 @@ void MetadataFile::removeEntry(off_t offset)
|
||||
mObjects.erase(it);
|
||||
}
|
||||
|
||||
void MetadataFile::removeAllEntries()
|
||||
{
|
||||
mObjects.clear();
|
||||
}
|
||||
|
||||
// There are more efficient ways to do it. Optimize if necessary.
|
||||
void MetadataFile::breakout(const string &key, vector<string> &ret)
|
||||
{
|
||||
|
||||
@@ -56,6 +56,7 @@ class MetadataFile
|
||||
metadataObject addMetadataObject(const char *filename, size_t length);
|
||||
bool getEntry(off_t offset, const metadataObject **out) const;
|
||||
void removeEntry(off_t offset);
|
||||
void removeAllEntries();
|
||||
|
||||
// TBD: this may have to go; there may be no use case where only the uuid needs to change.
|
||||
static std::string getNewKeyFromOldKey(const std::string &oldKey, size_t length=0);
|
||||
|
||||
@@ -47,7 +47,7 @@ void SMLogging::log(int priority,const char *format, ...)
|
||||
va_list args2;
|
||||
va_copy(args2, args);
|
||||
vfprintf(stderr, format, args2);
|
||||
printf("\n");
|
||||
fprintf(stderr, "\n");
|
||||
#endif
|
||||
vsyslog(priority, format, args);
|
||||
|
||||
|
||||
@@ -359,12 +359,13 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
|
||||
|
||||
const metadataObject *mdEntry;
|
||||
bool entryExists = md.getEntry(MetadataFile::getOffsetFromKey(key), &mdEntry);
|
||||
if (!entryExists)
|
||||
if (!entryExists || key != mdEntry->key)
|
||||
{
|
||||
logger->log(LOG_DEBUG, "synchronize(): %s does not exist in metadata for %s. This suggests truncation.", key.c_str(), sourceFile.c_str());
|
||||
return;
|
||||
}
|
||||
assert(key == mdEntry->key);
|
||||
|
||||
//assert(key == mdEntry->key); <-- This could fail b/c of truncation + a write/append before this job runs.
|
||||
|
||||
err = cs->exists(key, &exists);
|
||||
if (err)
|
||||
@@ -374,7 +375,7 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
|
||||
return;
|
||||
|
||||
// TODO: should be safe to check with Cache instead of a file existence check
|
||||
exists = bf::exists(cachePath / key);
|
||||
exists = cache->exists(key);
|
||||
if (!exists)
|
||||
{
|
||||
logger->log(LOG_DEBUG, "synchronize(): was told to upload %s but it does not exist locally", key.c_str());
|
||||
@@ -409,12 +410,12 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
||||
|
||||
const metadataObject *mdEntry;
|
||||
bool metaExists = md.getEntry(MetadataFile::getOffsetFromKey(key), &mdEntry);
|
||||
if (!metaExists)
|
||||
if (!metaExists || key != mdEntry->key)
|
||||
{
|
||||
logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s does not exist in metadata for %s. This suggests truncation.", key.c_str(), sourceFile.c_str());
|
||||
return;
|
||||
}
|
||||
assert(key == mdEntry->key);
|
||||
//assert(key == mdEntry->key); <--- I suspect this can happen in a truncate + write situation + a deep sync queue
|
||||
|
||||
bf::path oldCachePath = cachePath / key;
|
||||
string journalName = (journalPath/ (key + ".journal")).string();
|
||||
@@ -431,7 +432,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
||||
throw runtime_error(string("Synchronizer: cs->exists() failed: ") + strerror_r(errno, buf, 80));
|
||||
if (!existsOnCloud)
|
||||
{
|
||||
if (bf::exists(oldCachePath))
|
||||
if (cache->exists(key))
|
||||
{
|
||||
logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal and does not exist in the cloud, calling "
|
||||
"synchronize() instead. Need to explain how this happens.", key.c_str());
|
||||
|
||||
@@ -54,8 +54,11 @@ void ScopedWriteLock::unlock()
|
||||
}
|
||||
}
|
||||
|
||||
ScopedCloser::ScopedCloser() : fd(-1) { }
|
||||
ScopedCloser::ScopedCloser(int f) : fd(f) { assert(f != -1); }
|
||||
ScopedCloser::~ScopedCloser() {
|
||||
if (fd < 0)
|
||||
return;
|
||||
int s_errno = errno;
|
||||
::close(fd);
|
||||
errno = s_errno;
|
||||
|
||||
@@ -37,6 +37,7 @@ struct ScopedWriteLock
|
||||
|
||||
struct ScopedCloser
|
||||
{
|
||||
ScopedCloser();
|
||||
ScopedCloser(int f);
|
||||
~ScopedCloser();
|
||||
int fd;
|
||||
|
||||
@@ -22,6 +22,7 @@ using namespace storagemanager;
|
||||
|
||||
void printCacheUsage(int sig)
|
||||
{
|
||||
Cache::get()->validateCacheSize();
|
||||
cout << "Current cache size = " << Cache::get()->getCurrentCacheSize() << endl;
|
||||
cout << "Cache element count = " << Cache::get()->getCurrentCacheElementCount() << endl;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user