1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-15 12:09:09 +03:00

Merging the prefix-ownership feature.

Squashed commit of the following:

commit 4a4c3dab2e6acf942bbdfd4d760c000bc9cbfc6a
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Tue Jul 30 10:50:08 2019 -0500

    Standardized a couple status msgs.

commit 1b76f7e6411424c9633dcd4ebe7f61e9fce2f0ac
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Tue Jul 30 09:19:31 2019 -0500

    Fixed the thread-joining-itself problem.

commit 4fdb79e87496eab64c4c5af72321bc57423297ba
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Mon Jul 29 17:15:05 2019 -0500

    Checkpointing.  Need to release ownership differently, realized
    I have 1 thread trying to join itself.

commit 04d0183735e9697d76a2472c6135d90755ca61b5
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Mon Jul 29 16:12:33 2019 -0500

    Checkpointing a whole lot of fixes.  test000 works here.

commit 72e9c998c62b095cad1cf33f885f6c7697bde214
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Fri Jul 26 16:29:15 2019 -0500

    Checkpointing.  Started debugging.  Several small fixes.  Doesn't work yet.

commit ab728e1481debec94d676e697954b1d164302a0c
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Fri Jul 26 13:15:47 2019 -0500

    Checkpointing.  Got everything to build.

commit a2c6d07cdc12c45530c1d5cf4205d3aee8738d80
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Fri Jul 26 12:59:26 2019 -0500

    Checkpointing.  Got the library to build.

commit 9f6bf19a64f512e17e6139b0fc04850cdcdb3b3a
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Fri Jul 26 12:24:46 2019 -0500

    Checkpointing.  Still WIP.  Feature touches everything.

commit a79ca8dc88a99f812432d5dca34ed54474df1933
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Thu Jul 25 16:38:20 2019 -0500

    Checkpointing more changes.

commit a9e81af3e4e00f8a3d30b3796a2c3aa94c04f7c0
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Thu Jul 25 15:07:44 2019 -0500

    Checkpointing changes to the other classes that need to be aware
    of separately managed prefixes.

commit d85dfaa401b49a7bb714701649dec303eb7c068c
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Wed Jul 24 14:24:23 2019 -0500

    Added the new class to CMakeLists.

commit 66d6d550b13be94ada107311574378bd848951cd
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Wed Jul 24 14:23:49 2019 -0500

    Checkpointing.  Got the new class to build except for a
    to-be-implemented fcn in Cache.

commit e1b62dba7f05b37b9f12681a53d6632c6ce66d54
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Wed Jul 24 14:23:09 2019 -0500

    Added some documentation to the object_size param.

commit e671cf37c49ed084fbdec1bac50fbaa5ad7c43f9
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Wed Jul 24 10:34:54 2019 -0500

    Checkpointing a new class to manage ownership of prefixes.

commit e5f234ff4c05b5157d37fa17c44d7f626f5e4eb3
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Tue Jul 23 15:36:48 2019 -0500

    Fixed some copy/paste typos in Metadatafile config err msgs.
This commit is contained in:
Patrick LeBlanc
2019-07-30 10:51:05 -05:00
parent 5f43f55ea5
commit 1c5979576e
21 changed files with 1640 additions and 900 deletions

View File

@@ -86,6 +86,8 @@ set(storagemanager_SRCS
src/MetadataFile.cpp src/MetadataFile.cpp
src/Replicator.cpp src/Replicator.cpp
src/Utilities.cpp src/Utilities.cpp
src/Ownership.cpp
src/PrefixCache.cpp
) )
option(TRACE "Enable some tracing output" OFF) option(TRACE "Enable some tracing output" OFF)

View File

@@ -34,11 +34,10 @@ Cache * Cache::get()
return inst; return inst;
} }
Cache::Cache() : currentCacheSize(0) Cache::Cache()
{ {
Config *conf = Config::get(); Config *conf = Config::get();
logger = SMLogging::get(); logger = SMLogging::get();
replicator = Replicator::get();
string stmp = conf->getValue("Cache", "cache_size"); string stmp = conf->getValue("Cache", "cache_size");
if (stmp.empty()) if (stmp.empty())
@@ -73,8 +72,8 @@ Cache::Cache() : currentCacheSize(0)
throw runtime_error("Please set ObjectStorage/object_size to a number"); throw runtime_error("Please set ObjectStorage/object_size to a number");
} }
prefix = conf->getValue("Cache", "path"); cachePrefix = conf->getValue("Cache", "path");
if (prefix.empty()) if (cachePrefix.empty())
{ {
logger->log(LOG_CRIT, "Cache/path is not set"); logger->log(LOG_CRIT, "Cache/path is not set");
throw runtime_error("Please set Cache/path in the storagemanager.cnf file"); throw runtime_error("Please set Cache/path in the storagemanager.cnf file");
@@ -82,17 +81,16 @@ Cache::Cache() : currentCacheSize(0)
try try
{ {
bf::create_directories(prefix); bf::create_directories(cachePrefix);
} }
catch (exception &e) catch (exception &e)
{ {
logger->log(LOG_CRIT, "Failed to create %s, got: %s", prefix.string().c_str(), e.what()); logger->log(LOG_CRIT, "Failed to create %s, got: %s", cachePrefix.string().c_str(), e.what());
throw e; throw e;
} }
//cout << "Cache got prefix " << prefix << endl; //cout << "Cache got cachePrefix " << cachePrefix << endl;
downloader.reset(new Downloader()); downloader.reset(new Downloader());
downloader->setDownloadPath(prefix.string()); downloader->setDownloadPath(cachePrefix);
downloader->useThisLock(&lru_mutex);
stmp = conf->getValue("ObjectStorage", "journal_path"); stmp = conf->getValue("ObjectStorage", "journal_path");
if (stmp.empty()) if (stmp.empty())
@@ -111,64 +109,12 @@ Cache::Cache() : currentCacheSize(0)
logger->log(LOG_CRIT, "Failed to create %s, got: %s", journalPrefix.string().c_str(), e.what()); logger->log(LOG_CRIT, "Failed to create %s, got: %s", journalPrefix.string().c_str(), e.what());
throw e; throw e;
} }
lru_mutex.lock(); // unlocked by populate() when it's done
boost::thread t([this] { this->populate(); });
t.detach();
} }
Cache::~Cache() Cache::~Cache()
{ {
} for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it)
delete it->second;
void Cache::populate()
{
Synchronizer *sync = Synchronizer::get();
bf::directory_iterator dir(prefix);
bf::directory_iterator dend;
vector<string> newObjects;
while (dir != dend)
{
// put everything in lru & m_lru
const bf::path &p = dir->path();
if (bf::is_regular_file(p))
{
lru.push_back(p.filename().string());
auto last = lru.end();
m_lru.insert(--last);
currentCacheSize += bf::file_size(*dir);
newObjects.push_back(p.filename().string());
}
else
logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str());
++dir;
}
sync->newObjects(newObjects);
newObjects.clear();
// account for what's in the journal dir
vector<pair<string, size_t> > newJournals;
dir = bf::directory_iterator(journalPrefix);
while (dir != dend)
{
const bf::path &p = dir->path();
if (bf::is_regular_file(p))
{
if (p.extension() == ".journal")
{
size_t s = bf::file_size(*dir);
currentCacheSize += s;
newJournals.push_back(pair<string, size_t>(p.stem().string(), s));
}
else
logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str());
}
else
logger->log(LOG_WARNING, "Cache: found something in the journal dir that does not belong '%s'", p.string().c_str());
++dir;
}
lru_mutex.unlock();
sync->newJournalEntries(newJournals);
} }
// be careful using this! SM should be idle. No ongoing reads or writes. // be careful using this! SM should be idle. No ongoing reads or writes.
@@ -176,308 +122,88 @@ void Cache::validateCacheSize()
{ {
boost::unique_lock<boost::mutex> s(lru_mutex); boost::unique_lock<boost::mutex> s(lru_mutex);
if (!doNotEvict.empty() || !toBeDeleted.empty()) for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it)
{ it->second->validateCacheSize();
cout << "Not safe to use validateCacheSize() at the moment." << endl;
return;
} }
size_t oldSize = currentCacheSize;
currentCacheSize = 0;
m_lru.clear();
lru.clear();
populate();
if (oldSize != currentCacheSize)
logger->log(LOG_DEBUG, "Cache::validateCacheSize(): found a discrepancy. Actual size is %lld, had %lld.",
currentCacheSize, oldSize);
else
logger->log(LOG_DEBUG, "Cache::validateCacheSize(): Cache size accounting agrees with reality for now.");
}
#if 0
/* Need to simplify this, we keep running into sneaky problems, and I just spotted a couple more.
Just going to rewrite it. We can revisit later if the simplified version needs improvement */
void Cache::read(const vector<string> &keys)
{
/*
move existing keys to a do-not-evict map
fetch keys that do not exist
after fetching, move all keys from do-not-evict to the back of the LRU
*/
boost::unique_lock<boost::mutex> s(lru_mutex);
vector<const string *> keysToFetch;
uint i;
M_LRU_t::iterator mit;
for (const string &key : keys)
{
mit = m_lru.find(key);
if (mit != m_lru.end())
// it's in the cache, add the entry to the do-not-evict set
addToDNE(mit->lit);
else
// not in the cache, put it in the list to download
keysToFetch.push_back(&key);
}
//s.unlock();
// start downloading the keys to fetch
// TODO: there should be a path for the common case, which is that there is nothing
// to download.
vector<int> dl_errnos;
vector<size_t> sizes;
if (!keysToFetch.empty())
downloader->download(keysToFetch, &dl_errnos, &sizes, lru_mutex);
size_t sum_sizes = 0;
for (size_t &size : sizes)
sum_sizes += size;
//s.lock();
// do makespace() before downloading. Problem is, until the download is finished, this fcn can't tell which
// downloads it was responsible for. Need Downloader to make the call...?
_makeSpace(sum_sizes);
currentCacheSize += sum_sizes;
// move all keys to the back of the LRU
for (i = 0; i < keys.size(); i++)
{
mit = m_lru.find(keys[i]);
if (mit != m_lru.end())
{
lru.splice(lru.end(), lru, mit->lit);
LRU_t::iterator lit = lru.end();
removeFromDNE(--lit);
}
else if (dl_errnos[i] == 0) // successful download
{
lru.push_back(keys[i]);
LRU_t::iterator lit = lru.end();
m_lru.insert(M_LRU_element_t(--lit));
}
else
{
// Downloader already logged it, anything to do here?
/* brainstorming options for handling it.
1) Should it be handled? The caller will log a file-not-found error, and there will be a download
failure in the log already.
2) Can't really DO anything can it?
*/
}
}
}
#endif
// new simplified version // new simplified version
void Cache::read(const vector<string> &keys) void Cache::read(const bf::path &prefix, const vector<string> &keys)
{ {
/* Move existing keys to the back of the LRU, start downloading nonexistant keys. getPCache(prefix).read(keys);
*/
vector<const string *> keysToFetch;
vector<int> dlErrnos;
vector<size_t> dlSizes;
boost::unique_lock<boost::mutex> s(lru_mutex);
M_LRU_t::iterator mit;
for (const string &key : keys)
{
mit = m_lru.find(key);
if (mit != m_lru.end())
{
addToDNE(mit->lit);
lru.splice(lru.end(), lru, mit->lit); // move them to the back so they are last to pick for eviction
}
else
{
// There's window where the file has been downloaded but is not yet
// added to the lru structs. However it is in the DNE. If it is in the DNE, then it is also
// in Downloader's map. So, this thread needs to start the download if it's not in the
// DNE or if there's an existing download that hasn't finished yet. Starting the download
// includes waiting for an existing download to finish, which from this class's pov is the
// same thing.
if (doNotEvict.find(key) == doNotEvict.end() || downloader->inProgress(key))
keysToFetch.push_back(&key);
else
cout << "Cache: detected and stopped a racey download" << endl;
addToDNE(key);
}
}
if (keysToFetch.empty())
return;
downloader->download(keysToFetch, &dlErrnos, &dlSizes);
size_t sum_sizes = 0;
for (uint i = 0; i < keysToFetch.size(); ++i)
{
// downloads with size 0 didn't actually happen, either because it
// was a preexisting download (another read() call owns it), or because
// there was an error downloading it. Use size == 0 as an indication of
// what to add to the cache. Also needs to verify that the file was not deleted,
// indicated by existence in doNotEvict.
if (dlSizes[i] != 0)
{
if (doNotEvict.find(*keysToFetch[i]) != doNotEvict.end())
{
sum_sizes += dlSizes[i];
lru.push_back(*keysToFetch[i]);
LRU_t::iterator lit = lru.end();
m_lru.insert(--lit); // I dislike this way of grabbing the last iterator in a list.
}
else // it was downloaded, but a deletion happened so we have to toss it
{
cout << "removing a file that was deleted by another thread during download" << endl;
bf::remove(prefix / (*keysToFetch[i]));
}
}
} }
// move everything in keys to the back of the lru (yes, again) void Cache::doneReading(const bf::path &prefix, const vector<string> &keys)
for (const string &key : keys)
{ {
mit = m_lru.find(key); getPCache(prefix).doneReading(keys);
if (mit != m_lru.end()) // all of the files exist, just not all of them are 'owned by' this thread.
lru.splice(lru.end(), lru, mit->lit);
} }
// fix cache size void Cache::doneWriting(const bf::path &prefix)
//_makeSpace(sum_sizes); {
currentCacheSize += sum_sizes; getPCache(prefix).makeSpace(0);
} }
void Cache::doneReading(const vector<string> &keys) const bf::path & Cache::getCachePath() const
{ {
boost::unique_lock<boost::mutex> s(lru_mutex); return cachePrefix;
for (const string &key : keys)
{
removeFromDNE(key);
// most should be in the map.
// debateable whether it's faster to look up the list iterator and use it
// or whether it's faster to bypass that and use strings only.
//const auto &it = m_lru.find(key);
//if (it != m_lru.end())
// removeFromDNE(it->lit);
}
_makeSpace(0);
} }
void Cache::doneWriting() const bf::path & Cache::getJournalPath() const
{
makeSpace(0);
}
Cache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1)
{
}
Cache::DNEElement::DNEElement(const string &k) : sKey(k), refCount(1)
{
}
void Cache::addToDNE(const DNEElement &key)
{
DNE_t::iterator it = doNotEvict.find(key);
if (it != doNotEvict.end())
{
DNEElement &dnee = const_cast<DNEElement &>(*it);
++(dnee.refCount);
}
else
doNotEvict.insert(key);
}
void Cache::removeFromDNE(const DNEElement &key)
{
DNE_t::iterator it = doNotEvict.find(key);
if (it == doNotEvict.end())
return;
DNEElement &dnee = const_cast<DNEElement &>(*it);
if (--(dnee.refCount) == 0)
doNotEvict.erase(it);
}
const bf::path & Cache::getCachePath()
{
return prefix;
}
const bf::path & Cache::getJournalPath()
{ {
return journalPrefix; return journalPrefix;
} }
void Cache::exists(const vector<string> &keys, vector<bool> *out) const const bf::path Cache::getCachePath(const bf::path &prefix) const
{ {
out->resize(keys.size()); return cachePrefix/prefix;
boost::unique_lock<boost::mutex> s(lru_mutex);
for (uint i = 0; i < keys.size(); i++)
(*out)[i] = (m_lru.find(keys[i]) != m_lru.end());
} }
bool Cache::exists(const string &key) const const bf::path Cache::getJournalPath(const bf::path &prefix) const
{ {
boost::unique_lock<boost::mutex> s(lru_mutex); return journalPrefix/prefix;
return m_lru.find(key) != m_lru.end();
} }
void Cache::newObject(const string &key, size_t size) void Cache::exists(const bf::path &prefix, const vector<string> &keys, vector<bool> *out)
{ {
boost::unique_lock<boost::mutex> s(lru_mutex); getPCache(prefix).exists(keys, out);
assert(m_lru.find(key) == m_lru.end());
//_makeSpace(size);
lru.push_back(key);
LRU_t::iterator back = lru.end();
m_lru.insert(--back);
currentCacheSize += size;
} }
void Cache::newJournalEntry(size_t size) bool Cache::exists(const bf::path &prefix, const string &key)
{ {
boost::unique_lock<boost::mutex> s(lru_mutex); return getPCache(prefix).exists(key);
//_makeSpace(size);
currentCacheSize += size;
} }
void Cache::deletedJournal(size_t size) void Cache::newObject(const bf::path &prefix, const string &key, size_t size)
{ {
boost::unique_lock<boost::mutex> s(lru_mutex); getPCache(prefix).newObject(key, size);
assert(currentCacheSize >= size);
currentCacheSize -= size;
} }
void Cache::deletedObject(const string &key, size_t size) void Cache::newJournalEntry(const bf::path &prefix, size_t size)
{ {
boost::unique_lock<boost::mutex> s(lru_mutex); getPCache(prefix).newJournalEntry(size);
assert(currentCacheSize >= size);
M_LRU_t::iterator mit = m_lru.find(key);
assert(mit != m_lru.end());
// if it's being flushed, let makeSpace() do the deleting
if (toBeDeleted.find(mit->lit) == toBeDeleted.end())
{
doNotEvict.erase(mit->lit);
lru.erase(mit->lit);
m_lru.erase(mit);
currentCacheSize -= size;
} }
void Cache::deletedJournal(const bf::path &prefix, size_t size)
{
getPCache(prefix).deletedJournal(size);
}
void Cache::deletedObject(const bf::path &prefix, const string &key, size_t size)
{
getPCache(prefix).deletedObject(key, size);
} }
void Cache::setMaxCacheSize(size_t size) void Cache::setMaxCacheSize(size_t size)
{ {
boost::unique_lock<boost::mutex> s(lru_mutex); boost::unique_lock<boost::mutex> s(lru_mutex);
if (size < maxCacheSize)
_makeSpace(maxCacheSize - size);
maxCacheSize = size; maxCacheSize = size;
for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it)
it->second->setMaxCacheSize(size);
} }
void Cache::makeSpace(size_t size) void Cache::makeSpace(const bf::path &prefix, size_t size)
{ {
boost::unique_lock<boost::mutex> s(lru_mutex); getPCache(prefix).makeSpace(size);
_makeSpace(size);
} }
size_t Cache::getMaxCacheSize() const size_t Cache::getMaxCacheSize() const
@@ -485,188 +211,101 @@ size_t Cache::getMaxCacheSize() const
return maxCacheSize; return maxCacheSize;
} }
// call this holding lru_mutex void Cache::rename(const bf::path &prefix, const string &oldKey, const string &newKey, ssize_t sizediff)
void Cache::_makeSpace(size_t size)
{ {
ssize_t thisMuch = currentCacheSize + size - maxCacheSize; getPCache(prefix).rename(oldKey, newKey, sizediff);
if (thisMuch <= 0)
return;
LRU_t::iterator it;
while (thisMuch > 0 && !lru.empty())
{
it = lru.begin();
// find the first element not being either read() right now or being processed by another
// makeSpace() call.
while (it != lru.end())
{
// make sure it's not currently being read or being flushed by another _makeSpace() call
if ((doNotEvict.find(it) == doNotEvict.end()) && (toBeDeleted.find(it) == toBeDeleted.end()))
break;
++it;
}
if (it == lru.end())
{
// nothing can be deleted right now
return;
} }
if (!bf::exists(prefix / *it)) int Cache::ifExistsThenDelete(const bf::path &prefix, const string &key)
cout << prefix / *it << " doesn't exist, WTF?" << endl; // ran into this a couple times, still happens as of commit 948ee1aa5
assert(bf::exists(prefix / *it));
/*
tell Synchronizer that this key will be evicted
delete the file
remove it from our structs
update current size
*/
//logger->log(LOG_WARNING, "Cache: flushing!");
toBeDeleted.insert(it);
string key = *it; // need to make a copy; it could get changed after unlocking.
lru_mutex.unlock();
try
{ {
Synchronizer::get()->flushObject(key); return getPCache(prefix).ifExistsThenDelete(key);
}
catch (...)
{
// it gets logged by Sync
lru_mutex.lock();
toBeDeleted.erase(it);
continue;
}
lru_mutex.lock();
// check doNotEvict again in case this object is now being read
if (doNotEvict.find(it) == doNotEvict.end())
{
bf::path cachedFile = prefix / *it;
m_lru.erase(*it);
toBeDeleted.erase(it);
lru.erase(it);
size_t newSize = bf::file_size(cachedFile);
replicator->remove(cachedFile, Replicator::LOCAL_ONLY);
if (newSize < currentCacheSize)
{
currentCacheSize -= newSize;
thisMuch -= newSize;
}
else
{
logger->log(LOG_WARNING, "Cache::makeSpace(): accounting error. Almost wrapped currentCacheSize on flush.");
currentCacheSize = 0;
thisMuch = 0;
}
}
else
toBeDeleted.erase(it);
}
} }
void Cache::rename(const string &oldKey, const string &newKey, ssize_t sizediff) size_t Cache::getCurrentCacheSize()
{ {
// rename it in the LRU size_t totalSize = 0;
// erase/insert to rehash it everywhere else
boost::unique_lock<boost::mutex> s(lru_mutex); boost::unique_lock<boost::mutex> s(lru_mutex);
auto it = m_lru.find(oldKey);
if (it == m_lru.end())
return;
auto lit = it->lit; for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it)
m_lru.erase(it); totalSize += it->second->getCurrentCacheSize();
int refCount = 0; return totalSize;
auto dne_it = doNotEvict.find(lit);
if (dne_it != doNotEvict.end())
{
refCount = dne_it->refCount;
doNotEvict.erase(dne_it);
} }
auto tbd_it = toBeDeleted.find(lit); size_t Cache::getCurrentCacheElementCount()
bool hasTBDEntry = (tbd_it != toBeDeleted.end());
if (hasTBDEntry)
toBeDeleted.erase(tbd_it);
*lit = newKey;
if (hasTBDEntry)
toBeDeleted.insert(lit);
if (refCount != 0)
{ {
pair<DNE_t::iterator, bool> dne_tmp = doNotEvict.insert(lit); size_t totalCount = 0;
const_cast<DNEElement &>(*(dne_tmp.first)).refCount = refCount;
for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it)
totalCount += it->second->getCurrentCacheElementCount();
return totalCount;
} }
m_lru.insert(lit); size_t Cache::getCurrentCacheSize(const bf::path &prefix)
currentCacheSize += sizediff; {
return getPCache(prefix).getCurrentCacheSize();
} }
int Cache::ifExistsThenDelete(const string &key) size_t Cache::getCurrentCacheElementCount(const bf::path &prefix)
{ {
bf::path cachedPath = prefix / key; return getPCache(prefix).getCurrentCacheElementCount();
bf::path journalPath = journalPrefix / (key + ".journal");
boost::unique_lock<boost::mutex> s(lru_mutex);
bool objectExists = false;
auto it = m_lru.find(key);
if (it != m_lru.end())
{
if (toBeDeleted.find(it->lit) == toBeDeleted.end())
{
doNotEvict.erase(it->lit);
lru.erase(it->lit);
m_lru.erase(it);
objectExists = true;
}
else // let makeSpace() delete it if it's already in progress
return 0;
}
bool journalExists = bf::exists(journalPath);
//assert(objectExists == bf::exists(cachedPath));
size_t objectSize = (objectExists ? bf::file_size(cachedPath) : 0);
//size_t objectSize = (objectExists ? MetadataFile::getLengthFromKey(key) : 0);
size_t journalSize = (journalExists ? bf::file_size(journalPath) : 0);
currentCacheSize -= (objectSize + journalSize);
//assert(!objectExists || objectSize == bf::file_size(cachedPath));
return (objectExists ? 1 : 0) | (journalExists ? 2 : 0);
}
size_t Cache::getCurrentCacheSize() const
{
return currentCacheSize;
}
size_t Cache::getCurrentCacheElementCount() const
{
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(m_lru.size() == lru.size());
return m_lru.size();
} }
void Cache::reset() void Cache::reset()
{ {
boost::unique_lock<boost::mutex> s(lru_mutex); boost::unique_lock<boost::mutex> s(lru_mutex);
m_lru.clear();
lru.clear();
toBeDeleted.clear();
doNotEvict.clear();
bf::directory_iterator dir; for (auto it = prefixCaches.begin(); it != prefixCaches.end(); ++it)
bf::directory_iterator dend; it->second->reset();
for (dir = bf::directory_iterator(prefix); dir != dend; ++dir) }
bf::remove_all(dir->path());
for (dir = bf::directory_iterator(journalPrefix); dir != dend; ++dir) void Cache::newPrefix(const bf::path &prefix)
bf::remove_all(dir->path()); {
currentCacheSize = 0; boost::unique_lock<boost::mutex> s(lru_mutex);
//cerr << "Cache: making new prefix " << prefix.string() << endl;
assert(prefixCaches.find(prefix) == prefixCaches.end());
prefixCaches[prefix] = NULL;
s.unlock();
PrefixCache *pCache = new PrefixCache(prefix);
s.lock();
prefixCaches[prefix] = pCache;
}
void Cache::dropPrefix(const bf::path &prefix)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
auto *pCache = prefixCaches[prefix];
prefixCaches.erase(prefix);
s.unlock();
delete pCache;
}
inline PrefixCache & Cache::getPCache(const bf::path &prefix)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
//cerr << "Getting pcache for " << prefix.string() << endl;
PrefixCache *ret;
auto it = prefixCaches.find(prefix);
assert(it != prefixCaches.end());
ret = it->second;
while (ret == NULL) // wait for the new PC to init
{
s.unlock();
sleep(1);
s.lock();
ret = prefixCaches[prefix];
}
return *ret;
}
Downloader * Cache::getDownloader() const
{
return downloader.get();
} }
void Cache::shutdown() void Cache::shutdown()
@@ -675,48 +314,4 @@ void Cache::shutdown()
downloader.reset(); downloader.reset();
} }
/* The helper classes */
Cache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k)
{}
Cache::M_LRU_element_t::M_LRU_element_t(const string &k) : key(&k)
{}
Cache::M_LRU_element_t::M_LRU_element_t(const LRU_t::iterator &i) : key(&(*i)), lit(i)
{}
inline size_t Cache::KeyHasher::operator()(const M_LRU_element_t &l) const
{
return hash<string>()(*(l.key));
} }
inline bool Cache::KeyEquals::operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const
{
return (*(l1.key) == *(l2.key));
}
inline size_t Cache::DNEHasher::operator()(const DNEElement &l) const
{
return (l.sKey.empty() ? hash<string>()(*(l.key)) : hash<string>()(l.sKey));
}
inline bool Cache::DNEEquals::operator()(const DNEElement &l1, const DNEElement &l2) const
{
const string *s1, *s2;
s1 = l1.sKey.empty() ? &(*(l1.key)) : &(l1.sKey);
s2 = l2.sKey.empty() ? &(*(l2.key)) : &(l2.sKey);
return (*s1 == *s2);
}
inline bool Cache::TBDLess::operator()(const LRU_t::iterator &i1, const LRU_t::iterator &i2) const
{
return *i1 < *i2;
}
}

