1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

fix(threadpool): MCOL-5565 queries stuck in FairThreadScheduler. (#3100)

Meta Primitive Jobs, .e.g ADD_JOINER, LAST_JOINER stuck
	in Fair scheduler without out-of-band scheduler. Add OOB
	scheduler back to remedy the issue.
This commit is contained in:
drrtuy
2024-01-13 10:54:08 +02:00
committed by GitHub
parent eb7f1bb2b4
commit 79ad78f91f
7 changed files with 122 additions and 149 deletions

View File

@ -441,9 +441,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
} }
} }
#ifdef __FreeBSD__
pthread_mutex_unlock(&objLock); pthread_mutex_unlock(&objLock);
#endif
} }
bs >> filterCount; bs >> filterCount;
@ -593,9 +591,7 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con
memset(asyncLoaded.get(), 0, sizeof(bool) * (projectCount + 2)); memset(asyncLoaded.get(), 0, sizeof(bool) * (projectCount + 2));
buildVSSCache(count); buildVSSCache(count);
#ifdef __FreeBSD__
pthread_mutex_unlock(&objLock); pthread_mutex_unlock(&objLock);
#endif
} }
// This version of addToJoiner() is multithreaded. Values are first // This version of addToJoiner() is multithreaded. Values are first
@ -834,28 +830,11 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
idbassert(bs.length() == 0); idbassert(bs.length() == 0);
} }
void BatchPrimitiveProcessor::doneSendingJoinerData()
{
/* to get wall-time of hash table construction
if (!firstCallTime.is_not_a_date_time() && !(sessionID & 0x80000000))
{
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
Logger logger;
ostringstream os;
os << "id " << uniqueID << ": joiner construction time = " << now-firstCallTime;
logger.logMessage(os.str());
cout << os.str() << endl;
}
*/
}
int BatchPrimitiveProcessor::endOfJoiner() int BatchPrimitiveProcessor::endOfJoiner()
{ {
/* Wait for all joiner elements to be added */ /* Wait for all joiner elements to be added */
uint32_t i; uint32_t i;
size_t currentSize; size_t currentSize;
// it should be safe to run this without grabbing this lock
// boost::mutex::scoped_lock scoped(addToJoinerLock);
if (endOfJoinerRan) if (endOfJoinerRan)
return 0; return 0;
@ -876,34 +855,38 @@ int BatchPrimitiveProcessor::endOfJoiner()
currentSize = 0; currentSize = 0;
for (uint j = 0; j < processorThreads; ++j) for (uint j = 0; j < processorThreads; ++j)
if (!tJoiners[i] || !tJoiners[i][j]) if (!tJoiners[i] || !tJoiners[i][j])
{
return -1; return -1;
}
else else
currentSize += tJoiners[i][j]->size(); currentSize += tJoiners[i][j]->size();
if (currentSize != tJoinerSizes[i]) if (currentSize != tJoinerSizes[i])
{
return -1; return -1;
// if ((!tJoiners[i] || tJoiners[i]->size() != tJoinerSizes[i])) }
// return -1;
} }
else else
{ {
currentSize = 0; currentSize = 0;
for (uint j = 0; j < processorThreads; ++j) for (uint j = 0; j < processorThreads; ++j)
{
if (!tlJoiners[i] || !tlJoiners[i][j]) if (!tlJoiners[i] || !tlJoiners[i][j])
{
return -1; return -1;
}
else else
currentSize += tlJoiners[i][j]->size(); currentSize += tlJoiners[i][j]->size();
}
if (currentSize != tJoinerSizes[i]) if (currentSize != tJoinerSizes[i])
{
return -1; return -1;
// if ((!tJoiners[i] || tlJoiners[i]->size() != tJoinerSizes[i])) }
// return -1;
} }
} }
endOfJoinerRan = true; endOfJoinerRan = true;
#ifndef __FreeBSD__
pthread_mutex_unlock(&objLock); pthread_mutex_unlock(&objLock);
#endif
return 0; return 0;
} }
@ -1076,7 +1059,6 @@ void BatchPrimitiveProcessor::initProcessor()
{ {
for (i = 0; i < (uint32_t)filterCount - 1; ++i) for (i = 0; i < (uint32_t)filterCount - 1; ++i)
{ {
// cout << "prepping filter " << i << endl;
filterSteps[i]->setBatchPrimitiveProcessor(this); filterSteps[i]->setBatchPrimitiveProcessor(this);
if (filterSteps[i + 1]->getCommandType() == Command::DICT_STEP) if (filterSteps[i + 1]->getCommandType() == Command::DICT_STEP)
@ -1087,14 +1069,12 @@ void BatchPrimitiveProcessor::initProcessor()
filterSteps[i]->prep(OT_RID, false); filterSteps[i]->prep(OT_RID, false);
} }
// cout << "prepping filter " << i << endl;
filterSteps[i]->setBatchPrimitiveProcessor(this); filterSteps[i]->setBatchPrimitiveProcessor(this);
filterSteps[i]->prep(OT_BOTH, false); filterSteps[i]->prep(OT_BOTH, false);
} }
for (i = 0; i < projectCount; ++i) for (i = 0; i < projectCount; ++i)
{ {
// cout << "prepping projection " << i << endl;
projectSteps[i]->setBatchPrimitiveProcessor(this); projectSteps[i]->setBatchPrimitiveProcessor(this);
if (noVB) if (noVB)
@ -1120,7 +1100,6 @@ void BatchPrimitiveProcessor::initProcessor()
if (fAggregator.get() != NULL) if (fAggregator.get() != NULL)
{ {
// fAggRowGroupData.reset(new uint8_t[fAggregateRG.getMaxDataSize()]);
fAggRowGroupData.reinit(fAggregateRG); fAggRowGroupData.reinit(fAggregateRG);
fAggregateRG.setData(&fAggRowGroupData); fAggregateRG.setData(&fAggRowGroupData);

View File

@ -96,7 +96,6 @@ class BatchPrimitiveProcessor
void resetBPP(messageqcpp::ByteStream&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& outputSock); void resetBPP(messageqcpp::ByteStream&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& outputSock);
void addToJoiner(messageqcpp::ByteStream&); void addToJoiner(messageqcpp::ByteStream&);
int endOfJoiner(); int endOfJoiner();
void doneSendingJoinerData();
int operator()(); int operator()();
void setLBIDForScan(uint64_t rid); void setLBIDForScan(uint64_t rid);

View File

@ -157,8 +157,6 @@ int BPPSeeder::operator()()
return ret; return ret;
} }
// if (!(sessionID & 0x80000000))
// cout << "got request for <" << sessionID <<", " << stepID << ">\n";
scoped.lock(); scoped.lock();
if (!bppv) if (!bppv)
@ -172,26 +170,12 @@ int BPPSeeder::operator()()
if (boost::posix_time::second_clock::universal_time() > dieTime) if (boost::posix_time::second_clock::universal_time() > dieTime)
{ {
#if 0 // for debugging cout << "BPPSeeder::operator(): job for id " << uniqueID << "and session " << sessionID
boost::posix_time::ptime pt = boost::posix_time::microsec_clock::local_time(); << " has been killed." << endl;
if (sessionID & 0x80000000)
cout << "BPPSeeder couldn't find the sessionID/stepID pair. sessionID="
<< (int) (sessionID ^ 0x80000000) << " stepID=" << stepID << " (syscat)" << pt << endl;
else
cout << "BPPSeeder couldn't find the sessionID/stepID pair. sessionID="
<< sessionID << " stepID=" << stepID << pt << endl;
throw logic_error("BPPSeeder couldn't find the sessionID/stepID pair");
#endif
return 0; return 0;
} }
// if (!isSysCat())
return -1; return -1;
// else { // syscat queries aren't run by a threadpool, can't reschedule those
//jobs usleep(1000); goto retry;
// }
} }
bppv = it->second; bppv = it->second;
@ -205,10 +189,6 @@ int BPPSeeder::operator()()
if (!bpp) if (!bpp)
{ {
// if (isSysCat()) {
// usleep(1000);
// goto retry;
// }
return -1; // all BPP instances are busy, make threadpool reschedule return -1; // all BPP instances are busy, make threadpool reschedule
} }

