1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-17 01:02:23 +03:00

Fixed up some sync stuff.

This commit is contained in:
Patrick LeBlanc
2019-03-07 08:47:39 -06:00
parent c386881f40
commit 9bd6ddac1b
5 changed files with 85 additions and 31 deletions

View File

@@ -2,6 +2,7 @@
#include "Config.h" #include "Config.h"
#include <string> #include <string>
#include <errno.h> #include <errno.h>
#include <iostream>
using namespace std; using namespace std;
namespace storagemanager namespace storagemanager
@@ -28,9 +29,14 @@ Downloader::~Downloader()
{ {
} }
inline boost::mutex & Downloader::getDownloadMutex()
{
return download_mutex;
}
int Downloader::download(const vector<const string *> &keys, vector<int> *errnos) int Downloader::download(const vector<const string *> &keys, vector<int> *errnos)
{ {
uint counter = keys.size(); volatile uint counter = keys.size();
boost::condition condvar; boost::condition condvar;
boost::mutex m; boost::mutex m;
DownloadListener listener(&counter, &condvar, &m); DownloadListener listener(&counter, &condvar, &m);
@@ -76,7 +82,8 @@ int Downloader::download(const vector<const string *> &keys, vector<int> *errnos
for (i = 0; i < keys.size(); i++) for (i = 0; i < keys.size(); i++)
if (inserted[i]) if (inserted[i])
downloads.erase(iterators[i]); downloads.erase(iterators[i]);
s.unlock();
// check for errors & propagate // check for errors & propagate
int ret = 0; int ret = 0;
errnos->resize(keys.size()); errnos->resize(keys.size());
@@ -110,12 +117,13 @@ void Downloader::Download::operator()()
int err = storage->getObject(*key, dler->getDownloadPath() + "/" + *key); int err = storage->getObject(*key, dler->getDownloadPath() + "/" + *key);
if (err != 0) if (err != 0)
dl_errno = errno; dl_errno = errno;
boost::unique_lock<boost::mutex> s(dler->getDownloadMutex());
for (auto &listener : listeners) for (auto &listener : listeners)
listener->downloadFinished(); listener->downloadFinished();
} }
Downloader::DownloadListener::DownloadListener(uint *counter, boost::condition *condvar, boost::mutex *m) : count(counter), cond(condvar), mutex(m) Downloader::DownloadListener::DownloadListener(volatile uint *counter, boost::condition *condvar, boost::mutex *m) : count(counter), cond(condvar), mutex(m)
{ {
} }

View File

@@ -34,10 +34,10 @@ class Downloader
class DownloadListener class DownloadListener
{ {
public: public:
DownloadListener(uint *counter, boost::condition *condvar, boost::mutex *m); DownloadListener(volatile uint *counter, boost::condition *condvar, boost::mutex *m);
void downloadFinished(); void downloadFinished();
private: private:
uint *count; volatile uint *count;
boost::condition *cond; boost::condition *cond;
boost::mutex *mutex; boost::mutex *mutex;
}; };
@@ -65,7 +65,7 @@ class Downloader
typedef std::unordered_set<boost::shared_ptr<Download>, DLHasher, DLEquals> Downloads_t; typedef std::unordered_set<boost::shared_ptr<Download>, DLHasher, DLEquals> Downloads_t;
Downloads_t downloads; Downloads_t downloads;
boost::mutex download_mutex; boost::mutex download_mutex;
boost::mutex &getDownloadMutex();
boost::scoped_ptr<ThreadPool> workers; boost::scoped_ptr<ThreadPool> workers;
CloudStorage *storage; CloudStorage *storage;
}; };

View File

