1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-02 17:22:27 +03:00

MCOL-1474 Catch errors in PriorityThreadPool

PriorityThreadPool errors cause crashes in PrimProc. This patch catches
the errors and causes the thread to end cleanly.
This commit is contained in:
Andrew Hutchings
2018-06-14 14:43:37 +01:00
parent 4a20da68ac
commit 250d90a9bc

View File

@ -111,64 +111,113 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
uint32_t rescheduleCount;
uint32_t queueSize;
while (!_stop) {
try
{
while (!_stop) {
mutex::scoped_lock lk(mutex);
mutex::scoped_lock lk(mutex);
queue = pickAQueue(preferredQueue);
if (jobQueues[queue].empty()) {
newJob.wait(lk);
continue;
}
queue = pickAQueue(preferredQueue);
if (jobQueues[queue].empty()) {
newJob.wait(lk);
continue;
}
queueSize = jobQueues[queue].size();
weight = 0;
// 3 conditions stop this thread from grabbing all jobs in the queue
//
// 1: The weight limit has been exceeded
// 2: The queue is empty
// 3: It has grabbed more than half of the jobs available &
// should leave some to the other threads
queueSize = jobQueues[queue].size();
weight = 0;
// 3 conditions stop this thread from grabbing all jobs in the queue
//
// 1: The weight limit has been exceeded
// 2: The queue is empty
// 3: It has grabbed more than half of the jobs available &
// should leave some to the other threads
while ((weight < weightPerRun) && (!jobQueues[queue].empty())
&& (runList.size() <= queueSize/2)) {
runList.push_back(jobQueues[queue].front());
jobQueues[queue].pop_front();
weight += runList.back().weight;
}
lk.unlock();
while ((weight < weightPerRun) && (!jobQueues[queue].empty())
&& (runList.size() <= queueSize/2)) {
runList.push_back(jobQueues[queue].front());
jobQueues[queue].pop_front();
weight += runList.back().weight;
}
lk.unlock();
reschedule.resize(runList.size());
rescheduleCount = 0;
for (i = 0; i < runList.size() && !_stop; i++) {
try {
reschedule[i] = false;
reschedule[i] = (*(runList[i].functor))();
if (reschedule[i])
rescheduleCount++;
}
catch (std::exception &e) {
cerr << e.what() << endl;
}
}
reschedule.resize(runList.size());
rescheduleCount = 0;
for (i = 0; i < runList.size() && !_stop; i++) {
try {
reschedule[i] = false;
reschedule[i] = (*(runList[i].functor))();
if (reschedule[i])
rescheduleCount++;
}
catch (std::exception &e) {
cerr << e.what() << endl;
}
}
// no real work was done, prevent intensive busy waiting
if (rescheduleCount == runList.size())
usleep(1000);
// no real work was done, prevent intensive busy waiting
if (rescheduleCount == runList.size())
usleep(1000);
if (rescheduleCount > 0) {
lk.lock();
for (i = 0; i < runList.size(); i++)
if (reschedule[i])
addJob(runList[i], false);
if (rescheduleCount > 1)
newJob.notify_all();
else
newJob.notify_one();
lk.unlock();
}
runList.clear();
}
if (rescheduleCount > 0) {
lk.lock();
for (i = 0; i < runList.size(); i++)
if (reschedule[i])
addJob(runList[i], false);
if (rescheduleCount > 1)
newJob.notify_all();
else
newJob.notify_one();
lk.unlock();
}
runList.clear();
}
}
catch (std::exception &ex)
{
// Log the exception and exit this thread
try
{
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(5);
args.add("threadFcn: Caught exception: ");
args.add(ex.what());
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
#endif
}
catch (...)
{
}
}
catch (...)
{
// Log the exception and exit this thread
try
{
#ifndef NOLOGGING
logging::Message::Args args;
logging::Message message(6);
args.add("threadFcn: Caught unknown exception!");
message.format( args );
logging::LoggingID lid(22);
logging::MessageLog ml(lid);
ml.logErrorMessage( message );
#endif
}
catch (...)
{
}
}
}
void PriorityThreadPool::stop()