1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00

fix(join, threadpool): MCOL-5565: MCOL-5636: MCOL-5645: port from develop-23.02 to [develop] (#3128)

* fix(threadpool): MCOL-5565 queries stuck in FairThreadScheduler. (#3100)

Meta Primitive Jobs, .e.g ADD_JOINER, LAST_JOINER stuck
	in Fair scheduler without out-of-band scheduler. Add OOB
	scheduler back to remedy the issue.

* fix(messageqcpp): MCOL-5636 same node communication crashes transmiting PP errors to EM b/c error messaging leveraged socket that was a nullptr. (#3106)

* fix(threadpool): MCOL-5645 errenous threadpool Job ctor implictly sets socket shared_ptr to nullptr causing sigabrt when threadpool returns an error (#3125)

---------

Co-authored-by: drrtuy <roman.nozdrin@mariadb.com>
This commit is contained in:
Leonid Fedorov 2024-02-13 19:01:16 +03:00 committed by GitHub
parent fcd46ab00a
commit 83c2408f8d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 410 additions and 317 deletions

View File

@ -441,9 +441,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
}
}
#ifdef __FreeBSD__
pthread_mutex_unlock(&objLock);
#endif
}
bs >> filterCount;
@ -593,9 +591,7 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con
memset(asyncLoaded.get(), 0, sizeof(bool) * (projectCount + 2));
buildVSSCache(count);
#ifdef __FreeBSD__
pthread_mutex_unlock(&objLock);
#endif
}
// This version of addToJoiner() is multithreaded. Values are first
@ -834,28 +830,11 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
idbassert(bs.length() == 0);
}
void BatchPrimitiveProcessor::doneSendingJoinerData()
{
/* to get wall-time of hash table construction
if (!firstCallTime.is_not_a_date_time() && !(sessionID & 0x80000000))
{
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
Logger logger;
ostringstream os;
os << "id " << uniqueID << ": joiner construction time = " << now-firstCallTime;
logger.logMessage(os.str());
cout << os.str() << endl;
}
*/
}
int BatchPrimitiveProcessor::endOfJoiner()
{
/* Wait for all joiner elements to be added */
uint32_t i;
size_t currentSize;
// it should be safe to run this without grabbing this lock
// boost::mutex::scoped_lock scoped(addToJoinerLock);
if (endOfJoinerRan)
return 0;
@ -876,34 +855,38 @@ int BatchPrimitiveProcessor::endOfJoiner()
currentSize = 0;
for (uint j = 0; j < processorThreads; ++j)
if (!tJoiners[i] || !tJoiners[i][j])
{
return -1;
}
else
currentSize += tJoiners[i][j]->size();
if (currentSize != tJoinerSizes[i])
{
return -1;
// if ((!tJoiners[i] || tJoiners[i]->size() != tJoinerSizes[i]))
// return -1;
}
}
else
{
currentSize = 0;
for (uint j = 0; j < processorThreads; ++j)
{
if (!tlJoiners[i] || !tlJoiners[i][j])
{
return -1;
}
else
currentSize += tlJoiners[i][j]->size();
}
if (currentSize != tJoinerSizes[i])
{
return -1;
// if ((!tJoiners[i] || tlJoiners[i]->size() != tJoinerSizes[i]))
// return -1;
}
}
}
endOfJoinerRan = true;
#ifndef __FreeBSD__
pthread_mutex_unlock(&objLock);
#endif
return 0;
}
@ -1076,7 +1059,6 @@ void BatchPrimitiveProcessor::initProcessor()
{
for (i = 0; i < (uint32_t)filterCount - 1; ++i)
{
// cout << "prepping filter " << i << endl;
filterSteps[i]->setBatchPrimitiveProcessor(this);
if (filterSteps[i + 1]->getCommandType() == Command::DICT_STEP)
@ -1087,14 +1069,12 @@ void BatchPrimitiveProcessor::initProcessor()
filterSteps[i]->prep(OT_RID, false);
}
// cout << "prepping filter " << i << endl;
filterSteps[i]->setBatchPrimitiveProcessor(this);
filterSteps[i]->prep(OT_BOTH, false);
}
for (i = 0; i < projectCount; ++i)
{
// cout << "prepping projection " << i << endl;
projectSteps[i]->setBatchPrimitiveProcessor(this);
if (noVB)
@ -1120,7 +1100,6 @@ void BatchPrimitiveProcessor::initProcessor()
if (fAggregator.get() != NULL)
{
// fAggRowGroupData.reset(new uint8_t[fAggregateRG.getMaxDataSize()]);
fAggRowGroupData.reinit(fAggregateRG);
fAggregateRG.setData(&fAggRowGroupData);
@ -1944,64 +1923,6 @@ void BatchPrimitiveProcessor::execute()
}
catch (NeedToRestartJob& n)
{
#if 0
/* This block of code will flush the problematic OIDs from the
* cache. It seems to have no effect on the problem, so it's commented
* for now.
*
* This is currently thrown only on syscat queries. If we find the problem
* in user tables also, we should avoid dropping entire OIDs if possible.
*
* In local testing there was no need for flushing, because DDL flushes
* the syscat constantly. However, it can take a long time (>10 s) before
* that happens. Doing it locally should make it much more likely only
* one restart is necessary.
*/
try
{
vector<uint32_t> oids;
uint32_t oid;
for (uint32_t i = 0; i < filterCount; i++)
{
oid = filterSteps[i]->getOID();
if (oid > 0)
oids.push_back(oid);
}
for (uint32_t i = 0; i < projectCount; i++)
{
oid = projectSteps[i]->getOID();
if (oid > 0)
oids.push_back(oid);
}
#if 0
Logger logger;
ostringstream os;
os << "dropping OIDs: ";
for (int i = 0; i < oids.size(); i++)
os << oids[i] << " ";
logger.logMessage(os.str());
#endif
for (int i = 0; i < fCacheCount; i++)
{
dbbc::blockCacheClient bc(*BRPp[i]);
// bc.flushCache();
bc.flushOIDs(&oids[0], oids.size());
}
}
catch (...) { } // doesn't matter if this fails, just avoid crashing
#endif
#ifndef __FreeBSD__
pthread_mutex_unlock(&objLock);
#endif
@ -2132,21 +2053,20 @@ void BatchPrimitiveProcessor::serializeStrings()
void BatchPrimitiveProcessor::sendResponse()
{
auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
// Here is the fast path for local EM to PM interaction. PM puts into the
// input EM DEC queue directly.
// !sock has a 'same host connection' semantics here.
if (initiatedByEM_ && (!sock || exeMgrDecPtr->clientAtTheSameHost(sock)))
// !writelock has a 'same host connection' semantics here.
if (initiatedByEM_ && !writelock)
{
// Flow Control now handles same node connections so the recieving DEC queue
// is limited.
if (sendThread->flowControlEnabled())
{
sendThread->sendResult({serialized, nullptr, nullptr, 0}, false);
sendThread->sendResult({serialized, sock, writelock, 0}, false);
}
else
{
exeMgrDecPtr->addDataToOutput(serialized);
sock->write(serialized);
serialized.reset();
}

View File

@ -96,7 +96,6 @@ class BatchPrimitiveProcessor
void resetBPP(messageqcpp::ByteStream&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& outputSock);
void addToJoiner(messageqcpp::ByteStream&);
int endOfJoiner();
void doneSendingJoinerData();
int operator()();
void setLBIDForScan(uint64_t rid);

View File

@ -153,12 +153,10 @@ int BPPSeeder::operator()()
if (0 < status)
{
sendErrorMsg(uniqueID, status, stepID);
error_handling::sendErrorMsg(status, uniqueID, stepID, sock);
return ret;
}
// if (!(sessionID & 0x80000000))
// cout << "got request for <" << sessionID <<", " << stepID << ">\n";
scoped.lock();
if (!bppv)
@ -172,26 +170,12 @@ int BPPSeeder::operator()()
if (boost::posix_time::second_clock::universal_time() > dieTime)
{
#if 0 // for debugging
boost::posix_time::ptime pt = boost::posix_time::microsec_clock::local_time();
if (sessionID & 0x80000000)
cout << "BPPSeeder couldn't find the sessionID/stepID pair. sessionID="
<< (int) (sessionID ^ 0x80000000) << " stepID=" << stepID << " (syscat)" << pt << endl;
else
cout << "BPPSeeder couldn't find the sessionID/stepID pair. sessionID="
<< sessionID << " stepID=" << stepID << pt << endl;
throw logic_error("BPPSeeder couldn't find the sessionID/stepID pair");
#endif
cout << "BPPSeeder::operator(): job for id " << uniqueID << "and session " << sessionID
<< " has been killed." << endl;
return 0;
}
// if (!isSysCat())
return -1;
// else { // syscat queries aren't run by a threadpool, can't reschedule those
//jobs usleep(1000); goto retry;
// }
}
bppv = it->second;
@ -205,10 +189,6 @@ int BPPSeeder::operator()()
if (!bpp)
{
// if (isSysCat()) {
// usleep(1000);
// goto retry;
// }
return -1; // all BPP instances are busy, make threadpool reschedule
}
@ -355,23 +335,8 @@ void BPPSeeder::catchHandler(const string& ex, uint32_t id, uint32_t step)
{
Logger log;
log.logMessage(ex);
sendErrorMsg(id, logging::bppSeederErr, step);
}
void BPPSeeder::sendErrorMsg(uint32_t id, uint16_t status, uint32_t step)
{
ISMPacketHeader ism;
PrimitiveHeader ph = {0, 0, 0, 0, 0, 0};
ism.Status = status;
ph.UniqueID = id;
ph.StepID = step;
ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
msg.append((uint8_t*)&ism, sizeof(ism));
msg.append((uint8_t*)&ph, sizeof(ph));
boost::mutex::scoped_lock lk(*writelock);
sock->write(msg);
error_handling::sendErrorMsg(logging::bppSeederErr, id, step, sock);
}
bool BPPSeeder::isSysCat()

View File

@ -76,7 +76,6 @@ class BPPSeeder : public threadpool::FairThreadPool::Functor
private:
BPPSeeder();
void catchHandler(const std::string& s, uint32_t uniqueID, uint32_t step);
void sendErrorMsg(uint32_t id, uint16_t status, uint32_t step);
void flushSyscatOIDs();
messageqcpp::SBS bs;

View File

@ -234,11 +234,9 @@ void BPPSendThread::mainLoop()
bsSize = msg[msgsSent].msg->lengthWithHdrOverhead();
// Same node processing path
if (!sock)
if (!lock)
{
auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
assert(exeMgrDecPtr);
exeMgrDecPtr->addDataToOutput(msg[msgsSent].msg);
msg[msgsSent].sock->write(msg[msgsSent].msg);
}
else
{

View File

@ -59,6 +59,7 @@ using namespace BRM;
#include "writeengine.h"
#include "messagequeue.h"
#include "samenodepseudosocket.h"
using namespace messageqcpp;
#include "blockrequestprocessor.h"
@ -106,8 +107,6 @@ using namespace threadpool;
#define O_NOATIME 0
#endif
typedef tr1::unordered_set<BRM::OID_t> USOID;
// make global for blockcache
//
static const char* statsName = {"pm"};
@ -1019,7 +1018,7 @@ class DictScanJob : public threadpool::FairThreadPool::Functor
DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock);
virtual ~DictScanJob();
void write(const SBS&);
void write(const SBS);
int operator()();
void catchHandler(const std::string& ex, uint32_t id, uint16_t code = logging::primitiveServerErr);
void sendErrorMsg(uint32_t id, uint16_t code);
@ -1041,15 +1040,14 @@ DictScanJob::~DictScanJob()
{
}
void DictScanJob::write(const SBS& sbs)
void DictScanJob::write(const SBS sbs)
{
// Here is the fast path for local EM to PM interaction. PM puts into the
// input EM DEC queue directly.
// !sock has a 'same host connection' semantics here.
if (!fIos)
// !fWriteLock has a 'same host connection' semantics here.
if (!fWriteLock)
{
auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
exeMgrDecPtr->addDataToOutput(sbs);
fIos->write(sbs);
return;
}
boost::mutex::scoped_lock lk(*fWriteLock);
@ -1209,6 +1207,7 @@ struct BPPHandler
}
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(key);
}
scoped.unlock();
@ -1322,16 +1321,21 @@ struct BPPHandler
}
else
{
bs.rewind();
if (posix_time::second_clock::universal_time() > dieTime)
{
std::cout << "doAbort: job for key " << key << " has been killed." << std::endl;
return 0;
}
else
{
bs.rewind();
return -1;
}
}
scoped.unlock();
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(key);
return 0;
}
@ -1354,8 +1358,11 @@ struct BPPHandler
return 0;
}
else
{
bs.rewind();
return -1;
}
}
void createBPP(ByteStream& bs)
{
@ -1402,7 +1409,6 @@ struct BPPHandler
bppKeys.push_back(key);
bool newInsert;
newInsert = bppMap.insert(pair<uint32_t, SBPPV>(key, bppv)).second;
// cout << "creating BPP # " << key << endl;
scoped.unlock();
if (!newInsert)
@ -1420,10 +1426,7 @@ struct BPPHandler
inline SBPPV grabBPPs(uint32_t uniqueID)
{
BPPMap::iterator it;
/*
uint32_t failCount = 0;
uint32_t maxFailCount = (fatal ? 500 : 5000);
*/
SBPPV ret;
boost::mutex::scoped_lock scoped(bppLock);
@ -1433,24 +1436,6 @@ struct BPPHandler
return it->second;
else
return SBPPV();
/*
do
{
if (++failCount == maxFailCount) {
//cout << "grabBPPs couldn't find the BPPs for " << uniqueID << endl;
return ret;
//throw logic_error("grabBPPs couldn't find the unique ID");
}
scoped.unlock();
usleep(5000);
scoped.lock();
it = bppMap.find(uniqueID);
} while (it == bppMap.end());
ret = it->second;
return ret;
*/
}
inline shared_mutex& getDJLock(uint32_t uniqueID)
@ -1488,6 +1473,7 @@ struct BPPHandler
buf = bs.buf();
/* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */
uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
bppv = grabBPPs(uniqueID);
if (bppv)
@ -1499,7 +1485,10 @@ struct BPPHandler
else
{
if (posix_time::second_clock::universal_time() > dieTime)
{
cout << "addJoinerToBPP: job for id " << uniqueID << " has been killed." << endl;
return 0;
}
else
return -1;
}
@ -1517,20 +1506,22 @@ struct BPPHandler
buf = bs.buf();
/* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */
uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
bppv = grabBPPs(uniqueID);
if (!bppv)
{
// cout << "got a lastJoiner msg for an unknown obj " << uniqueID << endl;
if (posix_time::second_clock::universal_time() > dieTime)
{
cout << "LastJoiner: job for id " << uniqueID << " has been killed." << endl;
return 0;
}
else
{
return -1;
}
}
boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID));
for (i = 0; i < bppv->get().size(); i++)
{
err = bppv->get()[i]->endOfJoiner();
@ -1538,32 +1529,26 @@ struct BPPHandler
if (err == -1)
{
if (posix_time::second_clock::universal_time() > dieTime)
{
cout << "LastJoiner: job for id " << uniqueID
<< " has been killed waiting for joiner messages for too long." << endl;
return 0;
}
else
return -1;
}
}
bppv->get()[0]->doneSendingJoinerData();
/* Note: some of the duplicate/run/join sync was moved to the BPPV class to do
more intelligent scheduling. Once the join data is received, BPPV will
start letting jobs run and create more BPP instances on demand. */
atomicops::atomicMb(); // make sure the joinDataReceived assignment doesn't migrate upward...
bppv->joinDataReceived = true;
return 0;
}
int destroyBPP(ByteStream& bs, const posix_time::ptime& dieTime)
{
// This is a corner case that damages bs so its length becomes less than a header length.
// The damaged bs doesn't pass the if that checks bs at least has header + 3x int32_t.
// The if block below works around the issue.
if (posix_time::second_clock::universal_time() > dieTime)
{
return 0;
}
uint32_t uniqueID, sessionID, stepID;
BPPMap::iterator it;
if (bs.length() < sizeof(ISMPacketHeader) + sizeof(sessionID) + sizeof(stepID) + sizeof(uniqueID))
@ -1603,39 +1588,33 @@ struct BPPHandler
{
// MCOL-5. On ubuntu, a crash was happening. Checking
// joinDataReceived here fixes it.
// We're not ready for a destroy. Reschedule.
// We're not ready for a destroy. Reschedule to wait
// for all joiners to arrive.
// TODO there might be no joiners if the query is canceled.
// The memory will leak.
// Rewind to the beginning of ByteStream buf b/c of the advance above.
bs.rewind();
return -1;
}
}
else
{
// cout << "got a destroy for an unknown obj " << uniqueID << endl;
bs.rewind();
if (posix_time::second_clock::universal_time() > dieTime)
{
// XXXPAT: going to let this fall through and delete jobs for
// uniqueID if there are any. Not clear what the downside is.
/*
lk.unlock();
deleteDJLock(uniqueID);
return 0;
*/
cout << "destroyBPP: job for id " << uniqueID << " and sessionID " << sessionID << " has been killed."
<< endl;
// If for some reason there are jobs for this uniqueID that arrived later
// they won't leave PP thread pool staying there forever.
}
else
{
bs.rewind();
return -1;
}
}
// cout << " destroy: new size is " << bppMap.size() << endl;
/*
if (sessionID & 0x80000000)
cerr << "destroyed BPP instances for sessionID " << (int)
(sessionID ^ 0x80000000) << " stepID "<< stepID << " (syscat)\n";
else
cerr << "destroyed BPP instances for sessionID " << sessionID <<
" stepID "<< stepID << endl;
*/
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID);
fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(uniqueID);
lk.unlock();
deleteDJLock(uniqueID);
return 0;
@ -1704,8 +1683,11 @@ class DictionaryOp : public FairThreadPool::Functor
bs->rewind();
if (posix_time::second_clock::universal_time() > dieTime)
{
cout << "DictionaryOp::operator(): job has been killed." << endl;
return 0;
}
}
return ret;
}
@ -1782,8 +1764,11 @@ class DestroyEqualityFilter : public DictionaryOp
return 0;
}
else
{
bs->rewind();
return -1;
}
}
};
struct ReadThread
@ -1920,7 +1905,8 @@ struct ReadThread
}
static void dispatchPrimitive(SBS sbs, boost::shared_ptr<BPPHandler>& fBPPHandler,
boost::shared_ptr<threadpool::FairThreadPool>& procPoolPtr,
boost::shared_ptr<threadpool::FairThreadPool> procPool,
std::shared_ptr<threadpool::PriorityThreadPool> OOBProcPool,
SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock, const uint32_t processorThreads,
const bool ptTrace)
{
@ -1942,6 +1928,7 @@ struct ReadThread
const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]);
const uint32_t weight = threadpool::MetaJobsInitialWeight;
const uint32_t priority = 0;
uint32_t id = 0;
boost::shared_ptr<FairThreadPool::Functor> functor;
if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER)
@ -1975,8 +1962,8 @@ struct ReadThread
id = fBPPHandler->getUniqueID(sbs, ismHdr->Command);
functor.reset(new BPPHandler::Abort(fBPPHandler, sbs));
}
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, weight, priority, id);
procPoolPtr->addJob(job);
PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
OOBProcPool->addJob(job);
break;
}
@ -2017,10 +2004,18 @@ struct ReadThread
txnId = *((uint32_t*)&buf[pos + 2]);
stepID = *((uint32_t*)&buf[pos + 6]);
uniqueID = *((uint32_t*)&buf[pos + 10]);
weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]);
weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]) + 100000;
}
if (hdr && hdr->flags & IS_SYSCAT)
{
PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
OOBProcPool->addJob(job);
}
else
{
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
procPoolPtr->addJob(job);
procPool->addJob(job);
}
break;
}
@ -2044,7 +2039,8 @@ struct ReadThread
void operator()()
{
utils::setThreadName("PPReadThread");
boost::shared_ptr<threadpool::FairThreadPool> procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool();
auto procPool = fPrimitiveServerPtr->getProcessorThreadPool();
auto OOBProcPool = fPrimitiveServerPtr->getOOBProcessorThreadPool();
SBS bs;
UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
@ -2135,7 +2131,7 @@ struct ReadThread
default: break;
}
dispatchPrimitive(bs, fBPPHandler, procPoolPtr, outIos, writeLock,
dispatchPrimitive(bs, fBPPHandler, procPool, OOBProcPool, outIos, writeLock,
fPrimitiveServerPtr->ProcessorThreads(), fPrimitiveServerPtr->PTTrace());
}
else // bs.length() == 0
@ -2277,6 +2273,9 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro
fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads,
medPriorityThreads, lowPriorityThreads, 0));
// We're not using either the priority or the job-clustering features, just need a threadpool
// that can reschedule jobs, and an unlimited non-blocking queue
fOOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1));
asyncCounter = 0;
@ -2330,15 +2329,18 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace
sleep(1);
exeMgrDecPtr = (exemgr::globServiceExeMgr) ? exemgr::globServiceExeMgr->getDec() : nullptr;
}
// These empty SPs have "same-host" messaging semantics.
SP_UM_IOSOCK outIos(nullptr);
// This is a pseudo socket that puts data into DEC queue directly.
// It can be used for PP to EM communication only.
SP_UM_IOSOCK outIos(new IOSocket(new SameNodePseudoSocket(exeMgrDecPtr)));
// This empty SP transmits "same-host" messaging semantics.
SP_UM_MUTEX writeLock(nullptr);
auto procPoolPtr = this->getProcessorThreadPool();
auto procPool = this->getProcessorThreadPool();
auto OOBProcPool = this->getOOBProcessorThreadPool();
boost::shared_ptr<BPPHandler> fBPPHandler(new BPPHandler(this));
for (;;)
{
joblist::DistributedEngineComm::SBSVector primitiveMsgs;
for (auto& sbs : exeMgrDecPtr->readLocalQueueMessagesOrWait(primitiveMsgs))
for (auto sbs : exeMgrDecPtr->readLocalQueueMessagesOrWait(primitiveMsgs))
{
if (sbs->length() == 0)
{
@ -2347,7 +2349,7 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace
}
idbassert(sbs->length() >= sizeof(ISMPacketHeader));
ReadThread::dispatchPrimitive(sbs, fBPPHandler, procPoolPtr, outIos, writeLock,
ReadThread::dispatchPrimitive(sbs, fBPPHandler, procPool, OOBProcPool, outIos, writeLock,
this->ProcessorThreads(), this->PTTrace());
}
}
@ -2364,7 +2366,6 @@ BPPV::BPPV(PrimitiveServer* ps)
sendThread->setProcessorPool(ps->getProcessorThreadPool());
v.reserve(BPPCount);
pos = 0;
joinDataReceived = false;
}
BPPV::~BPPV()
@ -2404,27 +2405,6 @@ boost::shared_ptr<BatchPrimitiveProcessor> BPPV::next()
uint32_t size = v.size();
uint32_t i = 0;
#if 0
// This block of code scans for the first available BPP instance,
// makes BPPSeeder reschedule it if none are available. Relies on BPP instance
// being preallocated.
for (i = 0; i < size; i++)
{
uint32_t index = (i + pos) % size;
if (!(v[index]->busy()))
{
pos = (index + 1) % size;
v[index]->busy(true);
return v[index];
}
}
// They're all busy, make threadpool reschedule the job
return boost::shared_ptr<BatchPrimitiveProcessor>();
#endif
// This block of code creates BPP instances if/when they are needed
// don't use a processor thread when it will just block, reschedule it instead

