From a3c582d9fe7a1991abac44502b54df42e0665554 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Mon, 27 Jun 2022 19:05:46 +0000 Subject: [PATCH] WIP with map clean-up --- primitives/primproc/primitiveserver.cpp | 2 ++ primitives/primproc/primitiveserver.h | 1 - utils/threadpool/fair_threadpool.cpp | 16 +++++++++++++++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 7d15c359e..e17ee8378 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -1998,6 +1998,8 @@ struct ReadThread idbassert(bs->length() >= sizeof(ISMPacketHeader)); const ISMPacketHeader* ismHdr = reinterpret_cast(bs->buf()); + // uint64_t someVal = ismHdr->Command; + // std::cout << " PP read thread Command " << someVal << std::endl; /* This switch is for the OOB commands */ switch (ismHdr->Command) diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index eb50d0791..eb5195eb5 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -36,7 +36,6 @@ #include #include "threadpool.h" -#include "../../utils/threadpool/fair_threadpool.h" #include "fair_threadpool.h" #include "messagequeue.h" #include "blockrequestprocessor.h" diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp index f35781854..094d6681e 100644 --- a/utils/threadpool/fair_threadpool.cpp +++ b/utils/threadpool/fair_threadpool.cpp @@ -112,14 +112,23 @@ void FairThreadPool::addJob_(const Job& job, bool useLock) void FairThreadPool::removeJobs(uint32_t id) { + // std::cout << "FairThreadPool::removeJobs id " << id << std::endl; std::unique_lock lk(mutex); for (auto& txnJobsMapPair : txn2JobsListMap_) { ThreadPoolJobsList* txnJobsList = txnJobsMapPair.second; + if (txnJobsList->empty()) + { + txn2JobsListMap_.erase(txnJobsMapPair.first); + delete txnJobsList; + continue; + // There is no clean-up for PQ. It will happen later in threadFcn + } auto job = txnJobsList->begin(); while (job != txnJobsList->end()) { + // std::cout << "removeJobs() job->id_ " << job->id_ << std::endl; if (job->id_ == id) { job = txnJobsList->erase(job); // update the job iter @@ -231,6 +240,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue } catch (std::exception& ex) { + // std::cout << "FairThreadPool::threadFcn(): std::exception - no reschedule but send an error" << std::endl; if (running) { jobsRunning_.fetch_sub(1, std::memory_order_relaxed); @@ -254,14 +264,18 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue #endif if (running) + { sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + } } catch (...) { + std::cout << "FairThreadPool::threadFcn(): std::exception - double exception: failed to send an error" << std::endl; } } catch (...) { + // std::cout << "FairThreadPool::threadFcn(): ... exception - no reschedule but send an error" << std::endl; // Log the exception and exit this thread try { @@ -270,7 +284,6 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue jobsRunning_.fetch_sub(1, std::memory_order_relaxed); } threadCounts_.fetch_sub(1, std::memory_order_relaxed); - ; #ifndef NOLOGGING logging::Message::Args args; logging::Message message(6); @@ -289,6 +302,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue } catch (...) { + std::cout << "FairThreadPool::threadFcn(): ... exception - double exception: failed to send an error" << std::endl; } } }