View File

@@ -1,15 +1,19 @@
#ifndef CACHE_H_ #ifndef CACHE_H_
#define CACHE_H_ #define CACHE_H_
/* PrefixCache manages the cache for one prefix managed by SM.
Cache is a map of prefix -> Cache, and holds the items
that should be centralized like the Downloader
*/
#include "Downloader.h" #include "Downloader.h"
#include "SMLogging.h" #include "SMLogging.h"
#include "Replicator.h" #include "PrefixCache.h"
#include <string> #include <string>
#include <vector> #include <vector>
#include <list> #include <map>
#include <unordered_set>
#include <boost/utility.hpp> #include <boost/utility.hpp>
#include <boost/filesystem/path.hpp> #include <boost/filesystem/path.hpp>
#include <boost/thread/mutex.hpp> #include <boost/thread/mutex.hpp>
@@ -26,120 +30,66 @@ class Cache : public boost::noncopyable
//reading fcns //reading fcns
// read() marks objects to be read s.t. they do not get flushed. // read() marks objects to be read s.t. they do not get flushed.
// after reading them, unlock the 'logical file', and call doneReading(). // after reading them, unlock the 'logical file', and call doneReading().
void read(const std::vector<std::string> &keys); void read(const boost::filesystem::path &prefix, const std::vector<std::string> &keys);
void doneReading(const std::vector<std::string> &keys); void doneReading(const boost::filesystem::path &prefix, const std::vector<std::string> &keys);
bool exists(const std::string &key) const; bool exists(const boost::filesystem::path &prefix, const std::string &key);
void exists(const std::vector<std::string> &keys, std::vector<bool> *out) const; void exists(const boost::filesystem::path &prefix, const std::vector<std::string> &keys,
std::vector<bool> *out);
// writing fcns // writing fcns
// new*() fcns tell the cache data was added. After writing a set of objects, // new*() fcns tell the Cache data was added. After writing a set of objects,
// unlock the 'logical file', and call doneWriting(). // unlock the 'logical file', and call doneWriting().
void newObject(const std::string &key, size_t size); void newObject(const boost::filesystem::path &prefix, const std::string &key, size_t size);
void newJournalEntry(size_t size); void newJournalEntry(const boost::filesystem::path &prefix, size_t size);
void doneWriting(); void doneWriting(const boost::filesystem::path &prefix);
void deletedObject(const std::string &key, size_t size); void deletedObject(const boost::filesystem::path &prefix, const std::string &key, size_t size);
void deletedJournal(size_t size); void deletedJournal(const boost::filesystem::path &prefix, size_t size);
// an 'atomic' existence check & delete. Covers the object and journal. Does not delete the files. // an 'atomic' existence check & delete. Covers the object and journal. Does not delete the files.
// returns 0 if it didn't exist, 1 if the object exists, 2 if the journal exists, and 3 (1 | 2) if both exist // returns 0 if it didn't exist, 1 if the object exists, 2 if the journal exists, and 3 (1 | 2) if both exist
// This should be called while holding the file lock for key because it touches the journal file. // This should be called while holding the file lock for key because it touches the journal file.
int ifExistsThenDelete(const std::string &key); int ifExistsThenDelete(const boost::filesystem::path &prefix, const std::string &key);
// rename is used when an old obj gets merged with its journal file // rename is used when an old obj gets merged with its journal file
// the size will change in that process; sizediff is by how much // the size will change in that process; sizediff is by how much
void rename(const std::string &oldKey, const std::string &newKey, ssize_t sizediff); void rename(const boost::filesystem::path &prefix, const std::string &oldKey,
const std::string &newKey, ssize_t sizediff);
void setMaxCacheSize(size_t size); void setMaxCacheSize(size_t size);
void makeSpace(size_t size); void makeSpace(const boost::filesystem::path &prefix, size_t size);
size_t getCurrentCacheSize() const; size_t getCurrentCacheSize();
size_t getCurrentCacheElementCount() const; size_t getCurrentCacheElementCount();
size_t getCurrentCacheSize(const boost::filesystem::path &prefix);
size_t getCurrentCacheElementCount(const boost::filesystem::path &prefix);
size_t getMaxCacheSize() const; size_t getMaxCacheSize() const;
Downloader * getDownloader() const;
void newPrefix(const boost::filesystem::path &prefix);
void dropPrefix(const boost::filesystem::path &prefix);
void shutdown(); void shutdown();
// test helpers // test helpers
const boost::filesystem::path &getCachePath(); const boost::filesystem::path &getCachePath() const;
const boost::filesystem::path &getJournalPath(); const boost::filesystem::path &getJournalPath() const;
// this will delete everything in the cache and journal paths, and empty all Cache structures. const boost::filesystem::path getCachePath(const boost::filesystem::path &prefix) const;
const boost::filesystem::path getJournalPath(const boost::filesystem::path &prefix) const;
// this will delete everything in the Cache and journal paths, and empty all Cache structures.
void reset(); void reset();
void validateCacheSize(); void validateCacheSize();
private: private:
Cache(); Cache();
boost::filesystem::path prefix; SMLogging *logger;
boost::filesystem::path cachePrefix;
boost::filesystem::path journalPrefix; boost::filesystem::path journalPrefix;
size_t maxCacheSize; size_t maxCacheSize;
size_t objectSize; size_t objectSize;
size_t currentCacheSize;
boost::scoped_ptr<Downloader> downloader; boost::scoped_ptr<Downloader> downloader;
Replicator *replicator;
SMLogging *logger;
void populate(); PrefixCache & getPCache(const boost::filesystem::path &prefix);
void _makeSpace(size_t size);
/* The main cache structures */ std::map<boost::filesystem::path, PrefixCache *> prefixCaches;
// lru owns the string memory for the filenames it manages. m_lru and DNE point to those strings. mutable boost::mutex lru_mutex; // protects the prefixCaches
typedef std::list<std::string> LRU_t;
LRU_t lru;
struct M_LRU_element_t
{
M_LRU_element_t(const std::string &);
M_LRU_element_t(const std::string *);
M_LRU_element_t(const LRU_t::iterator &);
const std::string *key;
LRU_t::iterator lit;
};
struct KeyHasher
{
size_t operator()(const M_LRU_element_t &l) const;
};
struct KeyEquals
{
bool operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const;
};
typedef std::unordered_set<M_LRU_element_t, KeyHasher, KeyEquals> M_LRU_t;
M_LRU_t m_lru; // the LRU entries as a hash table
/* The do-not-evict list stuff. */
struct DNEElement
{
DNEElement(const LRU_t::iterator &);
DNEElement(const std::string &);
LRU_t::iterator key;
std::string sKey;
uint refCount;
};
struct DNEHasher
{
size_t operator()(const DNEElement &d) const;
};
struct DNEEquals
{
bool operator()(const DNEElement &d1, const DNEElement &d2) const;
};
typedef std::unordered_set<DNEElement, DNEHasher, DNEEquals> DNE_t;
DNE_t doNotEvict;
void addToDNE(const DNEElement &);
void removeFromDNE(const DNEElement &);
// the to-be-deleted set. Elements removed from the LRU but not yet deleted will be here.
// Elements are inserted and removed by makeSpace(). If read() references a file that is in this,
// it will remove it, signalling to makeSpace that it should not be deleted
struct TBDLess
{
bool operator()(const LRU_t::iterator &, const LRU_t::iterator &) const;
};
typedef std::set<LRU_t::iterator, TBDLess> TBD_t;
TBD_t toBeDeleted;
mutable boost::mutex lru_mutex; // protects the main cache structures & the do-not-evict set
}; };