View File

@ -66,7 +66,7 @@ class BPPV
}
void abort();
bool aborted();
volatile bool joinDataReceived;
std::atomic<bool> joinDataReceived{false};
private:
std::vector<boost::shared_ptr<BatchPrimitiveProcessor> > v;
@ -129,6 +129,11 @@ class PrimitiveServer
return fProcessorPool;
}
inline std::shared_ptr<threadpool::PriorityThreadPool> getOOBProcessorThreadPool() const
{
return fOOBPool;
}
int ReadAheadBlocks() const
{
return fReadAheadBlocks;
@ -161,6 +166,7 @@ class PrimitiveServer
* primitive commands
*/
boost::shared_ptr<threadpool::FairThreadPool> fProcessorPool;
std::shared_ptr<threadpool::PriorityThreadPool> fOOBPool;
int fServerThreads;
int fServerQueueSize;

View File

@ -9,6 +9,7 @@ set(messageqcpp_LIB_SRCS
bytestream.cpp
socketparms.cpp
inetstreamsocket.cpp
samenodepseudosocket.cpp
iosocket.cpp
compressed_iss.cpp
bytestreampool.cpp

View File

@ -39,8 +39,6 @@
class MessageQTestSuite;
#define EXPORT
namespace messageqcpp
{
class ServerSocket;
@ -54,22 +52,22 @@ class IOSocket
/** ctor
*
*/
EXPORT explicit IOSocket(Socket* socket = 0);
explicit IOSocket(Socket* socket = 0);
/** copy ctor
*
*/
EXPORT IOSocket(const IOSocket& rhs);
IOSocket(const IOSocket& rhs);
/** assign op
*
*/
EXPORT IOSocket& operator=(const IOSocket& rhs);
IOSocket& operator=(const IOSocket& rhs);
/** dtor
*
*/
EXPORT virtual ~IOSocket();
virtual ~IOSocket();
/** read a ByteStream from this socket
*
@ -84,9 +82,9 @@ class IOSocket
* This socket needs to be connected first. Will throw runtime_error on I/O error. Caller should
* call close() method if exception is thrown.
*/
EXPORT virtual void write(const ByteStream& msg, Stats* stats = NULL) const;
EXPORT virtual void write_raw(const ByteStream& msg, Stats* stats = NULL) const;
EXPORT virtual void write(SBS msg, Stats* stats = NULL) const;
virtual void write(const ByteStream& msg, Stats* stats = NULL) const;
virtual void write_raw(const ByteStream& msg, Stats* stats = NULL) const;
virtual void write(SBS msg, Stats* stats = NULL) const;
/** access the sockaddr member
*/
@ -125,29 +123,29 @@ class IOSocket
*
* Install a socket implementation that meets the Socket interface
*/
EXPORT virtual void setSocketImpl(Socket* socket);
virtual void setSocketImpl(Socket* socket);
/** get a string rep of the IOSocket
*
*/
EXPORT virtual const std::string toString() const;
virtual const std::string toString() const;
/** syncProto() forwarder for inherited classes
*
*/
EXPORT virtual void syncProto(bool use)
virtual void syncProto(bool use)
{
fSocket->syncProto(use);
}
EXPORT virtual int getConnectionNum() const;
virtual int getConnectionNum() const;
// Debug
EXPORT void setSockID(uint32_t id)
void setSockID(uint32_t id)
{
sockID = id;
}
EXPORT uint32_t getSockID()
uint32_t getSockID()
{
return sockID;
}
@ -174,7 +172,6 @@ class IOSocket
return fSocket->isSameAddr(ipv4Addr);
}
/** connect() forwarder for inherited classes
*
*/
@ -298,5 +295,3 @@ inline std::ostream& operator<<(std::ostream& os, const IOSocket& rhs)
}
} // namespace messageqcpp
#undef EXPORT

