diff --git a/utils/threadpool/threadpool.cpp b/utils/threadpool/threadpool.cpp index 14aa074f6..77a3c00e0 100644 --- a/utils/threadpool/threadpool.cpp +++ b/utils/threadpool/threadpool.cpp @@ -20,7 +20,7 @@ * * ***********************************************************************/ - +#define NOLOGGING #include using namespace std; @@ -127,6 +127,7 @@ void ThreadPool::join(uint64_t thrHandle) bool foundit = false; for (iter = fWaitingFunctors.begin(); iter != end; ++iter) { + foundit = false; if (iter->first == thrHandle) { foundit = true; @@ -141,6 +142,42 @@ void ThreadPool::join(uint64_t thrHandle) } } +void ThreadPool::join(std::vector thrHandle) +{ + boost::mutex::scoped_lock lock1(fMutex); + + while (waitingFunctorsSize > 0) + { + Container_T::iterator iter; + Container_T::iterator end = fWaitingFunctors.end(); + bool foundit = false; + for (iter = fWaitingFunctors.begin(); iter != end; ++iter) + { + foundit = false; + std::vector::iterator thrIter; + std::vector::iterator thrEnd = thrHandle.end(); + for (thrIter = thrHandle.begin(); thrIter != thrEnd; ++thrIter) + { + if (iter->first == *thrIter) + { + foundit = true; + break; + } + } + if (foundit == true) + { + break; + } + } + // If we didn't find any of the handles, then all are complete + if (!foundit) + { + break; + } + fThreadAvailable.wait(lock1); + } +} + int64_t ThreadPool::invoke(const Functor_T &threadfunc) { boost::mutex::scoped_lock lock1(fMutex); @@ -276,6 +313,7 @@ void ThreadPool::beginThread() throw() // Log the exception and exit this thread try { +#ifndef NOLOGGING logging::Message::Args args; logging::Message message(5); args.add("beginThread: Caught exception: "); @@ -287,7 +325,7 @@ void ThreadPool::beginThread() throw() logging::MessageLog ml(lid); ml.logErrorMessage( message ); - +#endif } catch(...) { @@ -302,6 +340,7 @@ void ThreadPool::beginThread() throw() // Log the exception and exit this thread try { +#ifndef NOLOGGING logging::Message::Args args; logging::Message message(6); args.add("beginThread: Caught unknown exception!"); @@ -312,7 +351,7 @@ void ThreadPool::beginThread() throw() logging::MessageLog ml(lid); ml.logErrorMessage( message ); - +#endif } catch(...) { diff --git a/utils/threadpool/threadpool.h b/utils/threadpool/threadpool.h index a21e19c0e..a14a6e4fb 100644 --- a/utils/threadpool/threadpool.h +++ b/utils/threadpool/threadpool.h @@ -147,7 +147,11 @@ public: */ EXPORT void join(uint64_t thrHandle); - /** @brief for use in debugging + /** @brief Wait for a specific thread + */ + EXPORT void join(std::vector thrHandle); + + /** @brief for use in debugging */ EXPORT void dump(); diff --git a/utils/threadpool/tp.cpp b/utils/threadpool/tp.cpp new file mode 100644 index 000000000..92eaeeae7 --- /dev/null +++ b/utils/threadpool/tp.cpp @@ -0,0 +1,121 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + + +#include +#include +#include +#include +using namespace std; +#include +#include +#include +#include +#include +#include +#include +#include "threadpool.h" + +int64_t thecount = 0; +boost::mutex mutex; + +const string timeNow() +{ + time_t outputTime = time(0); + struct tm ltm; + char buf[32]; //ctime(3) says at least 26 + size_t len = 0; +#ifdef _MSC_VER + asctime_s(buf, 32, localtime_r(&outputTime, <m)); +#else + asctime_r(localtime_r(&outputTime, <m), buf); +#endif + len = strlen(buf); + if (len > 0) --len; + if (buf[len] == '\n') buf[len] = 0; + return buf; +} + +// Functor class +struct foo +{ + int64_t fData; + int64_t fThd; + string start; + + void operator ()() + { + start = timeNow(); + + std::cout << "foo thd = " << fThd << " start " << start << std::endl; + for (int64_t i = 0; i < 1024*1024*fThd*128; i++) + // simulate some work + fData++; + + boost::mutex::scoped_lock lock(mutex); + std::cout << "foo thd = " << fThd << " start " << start << " fin " << timeNow() << std::endl; + } + + foo(int64_t i) : fThd(i), fData(i) {start=timeNow();} + + foo(const foo& copy) : fData(copy.fData), fThd(copy.fThd), start(copy.start) {std::cout << "new foo" << endl;} + + ~foo() {} +}; + + + +int main( int argc, char **argv) +{ + threadpool::ThreadPool pool( 20, 10 ); + std::vector hndl; + hndl.reserve(10); + int t1 = hndl.capacity(); + uint64_t testHndl; + uint64_t thdhndl=999; + for (int64_t y = 0; y < 20; y++) + { + foo bar(y); +// for (int64_t i = 0; i < 10; ++i) + { + thdhndl = pool.invoke(bar); + if (y<10) + { + hndl.push_back(thdhndl); + } + if (y == 0) + { + testHndl = thdhndl; + } + } + + boost::mutex::scoped_lock lock(mutex); + } + // Wait until all of the queued up and in-progress work has finished + std::cout << "Threads for join " << hndl.size() << std::endl; + pool.dump(); + std::cout << "*** JOIN 1 ***" << std::endl; + pool.join(testHndl); + pool.dump(); + std::cout << "*** JOIN 10 ***" << std::endl; + pool.join(hndl); + pool.dump(); + std::cout << "*** WAIT ***" << std::endl; + pool.wait(); + pool.dump(); + return 0; +} diff --git a/utils/threadpool/tp.vpj b/utils/threadpool/tp.vpj new file mode 100644 index 000000000..bffc57e50 --- /dev/null +++ b/utils/threadpool/tp.vpj @@ -0,0 +1,238 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/utils/threadpool/tp.vpw b/utils/threadpool/tp.vpw new file mode 100644 index 000000000..c15af49c7 --- /dev/null +++ b/utils/threadpool/tp.vpw @@ -0,0 +1,6 @@ + + + + + +