View File

@@ -7,6 +7,7 @@
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
using namespace std; using namespace std;
namespace bf = boost::filesystem;
namespace storagemanager namespace storagemanager
{ {
@@ -33,107 +34,24 @@ Downloader::~Downloader()
{ {
} }
void Downloader::useThisLock(boost::mutex *mutex) void Downloader::download(const vector<const string *> &keys, vector<int> *errnos, vector<size_t> *sizes,
{ const bf::path &prefix, boost::mutex *cache_lock)
lock = mutex;
}
/*
inline boost::mutex * Downloader::getDownloadMutex()
{
return &download_mutex;
}
*/
#if 0
/* This is another fcn with a bug too subtle to identify right now. Writing up a simplified
version; we can revisit later if necessary */
void Downloader::download(const vector<const string *> &keys, vector<int> *errnos, vector<size_t> *sizes)
{
uint counter = keys.size();
boost::condition condvar;
boost::mutex m;
DownloadListener listener(&counter, &condvar, &m);
boost::shared_ptr<Download> dls[keys.size()];
bool inserted[keys.size()];
Downloads_t::iterator iterators[keys.size()];
uint i;
for (i = 0; i < keys.size(); i++)
dls[i].reset(new Download(keys[i], this));
boost::unique_lock<boost::mutex> s(download_mutex);
for (i = 0; i < keys.size(); i++)
{
auto &dl = dls[i];
pair<Downloads_t::iterator, bool> ret = downloads.insert(dl);
iterators[i] = ret.first;
inserted[i] = ret.second;
if (inserted[i])
{
dl->listeners.push_back(&listener);
workers.addJob(dl);
}
else
{
dl = *(ret.first); // point to the existing download. Redundant with the iterators array. Don't care yet.
(*iterators[i])->listeners.push_back(&listener);
}
}
// wait for the downloads to finish
boost::unique_lock<boost::mutex> dl_lock(m);
s.unlock();
while (counter > 0)
condvar.wait(dl_lock);
dl_lock.unlock();
// remove the entries inserted by this call
sizes->resize(keys.size());
s.lock();
for (i = 0; i < keys.size(); i++)
{
if (inserted[i])
{
(*sizes)[i] = (*iterators[i])->size;
downloads.erase(iterators[i]);
}
else
(*sizes)[i] = 0;
}
s.unlock();
// check for errors & propagate
errnos->resize(keys.size());
char buf[80];
for (i = 0; i < keys.size(); i++)
{
auto &dl = dls[i];
(*errnos)[i] = dl->dl_errno;
if (dl->dl_errno != 0)
logger->log(LOG_ERR, "Downloader: failed to download %s, got '%s'", keys[i]->c_str(), strerror_r(dl->dl_errno, buf, 80));
}
}
#endif
void Downloader::download(const vector<const string *> &keys, vector<int> *errnos, vector<size_t> *sizes)
{ {
uint counter = keys.size(); uint counter = keys.size();
boost::condition condvar; boost::condition condvar;
DownloadListener listener(&counter, &condvar); DownloadListener listener(&counter, &condvar);
vector<boost::shared_ptr<Download> > ownedDownloads(keys.size()); vector<boost::shared_ptr<Download> > ownedDownloads(keys.size());
// the caller is holding a mutex // the caller is holding cache_lock
/* if a key is already being downloaded, attach listener to that Download instance. /* if a key is already being downloaded, attach listener to that Download instance.
if it is not already being downloaded, make a new Download instance. if it is not already being downloaded, make a new Download instance.
wait for the listener to tell us that it's done. wait for the listener to tell us that it's done.
*/ */
boost::unique_lock<boost::mutex> s(lock);
for (uint i = 0; i < keys.size(); i++) for (uint i = 0; i < keys.size(); i++)
{ {
boost::shared_ptr<Download> newDL(new Download(*keys[i], downloadPath, lock)); boost::shared_ptr<Download> newDL(new Download(*keys[i], prefix, cache_lock));
auto it = downloads.find(newDL); // kinda sucks to have to search this way. auto it = downloads.find(newDL); // kinda sucks to have to search this way.
if (it == downloads.end()) if (it == downloads.end())
{ {
@@ -157,15 +75,17 @@ void Downloader::download(const vector<const string *> &keys, vector<int> *errno
(*it)->listeners.push_back(&listener); (*it)->listeners.push_back(&listener);
} }
} }
s.unlock();
// wait for the downloads to finish // wait for the downloads to finish
while (counter > 0) while (counter > 0)
condvar.wait(*lock); condvar.wait(*cache_lock);
// check success, gather sizes from downloads started by this thread // check success, gather sizes from downloads started by this thread
sizes->resize(keys.size()); sizes->resize(keys.size());
errnos->resize(keys.size()); errnos->resize(keys.size());
char buf[80]; char buf[80];
s.lock();
for (uint i = 0; i < keys.size(); i++) for (uint i = 0; i < keys.size(); i++)
{ {
if (ownedDownloads[i]) if (ownedDownloads[i])
@@ -189,19 +109,21 @@ void Downloader::download(const vector<const string *> &keys, vector<int> *errno
bool Downloader::inProgress(const string &key) bool Downloader::inProgress(const string &key)
{ {
boost::shared_ptr<Download> tmp(new Download(key)); boost::shared_ptr<Download> tmp(new Download(key));
boost::unique_lock<boost::mutex> s(lock);
auto it = downloads.find(tmp); auto it = downloads.find(tmp);
if (it != downloads.end()) if (it != downloads.end())
return !(*it)->finished; return !(*it)->finished;
return false; return false;
} }
void Downloader::setDownloadPath(const string &path) void Downloader::setDownloadPath(const bf::path &path)
{ {
downloadPath = path; //downloadPath = path;
} }
/* The helper fcns */ /* The helper fcns */
Downloader::Download::Download(const string &source, const string &_dlPath, boost::mutex *_lock) : Downloader::Download::Download(const string &source, const bf::path &_dlPath, boost::mutex *_lock) :
dlPath(_dlPath), key(source), dl_errno(0), size(0), lock(_lock), finished(false), itRan(false) dlPath(_dlPath), key(source), dl_errno(0), size(0), lock(_lock), finished(false), itRan(false)
{ {
} }
@@ -222,11 +144,12 @@ void Downloader::Download::operator()()
CloudStorage *storage = CloudStorage::get(); CloudStorage *storage = CloudStorage::get();
// TODO: we should have it download to a tmp path, then mv it to the cache dir to avoid // TODO: we should have it download to a tmp path, then mv it to the cache dir to avoid
// Cache seeing an incomplete download after a crash // Cache seeing an incomplete download after a crash
int err = storage->getObject(key, (dlPath / key).string(), &size); int err = storage->getObject(key, (dlPath / key).string(), &size);
if (err != 0) if (err != 0)
{ {
dl_errno = errno; dl_errno = errno;
boost::filesystem::remove(dlPath / key); bf::remove(dlPath / key);
size = 0; size = 0;
} }

View File

@@ -25,15 +25,16 @@ class Downloader
// caller owns the memory for the strings. // caller owns the memory for the strings.
// errors are reported through errnos // errors are reported through errnos
void download(const std::vector<const std::string *> &keys, std::vector<int> *errnos, std::vector<size_t> *sizes); void download(const std::vector<const std::string *> &keys,
void setDownloadPath(const std::string &path); std::vector<int> *errnos, std::vector<size_t> *sizes, const boost::filesystem::path &prefix,
void useThisLock(boost::mutex *); boost::mutex *lock);
bool inProgress(const std::string &); void setDownloadPath(const boost::filesystem::path &path);
bool inProgress(const std::string &); // call this holding the cache's lock
private: private:
uint maxDownloads; uint maxDownloads;
std::string downloadPath; //boost::filesystem::path downloadPath;
boost::mutex *lock; boost::mutex lock;
class DownloadListener class DownloadListener
{ {
@@ -51,7 +52,7 @@ class Downloader
*/ */
struct Download : public ThreadPool::Job struct Download : public ThreadPool::Job
{ {
Download(const std::string &source, const std::string &_dlPath, boost::mutex *_lock); Download(const std::string &source, const boost::filesystem::path &_dlPath, boost::mutex *);
Download(const std::string &source); Download(const std::string &source);
~Download(); ~Download();
void operator()(); void operator()();

View File

@@ -80,11 +80,6 @@ IOCoordinator * IOCoordinator::get()
return ioc; return ioc;
} }
void IOCoordinator::willRead(const char *, off_t, size_t)
{
// not sure we will implement this.
}
int IOCoordinator::loadObject(int fd, uint8_t *data, off_t offset, size_t length) const int IOCoordinator::loadObject(int fd, uint8_t *data, off_t offset, size_t length) const
{ {
size_t count = 0; size_t count = 0;
@@ -135,7 +130,8 @@ ssize_t IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset,
release read lock release read lock
put together the response in data put together the response in data
*/ */
bf::path p(_filename); bf::path p = ownership.get(_filename);
const bf::path firstDir = *(p.begin());
const char *filename = p.string().c_str(); const char *filename = p.string().c_str();
ScopedReadLock fileLock(this, filename); ScopedReadLock fileLock(this, filename);
@@ -158,7 +154,7 @@ ssize_t IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset,
vector<string> keys; vector<string> keys;
for (const auto &object : relevants) for (const auto &object : relevants)
keys.push_back(object.key); keys.push_back(object.key);
cache->read(keys); cache->read(firstDir, keys);
// open the journal files and objects that exist to prevent them from being // open the journal files and objects that exist to prevent them from being
// deleted mid-operation // deleted mid-operation
@@ -169,11 +165,11 @@ ssize_t IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset,
// later. not thinking about it for now. // later. not thinking about it for now.
// open all of the journal files that exist // open all of the journal files that exist
string filename = (journalPath/(key + ".journal")).string(); string jFilename = (journalPath/firstDir/(key + ".journal")).string();
int fd = ::open(filename.c_str(), O_RDONLY); int fd = ::open(jFilename.c_str(), O_RDONLY);
if (fd >= 0) if (fd >= 0)
{ {
keyToJournalName[key] = filename; keyToJournalName[key] = jFilename;
journalFDs[key] = fd; journalFDs[key] = fd;
fdMinders[mindersIndex++].fd = fd; fdMinders[mindersIndex++].fd = fd;
//fdMinders.push_back(SharedCloser(fd)); //fdMinders.push_back(SharedCloser(fd));
@@ -182,27 +178,27 @@ ssize_t IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset,
{ {
int l_errno = errno; int l_errno = errno;
fileLock.unlock(); fileLock.unlock();
cache->doneReading(keys); cache->doneReading(firstDir, keys);
logger->log(LOG_CRIT, "IOCoordinator::read(): Got an unexpected error opening %s, error was '%s'", logger->log(LOG_CRIT, "IOCoordinator::read(): Got an unexpected error opening %s, error was '%s'",
filename.c_str(), strerror_r(l_errno, buf, 80)); jFilename.c_str(), strerror_r(l_errno, buf, 80));
errno = l_errno; errno = l_errno;
return -1; return -1;
} }
// open all of the objects // open all of the objects
filename = (cachePath/key).string(); string oFilename = (cachePath/firstDir/key).string();
fd = ::open(filename.c_str(), O_RDONLY); fd = ::open(oFilename.c_str(), O_RDONLY);
if (fd < 0) if (fd < 0)
{ {
int l_errno = errno; int l_errno = errno;
fileLock.unlock(); fileLock.unlock();
cache->doneReading(keys); cache->doneReading(firstDir, keys);
logger->log(LOG_CRIT, "IOCoordinator::read(): Got an unexpected error opening %s, error was '%s'", logger->log(LOG_CRIT, "IOCoordinator::read(): Got an unexpected error opening %s, error was '%s'",
filename.c_str(), strerror_r(l_errno, buf, 80)); oFilename.c_str(), strerror_r(l_errno, buf, 80));
errno = l_errno; errno = l_errno;
return -1; return -1;
} }
keyToObjectName[key] = filename; keyToObjectName[key] = oFilename;
objectFDs[key] = fd; objectFDs[key] = fd;
fdMinders[mindersIndex++].fd = fd; fdMinders[mindersIndex++].fd = fd;
//fdMinders.push_back(SharedCloser(fd)); //fdMinders.push_back(SharedCloser(fd));
@@ -239,7 +235,7 @@ ssize_t IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset,
if (err) if (err)
{ {
fileLock.unlock(); fileLock.unlock();
cache->doneReading(keys); cache->doneReading(firstDir, keys);
if (count == 0) if (count == 0)
return -1; return -1;
else else
@@ -251,24 +247,26 @@ ssize_t IOCoordinator::read(const char *_filename, uint8_t *data, off_t offset,
out: out:
fileLock.unlock(); fileLock.unlock();
cache->doneReading(keys); cache->doneReading(firstDir, keys);
// all done // all done
return count; return count;
} }
ssize_t IOCoordinator::write(const char *_filename, const uint8_t *data, off_t offset, size_t length) ssize_t IOCoordinator::write(const char *_filename, const uint8_t *data, off_t offset, size_t length)
{ {
bf::path p(_filename); bf::path p = ownership.get(_filename);
const bf::path firstDir = *(p.begin());
const char *filename = p.string().c_str(); const char *filename = p.string().c_str();
ScopedWriteLock lock(this, filename); ScopedWriteLock lock(this, filename);
int ret = _write(filename, data, offset, length); int ret = _write(filename, data, offset, length, firstDir);
lock.unlock(); lock.unlock();
cache->doneWriting(); cache->doneWriting(firstDir);
return ret; return ret;
} }
ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offset, size_t length) ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t offset, size_t length,
const bf::path &firstDir)
{ {
int err = 0; int err = 0;
ssize_t count = 0; ssize_t count = 0;
@@ -311,7 +309,7 @@ ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t o
} }
//cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); //cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE);
err = replicator->addJournalEntry(i->key.c_str(),&data[count],objectOffset,writeLength); err = replicator->addJournalEntry((firstDir/i->key).string().c_str(),&data[count],objectOffset,writeLength);
assert((uint) err == writeLength); assert((uint) err == writeLength);
if (err <= 0) if (err <= 0)
@@ -328,9 +326,9 @@ ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t o
if ((writeLength + objectOffset) > i->length) if ((writeLength + objectOffset) > i->length)
metadata.updateEntryLength(i->offset, (writeLength + objectOffset)); metadata.updateEntryLength(i->offset, (writeLength + objectOffset));
cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE); cache->newJournalEntry(firstDir, writeLength+JOURNAL_ENTRY_HEADER_SIZE);
synchronizer->newJournalEntry(i->key, writeLength+JOURNAL_ENTRY_HEADER_SIZE); synchronizer->newJournalEntry(firstDir, i->key, writeLength+JOURNAL_ENTRY_HEADER_SIZE);
count += writeLength; count += writeLength;
dataRemaining -= writeLength; dataRemaining -= writeLength;
} }
@@ -364,7 +362,7 @@ ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t o
metadataObject newObject = metadata.addMetadataObject(filename,(writeLength + objectOffset)); metadataObject newObject = metadata.addMetadataObject(filename,(writeLength + objectOffset));
// send to replicator // send to replicator
err = replicator->newObject(newObject.key.c_str(),&data[count],objectOffset,writeLength); err = replicator->newObject((firstDir/newObject.key).string().c_str(),&data[count],objectOffset,writeLength);
assert((uint) err == writeLength); assert((uint) err == writeLength);
if (err <= 0) if (err <= 0)
{ {
@@ -377,13 +375,13 @@ ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t o
//log error and abort //log error and abort
} }
cache->newObject(newObject.key,writeLength + objectOffset); cache->newObject(firstDir, newObject.key,writeLength + objectOffset);
newObjectKeys.push_back(newObject.key); newObjectKeys.push_back(newObject.key);
count += writeLength; count += writeLength;
dataRemaining -= writeLength; dataRemaining -= writeLength;
} }
synchronizer->newObjects(newObjectKeys); synchronizer->newObjects(firstDir, newObjectKeys);
replicator->updateMetadata(filename, metadata); replicator->updateMetadata(filename, metadata);
@@ -392,7 +390,8 @@ ssize_t IOCoordinator::_write(const char *filename, const uint8_t *data, off_t o
ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t length) ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t length)
{ {
bf::path p(_filename); bf::path p = ownership.get(_filename);
const bf::path firstDir = *(p.begin());
const char *filename = p.string().c_str(); const char *filename = p.string().c_str();
int err; int err;
@@ -430,7 +429,7 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t
//cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE); //cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE);
err = replicator->addJournalEntry(i->key.c_str(),&data[count],i->length,writeLength); err = replicator->addJournalEntry((firstDir/i->key).string().c_str(),&data[count],i->length,writeLength);
assert((uint) err == writeLength); assert((uint) err == writeLength);
if (err <= 0) if (err <= 0)
{ {
@@ -441,9 +440,9 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t
} }
metadata.updateEntryLength(i->offset, (writeLength + i->length)); metadata.updateEntryLength(i->offset, (writeLength + i->length));
cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE); cache->newJournalEntry(firstDir, writeLength+JOURNAL_ENTRY_HEADER_SIZE);
synchronizer->newJournalEntry(i->key, writeLength+JOURNAL_ENTRY_HEADER_SIZE); synchronizer->newJournalEntry(firstDir, i->key, writeLength+JOURNAL_ENTRY_HEADER_SIZE);
count += writeLength; count += writeLength;
dataRemaining -= writeLength; dataRemaining -= writeLength;
} }
@@ -466,7 +465,7 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t
metadataObject newObject = metadata.addMetadataObject(filename,writeLength); metadataObject newObject = metadata.addMetadataObject(filename,writeLength);
// write the new object // write the new object
err = replicator->newObject(newObject.key.c_str(),&data[count],0,writeLength); err = replicator->newObject((firstDir/newObject.key).string().c_str(),&data[count],0,writeLength);
assert((uint) err == writeLength); assert((uint) err == writeLength);
if (err <= 0) if (err <= 0)
{ {
@@ -477,20 +476,20 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t
goto out; goto out;
//log error and abort //log error and abort
} }
cache->newObject(newObject.key,writeLength); cache->newObject(firstDir, newObject.key,writeLength);
newObjectKeys.push_back(newObject.key); newObjectKeys.push_back(newObject.key);
count += writeLength; count += writeLength;
dataRemaining -= writeLength; dataRemaining -= writeLength;
} }
synchronizer->newObjects(newObjectKeys); synchronizer->newObjects(firstDir, newObjectKeys);
replicator->updateMetadata(filename, metadata); replicator->updateMetadata(filename, metadata);
// had to add this hack to prevent deadlock // had to add this hack to prevent deadlock
out: out:
// need to release the file lock before telling Cache that we're done writing. // need to release the file lock before telling Cache that we're done writing.
lock.unlock(); lock.unlock();
cache->doneWriting(); cache->doneWriting(firstDir);
return count; return count;
} }
@@ -498,26 +497,28 @@ out:
// TODO: might need to support more open flags, ex: O_EXCL // TODO: might need to support more open flags, ex: O_EXCL
int IOCoordinator::open(const char *_filename, int openmode, struct stat *out) int IOCoordinator::open(const char *_filename, int openmode, struct stat *out)
{ {
bf::path p = _filename; // normalize it bf::path p = ownership.get(_filename);
const char *filename = p.string().c_str(); const char *filename = p.string().c_str();
ScopedReadLock s(this, filename); boost::scoped_ptr<ScopedFileLock> s;
if (openmode & O_CREAT || openmode | O_TRUNC)
s.reset(new ScopedWriteLock(this, filename));
else
s.reset(new ScopedReadLock(this, filename));
MetadataFile meta(filename, MetadataFile::no_create_t()); MetadataFile meta(filename, MetadataFile::no_create_t());
if ((openmode & O_CREAT) && !meta.exists()) if ((openmode & O_CREAT) && !meta.exists())
replicator->updateMetadata(filename, meta); // this will end up creating filename replicator->updateMetadata(filename, meta); // this will end up creating filename
if ((openmode & O_TRUNC) && meta.exists()) if ((openmode & O_TRUNC) && meta.exists())
{ _truncate(p, 0, s.get());
s.unlock();
truncate(filename, 0);
s.lock();
}
return meta.stat(out); return meta.stat(out);
} }
int IOCoordinator::listDirectory(const char *dirname, vector<string> *listing) int IOCoordinator::listDirectory(const char *dirname, vector<string> *listing)
{ {
bf::path p(metaPath / dirname); bf::path p(metaPath / ownership.get(dirname));
listing->clear(); listing->clear();
if (!bf::exists(p)) if (!bf::exists(p))
@@ -544,7 +545,7 @@ int IOCoordinator::listDirectory(const char *dirname, vector<string> *listing)
int IOCoordinator::stat(const char *_path, struct stat *out) int IOCoordinator::stat(const char *_path, struct stat *out)
{ {
bf::path p(_path); bf::path p = ownership.get(_path);
const char *path = p.string().c_str(); const char *path = p.string().c_str();
if (bf::is_directory(metaPath/p)) if (bf::is_directory(metaPath/p))
@@ -556,6 +557,16 @@ int IOCoordinator::stat(const char *_path, struct stat *out)
} }
int IOCoordinator::truncate(const char *_path, size_t newSize) int IOCoordinator::truncate(const char *_path, size_t newSize)
{
bf::path p = ownership.get(_path);
const char *path = p.string().c_str();
ScopedWriteLock lock(this, path);
return _truncate(p, newSize, &lock);
}
int IOCoordinator::_truncate(const bf::path &bfpath, size_t newSize, ScopedFileLock *lock)
{ {
/* /*
grab the write lock. grab the write lock.
@@ -568,13 +579,12 @@ int IOCoordinator::truncate(const char *_path, size_t newSize)
tell cache they were deleted tell cache they were deleted
tell synchronizer they were deleted tell synchronizer they were deleted
*/ */
bf::path p(_path); const bf::path firstDir = *(bfpath.begin());
const char *path = p.string().c_str(); const char *path = bfpath.string().c_str();
Synchronizer *synchronizer = Synchronizer::get(); // needs to init sync here to break circular dependency... Synchronizer *synchronizer = Synchronizer::get(); // needs to init sync here to break circular dependency...
int err; int err;
ScopedWriteLock lock(this, path);
MetadataFile meta(path, MetadataFile::no_create_t()); MetadataFile meta(path, MetadataFile::no_create_t());
if (!meta.exists()) if (!meta.exists())
{ {
@@ -590,9 +600,9 @@ int IOCoordinator::truncate(const char *_path, size_t newSize)
if (filesize < newSize) if (filesize < newSize)
{ {
uint8_t zero = 0; uint8_t zero = 0;
err = _write(path, &zero, newSize - 1, 1); err = _write(path, &zero, newSize - 1, 1, firstDir);
lock.unlock(); lock->unlock();
cache->doneWriting(); cache->doneWriting(firstDir);
if (err < 0) if (err < 0)
return -1; return -1;
return 0; return 0;
@@ -620,15 +630,15 @@ int IOCoordinator::truncate(const char *_path, size_t newSize)
vector<string> deletedObjects; vector<string> deletedObjects;
for (; i < objects.size(); ++i) for (; i < objects.size(); ++i)
{ {
int result = cache->ifExistsThenDelete(objects[i].key); int result = cache->ifExistsThenDelete(firstDir, objects[i].key);
if (result & 0x1) if (result & 0x1)
replicator->remove(cachePath / objects[i].key); replicator->remove(cachePath/firstDir/objects[i].key);
if (result & 0x2) if (result & 0x2)
replicator->remove(journalPath / (objects[i].key + ".journal")); replicator->remove(journalPath/firstDir/(objects[i].key + ".journal"));
deletedObjects.push_back(objects[i].key); deletedObjects.push_back(objects[i].key);
} }
if (!deletedObjects.empty()) if (!deletedObjects.empty())
synchronizer->deletedObjects(deletedObjects); synchronizer->deletedObjects(firstDir, deletedObjects);
return 0; return 0;
} }
@@ -647,8 +657,9 @@ void IOCoordinator::deleteMetaFile(const bf::path &file)
Synchronizer *synchronizer = Synchronizer::get(); Synchronizer *synchronizer = Synchronizer::get();
// this is kind of ugly. We need to lock on 'file' relative to metaPath, and without the .meta extension // this is kind of ugly. We need to lock on 'file' relative to metaPath, and without the .meta extension
string pita = file.string().substr(metaPath.string().length()); // get rid of metapath string pita = file.string().substr(metaPath.string().length() + 1); // get rid of metapath
pita = pita.substr(0, pita.length() - 5); // get rid of the extension pita = pita.substr(0, pita.length() - 5); // get rid of the extension
const bf::path firstDir = *(bf::path(pita).begin());
ScopedWriteLock lock(this, pita); ScopedWriteLock lock(this, pita);
//cout << "file is " << file.string() << " locked on " << pita << endl; //cout << "file is " << file.string() << " locked on " << pita << endl;
@@ -661,14 +672,14 @@ void IOCoordinator::deleteMetaFile(const bf::path &file)
for (auto &object : objects) for (auto &object : objects)
{ {
//cout << "deleting " << object.key << endl; //cout << "deleting " << object.key << endl;
int result = cache->ifExistsThenDelete(object.key); int result = cache->ifExistsThenDelete(firstDir, object.key);
if (result & 0x1) if (result & 0x1)
replicator->remove(cachePath/object.key); replicator->remove(cachePath/firstDir/object.key);
if (result & 0x2) if (result & 0x2)
replicator->remove(journalPath/(object.key + ".journal")); replicator->remove(journalPath/firstDir/(object.key + ".journal"));
deletedObjects.push_back(object.key); deletedObjects.push_back(object.key);
} }
synchronizer->deletedObjects(deletedObjects); synchronizer->deletedObjects(firstDir, deletedObjects);
} }
void IOCoordinator::remove(const bf::path &p) void IOCoordinator::remove(const bf::path &p)
@@ -718,7 +729,7 @@ int IOCoordinator::unlink(const char *path)
/* TODO! We need to make sure the input params to IOC fcns don't go up to parent dirs, /* TODO! We need to make sure the input params to IOC fcns don't go up to parent dirs,
ex, if path = '../../../blahblah'. */ ex, if path = '../../../blahblah'. */
bf::path p(metaPath/path); bf::path p(metaPath/ownership.get(path));
try try
{ {
@@ -764,8 +775,10 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
write the new metadata object write the new metadata object
*/ */
const bf::path p1(_filename1); const bf::path p1 = ownership.get(_filename1);
const bf::path p2(_filename2); const bf::path p2 = ownership.get(_filename2);
const bf::path firstDir1 = *(p1.begin());
const bf::path firstDir2 = *(p2.begin());
const char *filename1 = p1.string().c_str(); const char *filename1 = p1.string().c_str();
const char *filename2 = p2.string().c_str(); const char *filename2 = p2.string().c_str();
@@ -815,7 +828,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
{ {
for (const auto &object : objects) for (const auto &object : objects)
{ {
bf::path journalFile = journalPath/(object.key + ".journal"); bf::path journalFile = journalPath/firstDir1/(object.key + ".journal");
metadataObject newObj = meta2.addMetadataObject(filename2, object.length); metadataObject newObj = meta2.addMetadataObject(filename2, object.length);
assert(newObj.offset == object.offset); assert(newObj.offset == object.offset);
// TODO: failure here makes a lot of noise in the log file // TODO: failure here makes a lot of noise in the log file
@@ -826,7 +839,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
if (errno == ENOENT) if (errno == ENOENT)
{ {
// it's not in cloudstorage, see if it's in the cache // it's not in cloudstorage, see if it's in the cache
bf::path cachedObjPath = cachePath/object.key; bf::path cachedObjPath = cachePath/firstDir1/object.key;
bool objExists = bf::exists(cachedObjPath); bool objExists = bf::exists(cachedObjPath);
if (!objExists) if (!objExists)
throw CFException(ENOENT, string("IOCoordinator::copyFile(): source = ") + filename1 + throw CFException(ENOENT, string("IOCoordinator::copyFile(): source = ") + filename1 +
@@ -849,12 +862,12 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
// if there's a journal file for this object, make a copy // if there's a journal file for this object, make a copy
if (bf::exists(journalFile)) if (bf::exists(journalFile))
{ {
bf::path newJournalFile = journalPath/(newObj.key + ".journal"); bf::path newJournalFile = journalPath/firstDir2/(newObj.key + ".journal");
try try
{ {
bf::copy_file(journalFile, newJournalFile); bf::copy_file(journalFile, newJournalFile);
size_t tmp = bf::file_size(newJournalFile); size_t tmp = bf::file_size(newJournalFile);
cache->newJournalEntry(tmp); cache->newJournalEntry(firstDir2, tmp);
newJournalEntries.push_back(pair<string, size_t>(newObj.key, tmp)); newJournalEntries.push_back(pair<string, size_t>(newObj.key, tmp));
} }
catch (bf::filesystem_error &e) catch (bf::filesystem_error &e)
@@ -873,8 +886,8 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
cs->deleteObject(newObject.key); cs->deleteObject(newObject.key);
for (auto &jEntry : newJournalEntries) for (auto &jEntry : newJournalEntries)
{ {
bf::path fullJournalPath = journalPath/(jEntry.first + ".journal"); bf::path fullJournalPath = journalPath/firstDir2/(jEntry.first + ".journal");
cache->deletedJournal(bf::file_size(fullJournalPath)); cache->deletedJournal(firstDir2, bf::file_size(fullJournalPath));
bf::remove(fullJournalPath); bf::remove(fullJournalPath);
} }
errno = e.l_errno; errno = e.l_errno;
@@ -885,7 +898,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
lock2.unlock(); lock2.unlock();
for (auto &jEntry : newJournalEntries) for (auto &jEntry : newJournalEntries)
sync->newJournalEntry(jEntry.first, jEntry.second); sync->newJournalEntry(firstDir2, jEntry.first, jEntry.second);
return 0; return 0;
} }
@@ -1147,18 +1160,12 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size
return 0; return 0;
} }
void IOCoordinator::renameObject(const string &oldKey, const string &newKey)
{
// does anything need to be done here?
}
void IOCoordinator::readLock(const string &filename) void IOCoordinator::readLock(const string &filename)
{ {
boost::unique_lock<boost::mutex> s(lockMutex); boost::unique_lock<boost::mutex> s(lockMutex);
//cout << "read-locking " << filename << endl; //cout << "read-locking " << filename << endl;
//assert(filename[0] == '/'); assert(filename[0] != '/');
auto ins = locks.insert(pair<string, RWLock *>(filename, NULL)); auto ins = locks.insert(pair<string, RWLock *>(filename, NULL));
if (ins.second) if (ins.second)
ins.first->second = new RWLock(); ins.first->second = new RWLock();
@@ -1183,7 +1190,7 @@ void IOCoordinator::writeLock(const string &filename)
boost::unique_lock<boost::mutex> s(lockMutex); boost::unique_lock<boost::mutex> s(lockMutex);
//cout << "write-locking " << filename << endl; //cout << "write-locking " << filename << endl;
//assert(filename[0] == '/'); assert(filename[0] != '/');
auto ins = locks.insert(pair<string, RWLock *>(filename, NULL)); auto ins = locks.insert(pair<string, RWLock *>(filename, NULL));
if (ins.second) if (ins.second)
ins.first->second = new RWLock(); ins.first->second = new RWLock();

View File

@@ -18,6 +18,7 @@
#include "RWLock.h" #include "RWLock.h"
#include "Replicator.h" #include "Replicator.h"
#include "Utilities.h" #include "Utilities.h"
#include "Ownership.h"
namespace storagemanager namespace storagemanager
{ {
@@ -30,7 +31,6 @@ class IOCoordinator : public boost::noncopyable
static IOCoordinator *get(); static IOCoordinator *get();
virtual ~IOCoordinator(); virtual ~IOCoordinator();
void willRead(const char *filename, off_t offset, size_t length);
/* TODO: make read, write, append return a ssize_t */ /* TODO: make read, write, append return a ssize_t */
ssize_t read(const char *filename, uint8_t *data, off_t offset, size_t length); ssize_t read(const char *filename, uint8_t *data, off_t offset, size_t length);
ssize_t write(const char *filename, const uint8_t *data, off_t offset, size_t length); ssize_t write(const char *filename, const uint8_t *data, off_t offset, size_t length);
@@ -58,7 +58,6 @@ class IOCoordinator : public boost::noncopyable
/* Lock manipulation fcns. They can lock on any param given to them. For convention's sake, /* Lock manipulation fcns. They can lock on any param given to them. For convention's sake,
the parameter should mostly be the abs filename being accessed. */ the parameter should mostly be the abs filename being accessed. */
void renameObject(const std::string &oldKey, const std::string &newKey);
void readLock(const std::string &filename); void readLock(const std::string &filename);
void writeLock(const std::string &filename); void writeLock(const std::string &filename);
void readUnlock(const std::string &filename); void readUnlock(const std::string &filename);
@@ -75,6 +74,7 @@ class IOCoordinator : public boost::noncopyable
Cache *cache; Cache *cache;
SMLogging *logger; SMLogging *logger;
Replicator *replicator; Replicator *replicator;
Ownership ownership; // ACK! Need a new name for this!
size_t objectSize; size_t objectSize;
boost::filesystem::path journalPath; boost::filesystem::path journalPath;
@@ -87,7 +87,9 @@ class IOCoordinator : public boost::noncopyable
void remove(const boost::filesystem::path &path); void remove(const boost::filesystem::path &path);
void deleteMetaFile(const boost::filesystem::path &file); void deleteMetaFile(const boost::filesystem::path &file);
ssize_t _write(const char *filename, const uint8_t *data, off_t offset, size_t length); int _truncate(const boost::filesystem::path &path, size_t newsize, ScopedFileLock *lock);
ssize_t _write(const char *filename, const uint8_t *data, off_t offset, size_t length,
const boost::filesystem::path &firstDir);
int loadObjectAndJournal(const char *objFilename, const char *journalFilename, int loadObjectAndJournal(const char *objFilename, const char *journalFilename,
uint8_t *data, off_t offset, size_t length) const; uint8_t *data, off_t offset, size_t length) const;

View File

@@ -49,7 +49,7 @@ MetadataFile::MetadataConfig::MetadataConfig()
catch (...) catch (...)
{ {
logger->log(LOG_CRIT, "ObjectStorage/object_size must be set to a numeric value"); logger->log(LOG_CRIT, "ObjectStorage/object_size must be set to a numeric value");
throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file"); throw runtime_error("Please set ObjectStorage/object)size in the storagemanager.cnf file");
} }
try try
@@ -57,8 +57,8 @@ MetadataFile::MetadataConfig::MetadataConfig()
msMetadataPath = config->getValue("ObjectStorage", "metadata_path"); msMetadataPath = config->getValue("ObjectStorage", "metadata_path");
if (msMetadataPath.empty()) if (msMetadataPath.empty())
{ {
logger->log(LOG_CRIT, "ObjectStorage/journal_path is not set"); logger->log(LOG_CRIT, "ObjectStorage/metadata_path is not set");
throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file"); throw runtime_error("Please set ObjectStorage/metadata_path in the storagemanager.cnf file");
} }
} }
catch (...) catch (...)

303
src/Ownership.cpp Normal file
View File

@@ -0,0 +1,303 @@
#include "Ownership.h"
#include "Config.h"
#include "Cache.h"
#include "Synchronizer.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <boost/filesystem.hpp>
using namespace std;
namespace bf=boost::filesystem;
namespace storagemanager
{
Ownership::Ownership()
{
Config *config = Config::get();
logger = SMLogging::get();
string sPrefixDepth = config->getValue("ObjectStorage", "common_prefix_depth");
if (sPrefixDepth.empty())
{
const char *msg = "Ownership: Need to specify ObjectStorage/common_prefix_depth in the storagemanager.cnf file";
logger->log(LOG_CRIT, msg);
throw runtime_error(msg);
}
try
{
prefixDepth = stoul(sPrefixDepth, NULL, 0);
}
catch (invalid_argument &e)
{
const char *msg = "Ownership: Invalid value in ObjectStorage/common_prefix_depth";
logger->log(LOG_CRIT, msg);
throw runtime_error(msg);
}
metadataPrefix = config->getValue("ObjectStorage", "metadata_path");
if (metadataPrefix.empty())
{
const char *msg = "Ownership: Need to specify ObjectStorage/metadata_path in the storagemanager.cnf file";
logger->log(LOG_CRIT, msg);
throw runtime_error(msg);
}
monitor = new Monitor(this);
}
Ownership::~Ownership()
{
delete monitor;
for (auto &it : ownedPrefixes)
releaseOwnership(it.first, true);
}
bf::path Ownership::get(const bf::path &p)
{
bf::path ret, prefix;
bf::path::const_iterator pit;
uint i;
//cerr << "Ownership::get() param = " << p.string() << endl;
if (prefixDepth > 0)
{
for (i = 0, pit = p.begin(); i <= prefixDepth && pit != p.end(); ++i, ++pit)
;
if (pit != p.end())
prefix = *pit;
//cerr << "prefix is " << prefix.string() << endl;
for (; pit != p.end(); ++pit)
ret /= *pit;
if (ret.empty())
{
//cerr << "returning ''" << endl;
return ret;
}
}
else
{
ret = p;
prefix = *(p.begin());
}
mutex.lock();
if (ownedPrefixes.find(prefix) == ownedPrefixes.end())
{
mutex.unlock();
takeOwnership(prefix);
}
else
{
// todo... replace this polling, and the similar polling in Cache, with proper condition vars.
while (ownedPrefixes[prefix] == false)
{
mutex.unlock();
sleep(1);
mutex.lock();
}
mutex.unlock();
}
//cerr << "returning " << ret.string() << endl;
return ret;
}
// minor timesaver
#define TOUCH(p, f) { \
int fd = ::open((metadataPrefix/p/f).string().c_str(), O_TRUNC | O_CREAT | O_WRONLY, 0660); \
if (fd >= 0) \
::close(fd); \
else \
{ \
char buf[80]; int saved_errno = errno; \
cerr << "failed to touch " << metadataPrefix/p/f << " got " << strerror_r(saved_errno, buf, 80) << endl; \
} \
}
#define DELETE(p, f) ::unlink((metadataPrefix/p/f).string().c_str());
void Ownership::touchFlushing(const bf::path &prefix, volatile bool *doneFlushing) const
{
while (!*doneFlushing)
{
TOUCH(prefix, "FLUSHING");
try
{
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
catch (boost::thread_interrupted &)
{ }
}
}
void Ownership::releaseOwnership(const bf::path &p, bool isDtor)
{
logger->log(LOG_DEBUG, "Ownership: releasing ownership of %s", p.string().c_str());
boost::unique_lock<boost::mutex> s(mutex);
auto it = ownedPrefixes.find(p);
if (it == ownedPrefixes.end())
{
logger->log(LOG_DEBUG, "Ownership::releaseOwnership(): told to disown %s, but do not own it", p.string().c_str());
return;
}
ownedPrefixes.erase(it);
if (isDtor)
{
// This is a quick release. If this is being destroyed, then it is through the graceful
// shutdown mechanism, which will flush data separately.
DELETE(p, "OWNED");
DELETE(p, "FLUSHING");
return;
}
s.unlock();
volatile bool done = false;
// start flushing
boost::thread xfer([this, &p, &done] { this->touchFlushing(p, &done); });
Cache::get()->dropPrefix(p);
Synchronizer::get()->dropPrefix(p);
done = true;
xfer.interrupt();
xfer.join();
// update state
DELETE(p, "OWNED");
DELETE(p, "FLUSHING");
}
void Ownership::_takeOwnership(const bf::path &p)
{
logger->log(LOG_DEBUG, "Ownership: taking ownership of %s", p.string().c_str());
bf::create_directories(metadataPrefix/p);
DELETE(p, "FLUSHING");
DELETE(p, "REQUEST_TRANSFER");
// TODO: need to consider errors taking ownership
TOUCH(p, "OWNED");
mutex.lock();
ownedPrefixes[p] = true;
mutex.unlock();
Synchronizer::get()->newPrefix(p);
Cache::get()->newPrefix(p);
}
void Ownership::takeOwnership(const bf::path &p)
{
boost::unique_lock<boost::mutex> s(mutex);
auto it = ownedPrefixes.find(p);
if (it != ownedPrefixes.end())
return;
ownedPrefixes[p] = NULL;
s.unlock();
bool okToTransfer = false;
struct stat statbuf;
int err;
char buf[80];
bf::path ownedPath = metadataPrefix/p/"OWNED";
bf::path flushingPath = metadataPrefix/p/"FLUSHING";
// if it's not already owned, then we can take possession
err = ::stat(ownedPath.string().c_str(), &statbuf);
if (err && errno == ENOENT)
{
_takeOwnership(p);
return;
}
TOUCH(p, "REQUEST_TRANSFER");
time_t lastFlushTime = time(NULL);
while (!okToTransfer && time(NULL) < lastFlushTime + 10)
{
// if the OWNED file is deleted or if the flushing file isn't touched after 10 secs
// it is ok to take possession.
err = ::stat(ownedPath.string().c_str(), &statbuf);
if (err)
{
if (errno == ENOENT)
okToTransfer = true;
else
logger->log(LOG_CRIT, "Ownership::takeOwnership(): got '%s' doing stat of %s", strerror_r(errno, buf, 80),
ownedPath.string().c_str());
}
err = ::stat(flushingPath.string().c_str(), &statbuf);
if (err && errno != ENOENT)
logger->log(LOG_CRIT, "Ownership::takeOwnership(): got '%s' doing stat of %s", strerror_r(errno, buf, 80),
flushingPath.string().c_str());
else
{
logger->log(LOG_DEBUG, "Ownership: waiting to get %s", p.string().c_str());
if (!err)
lastFlushTime = statbuf.st_mtime;
}
if (!okToTransfer)
sleep(1);
}
_takeOwnership(p);
}
Ownership::Monitor::Monitor(Ownership *_owner) : owner(_owner), stop(false)
{
thread = boost::thread([this] { this->watchForInterlopers(); });
}
Ownership::Monitor::~Monitor()
{
stop = true;
thread.interrupt();
thread.join();
}
void Ownership::Monitor::watchForInterlopers()
{
// look for requests to transfer ownership
struct stat statbuf;
int err;
char buf[80];
vector<bf::path> releaseList;
while (!stop)
{
releaseList.clear();
boost::unique_lock<boost::mutex> s(owner->mutex);
for (auto &prefix : owner->ownedPrefixes)
{
if (stop)
break;
if (prefix.second == false)
continue;
bf::path p(owner->metadataPrefix/(prefix.first)/"REQUEST_TRANSFER");
const char *cp = p.string().c_str();
err = ::stat(cp, &statbuf);
// release it if there's a release request only. Log it if there's an error other than
// that the file isn't there.
if (err == 0)
releaseList.push_back(prefix.first);
if (err < 0 && errno != ENOENT)
owner->logger->log(LOG_ERR, "Runner::watchForInterlopers(): failed to stat %s, got %s", cp,
strerror_r(errno, buf, 80));
}
s.unlock();
for (auto &prefix : releaseList)
owner->releaseOwnership(prefix);
if (stop)
break;
try
{
boost::this_thread::sleep_for(boost::chrono::seconds(1));
}
catch (boost::thread_interrupted &)
{ }
}
}
}

60
src/Ownership.h Normal file
View File

@@ -0,0 +1,60 @@
#ifndef OWNERSHIP_H_
#define OWNERSHIP_H_
#include <boost/filesystem/path.hpp>
#include <boost/thread.hpp>
#include <map>
#include "SMLogging.h"
/* This class tracks the ownership of each prefix and manages ownership transfer.
Could we come up with a better name btw? */
namespace storagemanager
{
class Ownership : public boost::noncopyable
{
public:
Ownership();
~Ownership();
bool sharedFS();
// returns the path "right shifted" by prefixDepth, and with ownership of that path.
// on error it returns an empty path.
boost::filesystem::path get(const boost::filesystem::path &);
private:
uint prefixDepth;
boost::filesystem::path metadataPrefix;
SMLogging *logger;
void touchFlushing(const boost::filesystem::path &, volatile bool *) const;
void takeOwnership(const boost::filesystem::path &);
void releaseOwnership(const boost::filesystem::path &, bool isDtor = false);
void _takeOwnership(const boost::filesystem::path &);
struct Monitor
{
Monitor(Ownership *);
~Monitor();
boost::thread thread;
Ownership *owner;
volatile bool stop;
void watchForInterlopers();
};
// maps a prefix to a state. ownedPrefixes[p] == false means it's being init'd, == true means it's ready for use.
std::map<boost::filesystem::path, bool> ownedPrefixes;
Monitor *monitor;
boost::mutex mutex;
};
inline bool Ownership::sharedFS()
{
return prefixDepth >= 0;
}
}
#endif

631
src/PrefixCache.cpp Normal file
View File

@@ -0,0 +1,631 @@
#include "PrefixCache.h"
#include "Cache.h"
#include "Config.h"
#include "Downloader.h"
#include "Synchronizer.h"
#include <iostream>
#include <syslog.h>
#include <boost/filesystem.hpp>
#include <boost/thread.hpp>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
using namespace std;
namespace bf = boost::filesystem;
namespace storagemanager
{
PrefixCache::PrefixCache(const bf::path &prefix) : firstDir(prefix), currentCacheSize(0)
{
Config *conf = Config::get();
logger = SMLogging::get();
replicator = Replicator::get();
downloader = Cache::get()->getDownloader();
string stmp = conf->getValue("Cache", "cache_size");
if (stmp.empty())
{
logger->log(LOG_CRIT, "Cache/cache_size is not set");
throw runtime_error("Please set Cache/cache_size in the storagemanager.cnf file");
}
try
{
maxCacheSize = stoul(stmp);
}
catch (invalid_argument &)
{
logger->log(LOG_CRIT, "Cache/cache_size is not a number");
throw runtime_error("Please set Cache/cache_size to a number");
}
//cout << "Cache got cache size " << maxCacheSize << endl;
stmp = conf->getValue("ObjectStorage", "object_size");
if (stmp.empty())
{
logger->log(LOG_CRIT, "ObjectStorage/object_size is not set");
throw runtime_error("Please set ObjectStorage/object_size in the storagemanager.cnf file");
}
try
{
objectSize = stoul(stmp);
}
catch (invalid_argument &)
{
logger->log(LOG_CRIT, "ObjectStorage/object_size is not a number");
throw runtime_error("Please set ObjectStorage/object_size to a number");
}
cachePrefix = conf->getValue("Cache", "path");
if (cachePrefix.empty())
{
logger->log(LOG_CRIT, "Cache/path is not set");
throw runtime_error("Please set Cache/path in the storagemanager.cnf file");
}
cachePrefix /= firstDir;
try
{
bf::create_directories(cachePrefix);
}
catch (exception &e)
{
logger->log(LOG_CRIT, "Failed to create %s, got: %s", cachePrefix.string().c_str(), e.what());
throw e;
}
stmp = conf->getValue("ObjectStorage", "journal_path");
if (stmp.empty())
{
logger->log(LOG_CRIT, "ObjectStorage/journal_path is not set");
throw runtime_error("Please set ObjectStorage/journal_path in the storagemanager.cnf file");
}
journalPrefix = stmp;
journalPrefix /= firstDir;
bf::create_directories(journalPrefix);
try
{
bf::create_directories(journalPrefix);
}
catch (exception &e)
{
logger->log(LOG_CRIT, "Failed to create %s, got: %s", journalPrefix.string().c_str(), e.what());
throw e;
}
lru_mutex.lock(); // unlocked by populate() when it's done
boost::thread t([this] { this->populate(); });
t.detach();
}
PrefixCache::~PrefixCache()
{
/* This and shutdown() need to do whatever is necessary to leave cache contents in a safe
state on disk. Does anything need to be done toward that?
*/
}
void PrefixCache::populate()
{
Synchronizer *sync = Synchronizer::get();
bf::directory_iterator dir(cachePrefix);
bf::directory_iterator dend;
vector<string> newObjects;
while (dir != dend)
{
// put everything in lru & m_lru
const bf::path &p = dir->path();
if (bf::is_regular_file(p))
{
lru.push_back(p.filename().string());
auto last = lru.end();
m_lru.insert(--last);
currentCacheSize += bf::file_size(*dir);
newObjects.push_back(p.filename().string());
}
else
logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str());
++dir;
}
sync->newObjects(firstDir, newObjects);
newObjects.clear();
// account for what's in the journal dir
vector<pair<string, size_t> > newJournals;
dir = bf::directory_iterator(journalPrefix);
while (dir != dend)
{
const bf::path &p = dir->path();
if (bf::is_regular_file(p))
{
if (p.extension() == ".journal")
{
size_t s = bf::file_size(*dir);
currentCacheSize += s;
newJournals.push_back(pair<string, size_t>(p.stem().string(), s));
}
else
logger->log(LOG_WARNING, "Cache: found a file in the journal dir that does not belong '%s'", p.string().c_str());
}
else
logger->log(LOG_WARNING, "Cache: found something in the journal dir that does not belong '%s'", p.string().c_str());
++dir;
}
lru_mutex.unlock();
sync->newJournalEntries(firstDir, newJournals);
}
// be careful using this! SM should be idle. No ongoing reads or writes.
void PrefixCache::validateCacheSize()
{
boost::unique_lock<boost::mutex> s(lru_mutex);
if (!doNotEvict.empty() || !toBeDeleted.empty())
{
cout << "Not safe to use validateCacheSize() at the moment." << endl;
return;
}
size_t oldSize = currentCacheSize;
currentCacheSize = 0;
m_lru.clear();
lru.clear();
populate();
if (oldSize != currentCacheSize)
logger->log(LOG_DEBUG, "PrefixCache::validateCacheSize(): found a discrepancy. Actual size is %lld, had %lld.",
currentCacheSize, oldSize);
else
logger->log(LOG_DEBUG, "PrefixCache::validateCacheSize(): Cache size accounting agrees with reality for now.");
}
void PrefixCache::read(const vector<string> &keys)
{
/* Move existing keys to the back of the LRU, start downloading nonexistant keys.
*/
vector<const string *> keysToFetch;
vector<int> dlErrnos;
vector<size_t> dlSizes;
boost::unique_lock<boost::mutex> s(lru_mutex);
M_LRU_t::iterator mit;
for (const string &key : keys)
{
mit = m_lru.find(key);
if (mit != m_lru.end())
{
addToDNE(mit->lit);
lru.splice(lru.end(), lru, mit->lit); // move them to the back so they are last to pick for eviction
}
else
{
// There's window where the file has been downloaded but is not yet
// added to the lru structs. However it is in the DNE. If it is in the DNE, then it is also
// in Downloader's map. So, this thread needs to start the download if it's not in the
// DNE or if there's an existing download that hasn't finished yet. Starting the download
// includes waiting for an existing download to finish, which from this class's pov is the
// same thing.
if (doNotEvict.find(key) == doNotEvict.end() || downloader->inProgress(key))
keysToFetch.push_back(&key);
else
cout << "Cache: detected and stopped a racey download" << endl;
addToDNE(key);
}
}
if (keysToFetch.empty())
return;
downloader->download(keysToFetch, &dlErrnos, &dlSizes, cachePrefix, &lru_mutex);
size_t sum_sizes = 0;
for (uint i = 0; i < keysToFetch.size(); ++i)
{
// downloads with size 0 didn't actually happen, either because it
// was a preexisting download (another read() call owns it), or because
// there was an error downloading it. Use size == 0 as an indication of
// what to add to the cache. Also needs to verify that the file was not deleted,
// indicated by existence in doNotEvict.
if (dlSizes[i] != 0)
{
if (doNotEvict.find(*keysToFetch[i]) != doNotEvict.end())
{
sum_sizes += dlSizes[i];
lru.push_back(*keysToFetch[i]);
LRU_t::iterator lit = lru.end();
m_lru.insert(--lit); // I dislike this way of grabbing the last iterator in a list.
}
else // it was downloaded, but a deletion happened so we have to toss it
{
cout << "removing a file that was deleted by another thread during download" << endl;
bf::remove(cachePrefix / (*keysToFetch[i]));
}
}
}
// move everything in keys to the back of the lru (yes, again)
for (const string &key : keys)
{
mit = m_lru.find(key);
if (mit != m_lru.end()) // all of the files exist, just not all of them are 'owned by' this thread.
lru.splice(lru.end(), lru, mit->lit);
}
// fix cache size
//_makeSpace(sum_sizes);
currentCacheSize += sum_sizes;
}
void PrefixCache::doneReading(const vector<string> &keys)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
for (const string &key : keys)
{
removeFromDNE(key);
// most should be in the map.
// debateable whether it's faster to look up the list iterator and use it
// or whether it's faster to bypass that and use strings only.
//const auto &it = m_lru.find(key);
//if (it != m_lru.end())
// removeFromDNE(it->lit);
}
_makeSpace(0);
}
void PrefixCache::doneWriting()
{
makeSpace(0);
}
PrefixCache::DNEElement::DNEElement(const LRU_t::iterator &k) : key(k), refCount(1)
{
}
PrefixCache::DNEElement::DNEElement(const string &k) : sKey(k), refCount(1)
{
}
void PrefixCache::addToDNE(const DNEElement &key)
{
DNE_t::iterator it = doNotEvict.find(key);
if (it != doNotEvict.end())
{
DNEElement &dnee = const_cast<DNEElement &>(*it);
++(dnee.refCount);
}
else
doNotEvict.insert(key);
}
void PrefixCache::removeFromDNE(const DNEElement &key)
{
DNE_t::iterator it = doNotEvict.find(key);
if (it == doNotEvict.end())
return;
DNEElement &dnee = const_cast<DNEElement &>(*it);
if (--(dnee.refCount) == 0)
doNotEvict.erase(it);
}
const bf::path & PrefixCache::getCachePath()
{
return cachePrefix;
}
const bf::path & PrefixCache::getJournalPath()
{
return journalPrefix;
}
void PrefixCache::exists(const vector<string> &keys, vector<bool> *out) const
{
out->resize(keys.size());
boost::unique_lock<boost::mutex> s(lru_mutex);
for (uint i = 0; i < keys.size(); i++)
(*out)[i] = (m_lru.find(keys[i]) != m_lru.end());
}
bool PrefixCache::exists(const string &key) const
{
boost::unique_lock<boost::mutex> s(lru_mutex);
return m_lru.find(key) != m_lru.end();
}
void PrefixCache::newObject(const string &key, size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(m_lru.find(key) == m_lru.end());
//_makeSpace(size);
lru.push_back(key);
LRU_t::iterator back = lru.end();
m_lru.insert(--back);
currentCacheSize += size;
}
void PrefixCache::newJournalEntry(size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
//_makeSpace(size);
currentCacheSize += size;
}
void PrefixCache::deletedJournal(size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(currentCacheSize >= size);
currentCacheSize -= size;
}
void PrefixCache::deletedObject(const string &key, size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(currentCacheSize >= size);
M_LRU_t::iterator mit = m_lru.find(key);
assert(mit != m_lru.end());
// if it's being flushed, let makeSpace() do the deleting
if (toBeDeleted.find(mit->lit) == toBeDeleted.end())
{
doNotEvict.erase(mit->lit);
lru.erase(mit->lit);
m_lru.erase(mit);
currentCacheSize -= size;
}
}
void PrefixCache::setMaxCacheSize(size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
if (size < maxCacheSize)
_makeSpace(maxCacheSize - size);
maxCacheSize = size;
}
void PrefixCache::makeSpace(size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
_makeSpace(size);
}
size_t PrefixCache::getMaxCacheSize() const
{
return maxCacheSize;
}
// call this holding lru_mutex
void PrefixCache::_makeSpace(size_t size)
{
ssize_t thisMuch = currentCacheSize + size - maxCacheSize;
if (thisMuch <= 0)
return;
LRU_t::iterator it;
while (thisMuch > 0 && !lru.empty())
{
it = lru.begin();
// find the first element not being either read() right now or being processed by another
// makeSpace() call.
while (it != lru.end())
{
// make sure it's not currently being read or being flushed by another _makeSpace() call
if ((doNotEvict.find(it) == doNotEvict.end()) && (toBeDeleted.find(it) == toBeDeleted.end()))
break;
++it;
}
if (it == lru.end())
{
// nothing can be deleted right now
return;
}
if (!bf::exists(cachePrefix / *it))
cout << cachePrefix / *it << " doesn't exist, WTF?" << endl; // ran into this a couple times, still happens as of commit 948ee1aa5
assert(bf::exists(cachePrefix / *it));
/*
tell Synchronizer that this key will be evicted
delete the file
remove it from our structs
update current size
*/
//logger->log(LOG_WARNING, "Cache: flushing!");
toBeDeleted.insert(it);
string key = *it; // need to make a copy; it could get changed after unlocking.
lru_mutex.unlock();
try
{
Synchronizer::get()->flushObject(firstDir, key);
}
catch (...)
{
// it gets logged by Sync
lru_mutex.lock();
toBeDeleted.erase(it);
continue;
}
lru_mutex.lock();
// check doNotEvict again in case this object is now being read
if (doNotEvict.find(it) == doNotEvict.end())
{
bf::path cachedFile = cachePrefix / *it;
m_lru.erase(*it);
toBeDeleted.erase(it);
lru.erase(it);
size_t newSize = bf::file_size(cachedFile);
replicator->remove(cachedFile, Replicator::LOCAL_ONLY);
if (newSize < currentCacheSize)
{
currentCacheSize -= newSize;
thisMuch -= newSize;
}
else
{
logger->log(LOG_WARNING, "PrefixCache::makeSpace(): accounting error. Almost wrapped currentCacheSize on flush.");
currentCacheSize = 0;
thisMuch = 0;
}
}
else
toBeDeleted.erase(it);
}
}
void PrefixCache::rename(const string &oldKey, const string &newKey, ssize_t sizediff)
{
// rename it in the LRU
// erase/insert to rehash it everywhere else
boost::unique_lock<boost::mutex> s(lru_mutex);
auto it = m_lru.find(oldKey);
if (it == m_lru.end())
return;
auto lit = it->lit;
m_lru.erase(it);
int refCount = 0;
auto dne_it = doNotEvict.find(lit);
if (dne_it != doNotEvict.end())
{
refCount = dne_it->refCount;
doNotEvict.erase(dne_it);
}
auto tbd_it = toBeDeleted.find(lit);
bool hasTBDEntry = (tbd_it != toBeDeleted.end());
if (hasTBDEntry)
toBeDeleted.erase(tbd_it);
*lit = newKey;
if (hasTBDEntry)
toBeDeleted.insert(lit);
if (refCount != 0)
{
pair<DNE_t::iterator, bool> dne_tmp = doNotEvict.insert(lit);
const_cast<DNEElement &>(*(dne_tmp.first)).refCount = refCount;
}
m_lru.insert(lit);
currentCacheSize += sizediff;
}
int PrefixCache::ifExistsThenDelete(const string &key)
{
bf::path cachedPath = cachePrefix / key;
bf::path journalPath = journalPrefix / (key + ".journal");
boost::unique_lock<boost::mutex> s(lru_mutex);
bool objectExists = false;
auto it = m_lru.find(key);
if (it != m_lru.end())
{
if (toBeDeleted.find(it->lit) == toBeDeleted.end())
{
doNotEvict.erase(it->lit);
lru.erase(it->lit);
m_lru.erase(it);
objectExists = true;
}
else // let makeSpace() delete it if it's already in progress
return 0;
}
bool journalExists = bf::exists(journalPath);
//assert(objectExists == bf::exists(cachedPath));
size_t objectSize = (objectExists ? bf::file_size(cachedPath) : 0);
//size_t objectSize = (objectExists ? MetadataFile::getLengthFromKey(key) : 0);
size_t journalSize = (journalExists ? bf::file_size(journalPath) : 0);
currentCacheSize -= (objectSize + journalSize);
//assert(!objectExists || objectSize == bf::file_size(cachedPath));
return (objectExists ? 1 : 0) | (journalExists ? 2 : 0);
}
size_t PrefixCache::getCurrentCacheSize() const
{
return currentCacheSize;
}
size_t PrefixCache::getCurrentCacheElementCount() const
{
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(m_lru.size() == lru.size());
return m_lru.size();
}
void PrefixCache::reset()
{
boost::unique_lock<boost::mutex> s(lru_mutex);
m_lru.clear();
lru.clear();
toBeDeleted.clear();
doNotEvict.clear();
bf::directory_iterator dir;
bf::directory_iterator dend;
for (dir = bf::directory_iterator(cachePrefix); dir != dend; ++dir)
bf::remove_all(dir->path());
for (dir = bf::directory_iterator(journalPrefix); dir != dend; ++dir)
bf::remove_all(dir->path());
currentCacheSize = 0;
}
void PrefixCache::shutdown()
{
/* Does this need to do something anymore? */
}
/* The helper classes */
PrefixCache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k)
{}
PrefixCache::M_LRU_element_t::M_LRU_element_t(const string &k) : key(&k)
{}
PrefixCache::M_LRU_element_t::M_LRU_element_t(const LRU_t::iterator &i) : key(&(*i)), lit(i)
{}
inline size_t PrefixCache::KeyHasher::operator()(const M_LRU_element_t &l) const
{
return hash<string>()(*(l.key));
}
inline bool PrefixCache::KeyEquals::operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const
{
return (*(l1.key) == *(l2.key));
}
inline size_t PrefixCache::DNEHasher::operator()(const DNEElement &l) const
{
return (l.sKey.empty() ? hash<string>()(*(l.key)) : hash<string>()(l.sKey));
}
inline bool PrefixCache::DNEEquals::operator()(const DNEElement &l1, const DNEElement &l2) const
{
const string *s1, *s2;
s1 = l1.sKey.empty() ? &(*(l1.key)) : &(l1.sKey);
s2 = l2.sKey.empty() ? &(*(l2.key)) : &(l2.sKey);
return (*s1 == *s2);
}
inline bool PrefixCache::TBDLess::operator()(const LRU_t::iterator &i1, const LRU_t::iterator &i2) const
{
return *i1 < *i2;
}
}

153
src/PrefixCache.h Normal file
View File

@@ -0,0 +1,153 @@
#ifndef PREFIXCACHE_H_
#define PREFIXCACHE_H_
/* PrefixCache manages the cache for one prefix managed by SM.
Cache is a map of prefix -> PrefixCache, and holds the items
that should be centralized like the Downloader
*/
#include "Downloader.h"
#include "SMLogging.h"
#include "Replicator.h"
#include <string>
#include <vector>
#include <list>
#include <unordered_set>
#include <boost/utility.hpp>
#include <boost/filesystem/path.hpp>
#include <boost/thread/mutex.hpp>
namespace storagemanager
{
class PrefixCache : public boost::noncopyable
{
public:
PrefixCache(const boost::filesystem::path &prefix);
virtual ~PrefixCache();
//reading fcns
// read() marks objects to be read s.t. they do not get flushed.
// after reading them, unlock the 'logical file', and call doneReading().
void read(const std::vector<std::string> &keys);
void doneReading(const std::vector<std::string> &keys);
bool exists(const std::string &key) const;
void exists(const std::vector<std::string> &keys, std::vector<bool> *out) const;
// writing fcns
// new*() fcns tell the PrefixCache data was added. After writing a set of objects,
// unlock the 'logical file', and call doneWriting().
void newObject(const std::string &key, size_t size);
void newJournalEntry(size_t size);
void doneWriting();
void deletedObject(const std::string &key, size_t size);
void deletedJournal(size_t size);
// an 'atomic' existence check & delete. Covers the object and journal. Does not delete the files.
// returns 0 if it didn't exist, 1 if the object exists, 2 if the journal exists, and 3 (1 | 2) if both exist
// This should be called while holding the file lock for key because it touches the journal file.
int ifExistsThenDelete(const std::string &key);
// rename is used when an old obj gets merged with its journal file
// the size will change in that process; sizediff is by how much
void rename(const std::string &oldKey, const std::string &newKey, ssize_t sizediff);
void setMaxCacheSize(size_t size);
void makeSpace(size_t size);
size_t getCurrentCacheSize() const;
size_t getCurrentCacheElementCount() const;
size_t getMaxCacheSize() const;
void shutdown();
// test helpers
const boost::filesystem::path &getCachePath();
const boost::filesystem::path &getJournalPath();
// this will delete everything in the PrefixCache and journal paths, and empty all PrefixCache structures.
void reset();
void validateCacheSize();
private:
PrefixCache();
boost::filesystem::path cachePrefix;
boost::filesystem::path journalPrefix;
boost::filesystem::path firstDir;
size_t maxCacheSize;
size_t objectSize;
size_t currentCacheSize;
Replicator *replicator;
SMLogging *logger;
Downloader *downloader;
void populate();
void _makeSpace(size_t size);
/* The main PrefixCache structures */
// lru owns the string memory for the filenames it manages. m_lru and DNE point to those strings.
typedef std::list<std::string> LRU_t;
LRU_t lru;
struct M_LRU_element_t
{
M_LRU_element_t(const std::string &);
M_LRU_element_t(const std::string *);
M_LRU_element_t(const LRU_t::iterator &);
const std::string *key;
LRU_t::iterator lit;
};
struct KeyHasher
{
size_t operator()(const M_LRU_element_t &l) const;
};
struct KeyEquals
{
bool operator()(const M_LRU_element_t &l1, const M_LRU_element_t &l2) const;
};
typedef std::unordered_set<M_LRU_element_t, KeyHasher, KeyEquals> M_LRU_t;
M_LRU_t m_lru; // the LRU entries as a hash table
/* The do-not-evict list stuff. */
struct DNEElement
{
DNEElement(const LRU_t::iterator &);
DNEElement(const std::string &);
LRU_t::iterator key;
std::string sKey;
uint refCount;
};
struct DNEHasher
{
size_t operator()(const DNEElement &d) const;
};
struct DNEEquals
{
bool operator()(const DNEElement &d1, const DNEElement &d2) const;
};
typedef std::unordered_set<DNEElement, DNEHasher, DNEEquals> DNE_t;
DNE_t doNotEvict;
void addToDNE(const DNEElement &);
void removeFromDNE(const DNEElement &);
// the to-be-deleted set. Elements removed from the LRU but not yet deleted will be here.
// Elements are inserted and removed by makeSpace(). If read() references a file that is in this,
// it will remove it, signalling to makeSpace that it should not be deleted
struct TBDLess
{
bool operator()(const LRU_t::iterator &, const LRU_t::iterator &) const;
};
typedef std::set<LRU_t::iterator, TBDLess> TBD_t;
TBD_t toBeDeleted;
mutable boost::mutex lru_mutex; // protects the main PrefixCache structures & the do-not-evict set
};
}
#endif

View File

@@ -56,7 +56,6 @@ bool ReadTask::run()
// todo: do the reading and writing in chunks // todo: do the reading and writing in chunks
// todo: need to make this use O_DIRECT on the IOC side // todo: need to make this use O_DIRECT on the IOC side
ioc->willRead(cmd->filename, cmd->offset, cmd->count);
ssize_t err; ssize_t err;
while ((uint) resp->returnCode < cmd->count) while ((uint) resp->returnCode < cmd->count)
{ {

View File

@@ -121,6 +121,7 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t
size_t count = 0; size_t count = 0;
int version = 1; int version = 1;
string journalFilename = msJournalPath + "/" + string(filename) + ".journal"; string journalFilename = msJournalPath + "/" + string(filename) + ".journal";
boost::filesystem::path firstDir = *(boost::filesystem::path(filename).begin());
uint64_t thisEntryMaxOffset = (offset + length - 1); uint64_t thisEntryMaxOffset = (offset + length - 1);
bool exists = boost::filesystem::exists(journalFilename); bool exists = boost::filesystem::exists(journalFilename);
@@ -135,7 +136,7 @@ int Replicator::addJournalEntry(const char *filename, const uint8_t *data, off_t
assert((uint) err == header.length() + 1); assert((uint) err == header.length() + 1);
if (err <= 0) if (err <= 0)
return err; return err;
Cache::get()->newJournalEntry(header.length() + 1); Cache::get()->newJournalEntry(firstDir, header.length() + 1);
} }
else else
{ {

View File

@@ -59,8 +59,8 @@ Synchronizer::Synchronizer() : maxUploads(0)
threadPool.reset(new ThreadPool()); threadPool.reset(new ThreadPool());
threadPool->setMaxThreads(maxUploads); threadPool->setMaxThreads(maxUploads);
die = false; die = false;
uncommittedJournalSize = 0;
journalSizeThreshold = cache->getMaxCacheSize() / 2; journalSizeThreshold = cache->getMaxCacheSize() / 2;
blockNewJobs = false;
syncThread = boost::thread([this] () { this->periodicSync(); }); syncThread = boost::thread([this] () { this->periodicSync(); });
} }
@@ -84,9 +84,28 @@ enum OpFlags
NEW_OBJECT = 0x4, NEW_OBJECT = 0x4,
}; };
void Synchronizer::_newJournalEntry(const string &key, size_t size) /* XXXPAT. Need to revisit this later. To make multiple prefix functionality as minimal as possible,
I limited the changes to key string manipulation where possible. The keys it manages have the prefix they
belong to prepended. So key 12345 in prefix p1 becomes p1/12345. This is not the most elegant or performant
option, just the least invasive.
*/
void Synchronizer::newPrefix(const bf::path &p)
{ {
uncommittedJournalSize += size; uncommittedJournalSize[p] = 0;
}
void Synchronizer::dropPrefix(const bf::path &p)
{
syncNow(p);
boost::unique_lock<boost::mutex> s(mutex);
uncommittedJournalSize.erase(p);
}
void Synchronizer::_newJournalEntry(const bf::path &prefix, const string &_key, size_t size)
{
string key = (prefix/_key).string();
uncommittedJournalSize[prefix] += size;
auto it = pendingOps.find(key); auto it = pendingOps.find(key);
if (it != pendingOps.end()) if (it != pendingOps.end())
{ {
@@ -97,61 +116,64 @@ void Synchronizer::_newJournalEntry(const string &key, size_t size)
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(JOURNAL)); pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(JOURNAL));
} }
void Synchronizer::newJournalEntry(const string &key, size_t size) void Synchronizer::newJournalEntry(const bf::path &prefix, const string &_key, size_t size)
{ {
boost::unique_lock<boost::mutex> s(mutex); boost::unique_lock<boost::mutex> s(mutex);
_newJournalEntry(key, size); _newJournalEntry(prefix, _key, size);
if (uncommittedJournalSize > journalSizeThreshold) if (uncommittedJournalSize[prefix] > journalSizeThreshold)
{ {
uncommittedJournalSize = 0; uncommittedJournalSize[prefix] = 0;
s.unlock(); s.unlock();
forceFlush(); forceFlush();
} }
} }
void Synchronizer::newJournalEntries(const vector<pair<string, size_t> > &keys) void Synchronizer::newJournalEntries(const bf::path &prefix, const vector<pair<string, size_t> > &keys)
{ {
boost::unique_lock<boost::mutex> s(mutex); boost::unique_lock<boost::mutex> s(mutex);
for (auto &keysize : keys) for (auto &keysize : keys)
_newJournalEntry(keysize.first, keysize.second); _newJournalEntry(prefix, keysize.first, keysize.second);
if (uncommittedJournalSize > journalSizeThreshold) if (uncommittedJournalSize[prefix] > journalSizeThreshold)
{ {
uncommittedJournalSize = 0; uncommittedJournalSize[prefix] = 0;
s.unlock(); s.unlock();
forceFlush(); forceFlush();
} }
} }
void Synchronizer::newObjects(const vector<string> &keys) void Synchronizer::newObjects(const bf::path &prefix, const vector<string> &keys)
{ {
boost::unique_lock<boost::mutex> s(mutex); boost::unique_lock<boost::mutex> s(mutex);
for (const string &key : keys) for (const string &_key : keys)
{ {
assert(pendingOps.find(key) == pendingOps.end()); bf::path key(prefix/_key);
assert(pendingOps.find(key.string()) == pendingOps.end());
//makeJob(key); //makeJob(key);
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT)); pendingOps[key.string()] = boost::shared_ptr<PendingOps>(new PendingOps(NEW_OBJECT));
} }
} }
void Synchronizer::deletedObjects(const vector<string> &keys) void Synchronizer::deletedObjects(const bf::path &prefix, const vector<string> &keys)
{ {
boost::unique_lock<boost::mutex> s(mutex); boost::unique_lock<boost::mutex> s(mutex);
for (const string &key : keys) for (const string &_key : keys)
{ {
auto it = pendingOps.find(key); bf::path key(prefix/_key);
auto it = pendingOps.find(key.string());
if (it != pendingOps.end()) if (it != pendingOps.end())
it->second->opFlags |= DELETE; it->second->opFlags |= DELETE;
else else
pendingOps[key] = boost::shared_ptr<PendingOps>(new PendingOps(DELETE)); pendingOps[key.string()] = boost::shared_ptr<PendingOps>(new PendingOps(DELETE));
} }
// would be good to signal to the things in opsInProgress that these were deleted. That would // would be good to signal to the things in opsInProgress that these were deleted. That would
// quiet down the logging somewhat. How to do that efficiently, and w/o gaps or deadlock... // quiet down the logging somewhat. How to do that efficiently, and w/o gaps or deadlock...
} }
void Synchronizer::flushObject(const string &key) void Synchronizer::flushObject(const bf::path &prefix, const string &_key)
{ {
string key = (prefix/_key).string();
boost::unique_lock<boost::mutex> s(mutex); boost::unique_lock<boost::mutex> s(mutex);
// if there is something to do on key, it should be either in pendingOps or opsInProgress // if there is something to do on key, it should be either in pendingOps or opsInProgress
@@ -192,7 +214,7 @@ void Synchronizer::flushObject(const string &key)
bool keyExists, journalExists; bool keyExists, journalExists;
int err; int err;
do { do {
err = cs->exists(key.c_str(), &keyExists); err = cs->exists(_key.c_str(), &keyExists);
if (err) if (err)
{ {
char buf[80]; char buf[80];
@@ -239,14 +261,37 @@ void Synchronizer::periodicSync()
//logger->log(LOG_DEBUG,"Synchronizer Force Flush."); //logger->log(LOG_DEBUG,"Synchronizer Force Flush.");
} }
lock.lock(); lock.lock();
if (blockNewJobs)
continue;
//cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " << //cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " <<
// threadPool.currentQueueSize() << endl; // threadPool.currentQueueSize() << endl;
for (auto &job : pendingOps) for (auto &job : pendingOps)
makeJob(job.first); makeJob(job.first);
uncommittedJournalSize = 0; for (auto it = uncommittedJournalSize.begin(); it != uncommittedJournalSize.end(); ++it)
it->second = 0;
} }
} }
void Synchronizer::syncNow(const bf::path &prefix)
{
boost::unique_lock<boost::mutex> lock(mutex);
// this is pretty hacky. when time permits, implement something better.
//
// Issue all of the pendingOps for the given prefix
// recreate the threadpool (dtor returns once all jobs have finished)
// resume normal operation
blockNewJobs = true;
for (auto &job : pendingOps)
if (job.first.find(prefix.string()) == 0)
makeJob(job.first);
uncommittedJournalSize[prefix] = 0;
threadPool.reset(new ThreadPool());
threadPool->setMaxThreads(maxUploads);
blockNewJobs = false;
}
void Synchronizer::forceFlush() void Synchronizer::forceFlush()
{ {
boost::unique_lock<boost::mutex> lock(mutex); boost::unique_lock<boost::mutex> lock(mutex);
@@ -361,6 +406,9 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
ScopedReadLock s(ioc, sourceFile); ScopedReadLock s(ioc, sourceFile);
string &key = *it; string &key = *it;
size_t pos = key.find_first_of('/');
bf::path prefix = key.substr(0, pos);
string cloudKey = key.substr(pos + 1);
char buf[80]; char buf[80];
bool exists = false; bool exists = false;
int err; int err;
@@ -374,7 +422,7 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
const metadataObject *mdEntry; const metadataObject *mdEntry;
bool entryExists = md.getEntry(MetadataFile::getOffsetFromKey(key), &mdEntry); bool entryExists = md.getEntry(MetadataFile::getOffsetFromKey(key), &mdEntry);
if (!entryExists || key != mdEntry->key) if (!entryExists || cloudKey != mdEntry->key)
{ {
logger->log(LOG_DEBUG, "synchronize(): %s does not exist in metadata for %s. This suggests truncation.", key.c_str(), sourceFile.c_str()); logger->log(LOG_DEBUG, "synchronize(): %s does not exist in metadata for %s. This suggests truncation.", key.c_str(), sourceFile.c_str());
return; return;
@@ -382,7 +430,7 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
//assert(key == mdEntry->key); <-- This could fail b/c of truncation + a write/append before this job runs. //assert(key == mdEntry->key); <-- This could fail b/c of truncation + a write/append before this job runs.
err = cs->exists(key, &exists); err = cs->exists(cloudKey, &exists);
if (err) if (err)
throw runtime_error(string("synchronize(): checking existence of ") + key + ", got " + throw runtime_error(string("synchronize(): checking existence of ") + key + ", got " +
strerror_r(errno, buf, 80)); strerror_r(errno, buf, 80));
@@ -390,14 +438,14 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
return; return;
// TODO: should be safe to check with Cache instead of a file existence check // TODO: should be safe to check with Cache instead of a file existence check
exists = cache->exists(key); exists = cache->exists(prefix, cloudKey);
if (!exists) if (!exists)
{ {
logger->log(LOG_DEBUG, "synchronize(): was told to upload %s but it does not exist locally", key.c_str()); logger->log(LOG_DEBUG, "synchronize(): was told to upload %s but it does not exist locally", key.c_str());
return; return;
} }
err = cs->putObject((cachePath / key).string(), key); err = cs->putObject((cachePath / key).string(), cloudKey);
if (err) if (err)
throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80)); throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80));
replicator->remove((cachePath/key).string().c_str(), Replicator::NO_LOCAL); replicator->remove((cachePath/key).string().c_str(), Replicator::NO_LOCAL);
@@ -406,7 +454,8 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
void Synchronizer::synchronizeDelete(const string &sourceFile, list<string>::iterator &it) void Synchronizer::synchronizeDelete(const string &sourceFile, list<string>::iterator &it)
{ {
ScopedWriteLock s(ioc, sourceFile); ScopedWriteLock s(ioc, sourceFile);
cs->deleteObject(*it); string cloudKey = it->substr(it->find('/') + 1);
cs->deleteObject(cloudKey);
} }
void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>::iterator &lit) void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>::iterator &lit)
@@ -415,6 +464,10 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
char buf[80]; char buf[80];
string key = *lit; string key = *lit;
size_t pos = key.find_first_of('/');
bf::path prefix = key.substr(0, pos);
string cloudKey = key.substr(pos + 1);
MetadataFile md(sourceFile.c_str(), MetadataFile::no_create_t()); MetadataFile md(sourceFile.c_str(), MetadataFile::no_create_t());
if (!md.exists()) if (!md.exists())
@@ -425,7 +478,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
const metadataObject *mdEntry; const metadataObject *mdEntry;
bool metaExists = md.getEntry(MetadataFile::getOffsetFromKey(key), &mdEntry); bool metaExists = md.getEntry(MetadataFile::getOffsetFromKey(key), &mdEntry);
if (!metaExists || key != mdEntry->key) if (!metaExists || cloudKey != mdEntry->key)
{ {
logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s does not exist in metadata for %s. This suggests truncation.", key.c_str(), sourceFile.c_str()); logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s does not exist in metadata for %s. This suggests truncation.", key.c_str(), sourceFile.c_str());
return; return;
@@ -442,12 +495,12 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
// sanity check + add'l info. Test whether the object exists in cloud storage. If so, complain, // sanity check + add'l info. Test whether the object exists in cloud storage. If so, complain,
// and run synchronize() instead. // and run synchronize() instead.
bool existsOnCloud; bool existsOnCloud;
int err = cs->exists(key, &existsOnCloud); int err = cs->exists(cloudKey, &existsOnCloud);
if (err) if (err)
throw runtime_error(string("Synchronizer: cs->exists() failed: ") + strerror_r(errno, buf, 80)); throw runtime_error(string("Synchronizer: cs->exists() failed: ") + strerror_r(errno, buf, 80));
if (!existsOnCloud) if (!existsOnCloud)
{ {
if (cache->exists(key)) if (cache->exists(prefix, cloudKey))
{ {
logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal and does not exist in the cloud, calling " logger->log(LOG_DEBUG, "synchronizeWithJournal(): %s has no journal and does not exist in the cloud, calling "
"synchronize() instead. Need to explain how this happens.", key.c_str()); "synchronize() instead. Need to explain how this happens.", key.c_str());
@@ -470,13 +523,13 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
boost::shared_array<uint8_t> data; boost::shared_array<uint8_t> data;
size_t count = 0, size = mdEntry->length; size_t count = 0, size = mdEntry->length;
bool oldObjIsCached = cache->exists(key); bool oldObjIsCached = cache->exists(prefix, cloudKey);
// get the base object if it is not already cached // get the base object if it is not already cached
// merge it with its journal file // merge it with its journal file
if (!oldObjIsCached) if (!oldObjIsCached)
{ {
err = cs->getObject(key, &data, &size); err = cs->getObject(cloudKey, &data, &size);
if (err) if (err)
{ {
if (errno == ENOENT) if (errno == ENOENT)
@@ -524,14 +577,15 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
} }
// get a new key for the resolved version & upload it // get a new key for the resolved version & upload it
string newKey = MetadataFile::getNewKeyFromOldKey(key, size); string newCloudKey = MetadataFile::getNewKeyFromOldKey(key, size);
err = cs->putObject(data, size, newKey); string newKey = (prefix/newCloudKey).string();
err = cs->putObject(data, size, newCloudKey);
if (err) if (err)
{ {
// try to delete it in cloud storage... unlikely it is there in the first place, and if it is // try to delete it in cloud storage... unlikely it is there in the first place, and if it is
// this probably won't work // this probably won't work
int l_errno = errno; int l_errno = errno;
cs->deleteObject(newKey); cs->deleteObject(newCloudKey);
throw runtime_error(string("Synchronizer: putObject() failed: ") + strerror_r(l_errno, buf, 80)); throw runtime_error(string("Synchronizer: putObject() failed: ") + strerror_r(l_errno, buf, 80));
} }
@@ -562,22 +616,21 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
} }
count += err; count += err;
} }
cache->rename(key, newKey, size - bf::file_size(oldCachePath)); cache->rename(prefix, cloudKey, newCloudKey, size - bf::file_size(oldCachePath));
replicator->remove(oldCachePath); replicator->remove(oldCachePath);
} }
// update the metadata for the source file // update the metadata for the source file
md.updateEntry(MetadataFile::getOffsetFromKey(key), newKey, size); md.updateEntry(MetadataFile::getOffsetFromKey(key), newCloudKey, size);
replicator->updateMetadata(sourceFile.c_str(), md); replicator->updateMetadata(sourceFile.c_str(), md);
rename(key, newKey); rename(key, newKey);
ioc->renameObject(key, newKey);
// delete the old object & journal file // delete the old object & journal file
cache->deletedJournal(bf::file_size(journalName)); cache->deletedJournal(prefix, bf::file_size(journalName));
replicator->remove(journalName); replicator->remove(journalName);
cs->deleteObject(key); cs->deleteObject(cloudKey);
} }
void Synchronizer::rename(const string &oldKey, const string &newKey) void Synchronizer::rename(const string &oldKey, const string &newKey)
@@ -639,4 +692,12 @@ void Synchronizer::PendingOps::wait(boost::mutex *m)
} }
} }
Synchronizer::Job::Job(Synchronizer *s, std::list<std::string>::iterator i) : sync(s), it(i)
{ }
void Synchronizer::Job::operator()()
{
sync->process(it);
}
} }