@@ -1,5 +1,6 @@
#include "ThreadPool.h" #include "ThreadPool.h"
#include <iostream>
using namespace std; using namespace std;
@@ -20,7 +21,7 @@ ThreadPool::ThreadPool(uint num_threads) : maxThreads(num_threads), die(false),
ThreadPool::~ThreadPool() ThreadPool::~ThreadPool()
{ {
boost::unique_lock<boost::mutex> s(m); boost::unique_lock<boost::mutex> s(mutex);
die = true; die = true;
jobs.clear(); jobs.clear();
jobAvailable.notify_all(); jobAvailable.notify_all();
@@ -33,7 +34,7 @@ ThreadPool::~ThreadPool()
void ThreadPool::addJob(const boost::shared_ptr<Job> &j) void ThreadPool::addJob(const boost::shared_ptr<Job> &j)
{ {
boost::unique_lock<boost::mutex> s(m); boost::unique_lock<boost::mutex> s(mutex);
jobs.push_back(j); jobs.push_back(j);
// Start another thread if necessary // Start another thread if necessary
if (threadsWaiting == 0 && threads.size() < maxThreads) { if (threadsWaiting == 0 && threads.size() < maxThreads) {
@@ -59,20 +60,25 @@ void ThreadPool::pruner_fcn()
void ThreadPool::prune() void ThreadPool::prune()
{ {
boost::unique_lock<boost::mutex> s(m); set<ID_Thread>::iterator it;
set<boost::thread *>::iterator it, to_remove;
it = s_threads.begin(); boost::unique_lock<boost::mutex> s(mutex);
while (it != s_threads.end()) while (1)
{ {
if ((*it)->joinable()) while (pruneable.empty() && !die)
somethingToPrune.wait(s);
if (die)
return;
for (auto &id : pruneable)
{ {
(*it)->join(); it = s_threads.find(id);
threads.remove_thread(*it); assert(it != s_threads.end());
to_remove = it++; it->thrd->join();
s_threads.erase(to_remove); threads.remove_thread(it->thrd);
s_threads.erase(it);
} }
else pruneable.clear();
++it;
} }
} }
@@ -83,9 +89,22 @@ void ThreadPool::setMaxThreads(uint newMax)
void ThreadPool::processingLoop() void ThreadPool::processingLoop()
{ {
boost::unique_lock<boost::mutex> s(m, boost::defer_lock); try
{
_processingLoop();
}
catch (...)
{}
boost::unique_lock<boost::mutex> s(mutex);
pruneable.push_back(boost::this_thread::get_id());
somethingToPrune.notify_one();
}
void ThreadPool::_processingLoop()
{
boost::unique_lock<boost::mutex> s(mutex, boost::defer_lock);
while (!die) while (1)
{ {
s.lock(); s.lock();
while (jobs.empty() && !die) while (jobs.empty() && !die)
@@ -106,4 +125,17 @@ void ThreadPool::processingLoop()
} }
} }
inline bool ThreadPool::id_compare::operator()(const ID_Thread &t1, const ID_Thread &t2) const
{
return t1.id < t2.id;
}
ThreadPool::ID_Thread::ID_Thread(boost::thread::id &i): id(i)
{
}
ThreadPool::ID_Thread::ID_Thread(boost::thread *t): id(t->get_id()), thrd(t)
{
}
} }

View File

@@ -28,25 +28,38 @@ class ThreadPool : public boost::noncopyable
void setMaxThreads(uint newMax); void setMaxThreads(uint newMax);
private: private:
struct Runner {
Runner(ThreadPool *t) : tp(t) { }
void operator()() { tp->processingLoop(); }
ThreadPool *tp;
};
void processingLoop(); // the fcn run by each thread void processingLoop(); // the fcn run by each thread
void _processingLoop(); // processingLoop() wraps _processingLoop() with thread management stuff.
uint maxThreads; uint maxThreads;
bool die; volatile bool die;
int threadsWaiting; int threadsWaiting;
boost::thread_group threads; boost::thread_group threads;
std::set<boost::thread *> s_threads;
// the set s_threads below is intended to make pruning idle threads efficient.
// there should be a cleaner way to do it.
struct ID_Thread
{
ID_Thread(boost::thread::id &);
ID_Thread(boost::thread *);
boost::thread::id id;
boost::thread *thrd;
};
struct id_compare
{
bool operator()(const ID_Thread &, const ID_Thread &) const;
};
std::set<ID_Thread, id_compare> s_threads;
boost::condition jobAvailable; boost::condition jobAvailable;
std::deque<boost::shared_ptr<Job> > jobs; std::deque<boost::shared_ptr<Job> > jobs;
boost::mutex m; boost::mutex mutex;
const boost::posix_time::time_duration idleThreadTimeout = boost::posix_time::seconds(60); const boost::posix_time::time_duration idleThreadTimeout = boost::posix_time::seconds(60);
boost::thread pruner; boost::thread pruner;
boost::condition somethingToPrune;
std::vector<boost::thread::id> pruneable; // when a thread is about to return it puts its id here
void pruner_fcn(); void pruner_fcn();
void prune(); void prune();
}; };

View File

@@ -496,6 +496,7 @@ bool cacheTest1()
// cleanup // cleanup
bf::remove(cachePath / "storagemanager.cnf"); bf::remove(cachePath / "storagemanager.cnf");
bf::remove(storagePath / "storagemanager.cnf");
cout << "cache test 1 OK" << endl; cout << "cache test 1 OK" << endl;
} }