You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-513 fix a couple bugs in threadpool join() Add a test program
This commit is contained in:
@ -20,7 +20,7 @@
|
||||
*
|
||||
*
|
||||
***********************************************************************/
|
||||
|
||||
#define NOLOGGING
|
||||
#include <stdexcept>
|
||||
using namespace std;
|
||||
|
||||
@ -127,6 +127,7 @@ void ThreadPool::join(uint64_t thrHandle)
|
||||
bool foundit = false;
|
||||
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
|
||||
{
|
||||
foundit = false;
|
||||
if (iter->first == thrHandle)
|
||||
{
|
||||
foundit = true;
|
||||
@ -141,6 +142,42 @@ void ThreadPool::join(uint64_t thrHandle)
|
||||
}
|
||||
}
|
||||
|
||||
void ThreadPool::join(std::vector<uint64_t> thrHandle)
|
||||
{
|
||||
boost::mutex::scoped_lock lock1(fMutex);
|
||||
|
||||
while (waitingFunctorsSize > 0)
|
||||
{
|
||||
Container_T::iterator iter;
|
||||
Container_T::iterator end = fWaitingFunctors.end();
|
||||
bool foundit = false;
|
||||
for (iter = fWaitingFunctors.begin(); iter != end; ++iter)
|
||||
{
|
||||
foundit = false;
|
||||
std::vector<uint64_t>::iterator thrIter;
|
||||
std::vector<uint64_t>::iterator thrEnd = thrHandle.end();
|
||||
for (thrIter = thrHandle.begin(); thrIter != thrEnd; ++thrIter)
|
||||
{
|
||||
if (iter->first == *thrIter)
|
||||
{
|
||||
foundit = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (foundit == true)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
// If we didn't find any of the handles, then all are complete
|
||||
if (!foundit)
|
||||
{
|
||||
break;
|
||||
}
|
||||
fThreadAvailable.wait(lock1);
|
||||
}
|
||||
}
|
||||
|
||||
int64_t ThreadPool::invoke(const Functor_T &threadfunc)
|
||||
{
|
||||
boost::mutex::scoped_lock lock1(fMutex);
|
||||
@ -276,6 +313,7 @@ void ThreadPool::beginThread() throw()
|
||||
// Log the exception and exit this thread
|
||||
try
|
||||
{
|
||||
#ifndef NOLOGGING
|
||||
logging::Message::Args args;
|
||||
logging::Message message(5);
|
||||
args.add("beginThread: Caught exception: ");
|
||||
@ -287,7 +325,7 @@ void ThreadPool::beginThread() throw()
|
||||
logging::MessageLog ml(lid);
|
||||
|
||||
ml.logErrorMessage( message );
|
||||
|
||||
#endif
|
||||
}
|
||||
catch(...)
|
||||
{
|
||||
@ -302,6 +340,7 @@ void ThreadPool::beginThread() throw()
|
||||
// Log the exception and exit this thread
|
||||
try
|
||||
{
|
||||
#ifndef NOLOGGING
|
||||
logging::Message::Args args;
|
||||
logging::Message message(6);
|
||||
args.add("beginThread: Caught unknown exception!");
|
||||
@ -312,7 +351,7 @@ void ThreadPool::beginThread() throw()
|
||||
logging::MessageLog ml(lid);
|
||||
|
||||
ml.logErrorMessage( message );
|
||||
|
||||
#endif
|
||||
}
|
||||
catch(...)
|
||||
{
|
||||
|
@ -147,7 +147,11 @@ public:
|
||||
*/
|
||||
EXPORT void join(uint64_t thrHandle);
|
||||
|
||||
/** @brief for use in debugging
|
||||
/** @brief Wait for a specific thread
|
||||
*/
|
||||
EXPORT void join(std::vector<uint64_t> thrHandle);
|
||||
|
||||
/** @brief for use in debugging
|
||||
*/
|
||||
EXPORT void dump();
|
||||
|
||||
|
121
utils/threadpool/tp.cpp
Normal file
121
utils/threadpool/tp.cpp
Normal file
@ -0,0 +1,121 @@
|
||||
/* Copyright (C) 2014 InfiniDB, Inc.
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; version 2 of
|
||||
the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
|
||||
#include <string>
|
||||
#include <stdexcept>
|
||||
#include <iostream>
|
||||
#include <fstream>
|
||||
using namespace std;
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
#include <boost/thread.hpp>
|
||||
#include <boost/scoped_ptr.hpp>
|
||||
#include <boost/scoped_array.hpp>
|
||||
#include <boost/shared_ptr.hpp>
|
||||
#include "threadpool.h"
|
||||
|
||||
int64_t thecount = 0;
|
||||
boost::mutex mutex;
|
||||
|
||||
const string timeNow()
|
||||
{
|
||||
time_t outputTime = time(0);
|
||||
struct tm ltm;
|
||||
char buf[32]; //ctime(3) says at least 26
|
||||
size_t len = 0;
|
||||
#ifdef _MSC_VER
|
||||
asctime_s(buf, 32, localtime_r(&outputTime, <m));
|
||||
#else
|
||||
asctime_r(localtime_r(&outputTime, <m), buf);
|
||||
#endif
|
||||
len = strlen(buf);
|
||||
if (len > 0) --len;
|
||||
if (buf[len] == '\n') buf[len] = 0;
|
||||
return buf;
|
||||
}
|
||||
|
||||
// Functor class
|
||||
struct foo
|
||||
{
|
||||
int64_t fData;
|
||||
int64_t fThd;
|
||||
string start;
|
||||
|
||||
void operator ()()
|
||||
{
|
||||
start = timeNow();
|
||||
|
||||
std::cout << "foo thd = " << fThd << " start " << start << std::endl;
|
||||
for (int64_t i = 0; i < 1024*1024*fThd*128; i++)
|
||||
// simulate some work
|
||||
fData++;
|
||||
|
||||
boost::mutex::scoped_lock lock(mutex);
|
||||
std::cout << "foo thd = " << fThd << " start " << start << " fin " << timeNow() << std::endl;
|
||||
}
|
||||
|
||||
foo(int64_t i) : fThd(i), fData(i) {start=timeNow();}
|
||||
|
||||
foo(const foo& copy) : fData(copy.fData), fThd(copy.fThd), start(copy.start) {std::cout << "new foo" << endl;}
|
||||
|
||||
~foo() {}
|
||||
};
|
||||
|
||||
|
||||
|
||||
int main( int argc, char **argv)
|
||||
{
|
||||
threadpool::ThreadPool pool( 20, 10 );
|
||||
std::vector<uint64_t> hndl;
|
||||
hndl.reserve(10);
|
||||
int t1 = hndl.capacity();
|
||||
uint64_t testHndl;
|
||||
uint64_t thdhndl=999;
|
||||
for (int64_t y = 0; y < 20; y++)
|
||||
{
|
||||
foo bar(y);
|
||||
// for (int64_t i = 0; i < 10; ++i)
|
||||
{
|
||||
thdhndl = pool.invoke(bar);
|
||||
if (y<10)
|
||||
{
|
||||
hndl.push_back(thdhndl);
|
||||
}
|
||||
if (y == 0)
|
||||
{
|
||||
testHndl = thdhndl;
|
||||
}
|
||||
}
|
||||
|
||||
boost::mutex::scoped_lock lock(mutex);
|
||||
}
|
||||
// Wait until all of the queued up and in-progress work has finished
|
||||
std::cout << "Threads for join " << hndl.size() << std::endl;
|
||||
pool.dump();
|
||||
std::cout << "*** JOIN 1 ***" << std::endl;
|
||||
pool.join(testHndl);
|
||||
pool.dump();
|
||||
std::cout << "*** JOIN 10 ***" << std::endl;
|
||||
pool.join(hndl);
|
||||
pool.dump();
|
||||
std::cout << "*** WAIT ***" << std::endl;
|
||||
pool.wait();
|
||||
pool.dump();
|
||||
return 0;
|
||||
}
|
238
utils/threadpool/tp.vpj
Normal file
238
utils/threadpool/tp.vpj
Normal file
@ -0,0 +1,238 @@
|
||||
<!DOCTYPE Project SYSTEM "http://www.slickedit.com/dtd/vse/10.0/vpj.dtd">
|
||||
<Project
|
||||
Version="10.0"
|
||||
VendorName="SlickEdit"
|
||||
TemplateName="GNU C/C++"
|
||||
WorkingDir="."
|
||||
BuildSystem="vsbuild">
|
||||
<Config
|
||||
Name="Debug"
|
||||
Type="gnuc"
|
||||
DebugCallbackName="gdb"
|
||||
Version="1"
|
||||
OutputFile="%bdtp"
|
||||
CompilerConfigName="Latest Version">
|
||||
<Menu>
|
||||
<Target
|
||||
Name="Compile"
|
||||
MenuCaption="&Compile"
|
||||
Dialog="_gnuc_options_form Compile"
|
||||
CaptureOutputWith="ProcessBuffer"
|
||||
Deletable="0"
|
||||
OutputExts="*.o"
|
||||
SaveOption="SaveCurrent"
|
||||
RunFromDir="%rw">
|
||||
<Exec CmdLine='g++ -c %xup %defd -g -o "%bd%n%oe" %i "%f"'/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="Link"
|
||||
MenuCaption="&Link"
|
||||
ShowOnMenu="Never"
|
||||
Dialog="_gnuc_options_form Link"
|
||||
CaptureOutputWith="ProcessBuffer"
|
||||
Deletable="0"
|
||||
SaveOption="SaveCurrent"
|
||||
RunFromDir="%rw">
|
||||
<Exec CmdLine='g++ %xup -g -o "%o" %f %libs'/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="Build"
|
||||
MenuCaption="&Build"
|
||||
Dialog="_gnuc_options_form Compile"
|
||||
CaptureOutputWith="ProcessBuffer"
|
||||
Deletable="0"
|
||||
SaveOption="SaveWorkspaceFiles"
|
||||
RunFromDir="%rw"
|
||||
ClearProcessBuffer="1">
|
||||
<Exec CmdLine='"%(VSLICKBIN1)vsbuild" "%w" "%r" -t build'/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="Rebuild"
|
||||
MenuCaption="&Rebuild"
|
||||
Dialog="_gnuc_options_form Compile"
|
||||
CaptureOutputWith="ProcessBuffer"
|
||||
Deletable="0"
|
||||
SaveOption="SaveWorkspaceFiles"
|
||||
RunFromDir="%rw"
|
||||
ClearProcessBuffer="1">
|
||||
<Exec CmdLine='"%(VSLICKBIN1)vsbuild" "%w" "%r" -t rebuild'/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="Debug"
|
||||
MenuCaption="&Debug"
|
||||
Dialog="_gnuc_options_form Run/Debug"
|
||||
BuildFirst="1"
|
||||
CaptureOutputWith="ProcessBuffer"
|
||||
Deletable="0"
|
||||
SaveOption="SaveNone"
|
||||
RunFromDir="%rw"
|
||||
ClearProcessBuffer="1">
|
||||
<Exec CmdLine='vsdebugio -prog "%o"'/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="Execute"
|
||||
MenuCaption="E&xecute"
|
||||
Dialog="_gnuc_options_form Run/Debug"
|
||||
BuildFirst="1"
|
||||
CaptureOutputWith="ProcessBuffer"
|
||||
Deletable="0"
|
||||
SaveOption="SaveWorkspaceFiles"
|
||||
RunFromDir="%rw">
|
||||
<Exec CmdLine='"%o"'/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="dash"
|
||||
MenuCaption="-"
|
||||
Deletable="0">
|
||||
<Exec/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="GNU C Options"
|
||||
MenuCaption="GNU C &Options..."
|
||||
ShowOnMenu="HideIfNoCmdLine"
|
||||
Deletable="0"
|
||||
SaveOption="SaveNone">
|
||||
<Exec
|
||||
CmdLine="gnucoptions"
|
||||
Type="Slick-C"/>
|
||||
</Target>
|
||||
</Menu>
|
||||
<List Name="GNUC Options">
|
||||
<Item
|
||||
Name="LinkerOutputType"
|
||||
Value="Executable"/>
|
||||
</List>
|
||||
<Includes>
|
||||
<Include Dir="/usr/local/include"/>
|
||||
</Includes>
|
||||
<Libs PreObjects="0">
|
||||
<Lib File="/usr/lib/libboost_thread.so"/>
|
||||
</Libs>
|
||||
</Config>
|
||||
<Config
|
||||
Name="Release"
|
||||
Type="gnuc"
|
||||
DebugCallbackName="gdb"
|
||||
Version="1"
|
||||
OutputFile="%bdtp"
|
||||
CompilerConfigName="Latest Version">
|
||||
<Menu>
|
||||
<Target
|
||||
Name="Compile"
|
||||
MenuCaption="&Compile"
|
||||
Dialog="_gnuc_options_form Compile"
|
||||
CaptureOutputWith="ProcessBuffer"
|
||||
Deletable="0"
|
||||
OutputExts="*.o"
|
||||
SaveOption="SaveCurrent"
|
||||
RunFromDir="%rw">
|
||||
<Exec CmdLine='g++ -c %xup %defd -o "%bd%n%oe" %i "%f"'/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="Link"
|
||||
MenuCaption="&Link"
|
||||
ShowOnMenu="Never"
|
||||
Dialog="_gnuc_options_form Link"
|
||||
CaptureOutputWith="ProcessBuffer"
|
||||
Deletable="0"
|
||||
SaveOption="SaveCurrent"
|
||||
RunFromDir="%rw">
|
||||
<Exec CmdLine='g++ %xup -o "%o" %f %libs'/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="Build"
|
||||
MenuCaption="&Build"
|
||||
Dialog="_gnuc_options_form Compile"
|
||||
CaptureOutputWith="ProcessBuffer"
|
||||
Deletable="0"
|
||||
SaveOption="SaveWorkspaceFiles"
|
||||
RunFromDir="%rw"
|
||||
ClearProcessBuffer="1">
|
||||
<Exec CmdLine='"%(VSLICKBIN1)vsbuild" "%w" "%r" -t build'/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="Rebuild"
|
||||
MenuCaption="&Rebuild"
|
||||
Dialog="_gnuc_options_form Compile"
|
||||
CaptureOutputWith="ProcessBuffer"
|
||||
Deletable="0"
|
||||
SaveOption="SaveWorkspaceFiles"
|
||||
RunFromDir="%rw"
|
||||
ClearProcessBuffer="1">
|
||||
<Exec CmdLine='"%(VSLICKBIN1)vsbuild" "%w" "%r" -t rebuild'/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="Debug"
|
||||
MenuCaption="&Debug"
|
||||
Dialog="_gnuc_options_form Run/Debug"
|
||||
BuildFirst="1"
|
||||
CaptureOutputWith="ProcessBuffer"
|
||||
Deletable="0"
|
||||
SaveOption="SaveNone"
|
||||
RunFromDir="%rw"
|
||||
ClearProcessBuffer="1">
|
||||
<Exec CmdLine='vsdebugio -prog "%o"'/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="Execute"
|
||||
MenuCaption="E&xecute"
|
||||
Dialog="_gnuc_options_form Run/Debug"
|
||||
BuildFirst="1"
|
||||
CaptureOutputWith="ProcessBuffer"
|
||||
Deletable="0"
|
||||
SaveOption="SaveWorkspaceFiles"
|
||||
RunFromDir="%rw">
|
||||
<Exec CmdLine='"%o"'/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="dash"
|
||||
MenuCaption="-"
|
||||
Deletable="0">
|
||||
<Exec/>
|
||||
</Target>
|
||||
<Target
|
||||
Name="GNU C Options"
|
||||
MenuCaption="GNU C &Options..."
|
||||
ShowOnMenu="HideIfNoCmdLine"
|
||||
Deletable="0"
|
||||
SaveOption="SaveNone">
|
||||
<Exec
|
||||
CmdLine="gnucoptions"
|
||||
Type="Slick-C"/>
|
||||
</Target>
|
||||
</Menu>
|
||||
<List Name="GNUC Options">
|
||||
<Item
|
||||
Name="LinkerOutputType"
|
||||
Value="Executable"/>
|
||||
</List>
|
||||
<Includes>
|
||||
<Include Dir="/usr/local/include"/>
|
||||
</Includes>
|
||||
<Libs PreObjects="0">
|
||||
<Lib File="/usr/lib/libboost_thread.so"/>
|
||||
</Libs>
|
||||
</Config>
|
||||
<Files>
|
||||
<Folder
|
||||
Name="Source Files"
|
||||
Filters="*.c;*.C;*.cc;*.cpp;*.cp;*.cxx;*.c++;*.prg;*.pas;*.dpr;*.asm;*.s;*.bas;*.java;*.cs;*.sc;*.e;*.cob;*.html;*.rc;*.tcl;*.py;*.pl;*.d">
|
||||
<F N="threadpool.cpp"/>
|
||||
<F N="tp.cpp"/>
|
||||
</Folder>
|
||||
<Folder
|
||||
Name="Header Files"
|
||||
Filters="*.h;*.H;*.hh;*.hpp;*.hxx;*.inc;*.sh;*.cpy;*.if">
|
||||
<F N="threadpool.h"/>
|
||||
</Folder>
|
||||
<Folder
|
||||
Name="Resource Files"
|
||||
Filters="*.ico;*.cur;*.dlg"/>
|
||||
<Folder
|
||||
Name="Bitmaps"
|
||||
Filters="*.bmp"/>
|
||||
<Folder
|
||||
Name="Other Files"
|
||||
Filters=""/>
|
||||
</Files>
|
||||
</Project>
|
6
utils/threadpool/tp.vpw
Normal file
6
utils/threadpool/tp.vpw
Normal file
@ -0,0 +1,6 @@
|
||||
<!DOCTYPE Workspace SYSTEM "http://www.slickedit.com/dtd/vse/10.0/vpw.dtd">
|
||||
<Workspace Version="10.0" VendorName="SlickEdit">
|
||||
<Projects>
|
||||
<Project File="tp.vpj"/>
|
||||
</Projects>
|
||||
</Workspace>
|
Reference in New Issue
Block a user