View File

@ -1211,6 +1211,7 @@ struct BPPHandler
} }
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key); fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(key);
} }
scoped.unlock(); scoped.unlock();
@ -1324,16 +1325,21 @@ struct BPPHandler
} }
else else
{ {
bs.rewind();
if (posix_time::second_clock::universal_time() > dieTime) if (posix_time::second_clock::universal_time() > dieTime)
{
std::cout << "doAbort: job for key " << key << " has been killed." << std::endl;
return 0; return 0;
}
else else
{
bs.rewind();
return -1; return -1;
}
} }
scoped.unlock(); scoped.unlock();
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key); fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(key);
return 0; return 0;
} }
@ -1356,7 +1362,10 @@ struct BPPHandler
return 0; return 0;
} }
else else
{
bs.rewind();
return -1; return -1;
}
} }
void createBPP(ByteStream& bs) void createBPP(ByteStream& bs)
@ -1404,7 +1413,6 @@ struct BPPHandler
bppKeys.push_back(key); bppKeys.push_back(key);
bool newInsert; bool newInsert;
newInsert = bppMap.insert(pair<uint32_t, SBPPV>(key, bppv)).second; newInsert = bppMap.insert(pair<uint32_t, SBPPV>(key, bppv)).second;
// cout << "creating BPP # " << key << endl;
scoped.unlock(); scoped.unlock();
if (!newInsert) if (!newInsert)
@ -1422,10 +1430,7 @@ struct BPPHandler
inline SBPPV grabBPPs(uint32_t uniqueID) inline SBPPV grabBPPs(uint32_t uniqueID)
{ {
BPPMap::iterator it; BPPMap::iterator it;
/*
uint32_t failCount = 0;
uint32_t maxFailCount = (fatal ? 500 : 5000);
*/
SBPPV ret; SBPPV ret;
boost::mutex::scoped_lock scoped(bppLock); boost::mutex::scoped_lock scoped(bppLock);
@ -1435,24 +1440,6 @@ struct BPPHandler
return it->second; return it->second;
else else
return SBPPV(); return SBPPV();
/*
do
{
if (++failCount == maxFailCount) {
//cout << "grabBPPs couldn't find the BPPs for " << uniqueID << endl;
return ret;
//throw logic_error("grabBPPs couldn't find the unique ID");
}
scoped.unlock();
usleep(5000);
scoped.lock();
it = bppMap.find(uniqueID);
} while (it == bppMap.end());
ret = it->second;
return ret;
*/
} }
inline shared_mutex& getDJLock(uint32_t uniqueID) inline shared_mutex& getDJLock(uint32_t uniqueID)
@ -1490,6 +1477,7 @@ struct BPPHandler
buf = bs.buf(); buf = bs.buf();
/* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */ /* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */
uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]); uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
bppv = grabBPPs(uniqueID); bppv = grabBPPs(uniqueID);
if (bppv) if (bppv)
@ -1501,7 +1489,10 @@ struct BPPHandler
else else
{ {
if (posix_time::second_clock::universal_time() > dieTime) if (posix_time::second_clock::universal_time() > dieTime)
{
cout << "addJoinerToBPP: job for id " << uniqueID << " has been killed." << endl;
return 0; return 0;
}
else else
return -1; return -1;
} }
@ -1519,20 +1510,22 @@ struct BPPHandler
buf = bs.buf(); buf = bs.buf();
/* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */ /* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */
uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]); uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
bppv = grabBPPs(uniqueID); bppv = grabBPPs(uniqueID);
if (!bppv) if (!bppv)
{ {
// cout << "got a lastJoiner msg for an unknown obj " << uniqueID << endl;
if (posix_time::second_clock::universal_time() > dieTime) if (posix_time::second_clock::universal_time() > dieTime)
{
cout << "LastJoiner: job for id " << uniqueID << " has been killed." << endl;
return 0; return 0;
}
else else
{
return -1; return -1;
}
} }
boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID)); boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID));
for (i = 0; i < bppv->get().size(); i++) for (i = 0; i < bppv->get().size(); i++)
{ {
err = bppv->get()[i]->endOfJoiner(); err = bppv->get()[i]->endOfJoiner();
@ -1540,32 +1533,26 @@ struct BPPHandler
if (err == -1) if (err == -1)
{ {
if (posix_time::second_clock::universal_time() > dieTime) if (posix_time::second_clock::universal_time() > dieTime)
{
cout << "LastJoiner: job for id " << uniqueID
<< " has been killed waiting for joiner messages for too long." << endl;
return 0; return 0;
}
else else
return -1; return -1;
} }
} }
bppv->get()[0]->doneSendingJoinerData();
/* Note: some of the duplicate/run/join sync was moved to the BPPV class to do /* Note: some of the duplicate/run/join sync was moved to the BPPV class to do
more intelligent scheduling. Once the join data is received, BPPV will more intelligent scheduling. Once the join data is received, BPPV will
start letting jobs run and create more BPP instances on demand. */ start letting jobs run and create more BPP instances on demand. */
atomicops::atomicMb(); // make sure the joinDataReceived assignment doesn't migrate upward...
bppv->joinDataReceived = true; bppv->joinDataReceived = true;
return 0; return 0;
} }
int destroyBPP(ByteStream& bs, const posix_time::ptime& dieTime) int destroyBPP(ByteStream& bs, const posix_time::ptime& dieTime)
{ {
// This is a corner case that damages bs so its length becomes less than a header length.
// The damaged bs doesn't pass the if that checks bs at least has header + 3x int32_t.
// The if block below works around the issue.
if (posix_time::second_clock::universal_time() > dieTime)
{
return 0;
}
uint32_t uniqueID, sessionID, stepID; uint32_t uniqueID, sessionID, stepID;
BPPMap::iterator it; BPPMap::iterator it;
@ -1608,39 +1595,33 @@ struct BPPHandler
{ {
// MCOL-5. On ubuntu, a crash was happening. Checking // MCOL-5. On ubuntu, a crash was happening. Checking
// joinDataReceived here fixes it. // joinDataReceived here fixes it.
// We're not ready for a destroy. Reschedule. // We're not ready for a destroy. Reschedule to wait
// for all joiners to arrive.
// TODO there might be no joiners if the query is canceled.
// The memory will leak.
// Rewind to the beginning of ByteStream buf b/c of the advance above.
bs.rewind();
return -1; return -1;
} }
} }
else else
{ {
// cout << "got a destroy for an unknown obj " << uniqueID << endl;
bs.rewind();
if (posix_time::second_clock::universal_time() > dieTime) if (posix_time::second_clock::universal_time() > dieTime)
{ {
// XXXPAT: going to let this fall through and delete jobs for cout << "destroyBPP: job for id " << uniqueID << " and sessionID " << sessionID << " has been killed."
// uniqueID if there are any. Not clear what the downside is. << endl;
/* // If for some reason there are jobs for this uniqueID that arrived later
lk.unlock(); // they won't leave PP thread pool staying there forever.
deleteDJLock(uniqueID);
return 0;
*/
} }
else else
{
bs.rewind();
return -1; return -1;
}
} }
// cout << " destroy: new size is " << bppMap.size() << endl;
/*
if (sessionID & 0x80000000)
cerr << "destroyed BPP instances for sessionID " << (int)
(sessionID ^ 0x80000000) << " stepID "<< stepID << " (syscat)\n";
else
cerr << "destroyed BPP instances for sessionID " << sessionID <<
" stepID "<< stepID << endl;
*/
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID); fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID);
fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(uniqueID);
lk.unlock(); lk.unlock();
deleteDJLock(uniqueID); deleteDJLock(uniqueID);
return 0; return 0;
@ -1709,7 +1690,10 @@ class DictionaryOp : public FairThreadPool::Functor
bs->rewind(); bs->rewind();
if (posix_time::second_clock::universal_time() > dieTime) if (posix_time::second_clock::universal_time() > dieTime)
{
cout << "DictionaryOp::operator(): job has been killed." << endl;
return 0; return 0;
}
} }
return ret; return ret;
@ -1787,7 +1771,10 @@ class DestroyEqualityFilter : public DictionaryOp
return 0; return 0;
} }
else else
{
bs->rewind();
return -1; return -1;
}
} }
}; };
@ -1925,7 +1912,8 @@ struct ReadThread
} }
static void dispatchPrimitive(SBS sbs, boost::shared_ptr<BPPHandler>& fBPPHandler, static void dispatchPrimitive(SBS sbs, boost::shared_ptr<BPPHandler>& fBPPHandler,
boost::shared_ptr<threadpool::FairThreadPool>& procPoolPtr, boost::shared_ptr<threadpool::FairThreadPool> procPool,
std::shared_ptr<threadpool::PriorityThreadPool> OOBProcPool,
SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock, const uint32_t processorThreads, SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock, const uint32_t processorThreads,
const bool ptTrace) const bool ptTrace)
{ {
@ -1947,6 +1935,7 @@ struct ReadThread
const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]); const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]);
const uint32_t weight = threadpool::MetaJobsInitialWeight; const uint32_t weight = threadpool::MetaJobsInitialWeight;
const uint32_t priority = 0; const uint32_t priority = 0;
uint32_t id = 0; uint32_t id = 0;
boost::shared_ptr<FairThreadPool::Functor> functor; boost::shared_ptr<FairThreadPool::Functor> functor;
if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER) if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER)
@ -1980,8 +1969,8 @@ struct ReadThread
id = fBPPHandler->getUniqueID(sbs, ismHdr->Command); id = fBPPHandler->getUniqueID(sbs, ismHdr->Command);
functor.reset(new BPPHandler::Abort(fBPPHandler, sbs)); functor.reset(new BPPHandler::Abort(fBPPHandler, sbs));
} }
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, weight, priority, id); PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, weight, priority, id);
procPoolPtr->addJob(job); OOBProcPool->addJob(job);
break; break;
} }
@ -2022,10 +2011,18 @@ struct ReadThread
txnId = *((uint32_t*)&buf[pos + 2]); txnId = *((uint32_t*)&buf[pos + 2]);
stepID = *((uint32_t*)&buf[pos + 6]); stepID = *((uint32_t*)&buf[pos + 6]);
uniqueID = *((uint32_t*)&buf[pos + 10]); uniqueID = *((uint32_t*)&buf[pos + 10]);
weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]); weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]) + 100000;
}
if (hdr && hdr->flags & IS_SYSCAT)
{
PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
OOBProcPool->addJob(job);
}
else
{
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
procPool->addJob(job);
} }
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
procPoolPtr->addJob(job);
break; break;
} }
@ -2049,7 +2046,8 @@ struct ReadThread
void operator()() void operator()()
{ {
utils::setThreadName("PPReadThread"); utils::setThreadName("PPReadThread");
boost::shared_ptr<threadpool::FairThreadPool> procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); auto procPool = fPrimitiveServerPtr->getProcessorThreadPool();
auto OOBProcPool = fPrimitiveServerPtr->getOOBProcessorThreadPool();
SBS bs; SBS bs;
UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
@ -2140,7 +2138,7 @@ struct ReadThread
default: break; default: break;
} }
dispatchPrimitive(bs, fBPPHandler, procPoolPtr, outIos, writeLock, dispatchPrimitive(bs, fBPPHandler, procPool, OOBProcPool, outIos, writeLock,
fPrimitiveServerPtr->ProcessorThreads(), fPrimitiveServerPtr->PTTrace()); fPrimitiveServerPtr->ProcessorThreads(), fPrimitiveServerPtr->PTTrace());
} }
else // bs.length() == 0 else // bs.length() == 0
@ -2282,6 +2280,9 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro
fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads,
medPriorityThreads, lowPriorityThreads, 0)); medPriorityThreads, lowPriorityThreads, 0));
// We're not using either the priority or the job-clustering features, just need a threadpool
// that can reschedule jobs, and an unlimited non-blocking queue
fOOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1));
asyncCounter = 0; asyncCounter = 0;
@ -2338,12 +2339,13 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace
// These empty SPs have "same-host" messaging semantics. // These empty SPs have "same-host" messaging semantics.
SP_UM_IOSOCK outIos(nullptr); SP_UM_IOSOCK outIos(nullptr);
SP_UM_MUTEX writeLock(nullptr); SP_UM_MUTEX writeLock(nullptr);
auto procPoolPtr = this->getProcessorThreadPool(); auto procPool = this->getProcessorThreadPool();
auto OOBProcPool = this->getOOBProcessorThreadPool();
boost::shared_ptr<BPPHandler> fBPPHandler(new BPPHandler(this)); boost::shared_ptr<BPPHandler> fBPPHandler(new BPPHandler(this));
for (;;) for (;;)
{ {
joblist::DistributedEngineComm::SBSVector primitiveMsgs; joblist::DistributedEngineComm::SBSVector primitiveMsgs;
for (auto& sbs : exeMgrDecPtr->readLocalQueueMessagesOrWait(primitiveMsgs)) for (auto sbs : exeMgrDecPtr->readLocalQueueMessagesOrWait(primitiveMsgs))
{ {
if (sbs->length() == 0) if (sbs->length() == 0)
{ {
@ -2352,7 +2354,7 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace
} }
idbassert(sbs->length() >= sizeof(ISMPacketHeader)); idbassert(sbs->length() >= sizeof(ISMPacketHeader));
ReadThread::dispatchPrimitive(sbs, fBPPHandler, procPoolPtr, outIos, writeLock, ReadThread::dispatchPrimitive(sbs, fBPPHandler, procPool, OOBProcPool, outIos, writeLock,
this->ProcessorThreads(), this->PTTrace()); this->ProcessorThreads(), this->PTTrace());
} }
} }
@ -2369,7 +2371,6 @@ BPPV::BPPV(PrimitiveServer* ps)
sendThread->setProcessorPool(ps->getProcessorThreadPool()); sendThread->setProcessorPool(ps->getProcessorThreadPool());
v.reserve(BPPCount); v.reserve(BPPCount);
pos = 0; pos = 0;
joinDataReceived = false;
} }
BPPV::~BPPV() BPPV::~BPPV()
@ -2409,27 +2410,6 @@ boost::shared_ptr<BatchPrimitiveProcessor> BPPV::next()
uint32_t size = v.size(); uint32_t size = v.size();
uint32_t i = 0; uint32_t i = 0;
#if 0
// This block of code scans for the first available BPP instance,
// makes BPPSeeder reschedule it if none are available. Relies on BPP instance
// being preallocated.
for (i = 0; i < size; i++)
{
uint32_t index = (i + pos) % size;
if (!(v[index]->busy()))
{
pos = (index + 1) % size;
v[index]->busy(true);
return v[index];
}
}
// They're all busy, make threadpool reschedule the job
return boost::shared_ptr<BatchPrimitiveProcessor>();
#endif
// This block of code creates BPP instances if/when they are needed // This block of code creates BPP instances if/when they are needed
// don't use a processor thread when it will just block, reschedule it instead // don't use a processor thread when it will just block, reschedule it instead

