From 4a2c73780a71b0ac9cd8dbf8c95157861283d3f1 Mon Sep 17 00:00:00 2001 From: drrtuy Date: Wed, 17 Jan 2024 19:18:23 +0200 Subject: [PATCH] fix(messageqcpp): MCOL-5636 same node communication crashes transmiting PP errors to EM b/c error messaging leveraged socket that was a nullptr. (#3106) --- .../primproc/batchprimitiveprocessor.cpp | 67 +-------- primitives/primproc/bppseeder.cpp | 19 +-- primitives/primproc/bppseeder.h | 1 - primitives/primproc/bppsendthread.cpp | 6 +- primitives/primproc/primitiveserver.cpp | 20 +-- utils/messageqcpp/CMakeLists.txt | 1 + utils/messageqcpp/iosocket.h | 31 ++--- utils/messageqcpp/samenodepseudosocket.cpp | 127 ++++++++++++++++++ utils/messageqcpp/samenodepseudosocket.h | 99 ++++++++++++++ utils/threadpool/fair_threadpool.cpp | 21 +-- utils/threadpool/prioritythreadpool.cpp | 48 ++++--- utils/threadpool/prioritythreadpool.h | 15 ++- 12 files changed, 300 insertions(+), 155 deletions(-) create mode 100644 utils/messageqcpp/samenodepseudosocket.cpp create mode 100644 utils/messageqcpp/samenodepseudosocket.h diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 8151c8926..e42b6a198 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -1921,64 +1921,6 @@ void BatchPrimitiveProcessor::execute() } catch (NeedToRestartJob& n) { -#if 0 - - /* This block of code will flush the problematic OIDs from the - * cache. It seems to have no effect on the problem, so it's commented - * for now. - * - * This is currently thrown only on syscat queries. If we find the problem - * in user tables also, we should avoid dropping entire OIDs if possible. - * - * In local testing there was no need for flushing, because DDL flushes - * the syscat constantly. However, it can take a long time (>10 s) before - * that happens. Doing it locally should make it much more likely only - * one restart is necessary. - */ - - try - { - vector oids; - uint32_t oid; - - for (uint32_t i = 0; i < filterCount; i++) - { - oid = filterSteps[i]->getOID(); - - if (oid > 0) - oids.push_back(oid); - } - - for (uint32_t i = 0; i < projectCount; i++) - { - oid = projectSteps[i]->getOID(); - - if (oid > 0) - oids.push_back(oid); - } - -#if 0 - Logger logger; - ostringstream os; - os << "dropping OIDs: "; - - for (int i = 0; i < oids.size(); i++) - os << oids[i] << " "; - - logger.logMessage(os.str()); -#endif - - for (int i = 0; i < fCacheCount; i++) - { - dbbc::blockCacheClient bc(*BRPp[i]); -// bc.flushCache(); - bc.flushOIDs(&oids[0], oids.size()); - } - } - catch (...) { } // doesn't matter if this fails, just avoid crashing - -#endif - #ifndef __FreeBSD__ pthread_mutex_unlock(&objLock); #endif @@ -2109,21 +2051,20 @@ void BatchPrimitiveProcessor::serializeStrings() void BatchPrimitiveProcessor::sendResponse() { - auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); // Here is the fast path for local EM to PM interaction. PM puts into the // input EM DEC queue directly. - // !sock has a 'same host connection' semantics here. - if (initiatedByEM_ && (!sock || exeMgrDecPtr->clientAtTheSameHost(sock))) + // !writelock has a 'same host connection' semantics here. + if (initiatedByEM_ && !writelock) { // Flow Control now handles same node connections so the recieving DEC queue // is limited. if (sendThread->flowControlEnabled()) { - sendThread->sendResult({serialized, nullptr, nullptr, 0}, false); + sendThread->sendResult({serialized, sock, writelock, 0}, false); } else { - exeMgrDecPtr->addDataToOutput(serialized); + sock->write(serialized); serialized.reset(); } diff --git a/primitives/primproc/bppseeder.cpp b/primitives/primproc/bppseeder.cpp index de1798c50..22d866b59 100644 --- a/primitives/primproc/bppseeder.cpp +++ b/primitives/primproc/bppseeder.cpp @@ -153,7 +153,7 @@ int BPPSeeder::operator()() if (0 < status) { - sendErrorMsg(uniqueID, status, stepID); + error_handling::sendErrorMsg(status, uniqueID, stepID, sock); return ret; } @@ -335,23 +335,8 @@ void BPPSeeder::catchHandler(const string& ex, uint32_t id, uint32_t step) { Logger log; log.logMessage(ex); - sendErrorMsg(id, logging::bppSeederErr, step); -} -void BPPSeeder::sendErrorMsg(uint32_t id, uint16_t status, uint32_t step) -{ - ISMPacketHeader ism; - PrimitiveHeader ph = {0, 0, 0, 0, 0, 0}; - - ism.Status = status; - ph.UniqueID = id; - ph.StepID = step; - ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); - msg.append((uint8_t*)&ism, sizeof(ism)); - msg.append((uint8_t*)&ph, sizeof(ph)); - - boost::mutex::scoped_lock lk(*writelock); - sock->write(msg); + error_handling::sendErrorMsg(logging::bppSeederErr, id, step, sock); } bool BPPSeeder::isSysCat() diff --git a/primitives/primproc/bppseeder.h b/primitives/primproc/bppseeder.h index f4c35b2c8..c8896e8bc 100644 --- a/primitives/primproc/bppseeder.h +++ b/primitives/primproc/bppseeder.h @@ -76,7 +76,6 @@ class BPPSeeder : public threadpool::FairThreadPool::Functor private: BPPSeeder(); void catchHandler(const std::string& s, uint32_t uniqueID, uint32_t step); - void sendErrorMsg(uint32_t id, uint16_t status, uint32_t step); void flushSyscatOIDs(); messageqcpp::SBS bs; diff --git a/primitives/primproc/bppsendthread.cpp b/primitives/primproc/bppsendthread.cpp index d444e48b5..831754e79 100644 --- a/primitives/primproc/bppsendthread.cpp +++ b/primitives/primproc/bppsendthread.cpp @@ -234,11 +234,9 @@ void BPPSendThread::mainLoop() bsSize = msg[msgsSent].msg->lengthWithHdrOverhead(); // Same node processing path - if (!sock) + if (!lock) { - auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); - assert(exeMgrDecPtr); - exeMgrDecPtr->addDataToOutput(msg[msgsSent].msg); + msg[msgsSent].sock->write(msg[msgsSent].msg); } else { diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 60ce2145d..60212baae 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -59,6 +59,7 @@ using namespace BRM; #include "writeengine.h" #include "messagequeue.h" +#include "samenodepseudosocket.h" using namespace messageqcpp; #include "blockrequestprocessor.h" @@ -106,8 +107,6 @@ using namespace threadpool; #define O_NOATIME 0 #endif -typedef tr1::unordered_set USOID; - // make global for blockcache // static const char* statsName = {"pm"}; @@ -1021,7 +1020,7 @@ class DictScanJob : public threadpool::FairThreadPool::Functor DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock); virtual ~DictScanJob(); - void write(const SBS&); + void write(const SBS); int operator()(); void catchHandler(const std::string& ex, uint32_t id, uint16_t code = logging::primitiveServerErr); void sendErrorMsg(uint32_t id, uint16_t code); @@ -1043,15 +1042,14 @@ DictScanJob::~DictScanJob() { } -void DictScanJob::write(const SBS& sbs) +void DictScanJob::write(const SBS sbs) { // Here is the fast path for local EM to PM interaction. PM puts into the // input EM DEC queue directly. - // !sock has a 'same host connection' semantics here. - if (!fIos) + // !fWriteLock has a 'same host connection' semantics here. + if (!fWriteLock) { - auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); - exeMgrDecPtr->addDataToOutput(sbs); + fIos->write(sbs); return; } boost::mutex::scoped_lock lk(*fWriteLock); @@ -2336,8 +2334,10 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace sleep(1); exeMgrDecPtr = (exemgr::globServiceExeMgr) ? exemgr::globServiceExeMgr->getDec() : nullptr; } - // These empty SPs have "same-host" messaging semantics. - SP_UM_IOSOCK outIos(nullptr); + // This is a pseudo socket that puts data into DEC queue directly. + // It can be used for PP to EM communication only. + SP_UM_IOSOCK outIos(new IOSocket(new SameNodePseudoSocket(exeMgrDecPtr))); + // This empty SP transmits "same-host" messaging semantics. SP_UM_MUTEX writeLock(nullptr); auto procPool = this->getProcessorThreadPool(); auto OOBProcPool = this->getOOBProcessorThreadPool(); diff --git a/utils/messageqcpp/CMakeLists.txt b/utils/messageqcpp/CMakeLists.txt index fcc2577de..dbb5ed70a 100644 --- a/utils/messageqcpp/CMakeLists.txt +++ b/utils/messageqcpp/CMakeLists.txt @@ -9,6 +9,7 @@ set(messageqcpp_LIB_SRCS bytestream.cpp socketparms.cpp inetstreamsocket.cpp + samenodepseudosocket.cpp iosocket.cpp compressed_iss.cpp bytestreampool.cpp diff --git a/utils/messageqcpp/iosocket.h b/utils/messageqcpp/iosocket.h index d1cc3ef49..4dd742a23 100644 --- a/utils/messageqcpp/iosocket.h +++ b/utils/messageqcpp/iosocket.h @@ -39,8 +39,6 @@ class MessageQTestSuite; -#define EXPORT - namespace messageqcpp { class ServerSocket; @@ -54,22 +52,22 @@ class IOSocket /** ctor * */ - EXPORT explicit IOSocket(Socket* socket = 0); + explicit IOSocket(Socket* socket = 0); /** copy ctor * */ - EXPORT IOSocket(const IOSocket& rhs); + IOSocket(const IOSocket& rhs); /** assign op * */ - EXPORT IOSocket& operator=(const IOSocket& rhs); + IOSocket& operator=(const IOSocket& rhs); /** dtor * */ - EXPORT virtual ~IOSocket(); + virtual ~IOSocket(); /** read a ByteStream from this socket * @@ -84,9 +82,9 @@ class IOSocket * This socket needs to be connected first. Will throw runtime_error on I/O error. Caller should * call close() method if exception is thrown. */ - EXPORT virtual void write(const ByteStream& msg, Stats* stats = NULL) const; - EXPORT virtual void write_raw(const ByteStream& msg, Stats* stats = NULL) const; - EXPORT virtual void write(SBS msg, Stats* stats = NULL) const; + virtual void write(const ByteStream& msg, Stats* stats = NULL) const; + virtual void write_raw(const ByteStream& msg, Stats* stats = NULL) const; + virtual void write(SBS msg, Stats* stats = NULL) const; /** access the sockaddr member */ @@ -125,29 +123,29 @@ class IOSocket * * Install a socket implementation that meets the Socket interface */ - EXPORT virtual void setSocketImpl(Socket* socket); + virtual void setSocketImpl(Socket* socket); /** get a string rep of the IOSocket * */ - EXPORT virtual const std::string toString() const; + virtual const std::string toString() const; /** syncProto() forwarder for inherited classes * */ - EXPORT virtual void syncProto(bool use) + virtual void syncProto(bool use) { fSocket->syncProto(use); } - EXPORT virtual int getConnectionNum() const; + virtual int getConnectionNum() const; // Debug - EXPORT void setSockID(uint32_t id) + void setSockID(uint32_t id) { sockID = id; } - EXPORT uint32_t getSockID() + uint32_t getSockID() { return sockID; } @@ -174,7 +172,6 @@ class IOSocket return fSocket->isSameAddr(ipv4Addr); } - /** connect() forwarder for inherited classes * */ @@ -298,5 +295,3 @@ inline std::ostream& operator<<(std::ostream& os, const IOSocket& rhs) } } // namespace messageqcpp - -#undef EXPORT diff --git a/utils/messageqcpp/samenodepseudosocket.cpp b/utils/messageqcpp/samenodepseudosocket.cpp new file mode 100644 index 000000000..2e7ac9c27 --- /dev/null +++ b/utils/messageqcpp/samenodepseudosocket.cpp @@ -0,0 +1,127 @@ +/* Copyright (C) 2024 MariaDB Corp. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include + +#include "samenodepseudosocket.h" +#include "iosocket.h" + +namespace messageqcpp +{ +SameNodePseudoSocket::SameNodePseudoSocket(joblist::DistributedEngineComm* exeMgrDecPtr) : dec_(exeMgrDecPtr) +{ + assert(dec_); +} + +SameNodePseudoSocket::~SameNodePseudoSocket() +{ +} + +void SameNodePseudoSocket::open() +{ +} + +void SameNodePseudoSocket::close() +{ +} + +Socket* SameNodePseudoSocket::clone() const +{ + return nullptr; +} + +SameNodePseudoSocket::SameNodePseudoSocket(const SameNodePseudoSocket& rhs) +{ +} + +SameNodePseudoSocket& SameNodePseudoSocket::operator=(const SameNodePseudoSocket& rhs) +{ + return *this; +} + +const SBS SameNodePseudoSocket::read(const struct ::timespec* timeout, bool* isTimeOut, Stats* stats) const +{ + return nullptr; +} + +// This is the only working method of this class. It puts SBS directly into DEC queue. +void SameNodePseudoSocket::write(SBS msg, Stats* stats) +{ + dec_->addDataToOutput(msg); +} + +void SameNodePseudoSocket::write(const ByteStream& msg, Stats* stats) +{ +} + +void SameNodePseudoSocket::write_raw(const ByteStream& msg, Stats* stats) const +{ +} + +void SameNodePseudoSocket::connect(const sockaddr* serv_addr) +{ +} + +void SameNodePseudoSocket::bind(const sockaddr* serv_addr) +{ +} + +const IOSocket SameNodePseudoSocket::accept(const struct timespec* timeout) +{ + return IOSocket(); +} + +void SameNodePseudoSocket::listen(int backlog) +{ +} + +const std::string SameNodePseudoSocket::toString() const +{ + return ""; +} + +const std::string SameNodePseudoSocket::addr2String() const +{ + return ""; +} + +bool SameNodePseudoSocket::isSameAddr(const Socket* rhs) const +{ + return false; +} + +bool SameNodePseudoSocket::isSameAddr(const struct in_addr& ipv4Addr) const +{ + return false; +} + +int SameNodePseudoSocket::ping(const std::string& ipaddr, const struct timespec* timeout) +{ + return 0; +} + +bool SameNodePseudoSocket::isConnected() const +{ + return true; +} + +bool SameNodePseudoSocket::hasData() const +{ + return false; +} + +} // namespace messageqcpp diff --git a/utils/messageqcpp/samenodepseudosocket.h b/utils/messageqcpp/samenodepseudosocket.h new file mode 100644 index 000000000..f977f02a2 --- /dev/null +++ b/utils/messageqcpp/samenodepseudosocket.h @@ -0,0 +1,99 @@ +/* Copyright (C) 2024 MariaDB Corp. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include "../../dbcon/joblist/distributedenginecomm.h" + +#include "socket.h" +#include "socketparms.h" +#include "bytestream.h" + +namespace messageqcpp +{ +class IOSocket; + +// This class is a dummy replacement for a TCP socket +// wrapper to communicate with the same node. +class SameNodePseudoSocket : public Socket +{ + public: + explicit SameNodePseudoSocket(joblist::DistributedEngineComm* exeMgrDecPtr); + virtual ~SameNodePseudoSocket(); + virtual void write(SBS msg, Stats* stats = NULL); + + private: + virtual void bind(const sockaddr* serv_addr); + SameNodePseudoSocket(const SameNodePseudoSocket& rhs); + virtual SameNodePseudoSocket& operator=(const SameNodePseudoSocket& rhs); + + virtual void connectionTimeout(const struct ::timespec* timeout) + { + } + + virtual void syncProto(bool use) + { + } + + int getConnectionNum() const + { + return 1; + } + + inline virtual void socketParms(const SocketParms& socket) + { + } + + inline virtual const SocketParms socketParms() const + { + return SocketParms(); + } + + // all these virtual methods are to stay inaccessable. + inline virtual void sa(const sockaddr* sa); + virtual void open(); + virtual void close(); + inline virtual bool isOpen() const; + virtual const SBS read(const struct timespec* timeout = 0, bool* isTimeOut = NULL, + Stats* stats = NULL) const; + virtual void write(const ByteStream& msg, Stats* stats = NULL); + virtual void write_raw(const ByteStream& msg, Stats* stats = NULL) const; + virtual void listen(int backlog = 5); + virtual const IOSocket accept(const struct timespec* timeout = 0); + virtual void connect(const sockaddr* serv_addr); + virtual Socket* clone() const; + virtual const std::string toString() const; + virtual const std::string addr2String() const; + virtual bool isSameAddr(const Socket* rhs) const; + virtual bool isSameAddr(const struct in_addr& ipv4Addr) const; + static int ping(const std::string& ipaddr, const struct timespec* timeout = 0); + virtual bool isConnected() const; + virtual bool hasData() const; + + joblist::DistributedEngineComm* dec_ = nullptr; +}; + +inline bool SameNodePseudoSocket::isOpen() const +{ + return true; +} + +inline void SameNodePseudoSocket::sa(const sockaddr* sa) +{ +} + +} // namespace messageqcpp diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp index 9876eb57c..75c4b7e42 100644 --- a/utils/threadpool/fair_threadpool.cpp +++ b/utils/threadpool/fair_threadpool.cpp @@ -259,7 +259,8 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue if (running) { - sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + error_handling::sendErrorMsg(logging::primitiveServerErr, runList[0].uniqueID_, runList[0].stepID_, + runList[0].sock_); } } catch (...) @@ -291,7 +292,8 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue #endif if (running) - sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + error_handling::sendErrorMsg(logging::primitiveServerErr, runList[0].uniqueID_, runList[0].stepID_, + runList[0].sock_); } catch (...) { @@ -301,21 +303,6 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue } } -void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock) -{ - ISMPacketHeader ism; - PrimitiveHeader ph = {0, 0, 0, 0, 0, 0}; - - ism.Status = logging::primitiveServerErr; - ph.UniqueID = id; - ph.StepID = step; - messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); - msg.append((uint8_t*)&ism, sizeof(ism)); - msg.append((uint8_t*)&ph, sizeof(ph)); - - sock->write(msg); -} - void FairThreadPool::stop() { stop_.store(true, std::memory_order_relaxed); diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index 908c0ba97..946d7b08d 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -21,7 +21,6 @@ * ***********************************************************************/ -#include #include #include using namespace std; @@ -36,6 +35,32 @@ using namespace boost; #include "dbcon/joblist/primitivemsg.h" +namespace error_handling +{ +messageqcpp::SBS makePrimitiveErrorMsg(const uint16_t status, const uint32_t id, const uint32_t step) +{ + ISMPacketHeader ism; + ism.Status = status; + + PrimitiveHeader ph = {0, 0, 0, step, id, 0}; + + messageqcpp::SBS errorMsg(new messageqcpp::ByteStream(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader))); + errorMsg->append((uint8_t*)&ism, sizeof(ism)); + errorMsg->append((uint8_t*)&ph, sizeof(ph)); + + return errorMsg; +} + +void sendErrorMsg(const uint16_t status, const uint32_t id, const uint32_t step, + primitiveprocessor::SP_UM_IOSOCK sock) +{ + auto errorMsg = error_handling::makePrimitiveErrorMsg(status, id, step); + + sock->write(errorMsg); +} + +} // namespace error_handling + namespace threadpool { PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, @@ -267,7 +292,8 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() #endif if (running) - sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock); + error_handling::sendErrorMsg(logging::primitiveServerErr, runList[i].uniqueID, runList[i].stepID, + runList[i].sock); } catch (...) { @@ -293,7 +319,8 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() #endif if (running) - sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock); + error_handling::sendErrorMsg(logging::primitiveServerErr, runList[i].uniqueID, runList[i].stepID, + runList[i].sock); } catch (...) { @@ -301,21 +328,6 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() } } -void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock) -{ - ISMPacketHeader ism; - PrimitiveHeader ph = {0, 0, 0, 0, 0, 0}; - - ism.Status = logging::primitiveServerErr; - ph.UniqueID = id; - ph.StepID = step; - messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); - msg.append((uint8_t*)&ism, sizeof(ism)); - msg.append((uint8_t*)&ph, sizeof(ph)); - - sock->write(msg); -} - void PriorityThreadPool::stop() { _stop = true; diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index c9597b8df..575c996c0 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -24,11 +24,6 @@ #pragma once -#include -#include -#include -#include -#include #include #include #include @@ -36,7 +31,14 @@ #include #include #include "primitives/primproc/umsocketselector.h" -#include "atomicops.h" + +namespace error_handling +{ + +messageqcpp::SBS makePrimitiveErrorMsg(const uint16_t status, const uint32_t id, const uint32_t step); +void sendErrorMsg(const uint16_t status, const uint32_t id, const uint32_t step, + primitiveprocessor::SP_UM_IOSOCK sock); +} // namespace error_handling namespace threadpool { @@ -164,7 +166,6 @@ class PriorityThreadPool Priority pickAQueue(Priority preference); void threadFcn(const Priority preferredQueue) throw(); - void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); std::list jobQueues[3]; // higher indexes = higher priority uint32_t threadCounts[3];