diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index 461069e18..a0fc6a347 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -111,64 +111,113 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() uint32_t rescheduleCount; uint32_t queueSize; - while (!_stop) { + try + { + while (!_stop) { - mutex::scoped_lock lk(mutex); + mutex::scoped_lock lk(mutex); - queue = pickAQueue(preferredQueue); - if (jobQueues[queue].empty()) { - newJob.wait(lk); - continue; - } + queue = pickAQueue(preferredQueue); + if (jobQueues[queue].empty()) { + newJob.wait(lk); + continue; + } - queueSize = jobQueues[queue].size(); - weight = 0; - // 3 conditions stop this thread from grabbing all jobs in the queue - // - // 1: The weight limit has been exceeded - // 2: The queue is empty - // 3: It has grabbed more than half of the jobs available & - // should leave some to the other threads + queueSize = jobQueues[queue].size(); + weight = 0; + // 3 conditions stop this thread from grabbing all jobs in the queue + // + // 1: The weight limit has been exceeded + // 2: The queue is empty + // 3: It has grabbed more than half of the jobs available & + // should leave some to the other threads - while ((weight < weightPerRun) && (!jobQueues[queue].empty()) - && (runList.size() <= queueSize/2)) { - runList.push_back(jobQueues[queue].front()); - jobQueues[queue].pop_front(); - weight += runList.back().weight; - } - lk.unlock(); + while ((weight < weightPerRun) && (!jobQueues[queue].empty()) + && (runList.size() <= queueSize/2)) { + runList.push_back(jobQueues[queue].front()); + jobQueues[queue].pop_front(); + weight += runList.back().weight; + } + lk.unlock(); - reschedule.resize(runList.size()); - rescheduleCount = 0; - for (i = 0; i < runList.size() && !_stop; i++) { - try { - reschedule[i] = false; - reschedule[i] = (*(runList[i].functor))(); - if (reschedule[i]) - rescheduleCount++; - } - catch (std::exception &e) { - cerr << e.what() << endl; - } - } + reschedule.resize(runList.size()); + rescheduleCount = 0; + for (i = 0; i < runList.size() && !_stop; i++) { + try { + reschedule[i] = false; + reschedule[i] = (*(runList[i].functor))(); + if (reschedule[i]) + rescheduleCount++; + } + catch (std::exception &e) { + cerr << e.what() << endl; + } + } - // no real work was done, prevent intensive busy waiting - if (rescheduleCount == runList.size()) - usleep(1000); + // no real work was done, prevent intensive busy waiting + if (rescheduleCount == runList.size()) + usleep(1000); - if (rescheduleCount > 0) { - lk.lock(); - for (i = 0; i < runList.size(); i++) - if (reschedule[i]) - addJob(runList[i], false); - if (rescheduleCount > 1) - newJob.notify_all(); - else - newJob.notify_one(); - lk.unlock(); - } - runList.clear(); - } + if (rescheduleCount > 0) { + lk.lock(); + for (i = 0; i < runList.size(); i++) + if (reschedule[i]) + addJob(runList[i], false); + if (rescheduleCount > 1) + newJob.notify_all(); + else + newJob.notify_one(); + lk.unlock(); + } + runList.clear(); + } + } + catch (std::exception &ex) + { + // Log the exception and exit this thread + try + { +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(5); + args.add("threadFcn: Caught exception: "); + args.add(ex.what()); + + message.format( args ); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage( message ); +#endif + } + catch (...) + { + } + } + catch (...) + { + + // Log the exception and exit this thread + try + { +#ifndef NOLOGGING + logging::Message::Args args; + logging::Message message(6); + args.add("threadFcn: Caught unknown exception!"); + + message.format( args ); + + logging::LoggingID lid(22); + logging::MessageLog ml(lid); + + ml.logErrorMessage( message ); +#endif + } + catch (...) + { + } + } } void PriorityThreadPool::stop()