diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 7989a17d4..f56f1ef6c 100755 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -1818,12 +1818,22 @@ struct ReadThread switch(ismHdr->Command) { case DICT_CREATE_EQUALITY_FILTER: { PriorityThreadPool::Job job; + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; job.functor = boost::shared_ptr(new CreateEqualityFilter(bs)); OOBPool->addJob(job); break; } case DICT_DESTROY_EQUALITY_FILTER: { PriorityThreadPool::Job job; + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; job.functor = boost::shared_ptr(new DestroyEqualityFilter(bs)); OOBPool->addJob(job); break; @@ -1851,6 +1861,11 @@ struct ReadThread job.id = hdr->Hdr.UniqueID; job.weight = LOGICAL_BLOCK_RIDS; job.priority = hdr->Hdr.Priority; + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; if (hdr->flags & IS_SYSCAT) { //boost::thread t(DictScanJob(outIos, bs, writeLock)); // using already-existing threads may cut latency @@ -1889,6 +1904,12 @@ struct ReadThread job.id = bpps->getID(); job.weight = ismHdr->Size; job.priority = bpps->priority(); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; + if (bpps->isSysCat()) { //boost::thread t(*bpps); // using already-existing threads may cut latency @@ -1904,6 +1925,11 @@ struct ReadThread case BATCH_PRIMITIVE_CREATE: { PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::Create(fBPPHandler, bs)); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); //fBPPHandler->createBPP(*bs); break; @@ -1912,6 +1938,11 @@ struct ReadThread PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::AddJoiner(fBPPHandler, bs)); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); //fBPPHandler->addJoinerToBPP(*bs); break; @@ -1923,6 +1954,11 @@ struct ReadThread PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::LastJoiner(fBPPHandler, bs)); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); break; } @@ -1932,6 +1968,11 @@ struct ReadThread PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::Destroy(fBPPHandler, bs)); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); //fBPPHandler->destroyBPP(*bs); break; @@ -1946,6 +1987,11 @@ struct ReadThread PriorityThreadPool::Job job; job.functor = boost::shared_ptr(new BPPHandler::Abort(fBPPHandler, bs)); job.id = fBPPHandler->getUniqueID(bs, ismHdr->Command); + const uint8_t *buf = bs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + job.stepID = *((uint32_t *) &buf[pos+6]); + job.uniqueID = *((uint32_t *) &buf[pos+10]); + job.sock = outIos; OOBPool->addJob(job); break; } diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index a0fc6a347..4d19df91e 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -33,6 +33,8 @@ using namespace logging; #include "prioritythreadpool.h" using namespace boost; +#include "dbcon/joblist/primitivemsg.h" + namespace threadpool { @@ -48,9 +50,9 @@ PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads threads.create_thread(ThreadHelper(this, LOW)); cout << "started " << highThreads << " high, " << midThreads << " med, " << lowThreads << " low.\n"; - threadCounts[HIGH] = highThreads; - threadCounts[MEDIUM] = midThreads; - threadCounts[LOW] = lowThreads; + defaultThreadCounts[HIGH] = threadCounts[HIGH] = highThreads; + defaultThreadCounts[MEDIUM] = threadCounts[MEDIUM] = midThreads; + defaultThreadCounts[LOW] = threadCounts[LOW] = lowThreads; } PriorityThreadPool::~PriorityThreadPool() @@ -65,6 +67,23 @@ void PriorityThreadPool::addJob(const Job &job, bool useLock) if (useLock) lk.lock(); + // Create any missing threads + if (defaultThreadCounts[HIGH] != threadCounts[HIGH]) + { + threads.create_thread(ThreadHelper(this, HIGH)); + threadCounts[HIGH]++; + } + if (defaultThreadCounts[MEDIUM] != threadCounts[MEDIUM]) + { + threads.create_thread(ThreadHelper(this, MEDIUM)); + threadCounts[MEDIUM]++; + } + if (defaultThreadCounts[LOW] != threadCounts[LOW]) + { + threads.create_thread(ThreadHelper(this, LOW)); + threadCounts[LOW]++; + } + if (job.priority > 66) jobQueues[HIGH].push_back(job); else if (job.priority > 33) @@ -110,6 +129,7 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() vector reschedule; uint32_t rescheduleCount; uint32_t queueSize; + bool running = false; try { @@ -143,15 +163,12 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() reschedule.resize(runList.size()); rescheduleCount = 0; for (i = 0; i < runList.size() && !_stop; i++) { - try { reschedule[i] = false; + running = true; reschedule[i] = (*(runList[i].functor))(); + running = false; if (reschedule[i]) rescheduleCount++; - } - catch (std::exception &e) { - cerr << e.what() << endl; - } } // no real work was done, prevent intensive busy waiting @@ -177,6 +194,7 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() // Log the exception and exit this thread try { + threadCounts[queue]--; #ifndef NOLOGGING logging::Message::Args args; logging::Message message(5); @@ -190,6 +208,8 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() ml.logErrorMessage( message ); #endif + if (running) + sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock); } catch (...) { @@ -201,6 +221,7 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() // Log the exception and exit this thread try { + threadCounts[queue]--; #ifndef NOLOGGING logging::Message::Args args; logging::Message message(6); @@ -213,6 +234,8 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() ml.logErrorMessage( message ); #endif + if (running) + sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock); } catch (...) { @@ -220,6 +243,21 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() } } +void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock) +{ + ISMPacketHeader ism; + PrimitiveHeader ph = {0}; + + ism.Status = logging::primitiveServerErr; + ph.UniqueID = id; + ph.StepID = step; + ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); + msg.append((uint8_t *) &ism, sizeof(ism)); + msg.append((uint8_t *) &ph, sizeof(ph)); + + sock->write(msg); +} + void PriorityThreadPool::stop() { _stop = true; diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index 516c0df2f..649913d95 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -36,6 +36,7 @@ #include #include #include "../winport/winport.h" +#include "primitives/primproc/umsocketselector.h" namespace threadpool { @@ -60,6 +61,9 @@ public: uint32_t weight; uint32_t priority; uint32_t id; + uint32_t uniqueID; + uint32_t stepID; + primitiveprocessor::SP_UM_IOSOCK sock; }; enum Priority { @@ -105,9 +109,11 @@ private: Priority pickAQueue(Priority preference); void threadFcn(const Priority preferredQueue) throw(); + void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); std::list jobQueues[3]; // higher indexes = higher priority uint32_t threadCounts[3]; + uint32_t defaultThreadCounts[3]; boost::mutex mutex; boost::condition newJob; boost::thread_group threads;