You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
fix(messageqcpp): MCOL-5636 same node communication crashes transmiting PP errors to EM b/c error messaging leveraged socket that was a nullptr. (#3106)
This commit is contained in:
@ -59,6 +59,7 @@ using namespace BRM;
|
||||
#include "writeengine.h"
|
||||
|
||||
#include "messagequeue.h"
|
||||
#include "samenodepseudosocket.h"
|
||||
using namespace messageqcpp;
|
||||
|
||||
#include "blockrequestprocessor.h"
|
||||
@ -106,8 +107,6 @@ using namespace threadpool;
|
||||
#define O_NOATIME 0
|
||||
#endif
|
||||
|
||||
typedef tr1::unordered_set<BRM::OID_t> USOID;
|
||||
|
||||
// make global for blockcache
|
||||
//
|
||||
static const char* statsName = {"pm"};
|
||||
@ -1021,7 +1020,7 @@ class DictScanJob : public threadpool::FairThreadPool::Functor
|
||||
DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock);
|
||||
virtual ~DictScanJob();
|
||||
|
||||
void write(const SBS&);
|
||||
void write(const SBS);
|
||||
int operator()();
|
||||
void catchHandler(const std::string& ex, uint32_t id, uint16_t code = logging::primitiveServerErr);
|
||||
void sendErrorMsg(uint32_t id, uint16_t code);
|
||||
@ -1043,15 +1042,14 @@ DictScanJob::~DictScanJob()
|
||||
{
|
||||
}
|
||||
|
||||
void DictScanJob::write(const SBS& sbs)
|
||||
void DictScanJob::write(const SBS sbs)
|
||||
{
|
||||
// Here is the fast path for local EM to PM interaction. PM puts into the
|
||||
// input EM DEC queue directly.
|
||||
// !sock has a 'same host connection' semantics here.
|
||||
if (!fIos)
|
||||
// !fWriteLock has a 'same host connection' semantics here.
|
||||
if (!fWriteLock)
|
||||
{
|
||||
auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
|
||||
exeMgrDecPtr->addDataToOutput(sbs);
|
||||
fIos->write(sbs);
|
||||
return;
|
||||
}
|
||||
boost::mutex::scoped_lock lk(*fWriteLock);
|
||||
@ -2336,8 +2334,10 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace
|
||||
sleep(1);
|
||||
exeMgrDecPtr = (exemgr::globServiceExeMgr) ? exemgr::globServiceExeMgr->getDec() : nullptr;
|
||||
}
|
||||
// These empty SPs have "same-host" messaging semantics.
|
||||
SP_UM_IOSOCK outIos(nullptr);
|
||||
// This is a pseudo socket that puts data into DEC queue directly.
|
||||
// It can be used for PP to EM communication only.
|
||||
SP_UM_IOSOCK outIos(new IOSocket(new SameNodePseudoSocket(exeMgrDecPtr)));
|
||||
// This empty SP transmits "same-host" messaging semantics.
|
||||
SP_UM_MUTEX writeLock(nullptr);
|
||||
auto procPool = this->getProcessorThreadPool();
|
||||
auto OOBProcPool = this->getOOBProcessorThreadPool();
|
||||
|
Reference in New Issue
Block a user