View File

@ -66,7 +66,7 @@ class BPPV
} }
void abort(); void abort();
bool aborted(); bool aborted();
volatile bool joinDataReceived; std::atomic<bool> joinDataReceived{false};
private: private:
std::vector<boost::shared_ptr<BatchPrimitiveProcessor> > v; std::vector<boost::shared_ptr<BatchPrimitiveProcessor> > v;
@ -129,6 +129,11 @@ class PrimitiveServer
return fProcessorPool; return fProcessorPool;
} }
inline std::shared_ptr<threadpool::PriorityThreadPool> getOOBProcessorThreadPool() const
{
return fOOBPool;
}
int ReadAheadBlocks() const int ReadAheadBlocks() const
{ {
return fReadAheadBlocks; return fReadAheadBlocks;
@ -161,6 +166,7 @@ class PrimitiveServer
* primitive commands * primitive commands
*/ */
boost::shared_ptr<threadpool::FairThreadPool> fProcessorPool; boost::shared_ptr<threadpool::FairThreadPool> fProcessorPool;
std::shared_ptr<threadpool::PriorityThreadPool> fOOBPool;
int fServerThreads; int fServerThreads;
int fServerQueueSize; int fServerQueueSize;

View File

@ -228,7 +228,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue
{ {
// to avoid excessive CPU usage waiting for data from storage // to avoid excessive CPU usage waiting for data from storage
usleep(500); usleep(500);
runList[0].weight_ += RescheduleWeightIncrement; runList[0].weight_ += (runList[0].weight_) ? runList[0].weight_ : RescheduleWeightIncrement;
addJob(runList[0]); addJob(runList[0]);
} }
} }

