1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

fix(messageqcpp): MCOL-5636 same node communication crashes transmiting PP errors to EM b/c error messaging leveraged socket that was a nullptr.

This commit is contained in:
Roman Nozdrin
2024-01-11 16:58:47 +00:00
parent 79ad78f91f
commit 22bbfa7239
12 changed files with 300 additions and 155 deletions

View File

@ -1921,64 +1921,6 @@ void BatchPrimitiveProcessor::execute()
}
catch (NeedToRestartJob& n)
{
#if 0
/* This block of code will flush the problematic OIDs from the
* cache. It seems to have no effect on the problem, so it's commented
* for now.
*
* This is currently thrown only on syscat queries. If we find the problem
* in user tables also, we should avoid dropping entire OIDs if possible.
*
* In local testing there was no need for flushing, because DDL flushes
* the syscat constantly. However, it can take a long time (>10 s) before
* that happens. Doing it locally should make it much more likely only
* one restart is necessary.
*/
try
{
vector<uint32_t> oids;
uint32_t oid;
for (uint32_t i = 0; i < filterCount; i++)
{
oid = filterSteps[i]->getOID();
if (oid > 0)
oids.push_back(oid);
}
for (uint32_t i = 0; i < projectCount; i++)
{
oid = projectSteps[i]->getOID();
if (oid > 0)
oids.push_back(oid);
}
#if 0
Logger logger;
ostringstream os;
os << "dropping OIDs: ";
for (int i = 0; i < oids.size(); i++)
os << oids[i] << " ";
logger.logMessage(os.str());
#endif
for (int i = 0; i < fCacheCount; i++)
{
dbbc::blockCacheClient bc(*BRPp[i]);
// bc.flushCache();
bc.flushOIDs(&oids[0], oids.size());
}
}
catch (...) { } // doesn't matter if this fails, just avoid crashing
#endif
#ifndef __FreeBSD__
pthread_mutex_unlock(&objLock);
#endif
@ -2109,21 +2051,20 @@ void BatchPrimitiveProcessor::serializeStrings()
void BatchPrimitiveProcessor::sendResponse()
{
auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
// Here is the fast path for local EM to PM interaction. PM puts into the
// input EM DEC queue directly.
// !sock has a 'same host connection' semantics here.
if (initiatedByEM_ && (!sock || exeMgrDecPtr->clientAtTheSameHost(sock)))
// !writelock has a 'same host connection' semantics here.
if (initiatedByEM_ && !writelock)
{
// Flow Control now handles same node connections so the recieving DEC queue
// is limited.
if (sendThread->flowControlEnabled())
{
sendThread->sendResult({serialized, nullptr, nullptr, 0}, false);
sendThread->sendResult({serialized, sock, writelock, 0}, false);
}
else
{
exeMgrDecPtr->addDataToOutput(serialized);
sock->write(serialized);
serialized.reset();
}

View File

@ -153,7 +153,7 @@ int BPPSeeder::operator()()
if (0 < status)
{
sendErrorMsg(uniqueID, status, stepID);
error_handling::sendErrorMsg(status, uniqueID, stepID, sock);
return ret;
}
@ -335,23 +335,8 @@ void BPPSeeder::catchHandler(const string& ex, uint32_t id, uint32_t step)
{
Logger log;
log.logMessage(ex);
sendErrorMsg(id, logging::bppSeederErr, step);
}
void BPPSeeder::sendErrorMsg(uint32_t id, uint16_t status, uint32_t step)
{
ISMPacketHeader ism;
PrimitiveHeader ph = {0, 0, 0, 0, 0, 0};
ism.Status = status;
ph.UniqueID = id;
ph.StepID = step;
ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
msg.append((uint8_t*)&ism, sizeof(ism));
msg.append((uint8_t*)&ph, sizeof(ph));
boost::mutex::scoped_lock lk(*writelock);
sock->write(msg);
error_handling::sendErrorMsg(logging::bppSeederErr, id, step, sock);
}
bool BPPSeeder::isSysCat()

View File

@ -76,7 +76,6 @@ class BPPSeeder : public threadpool::FairThreadPool::Functor
private:
BPPSeeder();
void catchHandler(const std::string& s, uint32_t uniqueID, uint32_t step);
void sendErrorMsg(uint32_t id, uint16_t status, uint32_t step);
void flushSyscatOIDs();
messageqcpp::SBS bs;

View File

@ -234,11 +234,9 @@ void BPPSendThread::mainLoop()
bsSize = msg[msgsSent].msg->lengthWithHdrOverhead();
// Same node processing path
if (!sock)
if (!lock)
{
auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
assert(exeMgrDecPtr);
exeMgrDecPtr->addDataToOutput(msg[msgsSent].msg);
msg[msgsSent].sock->write(msg[msgsSent].msg);
}
else
{

View File

@ -59,6 +59,7 @@ using namespace BRM;
#include "writeengine.h"
#include "messagequeue.h"
#include "samenodepseudosocket.h"
using namespace messageqcpp;
#include "blockrequestprocessor.h"
@ -106,8 +107,6 @@ using namespace threadpool;
#define O_NOATIME 0
#endif
typedef tr1::unordered_set<BRM::OID_t> USOID;
// make global for blockcache
//
static const char* statsName = {"pm"};
@ -1021,7 +1020,7 @@ class DictScanJob : public threadpool::FairThreadPool::Functor
DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock);
virtual ~DictScanJob();
void write(const SBS&);
void write(const SBS);
int operator()();
void catchHandler(const std::string& ex, uint32_t id, uint16_t code = logging::primitiveServerErr);
void sendErrorMsg(uint32_t id, uint16_t code);
@ -1043,15 +1042,14 @@ DictScanJob::~DictScanJob()
{
}
void DictScanJob::write(const SBS& sbs)
void DictScanJob::write(const SBS sbs)
{
// Here is the fast path for local EM to PM interaction. PM puts into the
// input EM DEC queue directly.
// !sock has a 'same host connection' semantics here.
if (!fIos)
// !fWriteLock has a 'same host connection' semantics here.
if (!fWriteLock)
{
auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
exeMgrDecPtr->addDataToOutput(sbs);
fIos->write(sbs);
return;
}
boost::mutex::scoped_lock lk(*fWriteLock);
@ -2336,8 +2334,10 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace
sleep(1);
exeMgrDecPtr = (exemgr::globServiceExeMgr) ? exemgr::globServiceExeMgr->getDec() : nullptr;
}
// These empty SPs have "same-host" messaging semantics.
SP_UM_IOSOCK outIos(nullptr);
// This is a pseudo socket that puts data into DEC queue directly.
// It can be used for PP to EM communication only.
SP_UM_IOSOCK outIos(new IOSocket(new SameNodePseudoSocket(exeMgrDecPtr)));
// This empty SP transmits "same-host" messaging semantics.
SP_UM_MUTEX writeLock(nullptr);
auto procPool = this->getProcessorThreadPool();
auto OOBProcPool = this->getOOBProcessorThreadPool();