View File

@@ -21,7 +21,6 @@ namespace storagemanager
class Cache; // break circular dependency in header files class Cache; // break circular dependency in header files
class IOCoordinator; class IOCoordinator;
/* TODO: Need to think about how errors are handled / propagated */
class Synchronizer : public boost::noncopyable class Synchronizer : public boost::noncopyable
{ {
public: public:
@@ -30,12 +29,16 @@ class Synchronizer : public boost::noncopyable
// these take keys as parameters, not full path names, ex, pass in '12345' not // these take keys as parameters, not full path names, ex, pass in '12345' not
// 'cache/12345'. // 'cache/12345'.
void newJournalEntry(const std::string &key, size_t len); void newJournalEntry(const boost::filesystem::path &firstDir, const std::string &key, size_t len);
void newJournalEntries(const std::vector<std::pair<std::string, size_t> > &keys); void newJournalEntries(const boost::filesystem::path &firstDir, const std::vector<std::pair<std::string, size_t> > &keys);
void newObjects(const std::vector<std::string> &keys); void newObjects(const boost::filesystem::path &firstDir, const std::vector<std::string> &keys);
void deletedObjects(const std::vector<std::string> &keys); void deletedObjects(const boost::filesystem::path &firstDir, const std::vector<std::string> &keys);
void flushObject(const std::string &key); void flushObject(const boost::filesystem::path &firstDir, const std::string &key);
void forceFlush(); void forceFlush(); // ideally, make a version of this that takes a firstDir parameter
void newPrefix(const boost::filesystem::path &p);
void dropPrefix(const boost::filesystem::path &p);
// for testing primarily // for testing primarily
boost::filesystem::path getJournalPath(); boost::filesystem::path getJournalPath();
@@ -43,7 +46,7 @@ class Synchronizer : public boost::noncopyable
private: private:
Synchronizer(); Synchronizer();
void _newJournalEntry(const std::string &key, size_t len); void _newJournalEntry(const boost::filesystem::path &firstDir, const std::string &key, size_t len);
void process(std::list<std::string>::iterator key); void process(std::list<std::string>::iterator key);
void synchronize(const std::string &sourceFile, std::list<std::string>::iterator &it); void synchronize(const std::string &sourceFile, std::list<std::string>::iterator &it);
void synchronizeDelete(const std::string &sourceFile, std::list<std::string>::iterator &it); void synchronizeDelete(const std::string &sourceFile, std::list<std::string>::iterator &it);
@@ -66,8 +69,8 @@ class Synchronizer : public boost::noncopyable
struct Job : public ThreadPool::Job struct Job : public ThreadPool::Job
{ {
Job(Synchronizer *s, std::list<std::string>::iterator i) : sync(s), it(i) { } Job(Synchronizer *s, std::list<std::string>::iterator i);
void operator()() { sync->process(it); } void operator()();
Synchronizer *sync; Synchronizer *sync;
std::list<std::string>::iterator it; std::list<std::string>::iterator it;
}; };
@@ -88,7 +91,11 @@ class Synchronizer : public boost::noncopyable
boost::thread syncThread; boost::thread syncThread;
const boost::chrono::seconds syncInterval = boost::chrono::seconds(10); const boost::chrono::seconds syncInterval = boost::chrono::seconds(10);
void periodicSync(); void periodicSync();
size_t uncommittedJournalSize, journalSizeThreshold; std::map<boost::filesystem::path, size_t> uncommittedJournalSize;
size_t journalSizeThreshold;
bool blockNewJobs;
void syncNow(const boost::filesystem::path &prefix); // a synchronous version of forceFlush()
SMLogging *logger; SMLogging *logger;
Cache *cache; Cache *cache;

View File

@@ -4,10 +4,19 @@
namespace storagemanager namespace storagemanager
{ {
ScopedReadLock::ScopedReadLock(IOCoordinator *i, const std::string &k) : ioc(i), locked(false), key(k) ScopedFileLock::ScopedFileLock(IOCoordinator *i, const std::string &k) : ioc(i), locked(false), key(k)
{
}
ScopedFileLock::~ScopedFileLock()
{
}
ScopedReadLock::ScopedReadLock(IOCoordinator *i, const std::string &k) : ScopedFileLock(i, k)
{ {
lock(); lock();
} }
ScopedReadLock::~ScopedReadLock() ScopedReadLock::~ScopedReadLock()
{ {
unlock(); unlock();
@@ -29,10 +38,11 @@ void ScopedReadLock::unlock()
} }
} }
ScopedWriteLock::ScopedWriteLock(IOCoordinator *i, const std::string &k) : ioc(i), locked(false), key(k) ScopedWriteLock::ScopedWriteLock(IOCoordinator *i, const std::string &k) : ScopedFileLock(i, k)
{ {
lock(); lock();
} }
ScopedWriteLock::~ScopedWriteLock() ScopedWriteLock::~ScopedWriteLock()
{ {
unlock(); unlock();

View File

@@ -11,28 +11,34 @@ class IOCoordinator;
// a few utility classes we've coded here and there, now de-duped and centralized. // a few utility classes we've coded here and there, now de-duped and centralized.
// modify as necessary. // modify as necessary.
struct ScopedReadLock
struct ScopedFileLock
{ {
ScopedReadLock(IOCoordinator *i, const std::string &k); ScopedFileLock(IOCoordinator *i, const std::string &k);
~ScopedReadLock(); virtual ~ScopedFileLock();
void lock();
void unlock(); virtual void lock() = 0;
virtual void unlock() = 0;
IOCoordinator *ioc; IOCoordinator *ioc;
bool locked; bool locked;
const std::string key; const std::string key;
}; };
struct ScopedWriteLock struct ScopedReadLock : public ScopedFileLock
{ {
ScopedWriteLock(IOCoordinator *i, const std::string &k); ScopedReadLock(IOCoordinator *i, const std::string &k);
~ScopedWriteLock(); virtual ~ScopedReadLock();
void lock(); void lock();
void unlock(); void unlock();
};
IOCoordinator *ioc; struct ScopedWriteLock : public ScopedFileLock
bool locked; {
const std::string key; ScopedWriteLock(IOCoordinator *i, const std::string &k);
virtual ~ScopedWriteLock();
void lock();
void unlock();
}; };
struct ScopedCloser struct ScopedCloser

View File

@@ -68,11 +68,10 @@ int main(int argc, char** argv)
SessionManager* sm = SessionManager::get(); SessionManager* sm = SessionManager::get();
ret = sm->start(); ret = sm->start();
cache->shutdown(); cache->shutdown();
delete sync; delete sync;
delete cache; delete cache;
delete ioc; delete ioc;

View File

@@ -891,9 +891,9 @@ bool cacheTest1()
// make sure nothing shows up in the cache path for files that don't exist // make sure nothing shows up in the cache path for files that don't exist
v_bogus.push_back("does-not-exist"); v_bogus.push_back("does-not-exist");
cache->read(v_bogus); cache->read("", v_bogus);
assert(!bf::exists(cachePath / "does-not-exist")); assert(!bf::exists(cachePath / "does-not-exist"));
cache->exists(v_bogus, &exists); cache->exists("", v_bogus, &exists);
assert(exists.size() == 1); assert(exists.size() == 1);
assert(!exists[0]); assert(!exists[0]);
@@ -901,21 +901,21 @@ bool cacheTest1()
string realFile("storagemanager.cnf"); string realFile("storagemanager.cnf");
bf::copy_file(realFile, storagePath / realFile, bf::copy_option::overwrite_if_exists); bf::copy_file(realFile, storagePath / realFile, bf::copy_option::overwrite_if_exists);
v_bogus[0] = realFile; v_bogus[0] = realFile;
cache->read(v_bogus); cache->read("", v_bogus);
assert(bf::exists(cachePath / realFile)); assert(bf::exists(cachePath / realFile));
exists.clear(); exists.clear();
cache->exists(v_bogus, &exists); cache->exists("", v_bogus, &exists);
assert(exists.size() == 1); assert(exists.size() == 1);
assert(exists[0]); assert(exists[0]);
size_t currentSize = cache->getCurrentCacheSize(); size_t currentSize = cache->getCurrentCacheSize();
assert(currentSize == bf::file_size(cachePath / realFile)); assert(currentSize == bf::file_size(cachePath / realFile));
// lie about the file being deleted and then replaced // lie about the file being deleted and then replaced
cache->deletedObject(realFile, currentSize); cache->deletedObject("", realFile, currentSize);
assert(cache->getCurrentCacheSize() == 0); assert(cache->getCurrentCacheSize() == 0);
cache->newObject(realFile, currentSize); cache->newObject("", realFile, currentSize);
assert(cache->getCurrentCacheSize() == currentSize); assert(cache->getCurrentCacheSize() == currentSize);
cache->exists(v_bogus, &exists); cache->exists("", v_bogus, &exists);
assert(exists.size() == 1); assert(exists.size() == 1);
assert(exists[0]); assert(exists[0]);
@@ -1025,13 +1025,13 @@ bool syncTest1()
makeTestJournal((journalPath/journalName).string().c_str()); makeTestJournal((journalPath/journalName).string().c_str());
makeTestMetadata((metaPath/"test-file.meta").string().c_str()); makeTestMetadata((metaPath/"test-file.meta").string().c_str());
cache->newObject(key, bf::file_size(cachePath/key)); cache->newObject("", key, bf::file_size(cachePath/key));
cache->newJournalEntry(bf::file_size(journalPath/journalName)); cache->newJournalEntry("", bf::file_size(journalPath/journalName));
vector<string> vObj; vector<string> vObj;
vObj.push_back(key); vObj.push_back(key);
sync->newObjects(vObj); sync->newObjects("", vObj);
sync->forceFlush(); sync->forceFlush();
sleep(1); // wait for the job to run sleep(1); // wait for the job to run
@@ -1041,12 +1041,12 @@ bool syncTest1()
assert(!err); assert(!err);
assert(exists); assert(exists);
sync->newJournalEntry(key, 0); sync->newJournalEntry("", key, 0);
sync->forceFlush(); sync->forceFlush();
sleep(1); // let it do what it does sleep(1); // let it do what it does
// check that the original objects no longer exist // check that the original objects no longer exist
assert(!cache->exists(key)); assert(!cache->exists("", key));
assert(!bf::exists(journalPath/journalName)); assert(!bf::exists(journalPath/journalName));
// Replicator doesn't implement all of its functionality yet, need to delete key from the cache manually for now // Replicator doesn't implement all of its functionality yet, need to delete key from the cache manually for now
@@ -1063,24 +1063,24 @@ bool syncTest1()
foundIt = (MetadataFile::getSourceFromKey(newKey) == "test-file"); foundIt = (MetadataFile::getSourceFromKey(newKey) == "test-file");
if (foundIt) if (foundIt)
{ {
assert(cache->exists(newKey)); assert(cache->exists("", newKey));
cs->deleteObject(newKey); cs->deleteObject(newKey);
break; break;
} }
} }
assert(foundIt); assert(foundIt);
cache->makeSpace(cache->getMaxCacheSize()); // clear the cache & make it call sync->flushObject() cache->makeSpace("", cache->getMaxCacheSize()); // clear the cache & make it call sync->flushObject()
// the key should now be back in cloud storage and deleted from the cache // the key should now be back in cloud storage and deleted from the cache
assert(!cache->exists(newKey)); assert(!cache->exists("", newKey));
err = cs->exists(newKey, &exists); err = cs->exists(newKey, &exists);
assert(!err && exists); assert(!err && exists);
// make the journal again, call sync->newJournalObject() // make the journal again, call sync->newJournalObject()
makeTestJournal((journalPath / (newKey + ".journal")).string().c_str()); makeTestJournal((journalPath / (newKey + ".journal")).string().c_str());
cache->newJournalEntry(bf::file_size(journalPath / (newKey + ".journal"))); cache->newJournalEntry("", bf::file_size(journalPath / (newKey + ".journal")));
sync->newJournalEntry(newKey, 0); sync->newJournalEntry("", newKey, 0);
sync->forceFlush(); sync->forceFlush();
sleep(1); sleep(1);
@@ -1102,7 +1102,7 @@ bool syncTest1()
vector<string> keys; vector<string> keys;
for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator(); ++dir) for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator(); ++dir)
keys.push_back(dir->path().filename().string()); keys.push_back(dir->path().filename().string());
sync->deletedObjects(keys); sync->deletedObjects("", keys);
sync->forceFlush(); sync->forceFlush();
sleep(1); sleep(1);
::unlink((metaPath/"test-file.meta").string().c_str()); ::unlink((metaPath/"test-file.meta").string().c_str());
@@ -1318,11 +1318,11 @@ void IOCUnlink()
makeTestObject(cachedObjPath.string().c_str()); makeTestObject(cachedObjPath.string().c_str());
makeTestJournal(cachedJournalPath.string().c_str()); makeTestJournal(cachedJournalPath.string().c_str());
cache->newObject(cachedObjPath.filename().string(), bf::file_size(cachedObjPath)); cache->newObject("", cachedObjPath.filename().string(), bf::file_size(cachedObjPath));
cache->newJournalEntry(bf::file_size(cachedJournalPath)); cache->newJournalEntry("", bf::file_size(cachedJournalPath));
vector<string> keys; vector<string> keys;
keys.push_back(cachedObjPath.filename().string()); keys.push_back(cachedObjPath.filename().string());
sync->newObjects(keys); sync->newObjects("", keys);
//sync->newJournalEntry(keys[0]); don't want to end up renaming it //sync->newJournalEntry(keys[0]); don't want to end up renaming it
sync->forceFlush(); sync->forceFlush();
sleep(1); sleep(1);
@@ -1385,7 +1385,7 @@ void IOCCopyFile1()
makeTestMetadata(sourcePath.string().c_str()); makeTestMetadata(sourcePath.string().c_str());
makeTestObject((csPath/testObjKey).string().c_str()); makeTestObject((csPath/testObjKey).string().c_str());
makeTestJournal((journalPath/(string(testObjKey) + ".journal")).string().c_str()); makeTestJournal((journalPath/(string(testObjKey) + ".journal")).string().c_str());
cache->newJournalEntry(bf::file_size(journalPath/(string(testObjKey) + ".journal"))); cache->newJournalEntry("", bf::file_size(journalPath/(string(testObjKey) + ".journal")));
int err = ioc->copyFile("copyfile1/source", "copyfile2/dest"); int err = ioc->copyFile("copyfile1/source", "copyfile2/dest");
assert(!err); assert(!err);
@@ -1445,8 +1445,8 @@ void IOCCopyFile3()
makeTestMetadata(sourcePath.string().c_str()); makeTestMetadata(sourcePath.string().c_str());
makeTestObject((cachePath/testObjKey).string().c_str()); makeTestObject((cachePath/testObjKey).string().c_str());
makeTestJournal((journalPath/(string(testObjKey) + ".journal")).string().c_str()); makeTestJournal((journalPath/(string(testObjKey) + ".journal")).string().c_str());
cache->newObject(testObjKey, bf::file_size(cachePath/testObjKey)); cache->newObject("", testObjKey, bf::file_size(cachePath/testObjKey));
cache->newJournalEntry(bf::file_size(journalPath/(string(testObjKey) + ".journal"))); cache->newJournalEntry("", bf::file_size(journalPath/(string(testObjKey) + ".journal")));
int err = ioc->copyFile("copyfile3/source", "copyfile4/dest"); int err = ioc->copyFile("copyfile3/source", "copyfile4/dest");
assert(!err); assert(!err);