View File

@ -0,0 +1,127 @@
/* Copyright (C) 2024 MariaDB Corp.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <string>
#include "samenodepseudosocket.h"
#include "iosocket.h"
namespace messageqcpp
{
SameNodePseudoSocket::SameNodePseudoSocket(joblist::DistributedEngineComm* exeMgrDecPtr) : dec_(exeMgrDecPtr)
{
assert(dec_);
}
SameNodePseudoSocket::~SameNodePseudoSocket()
{
}
void SameNodePseudoSocket::open()
{
}
void SameNodePseudoSocket::close()
{
}
Socket* SameNodePseudoSocket::clone() const
{
return nullptr;
}
SameNodePseudoSocket::SameNodePseudoSocket(const SameNodePseudoSocket& rhs)
{
}
SameNodePseudoSocket& SameNodePseudoSocket::operator=(const SameNodePseudoSocket& rhs)
{
return *this;
}
const SBS SameNodePseudoSocket::read(const struct ::timespec* timeout, bool* isTimeOut, Stats* stats) const
{
return nullptr;
}
// This is the only working method of this class. It puts SBS directly into DEC queue.
void SameNodePseudoSocket::write(SBS msg, Stats* stats)
{
dec_->addDataToOutput(msg);
}
void SameNodePseudoSocket::write(const ByteStream& msg, Stats* stats)
{
}
void SameNodePseudoSocket::write_raw(const ByteStream& msg, Stats* stats) const
{
}
void SameNodePseudoSocket::connect(const sockaddr* serv_addr)
{
}
void SameNodePseudoSocket::bind(const sockaddr* serv_addr)
{
}
const IOSocket SameNodePseudoSocket::accept(const struct timespec* timeout)
{
return IOSocket();
}
void SameNodePseudoSocket::listen(int backlog)
{
}
const std::string SameNodePseudoSocket::toString() const
{
return "";
}
const std::string SameNodePseudoSocket::addr2String() const
{
return "";
}
bool SameNodePseudoSocket::isSameAddr(const Socket* rhs) const
{
return false;
}
bool SameNodePseudoSocket::isSameAddr(const struct in_addr& ipv4Addr) const
{
return false;
}
int SameNodePseudoSocket::ping(const std::string& ipaddr, const struct timespec* timeout)
{
return 0;
}
bool SameNodePseudoSocket::isConnected() const
{
return true;
}
bool SameNodePseudoSocket::hasData() const
{
return false;
}
} // namespace messageqcpp

View File

@ -0,0 +1,99 @@
/* Copyright (C) 2024 MariaDB Corp.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include "../../dbcon/joblist/distributedenginecomm.h"
#include "socket.h"
#include "socketparms.h"
#include "bytestream.h"
namespace messageqcpp
{
class IOSocket;
// This class is a dummy replacement for a TCP socket
// wrapper to communicate with the same node.
class SameNodePseudoSocket : public Socket
{
public:
explicit SameNodePseudoSocket(joblist::DistributedEngineComm* exeMgrDecPtr);
virtual ~SameNodePseudoSocket();
virtual void write(SBS msg, Stats* stats = NULL);
private:
virtual void bind(const sockaddr* serv_addr);
SameNodePseudoSocket(const SameNodePseudoSocket& rhs);
virtual SameNodePseudoSocket& operator=(const SameNodePseudoSocket& rhs);
virtual void connectionTimeout(const struct ::timespec* timeout)
{
}
virtual void syncProto(bool use)
{
}
int getConnectionNum() const
{
return 1;
}
inline virtual void socketParms(const SocketParms& socket)
{
}
inline virtual const SocketParms socketParms() const
{
return SocketParms();
}
// all these virtual methods are to stay inaccessable.
inline virtual void sa(const sockaddr* sa);
virtual void open();
virtual void close();
inline virtual bool isOpen() const;
virtual const SBS read(const struct timespec* timeout = 0, bool* isTimeOut = NULL,
Stats* stats = NULL) const;
virtual void write(const ByteStream& msg, Stats* stats = NULL);
virtual void write_raw(const ByteStream& msg, Stats* stats = NULL) const;
virtual void listen(int backlog = 5);
virtual const IOSocket accept(const struct timespec* timeout = 0);
virtual void connect(const sockaddr* serv_addr);
virtual Socket* clone() const;
virtual const std::string toString() const;
virtual const std::string addr2String() const;
virtual bool isSameAddr(const Socket* rhs) const;
virtual bool isSameAddr(const struct in_addr& ipv4Addr) const;
static int ping(const std::string& ipaddr, const struct timespec* timeout = 0);
virtual bool isConnected() const;
virtual bool hasData() const;
joblist::DistributedEngineComm* dec_ = nullptr;
};
inline bool SameNodePseudoSocket::isOpen() const
{
return true;
}
inline void SameNodePseudoSocket::sa(const sockaddr* sa)
{
}
} // namespace messageqcpp

View File

@ -228,7 +228,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
{
// to avoid excessive CPU usage waiting for data from storage
usleep(500);
runList[0].weight_ += RescheduleWeightIncrement;
runList[0].weight_ += (runList[0].weight_) ? runList[0].weight_ : RescheduleWeightIncrement;
addJob(runList[0]);
}
}
@ -259,7 +259,8 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
if (running)
{
sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_);
error_handling::sendErrorMsg(logging::primitiveServerErr, runList[0].uniqueID_, runList[0].stepID_,
runList[0].sock_);
}
}
catch (...)
@ -291,7 +292,8 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
#endif
if (running)
sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_);
error_handling::sendErrorMsg(logging::primitiveServerErr, runList[0].uniqueID_, runList[0].stepID_,
runList[0].sock_);
}
catch (...)
{
@ -301,21 +303,6 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
}
}
void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock)
{
ISMPacketHeader ism;
PrimitiveHeader ph = {0, 0, 0, 0, 0, 0};
ism.Status = logging::primitiveServerErr;
ph.UniqueID = id;
ph.StepID = step;
messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
msg.append((uint8_t*)&ism, sizeof(ism));
msg.append((uint8_t*)&ph, sizeof(ph));
sock->write(msg);
}
void FairThreadPool::stop()
{
stop_.store(true, std::memory_order_relaxed);

View File

@ -76,19 +76,7 @@ class FairThreadPool
, id_(id)
{
}
// sock_ is nullptr here. This is kinda dangerous.
Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx,
const boost::shared_ptr<Functor>& functor, const uint32_t weight = 1, const uint32_t priority = 0,
const uint32_t id = 0)
: uniqueID_(uniqueID)
, stepID_(stepID)
, txnIdx_(txnIdx)
, functor_(functor)
, weight_(weight)
, priority_(priority)
, id_(id)
{
}
uint32_t uniqueID_;
uint32_t stepID_;
TransactionIdxT txnIdx_;

View File

@ -21,7 +21,6 @@
*
***********************************************************************/
#include <stdexcept>
#include <unistd.h>
#include <exception>
using namespace std;
@ -36,6 +35,32 @@ using namespace boost;
#include "dbcon/joblist/primitivemsg.h"
namespace error_handling
{
messageqcpp::SBS makePrimitiveErrorMsg(const uint16_t status, const uint32_t id, const uint32_t step)
{
ISMPacketHeader ism;
ism.Status = status;
PrimitiveHeader ph = {0, 0, 0, step, id, 0};
messageqcpp::SBS errorMsg(new messageqcpp::ByteStream(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)));
errorMsg->append((uint8_t*)&ism, sizeof(ism));
errorMsg->append((uint8_t*)&ph, sizeof(ph));
return errorMsg;
}
void sendErrorMsg(const uint16_t status, const uint32_t id, const uint32_t step,
primitiveprocessor::SP_UM_IOSOCK sock)
{
auto errorMsg = error_handling::makePrimitiveErrorMsg(status, id, step);
sock->write(errorMsg);
}
} // namespace error_handling
namespace threadpool
{
PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads,
@ -267,7 +292,8 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
#endif
if (running)
sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock);
error_handling::sendErrorMsg(logging::primitiveServerErr, runList[i].uniqueID, runList[i].stepID,
runList[i].sock);
}
catch (...)
{
@ -293,7 +319,8 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
#endif
if (running)
sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock);
error_handling::sendErrorMsg(logging::primitiveServerErr, runList[i].uniqueID, runList[i].stepID,
runList[i].sock);
}
catch (...)
{
@ -301,21 +328,6 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw()
}
}
void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock)
{
ISMPacketHeader ism;
PrimitiveHeader ph = {0, 0, 0, 0, 0, 0};
ism.Status = logging::primitiveServerErr;
ph.UniqueID = id;
ph.StepID = step;
messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
msg.append((uint8_t*)&ism, sizeof(ism));
msg.append((uint8_t*)&ph, sizeof(ph));
sock->write(msg);
}
void PriorityThreadPool::stop()
{
_stop = true;

View File

@ -24,11 +24,6 @@
#pragma once
#include <string>
#include <iostream>
#include <cstdlib>
#include <sstream>
#include <stdexcept>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
@ -36,10 +31,20 @@
#include <boost/function.hpp>
#include <atomic>
#include "primitives/primproc/umsocketselector.h"
#include "atomicops.h"
namespace error_handling
{
messageqcpp::SBS makePrimitiveErrorMsg(const uint16_t status, const uint32_t id, const uint32_t step);
void sendErrorMsg(const uint16_t status, const uint32_t id, const uint32_t step,
primitiveprocessor::SP_UM_IOSOCK sock);
} // namespace error_handling
namespace threadpool
{
using TransactionIdxT = uint32_t;
class PriorityThreadPool
{
public:
@ -57,12 +62,25 @@ class PriorityThreadPool
Job() : weight(1), priority(0), id(0)
{
}
Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx,
const boost::shared_ptr<Functor>& functor, const primitiveprocessor::SP_UM_IOSOCK& sock,
const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0)
: functor(functor)
, weight(weight)
, priority(priority)
, id(id)
, stepID(stepID)
, uniqueID(uniqueID)
, sock(sock)
{
}
boost::shared_ptr<Functor> functor;
uint32_t weight;
uint32_t priority;
uint32_t id;
uint32_t uniqueID;
uint32_t stepID;
uint32_t uniqueID;
primitiveprocessor::SP_UM_IOSOCK sock;
};
@ -135,7 +153,6 @@ class PriorityThreadPool
Priority pickAQueue(Priority preference);
void threadFcn(const Priority preferredQueue) throw();
void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock);
std::list<Job> jobQueues[3]; // higher indexes = higher priority
uint32_t threadCounts[3];