You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-12 11:01:17 +03:00
Checkpointing some fixes.
This commit is contained in:
@@ -107,7 +107,7 @@ Cache::Cache() : currentCacheSize(0)
|
||||
}
|
||||
catch (exception &e)
|
||||
{
|
||||
syslog(LOG_CRIT, "Failed to create %s, got: %s", journalPrefix.string().c_str(), e.what());
|
||||
logger->log(LOG_CRIT, "Failed to create %s, got: %s", journalPrefix.string().c_str(), e.what());
|
||||
throw e;
|
||||
}
|
||||
|
||||
@@ -120,6 +120,7 @@ Cache::~Cache()
|
||||
|
||||
void Cache::populate()
|
||||
{
|
||||
Synchronizer *sync = Synchronizer::get();
|
||||
bf::directory_iterator dir(prefix);
|
||||
bf::directory_iterator dend;
|
||||
while (dir != dend)
|
||||
@@ -145,8 +146,10 @@ void Cache::populate()
|
||||
const bf::path &p = dir->path();
|
||||
if (bf::is_regular_file(p))
|
||||
{
|
||||
if (p.extension() == ".journal") // need to decide whether objects should have an extension
|
||||
if (p.extension() == ".journal")
|
||||
{
|
||||
currentCacheSize += bf::file_size(*dir);
|
||||
sync->newJournalEntry(p.stem());
|
||||
else
|
||||
logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str());
|
||||
}
|
||||
@@ -235,9 +238,6 @@ void Cache::read(const vector<string> &keys)
|
||||
void Cache::read(const vector<string> &keys)
|
||||
{
|
||||
/* Move existing keys to the back of the LRU, start downloading nonexistant keys.
|
||||
Going to skip the do-not-evict set in this version, and will assume that the cache
|
||||
is large enough that an entry at the back of the LRU won't be evicted before the
|
||||
caller can open the corresponding file
|
||||
*/
|
||||
vector<const string *> keysToFetch;
|
||||
vector<int> dlErrnos;
|
||||
@@ -251,6 +251,7 @@ void Cache::read(const vector<string> &keys)
|
||||
mit = m_lru.find(key);
|
||||
if (mit != m_lru.end())
|
||||
{
|
||||
addToDNE(mit->lit);
|
||||
lru.splice(lru.end(), lru, mit->lit);
|
||||
// if it's about to be deleted, stop that
|
||||
TBD_t::iterator tbd_it = toBeDeleted.find(mit->lit);
|
||||
@@ -282,15 +283,36 @@ void Cache::read(const vector<string> &keys)
|
||||
sum_sizes += dlSizes[i];
|
||||
lru.push_back(*keysToFetch[i]);
|
||||
LRU_t::iterator lit = lru.end();
|
||||
m_lru.insert(M_LRU_element_t(--lit)); // I dislike this way of grabbing the last iterator in a list.
|
||||
m_lru.insert(--lit); // I dislike this way of grabbing the last iterator in a list.
|
||||
addToDNE(lit);
|
||||
}
|
||||
}
|
||||
|
||||
// move everything in keys to the back of the lru (yes, again)
|
||||
for (const string &key : keys)
|
||||
{
|
||||
mit = m_lru.find(key);
|
||||
if (mit != m_lru.end()) // it could have been deleted by deletedObject() or ifExistsThenDelete()
|
||||
lru.splice(lru.end(), lru, mit->lit);
|
||||
}
|
||||
|
||||
// fix cache size
|
||||
_makeSpace(sum_sizes);
|
||||
currentCacheSize += sum_sizes;
|
||||
}
|
||||
|
||||
void Cache::doneReading(const vector<string> &keys)
|
||||
{
|
||||
boost::unique_lock<boost::mutex> s(lru_mutex);
|
||||
for (const string &key : keys)
|
||||
{
|
||||
const auto &it = m_lru.find(key);
|
||||
if (it != m_lru.end())
|
||||
removeFromDNE(it->lit);
|
||||
}
|
||||
readyToDelete.notify_all();
|
||||
}
|
||||
|
||||
Cache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1)
|
||||
{
|
||||
}
|
||||
@@ -374,10 +396,15 @@ void Cache::deletedObject(const string &key, size_t size)
|
||||
assert(currentCacheSize >= size);
|
||||
M_LRU_t::iterator mit = m_lru.find(key);
|
||||
assert(mit != m_lru.end()); // TODO: 5/16/19 - got this assertion using S3 by running test000, then test000 again.
|
||||
assert(doNotEvict.find(mit->lit) == doNotEvict.end());
|
||||
lru.erase(mit->lit);
|
||||
m_lru.erase(mit);
|
||||
currentCacheSize -= size;
|
||||
|
||||
// if it's being flushed, let it do the deleting
|
||||
if (toBeDeleted.find(mit->lit) == toBeDeleted.end())
|
||||
{
|
||||
doNotEvict.erase(mit->lit);
|
||||
lru.erase(mit->lit);
|
||||
m_lru.erase(mit);
|
||||
currentCacheSize -= size;
|
||||
}
|
||||
}
|
||||
|
||||
void Cache::setMaxCacheSize(size_t size)
|
||||
@@ -418,6 +445,14 @@ void Cache::_makeSpace(size_t size)
|
||||
break;
|
||||
++it;
|
||||
}
|
||||
if (it == lru.end())
|
||||
{
|
||||
// nothing can be deleted, wait for something to change
|
||||
cout << "nothing can be deleted, waiting" << endl;
|
||||
readyToDelete.wait(lru_mutex);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
bf::path cachedFile = prefix / *it;
|
||||
assert(bf::exists(cachedFile));
|
||||
@@ -465,6 +500,9 @@ void Cache::_makeSpace(size_t size)
|
||||
|
||||
void Cache::rename(const string &oldKey, const string &newKey, ssize_t sizediff)
|
||||
{
|
||||
// rename it in the LRU
|
||||
// erase/insert to rehash it everywhere else
|
||||
|
||||
boost::unique_lock<boost::mutex> s(lru_mutex);
|
||||
auto it = m_lru.find(oldKey);
|
||||
if (it == m_lru.end())
|
||||
@@ -472,7 +510,29 @@ void Cache::rename(const string &oldKey, const string &newKey, ssize_t sizediff)
|
||||
|
||||
auto lit = it->lit;
|
||||
m_lru.erase(it);
|
||||
int refCount = 0;
|
||||
auto dne_it = doNotEvict.find(it->lit);
|
||||
if (dne_it != doNotEvict.end())
|
||||
{
|
||||
refCount = dne_it->refCount;
|
||||
doNotEvict.erase(dne_it);
|
||||
}
|
||||
|
||||
auto tbd_it = toBeDeleted.find(lit);
|
||||
bool hasTBDEntry = (tbd_it != toBeDeleted.end());
|
||||
if (hasTBDEntry)
|
||||
toBeDeleted.erase(tbd_it);
|
||||
|
||||
*lit = newKey;
|
||||
|
||||
if (hasTBDEntry)
|
||||
toBeDeleted.insert(lit);
|
||||
if (refCount != 0)
|
||||
{
|
||||
pair<DNE_t::iterator, bool> dne_tmp = doNotEvict.insert(lit);
|
||||
const_cast<DNEElement &>(*(dne_tmp.first)).refCount = refCount;
|
||||
}
|
||||
|
||||
m_lru.insert(lit);
|
||||
currentCacheSize += sizediff;
|
||||
}
|
||||
@@ -489,11 +549,16 @@ int Cache::ifExistsThenDelete(const string &key)
|
||||
auto it = m_lru.find(key);
|
||||
if (it != m_lru.end())
|
||||
{
|
||||
objectExists = true;
|
||||
lru.erase(it->lit);
|
||||
m_lru.erase(it);
|
||||
// let makeSpace() delete it if it's already being flushed
|
||||
if (toBeDeleted.find(it->lit) == toBeDeleted.end())
|
||||
{
|
||||
doNotEvict.erase(it->lit);
|
||||
lru.erase(it->lit);
|
||||
m_lru.erase(it);
|
||||
objectExists = true;
|
||||
}
|
||||
}
|
||||
assert(objectExists == bf::exists(cachedPath));
|
||||
//assert(objectExists == bf::exists(cachedPath));
|
||||
|
||||
size_t objectSize = (objectExists ? bf::file_size(cachedPath) : 0);
|
||||
//size_t objectSize = (objectExists ? MetadataFile::getLengthFromKey(key) : 0);
|
||||
@@ -522,6 +587,8 @@ void Cache::reset()
|
||||
boost::unique_lock<boost::mutex> s(lru_mutex);
|
||||
m_lru.clear();
|
||||
lru.clear();
|
||||
toBeDeleted.clear();
|
||||
doNotEvict.clear();
|
||||
|
||||
bf::directory_iterator dir;
|
||||
bf::directory_iterator dend;
|
||||
|
||||
Reference in New Issue
Block a user