You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-15 12:09:09 +03:00
Simple unit test for Cache::read(), and corresponding fixes.
This commit is contained in:
@@ -52,6 +52,7 @@ Cache::Cache()
|
||||
cout << "Cache got prefix " << prefix << endl;
|
||||
|
||||
downloader.setDownloadPath(prefix.string());
|
||||
/* todo: populate structures with existing files in the cache path */
|
||||
}
|
||||
|
||||
Cache::~Cache()
|
||||
@@ -84,23 +85,36 @@ void Cache::read(const vector<string> &keys)
|
||||
s.unlock();
|
||||
|
||||
// start downloading the keys to fetch
|
||||
int dl_err;
|
||||
vector<int> dl_errnos;
|
||||
if (!keysToFetch.empty())
|
||||
downloader.download(keysToFetch);
|
||||
dl_err = downloader.download(keysToFetch, &dl_errnos);
|
||||
|
||||
s.lock();
|
||||
|
||||
// move all keys to the back of the LRU
|
||||
for (const string &key : keys)
|
||||
for (i = 0; i < keys.size(); i++)
|
||||
{
|
||||
mit = m_lru.find(key);
|
||||
mit = m_lru.find(keys[i]);
|
||||
if (mit != m_lru.end())
|
||||
lru.splice(lru.end(), lru, mit->lit);
|
||||
else
|
||||
{
|
||||
lru.push_back(key);
|
||||
lru.splice(lru.end(), lru, mit->lit);
|
||||
removeFromDNE(lru.end());
|
||||
}
|
||||
else if (dl_errnos[i] == 0) // successful download
|
||||
{
|
||||
lru.push_back(keys[i]);
|
||||
m_lru.insert(M_LRU_element_t(&(lru.back()), lru.end()--));
|
||||
}
|
||||
removeFromDNE(lru.end());
|
||||
else
|
||||
{
|
||||
// Downloader already logged it, anything to do here?
|
||||
/* brainstorming options for handling it.
|
||||
1) Should it be handled? The caller will log a file-not-found error, and there will be a download
|
||||
failure in the log already.
|
||||
2) Can't really DO anything can it?
|
||||
*/
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,7 +146,10 @@ void Cache::removeFromDNE(const LRU_t::iterator &key)
|
||||
doNotEvict.erase(it);
|
||||
}
|
||||
|
||||
|
||||
const boost::filesystem::path & Cache::getCachePath()
|
||||
{
|
||||
return prefix;
|
||||
}
|
||||
|
||||
|
||||
void Cache::exists(const vector<string> &keys, vector<bool> *out)
|
||||
|
||||
@@ -28,11 +28,14 @@ class Cache : public boost::noncopyable
|
||||
void setCacheSize(size_t size);
|
||||
void makeSpace(size_t size);
|
||||
|
||||
// test helpers
|
||||
const boost::filesystem::path &getCachePath();
|
||||
private:
|
||||
boost::filesystem::path prefix;
|
||||
size_t maxCacheSize;
|
||||
|
||||
/* The main cache structures */
|
||||
// lru owns the string memory for the filenames it manages. m_lru and DNE point to those strings.
|
||||
typedef std::list<std::string> LRU_t;
|
||||
LRU_t lru;
|
||||
|
||||
|
||||
@@ -83,6 +83,7 @@ void Config::reloadThreadFcn()
|
||||
try
|
||||
{
|
||||
reload();
|
||||
// TODO: add a listener interface to inform upstream of config changes
|
||||
boost::this_thread::sleep(reloadInterval);
|
||||
}
|
||||
catch (boost::property_tree::ini_parser_error &e)
|
||||
@@ -133,8 +134,14 @@ string expand_numbers(const boost::smatch &match)
|
||||
string Config::getValue(const string §ion, const string &key) const
|
||||
{
|
||||
// if we care, move this envvar substition stuff to where the file is loaded
|
||||
string ret;
|
||||
boost::unique_lock<boost::mutex> s(mutex);
|
||||
string ret = contents.get<string>(section + "." + key);
|
||||
try {
|
||||
ret = contents.get<string>(section + "." + key);
|
||||
}
|
||||
catch (...) {
|
||||
return ""; // debating whether it's necessary to tell the caller there was no entry.
|
||||
}
|
||||
s.unlock();
|
||||
|
||||
boost::regex re("\\$\\{(.+)\\}");
|
||||
|
||||
@@ -7,20 +7,28 @@ using namespace std;
|
||||
namespace storagemanager
|
||||
{
|
||||
|
||||
Downloader::Downloader()
|
||||
Downloader::Downloader() : maxDownloads(0)
|
||||
{
|
||||
storage = CloudStorage::get();
|
||||
string sMaxDownloads = Config::get()->getValue("ObjectStorage", "max_concurrent_downloads");
|
||||
maxDownloads = stoi(sMaxDownloads);
|
||||
try
|
||||
{
|
||||
maxDownloads = stoul(sMaxDownloads);
|
||||
}
|
||||
catch(invalid_argument)
|
||||
{
|
||||
// log something
|
||||
}
|
||||
if (maxDownloads == 0)
|
||||
maxDownloads = 20;
|
||||
workers.reset(new ThreadPool(maxDownloads));
|
||||
}
|
||||
|
||||
Downloader::~Downloader()
|
||||
{
|
||||
}
|
||||
|
||||
void Downloader::download(const vector<const string *> &keys)
|
||||
int Downloader::download(const vector<const string *> &keys, vector<int> *errnos)
|
||||
{
|
||||
uint counter = keys.size();
|
||||
boost::condition condvar;
|
||||
@@ -33,8 +41,8 @@ void Downloader::download(const vector<const string *> &keys)
|
||||
|
||||
uint i;
|
||||
|
||||
for (const string *key : keys)
|
||||
dls[i].reset(new Download(key, this));
|
||||
for (i = 0; i < keys.size(); i++)
|
||||
dls[i].reset(new Download(keys[i], this));
|
||||
|
||||
boost::unique_lock<boost::mutex> s(download_mutex);
|
||||
for (i = 0; i < keys.size(); i++)
|
||||
@@ -47,11 +55,14 @@ void Downloader::download(const vector<const string *> &keys)
|
||||
if (inserted[i])
|
||||
{
|
||||
dl->listeners.push_back(&listener);
|
||||
downloaders->addJob(dl);
|
||||
workers->addJob(dl);
|
||||
}
|
||||
else
|
||||
{
|
||||
dl = *(ret.first); // point to the existing download. Redundant with the iterators array. Don't care yet.
|
||||
(*iterators[i])->listeners.push_back(&listener);
|
||||
}
|
||||
}
|
||||
s.unlock();
|
||||
|
||||
// wait for the downloads to finish
|
||||
@@ -66,7 +77,16 @@ void Downloader::download(const vector<const string *> &keys)
|
||||
if (inserted[i])
|
||||
downloads.erase(iterators[i]);
|
||||
|
||||
// TODO: check for errors & propagate
|
||||
// check for errors & propagate
|
||||
int ret = 0;
|
||||
errnos->resize(keys.size());
|
||||
for (i = 0; i < keys.size(); i++)
|
||||
{
|
||||
auto &dl = dls[i];
|
||||
(*errnos)[i] = dl->dl_errno;
|
||||
if (dl->dl_errno != 0)
|
||||
ret = -1;
|
||||
}
|
||||
}
|
||||
|
||||
void Downloader::setDownloadPath(const string &path)
|
||||
@@ -87,7 +107,7 @@ Downloader::Download::Download(const string *source, Downloader *dl) : key(sourc
|
||||
void Downloader::Download::operator()()
|
||||
{
|
||||
CloudStorage *storage = CloudStorage::get();
|
||||
int err = storage->getObject(*key, dler->getDownloadPath() + *key);
|
||||
int err = storage->getObject(*key, dler->getDownloadPath() + "/" + *key);
|
||||
if (err != 0)
|
||||
dl_errno = errno;
|
||||
|
||||
|
||||
@@ -21,7 +21,9 @@ class Downloader
|
||||
Downloader();
|
||||
virtual ~Downloader();
|
||||
|
||||
void download(const std::vector<const std::string *> &keys);
|
||||
// returns 0 on success. If != 0, errnos will contains the errno associated with the failure
|
||||
// caller owns the memory for the strings.
|
||||
int download(const std::vector<const std::string *> &keys, std::vector<int> *errnos);
|
||||
void setDownloadPath(const std::string &path);
|
||||
const std::string & getDownloadPath() const;
|
||||
|
||||
@@ -64,7 +66,7 @@ class Downloader
|
||||
Downloads_t downloads;
|
||||
boost::mutex download_mutex;
|
||||
|
||||
boost::scoped_ptr<ThreadPool> downloaders;
|
||||
boost::scoped_ptr<ThreadPool> workers;
|
||||
CloudStorage *storage;
|
||||
};
|
||||
|
||||
|
||||
@@ -34,6 +34,11 @@ LocalStorage::~LocalStorage()
|
||||
{
|
||||
}
|
||||
|
||||
const boost::filesystem::path & LocalStorage::getPrefix() const
|
||||
{
|
||||
return prefix;
|
||||
}
|
||||
|
||||
int LocalStorage::copy(const path &source, const path &dest)
|
||||
{
|
||||
boost::system::error_code err;
|
||||
@@ -49,7 +54,7 @@ int LocalStorage::copy(const path &source, const path &dest)
|
||||
path operator+(const path &p1, const path &p2)
|
||||
{
|
||||
path ret(p1);
|
||||
ret+=p2;
|
||||
ret /= p2;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,8 @@ class LocalStorage : public CloudStorage
|
||||
void deleteObject(const std::string &key);
|
||||
int copyObject(const std::string &sourceKey, const std::string &destKey);
|
||||
|
||||
const boost::filesystem::path & getPrefix() const;
|
||||
|
||||
private:
|
||||
boost::filesystem::path prefix;
|
||||
|
||||
|
||||
@@ -459,6 +459,47 @@ bool copytask()
|
||||
return true;
|
||||
}
|
||||
|
||||
bool localstorageTest1()
|
||||
{
|
||||
LocalStorage ls;
|
||||
|
||||
/* TODO: Some stuff */
|
||||
cout << "local storage test 1 OK" << endl;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool cacheTest1()
|
||||
{
|
||||
namespace bf = boost::filesystem;
|
||||
Cache cache;
|
||||
CloudStorage *cs = CloudStorage::get();
|
||||
LocalStorage *ls = dynamic_cast<LocalStorage *>(cs);
|
||||
if (ls == NULL) {
|
||||
cout << "Cache test 1 requires using local storage" << endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
bf::path storagePath = ls->getPrefix();
|
||||
bf::path cachePath = cache.getCachePath();
|
||||
vector<string> v_bogus;
|
||||
|
||||
// make sure nothing shows up in the cache path for files that don't exist
|
||||
v_bogus.push_back("does-not-exist");
|
||||
cache.read(v_bogus);
|
||||
assert(!bf::exists(cachePath / "does-not-exist"));
|
||||
|
||||
// make sure a file that does exist does show up in the cache path
|
||||
bf::copy_file("storagemanager.cnf", storagePath / "storagemanager.cnf", bf::copy_option::overwrite_if_exists);
|
||||
v_bogus[0] = "storagemanager.cnf";
|
||||
cache.read(v_bogus);
|
||||
assert(bf::exists(cachePath / "storagemanager.cnf"));
|
||||
|
||||
// cleanup
|
||||
bf::remove(cachePath / "storagemanager.cnf");
|
||||
cout << "cache test 1 OK" << endl;
|
||||
}
|
||||
|
||||
|
||||
int main()
|
||||
{
|
||||
cout << "connecting" << endl;
|
||||
@@ -475,12 +516,8 @@ int main()
|
||||
pingtask();
|
||||
copytask();
|
||||
|
||||
Config *conf = Config::get();
|
||||
LocalStorage ls;
|
||||
Cache cache;
|
||||
|
||||
|
||||
|
||||
localstorageTest1();
|
||||
cacheTest1();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user