diff --git a/src/Downloader.cpp b/src/Downloader.cpp index 908f90557..b935aa641 100644 --- a/src/Downloader.cpp +++ b/src/Downloader.cpp @@ -2,6 +2,7 @@ #include "Config.h" #include #include +#include using namespace std; namespace storagemanager @@ -28,9 +29,14 @@ Downloader::~Downloader() { } +inline boost::mutex & Downloader::getDownloadMutex() +{ + return download_mutex; +} + int Downloader::download(const vector &keys, vector *errnos) { - uint counter = keys.size(); + volatile uint counter = keys.size(); boost::condition condvar; boost::mutex m; DownloadListener listener(&counter, &condvar, &m); @@ -76,7 +82,8 @@ int Downloader::download(const vector &keys, vector *errnos for (i = 0; i < keys.size(); i++) if (inserted[i]) downloads.erase(iterators[i]); - + s.unlock(); + // check for errors & propagate int ret = 0; errnos->resize(keys.size()); @@ -110,12 +117,13 @@ void Downloader::Download::operator()() int err = storage->getObject(*key, dler->getDownloadPath() + "/" + *key); if (err != 0) dl_errno = errno; - + + boost::unique_lock s(dler->getDownloadMutex()); for (auto &listener : listeners) listener->downloadFinished(); } -Downloader::DownloadListener::DownloadListener(uint *counter, boost::condition *condvar, boost::mutex *m) : count(counter), cond(condvar), mutex(m) +Downloader::DownloadListener::DownloadListener(volatile uint *counter, boost::condition *condvar, boost::mutex *m) : count(counter), cond(condvar), mutex(m) { } diff --git a/src/Downloader.h b/src/Downloader.h index 6a0fbb875..8ddb0f004 100644 --- a/src/Downloader.h +++ b/src/Downloader.h @@ -34,10 +34,10 @@ class Downloader class DownloadListener { public: - DownloadListener(uint *counter, boost::condition *condvar, boost::mutex *m); + DownloadListener(volatile uint *counter, boost::condition *condvar, boost::mutex *m); void downloadFinished(); private: - uint *count; + volatile uint *count; boost::condition *cond; boost::mutex *mutex; }; @@ -65,7 +65,7 @@ class Downloader typedef std::unordered_set, DLHasher, DLEquals> Downloads_t; Downloads_t downloads; boost::mutex download_mutex; - + boost::mutex &getDownloadMutex(); boost::scoped_ptr workers; CloudStorage *storage; }; diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp index 7b7f7278b..e28754126 100644 --- a/src/ThreadPool.cpp +++ b/src/ThreadPool.cpp @@ -1,5 +1,6 @@ #include "ThreadPool.h" +#include using namespace std; @@ -20,7 +21,7 @@ ThreadPool::ThreadPool(uint num_threads) : maxThreads(num_threads), die(false), ThreadPool::~ThreadPool() { - boost::unique_lock s(m); + boost::unique_lock s(mutex); die = true; jobs.clear(); jobAvailable.notify_all(); @@ -33,7 +34,7 @@ ThreadPool::~ThreadPool() void ThreadPool::addJob(const boost::shared_ptr &j) { - boost::unique_lock s(m); + boost::unique_lock s(mutex); jobs.push_back(j); // Start another thread if necessary if (threadsWaiting == 0 && threads.size() < maxThreads) { @@ -59,20 +60,25 @@ void ThreadPool::pruner_fcn() void ThreadPool::prune() { - boost::unique_lock s(m); - set::iterator it, to_remove; - it = s_threads.begin(); - while (it != s_threads.end()) + set::iterator it; + + boost::unique_lock s(mutex); + while (1) { - if ((*it)->joinable()) + while (pruneable.empty() && !die) + somethingToPrune.wait(s); + if (die) + return; + + for (auto &id : pruneable) { - (*it)->join(); - threads.remove_thread(*it); - to_remove = it++; - s_threads.erase(to_remove); + it = s_threads.find(id); + assert(it != s_threads.end()); + it->thrd->join(); + threads.remove_thread(it->thrd); + s_threads.erase(it); } - else - ++it; + pruneable.clear(); } } @@ -83,9 +89,22 @@ void ThreadPool::setMaxThreads(uint newMax) void ThreadPool::processingLoop() { - boost::unique_lock s(m, boost::defer_lock); + try + { + _processingLoop(); + } + catch (...) + {} + boost::unique_lock s(mutex); + pruneable.push_back(boost::this_thread::get_id()); + somethingToPrune.notify_one(); +} + +void ThreadPool::_processingLoop() +{ + boost::unique_lock s(mutex, boost::defer_lock); - while (!die) + while (1) { s.lock(); while (jobs.empty() && !die) @@ -106,4 +125,17 @@ void ThreadPool::processingLoop() } } +inline bool ThreadPool::id_compare::operator()(const ID_Thread &t1, const ID_Thread &t2) const +{ + return t1.id < t2.id; +} + +ThreadPool::ID_Thread::ID_Thread(boost::thread::id &i): id(i) +{ +} + +ThreadPool::ID_Thread::ID_Thread(boost::thread *t): id(t->get_id()), thrd(t) +{ +} + } diff --git a/src/ThreadPool.h b/src/ThreadPool.h index 8669e7c1f..e111f3585 100644 --- a/src/ThreadPool.h +++ b/src/ThreadPool.h @@ -28,25 +28,38 @@ class ThreadPool : public boost::noncopyable void setMaxThreads(uint newMax); private: - struct Runner { - Runner(ThreadPool *t) : tp(t) { } - void operator()() { tp->processingLoop(); } - ThreadPool *tp; - }; - void processingLoop(); // the fcn run by each thread + void _processingLoop(); // processingLoop() wraps _processingLoop() with thread management stuff. uint maxThreads; - bool die; + volatile bool die; int threadsWaiting; boost::thread_group threads; - std::set s_threads; + + // the set s_threads below is intended to make pruning idle threads efficient. + // there should be a cleaner way to do it. + struct ID_Thread + { + ID_Thread(boost::thread::id &); + ID_Thread(boost::thread *); + boost::thread::id id; + boost::thread *thrd; + }; + + struct id_compare + { + bool operator()(const ID_Thread &, const ID_Thread &) const; + }; + std::set s_threads; + boost::condition jobAvailable; std::deque > jobs; - boost::mutex m; + boost::mutex mutex; const boost::posix_time::time_duration idleThreadTimeout = boost::posix_time::seconds(60); boost::thread pruner; + boost::condition somethingToPrune; + std::vector pruneable; // when a thread is about to return it puts its id here void pruner_fcn(); void prune(); }; diff --git a/src/unit_tests.cpp b/src/unit_tests.cpp index 53e32e0b8..f0e42a8b2 100644 --- a/src/unit_tests.cpp +++ b/src/unit_tests.cpp @@ -496,6 +496,7 @@ bool cacheTest1() // cleanup bf::remove(cachePath / "storagemanager.cnf"); + bf::remove(storagePath / "storagemanager.cnf"); cout << "cache test 1 OK" << endl; }