You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-13 23:02:14 +03:00
Fixed more races and edge cases I found.
This commit is contained in:
@@ -132,7 +132,7 @@ void Cache::populate()
|
|||||||
lru.push_back(p.filename().string());
|
lru.push_back(p.filename().string());
|
||||||
auto last = lru.end();
|
auto last = lru.end();
|
||||||
m_lru.insert(--last);
|
m_lru.insert(--last);
|
||||||
currentCacheSize += bf::file_size(p);
|
currentCacheSize += bf::file_size(*dir);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str());
|
logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str());
|
||||||
@@ -147,7 +147,7 @@ void Cache::populate()
|
|||||||
if (bf::is_regular_file(p))
|
if (bf::is_regular_file(p))
|
||||||
{
|
{
|
||||||
if (p.extension() == ".journal")
|
if (p.extension() == ".journal")
|
||||||
currentCacheSize += bf::file_size(p);
|
currentCacheSize += bf::file_size(*dir);
|
||||||
else
|
else
|
||||||
logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str());
|
logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str());
|
||||||
}
|
}
|
||||||
@@ -286,8 +286,17 @@ void Cache::read(const vector<string> &keys)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
// There's window where the file has been downloaded but is not yet
|
||||||
|
// added to the lru structs. However it is in the DNE. If it is in the DNE, then it is also
|
||||||
|
// in Downloader's map. So, this thread needs to start the download if it's not in the
|
||||||
|
// DNE or if there's an existing download that hasn't finished yet. Starting the download
|
||||||
|
// includes waiting for an existing download to finish, which from this class's pov is the
|
||||||
|
// same thing.
|
||||||
|
if (doNotEvict.find(key) == doNotEvict.end() || downloader.inProgress(key))
|
||||||
|
keysToFetch.push_back(&key);
|
||||||
|
else
|
||||||
|
cout << "Cache: detected and stopped a racey download" << endl;
|
||||||
addToDNE(key);
|
addToDNE(key);
|
||||||
keysToFetch.push_back(&key);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (keysToFetch.empty())
|
if (keysToFetch.empty())
|
||||||
@@ -301,13 +310,22 @@ void Cache::read(const vector<string> &keys)
|
|||||||
// downloads with size 0 didn't actually happen, either because it
|
// downloads with size 0 didn't actually happen, either because it
|
||||||
// was a preexisting download (another read() call owns it), or because
|
// was a preexisting download (another read() call owns it), or because
|
||||||
// there was an error downloading it. Use size == 0 as an indication of
|
// there was an error downloading it. Use size == 0 as an indication of
|
||||||
// what to add to the cache
|
// what to add to the cache. Also needs to verify that the file was not deleted,
|
||||||
|
// indicated by existence in doNotEvict.
|
||||||
if (dlSizes[i] != 0)
|
if (dlSizes[i] != 0)
|
||||||
{
|
{
|
||||||
sum_sizes += dlSizes[i];
|
if (doNotEvict.find(*keysToFetch[i]) != doNotEvict.end())
|
||||||
lru.push_back(*keysToFetch[i]);
|
{
|
||||||
LRU_t::iterator lit = lru.end();
|
sum_sizes += dlSizes[i];
|
||||||
m_lru.insert(--lit); // I dislike this way of grabbing the last iterator in a list.
|
lru.push_back(*keysToFetch[i]);
|
||||||
|
LRU_t::iterator lit = lru.end();
|
||||||
|
m_lru.insert(--lit); // I dislike this way of grabbing the last iterator in a list.
|
||||||
|
}
|
||||||
|
else // it was downloaded, but a deletion happened so we have to toss it
|
||||||
|
{
|
||||||
|
cout << "removing a file that was deleted by another thread during download" << endl;
|
||||||
|
bf::remove(prefix / (*keysToFetch[i]));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -329,9 +347,14 @@ void Cache::doneReading(const vector<string> &keys)
|
|||||||
boost::unique_lock<boost::mutex> s(lru_mutex);
|
boost::unique_lock<boost::mutex> s(lru_mutex);
|
||||||
for (const string &key : keys)
|
for (const string &key : keys)
|
||||||
{
|
{
|
||||||
const auto &it = m_lru.find(key);
|
removeFromDNE(key);
|
||||||
if (it != m_lru.end())
|
// most should be in the map.
|
||||||
removeFromDNE(it->lit);
|
// debateable whether it's faster to look up the list iterator and use it
|
||||||
|
// or whether it's faster to bypass that and use strings only.
|
||||||
|
|
||||||
|
//const auto &it = m_lru.find(key);
|
||||||
|
//if (it != m_lru.end())
|
||||||
|
// removeFromDNE(it->lit);
|
||||||
}
|
}
|
||||||
_makeSpace(0);
|
_makeSpace(0);
|
||||||
}
|
}
|
||||||
@@ -588,12 +611,6 @@ int Cache::ifExistsThenDelete(const string &key)
|
|||||||
auto it = m_lru.find(key);
|
auto it = m_lru.find(key);
|
||||||
if (it != m_lru.end())
|
if (it != m_lru.end())
|
||||||
{
|
{
|
||||||
if (doNotEvict.find(it->lit) != doNotEvict.end())
|
|
||||||
{
|
|
||||||
cout << "almost deleted a file being read, are we really doing that somewhere?" << endl;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (toBeDeleted.find(it->lit) == toBeDeleted.end())
|
if (toBeDeleted.find(it->lit) == toBeDeleted.end())
|
||||||
{
|
{
|
||||||
doNotEvict.erase(it->lit);
|
doNotEvict.erase(it->lit);
|
||||||
|
|||||||
@@ -186,6 +186,15 @@ void Downloader::download(const vector<const string *> &keys, vector<int> *errno
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Downloader::inProgress(const string &key)
|
||||||
|
{
|
||||||
|
boost::shared_ptr<Download> tmp(new Download(key));
|
||||||
|
auto it = downloads.find(tmp);
|
||||||
|
if (it != downloads.end())
|
||||||
|
return !(*it)->finished;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
void Downloader::setDownloadPath(const string &path)
|
void Downloader::setDownloadPath(const string &path)
|
||||||
{
|
{
|
||||||
downloadPath = path;
|
downloadPath = path;
|
||||||
@@ -197,6 +206,11 @@ Downloader::Download::Download(const string &source, const string &_dlPath, boos
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Downloader::Download::Download(const string &source) :
|
||||||
|
key(source), dl_errno(0), size(0), lock(NULL), finished(false), itRan(false)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
Downloader::Download::~Download()
|
Downloader::Download::~Download()
|
||||||
{
|
{
|
||||||
assert(!itRan || finished);
|
assert(!itRan || finished);
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ class Downloader
|
|||||||
void download(const std::vector<const std::string *> &keys, std::vector<int> *errnos, std::vector<size_t> *sizes);
|
void download(const std::vector<const std::string *> &keys, std::vector<int> *errnos, std::vector<size_t> *sizes);
|
||||||
void setDownloadPath(const std::string &path);
|
void setDownloadPath(const std::string &path);
|
||||||
void useThisLock(boost::mutex *);
|
void useThisLock(boost::mutex *);
|
||||||
|
bool inProgress(const std::string &);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
uint maxDownloads;
|
uint maxDownloads;
|
||||||
@@ -51,6 +52,7 @@ class Downloader
|
|||||||
struct Download : public ThreadPool::Job
|
struct Download : public ThreadPool::Job
|
||||||
{
|
{
|
||||||
Download(const std::string &source, const std::string &_dlPath, boost::mutex *_lock);
|
Download(const std::string &source, const std::string &_dlPath, boost::mutex *_lock);
|
||||||
|
Download(const std::string &source);
|
||||||
~Download();
|
~Download();
|
||||||
void operator()();
|
void operator()();
|
||||||
boost::filesystem::path dlPath;
|
boost::filesystem::path dlPath;
|
||||||
|
|||||||
@@ -488,7 +488,7 @@ int IOCoordinator::append(const char *_filename, const uint8_t *data, size_t len
|
|||||||
|
|
||||||
// had to add this hack to prevent deadlock
|
// had to add this hack to prevent deadlock
|
||||||
out:
|
out:
|
||||||
// need to release the file lock before calling the cache fcns.
|
// need to release the file lock before telling Cache that we're done writing.
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
cache->doneWriting();
|
cache->doneWriting();
|
||||||
|
|
||||||
@@ -802,12 +802,13 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
|
|||||||
ScopedReadLock lock(this, filename1);
|
ScopedReadLock lock(this, filename1);
|
||||||
ScopedWriteLock lock2(this, filename2);
|
ScopedWriteLock lock2(this, filename2);
|
||||||
MetadataFile meta1(metaFile1);
|
MetadataFile meta1(metaFile1);
|
||||||
MetadataFile meta2(metaFile2);
|
MetadataFile meta2(metaFile2.string().c_str(), MetadataFile::no_create_t());
|
||||||
vector<metadataObject> objects = meta1.metadataRead(0, meta1.getLength());
|
vector<metadataObject> objects = meta1.metadataRead(0, meta1.getLength());
|
||||||
|
|
||||||
if (meta2.exists())
|
if (meta2.exists()) {
|
||||||
cout << "copyFile: overwriting a file" << endl;
|
cout << "copyFile: overwriting a file" << endl;
|
||||||
meta2.removeAllEntries();
|
meta2.removeAllEntries();
|
||||||
|
}
|
||||||
|
|
||||||
// TODO. I dislike large try-catch blocks, and large loops. Maybe a little refactoring is in order.
|
// TODO. I dislike large try-catch blocks, and large loops. Maybe a little refactoring is in order.
|
||||||
try
|
try
|
||||||
@@ -1089,19 +1090,34 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size
|
|||||||
{
|
{
|
||||||
uint64_t offlen[2];
|
uint64_t offlen[2];
|
||||||
int err = ::read(journalFD, &offlen, 16);
|
int err = ::read(journalFD, &offlen, 16);
|
||||||
if (err != 16) // got EOF
|
if (err == 0) // got EOF
|
||||||
break;
|
break;
|
||||||
|
else if (err < 16)
|
||||||
|
{
|
||||||
|
// punting on this
|
||||||
|
cout << "mergeJournalInMem: failed to read a journal entry header in one attempt. fixme..." << endl;
|
||||||
|
errno = ENODATA;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t startReadingAt = offlen[0];
|
uint64_t startReadingAt = offlen[0];
|
||||||
uint64_t lengthOfRead = offlen[1];
|
uint64_t lengthOfRead = offlen[1];
|
||||||
|
|
||||||
// XXXPAT: Speculative change. Got mem errors from writing past the end of objData. The length
|
// XXXPAT: Speculative change. Got mem errors from writing past the end of objData. The length
|
||||||
// in the metadata is shorter than this journal entry, and not because it got crazy values.
|
// in the metadata is shorter than this journal entry, and not because it got crazy values.
|
||||||
// I think the explanation is a truncation. Remove the log here if we see good results.
|
// I think the explanation is a truncation.
|
||||||
|
if (startReadingAt > len)
|
||||||
|
{
|
||||||
|
//logger->log(LOG_CRIT, "mergeJournalInMem: skipping a theoretically irrelevant journal entry in %s. "
|
||||||
|
// "jstart = %llu, max = %llu", journalPath, startReadingAt, len);
|
||||||
|
::lseek(journalFD, offlen[1], SEEK_CUR);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (startReadingAt + lengthOfRead > len)
|
if (startReadingAt + lengthOfRead > len)
|
||||||
{
|
{
|
||||||
logger->log(LOG_CRIT, "mergeJournalInMem: possibly bad journal entry in %s. jStart = %lld, jEnd = %lld, max = %lld",
|
//logger->log(LOG_CRIT, "mergeJournalInMem: possibly bad journal entry in %s. jStart = %llu, jEnd = %llu, max = %llu",
|
||||||
journalPath, startReadingAt, startReadingAt + lengthOfRead, len);
|
// journalPath, startReadingAt, startReadingAt + lengthOfRead, len);
|
||||||
lengthOfRead = len - startReadingAt;
|
lengthOfRead = len - startReadingAt;
|
||||||
}
|
}
|
||||||
uint count = 0;
|
uint count = 0;
|
||||||
@@ -1124,6 +1140,8 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size
|
|||||||
}
|
}
|
||||||
count += err;
|
count += err;
|
||||||
}
|
}
|
||||||
|
if (lengthOfRead < offlen[1])
|
||||||
|
::lseek(journalFD, offlen[1] - lengthOfRead, SEEK_CUR);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,6 +20,14 @@ bucket = s3-cs-test2
|
|||||||
[LocalStorage]
|
[LocalStorage]
|
||||||
path = ${HOME}/storagemanager/fake-cloud
|
path = ${HOME}/storagemanager/fake-cloud
|
||||||
|
|
||||||
|
# introduce latency to fake-cloud operations. Useful for debugging.
|
||||||
|
fake_latency = n
|
||||||
|
|
||||||
|
# values are randomized between 1 and max_latency in microseconds.
|
||||||
|
# values between 30000-50000 roughly simulate observed latency of S3
|
||||||
|
# access from an EC2 node.
|
||||||
|
max_latency = 50000
|
||||||
|
|
||||||
[Cache]
|
[Cache]
|
||||||
cache_size = 2g
|
cache_size = 2g
|
||||||
path = ${HOME}/storagemanager/cache
|
path = ${HOME}/storagemanager/cache
|
||||||
|
|||||||
Reference in New Issue
Block a user