View File

@@ -1,11 +1,41 @@
[ObjectStorage] [ObjectStorage]
service = LocalStorage service = LocalStorage
# This is a tuneable value, but also implies a maximum capacity.
# Each file managed by StorageManager is broken into chunks of
# object_size bytes. Each chunk is stored in the cache as a file,
# so the filesystem the cache is put on needs to have enough inodes to
# support at least cache_size/object_size files.
#
# Regarding tuning, object stores do not support modifying stored data;
# entire objects must be replaced on modification, and entire
# objects are fetched on read. An access pattern that includes
# frequently accessing small amounts of data will benefit from
# a smaller object_size. An access pattern where data
# is accessed in large chunks will benefit from a larger object_size.
#
# Another limitation to consider is the get/put rate imposed by the
# cloud provider. If that is the limitation, increasing object_size
# will result in higher transfer rates.
object_size = 5M object_size = 5M
metadata_path = ${HOME}/storagemanager/metadata metadata_path = ${HOME}/storagemanager/metadata
journal_path = ${HOME}/storagemanager/journal journal_path = ${HOME}/storagemanager/journal
max_concurrent_downloads = 20 max_concurrent_downloads = 20
max_concurrent_uploads = 20 max_concurrent_uploads = 20
# This is the depth of the common prefix that all files managed by SM have
# Ex: /usr/local/mariadb/columnstore/data1, and
# /usr/local/mariadb/columnstore/data2 differ at the 5th directory element,
# so they have a common prefix depth of 4.
#
# This value is used to manage the ownership of prefixes between
# StorageManager instances that sharing a filesystem.
#
# -1 is a special value indicating that there is no filesystem shared
# between SM instances.
common_prefix_depth = 4
[S3] [S3]
region = us-east-2 region = us-east-2
bucket = s3-cs-test2 bucket = s3-cs-test2