View File

@ -40,6 +40,9 @@
namespace threadpool namespace threadpool
{ {
using TransactionIdxT = uint32_t;
class PriorityThreadPool class PriorityThreadPool
{ {
public: public:
@ -57,12 +60,38 @@ class PriorityThreadPool
Job() : weight(1), priority(0), id(0) Job() : weight(1), priority(0), id(0)
{ {
} }
Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx,
const boost::shared_ptr<Functor>& functor, const primitiveprocessor::SP_UM_IOSOCK& sock,
const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0)
: functor(functor)
, weight(weight)
, priority(priority)
, id(id)
, stepID(stepID)
, uniqueID(uniqueID)
, sock(sock)
{
}
// sock_ is nullptr here. This is kinda dangerous.
Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx,
const boost::shared_ptr<Functor>& functor, const uint32_t weight = 1, const uint32_t priority = 0,
const uint32_t id = 0)
: functor(functor)
, weight(weight)
, priority(priority)
, id(id)
, stepID(stepID)
, uniqueID(uniqueID)
, sock(nullptr)
{
}
boost::shared_ptr<Functor> functor; boost::shared_ptr<Functor> functor;
uint32_t weight; uint32_t weight;
uint32_t priority; uint32_t priority;
uint32_t id; uint32_t id;
uint32_t uniqueID;
uint32_t stepID; uint32_t stepID;
uint32_t uniqueID;
primitiveprocessor::SP_UM_IOSOCK sock; primitiveprocessor::SP_UM_IOSOCK sock;
}; };
@ -113,7 +142,7 @@ class PriorityThreadPool
{ {
return blockedThreads; return blockedThreads;
} }
protected: protected:
private: private:
struct ThreadHelper struct ThreadHelper