diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 62134e3c5..8151c8926 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -441,9 +441,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) } } -#ifdef __FreeBSD__ pthread_mutex_unlock(&objLock); -#endif } bs >> filterCount; @@ -593,9 +591,7 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con memset(asyncLoaded.get(), 0, sizeof(bool) * (projectCount + 2)); buildVSSCache(count); -#ifdef __FreeBSD__ pthread_mutex_unlock(&objLock); -#endif } // This version of addToJoiner() is multithreaded. Values are first @@ -834,28 +830,11 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) idbassert(bs.length() == 0); } -void BatchPrimitiveProcessor::doneSendingJoinerData() -{ - /* to get wall-time of hash table construction -if (!firstCallTime.is_not_a_date_time() && !(sessionID & 0x80000000)) -{ - boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); - Logger logger; - ostringstream os; - os << "id " << uniqueID << ": joiner construction time = " << now-firstCallTime; - logger.logMessage(os.str()); - cout << os.str() << endl; -} - */ -} - int BatchPrimitiveProcessor::endOfJoiner() { /* Wait for all joiner elements to be added */ uint32_t i; size_t currentSize; - // it should be safe to run this without grabbing this lock - // boost::mutex::scoped_lock scoped(addToJoinerLock); if (endOfJoinerRan) return 0; @@ -876,34 +855,38 @@ int BatchPrimitiveProcessor::endOfJoiner() currentSize = 0; for (uint j = 0; j < processorThreads; ++j) if (!tJoiners[i] || !tJoiners[i][j]) + { return -1; + } else currentSize += tJoiners[i][j]->size(); if (currentSize != tJoinerSizes[i]) + { return -1; - // if ((!tJoiners[i] || tJoiners[i]->size() != tJoinerSizes[i])) - // return -1; + } } else { currentSize = 0; for (uint j = 0; j < processorThreads; ++j) + { if (!tlJoiners[i] || !tlJoiners[i][j]) + { return -1; + } else currentSize += tlJoiners[i][j]->size(); + } if (currentSize != tJoinerSizes[i]) + { return -1; - // if ((!tJoiners[i] || tlJoiners[i]->size() != tJoinerSizes[i])) - // return -1; + } } } endOfJoinerRan = true; -#ifndef __FreeBSD__ pthread_mutex_unlock(&objLock); -#endif return 0; } @@ -1076,7 +1059,6 @@ void BatchPrimitiveProcessor::initProcessor() { for (i = 0; i < (uint32_t)filterCount - 1; ++i) { - // cout << "prepping filter " << i << endl; filterSteps[i]->setBatchPrimitiveProcessor(this); if (filterSteps[i + 1]->getCommandType() == Command::DICT_STEP) @@ -1087,14 +1069,12 @@ void BatchPrimitiveProcessor::initProcessor() filterSteps[i]->prep(OT_RID, false); } - // cout << "prepping filter " << i << endl; filterSteps[i]->setBatchPrimitiveProcessor(this); filterSteps[i]->prep(OT_BOTH, false); } for (i = 0; i < projectCount; ++i) { - // cout << "prepping projection " << i << endl; projectSteps[i]->setBatchPrimitiveProcessor(this); if (noVB) @@ -1120,7 +1100,6 @@ void BatchPrimitiveProcessor::initProcessor() if (fAggregator.get() != NULL) { - // fAggRowGroupData.reset(new uint8_t[fAggregateRG.getMaxDataSize()]); fAggRowGroupData.reinit(fAggregateRG); fAggregateRG.setData(&fAggRowGroupData); diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index 730ee2a6b..c7965f9e6 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -96,7 +96,6 @@ class BatchPrimitiveProcessor void resetBPP(messageqcpp::ByteStream&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& outputSock); void addToJoiner(messageqcpp::ByteStream&); int endOfJoiner(); - void doneSendingJoinerData(); int operator()(); void setLBIDForScan(uint64_t rid); diff --git a/primitives/primproc/bppseeder.cpp b/primitives/primproc/bppseeder.cpp index 48f821c07..de1798c50 100644 --- a/primitives/primproc/bppseeder.cpp +++ b/primitives/primproc/bppseeder.cpp @@ -157,8 +157,6 @@ int BPPSeeder::operator()() return ret; } - // if (!(sessionID & 0x80000000)) - // cout << "got request for <" << sessionID <<", " << stepID << ">\n"; scoped.lock(); if (!bppv) @@ -172,26 +170,12 @@ int BPPSeeder::operator()() if (boost::posix_time::second_clock::universal_time() > dieTime) { -#if 0 // for debugging - boost::posix_time::ptime pt = boost::posix_time::microsec_clock::local_time(); - - if (sessionID & 0x80000000) - cout << "BPPSeeder couldn't find the sessionID/stepID pair. sessionID=" - << (int) (sessionID ^ 0x80000000) << " stepID=" << stepID << " (syscat)" << pt << endl; - else - cout << "BPPSeeder couldn't find the sessionID/stepID pair. sessionID=" - << sessionID << " stepID=" << stepID << pt << endl; - - throw logic_error("BPPSeeder couldn't find the sessionID/stepID pair"); -#endif + cout << "BPPSeeder::operator(): job for id " << uniqueID << "and session " << sessionID + << " has been killed." << endl; return 0; } - // if (!isSysCat()) return -1; - // else { // syscat queries aren't run by a threadpool, can't reschedule those - //jobs usleep(1000); goto retry; - // } } bppv = it->second; @@ -205,10 +189,6 @@ int BPPSeeder::operator()() if (!bpp) { - // if (isSysCat()) { - // usleep(1000); - // goto retry; - // } return -1; // all BPP instances are busy, make threadpool reschedule } diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 088b96b43..60ce2145d 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -1211,6 +1211,7 @@ struct BPPHandler } fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key); + fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(key); } scoped.unlock(); @@ -1324,16 +1325,21 @@ struct BPPHandler } else { - bs.rewind(); - if (posix_time::second_clock::universal_time() > dieTime) + { + std::cout << "doAbort: job for key " << key << " has been killed." << std::endl; return 0; + } else + { + bs.rewind(); return -1; + } } scoped.unlock(); fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key); + fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(key); return 0; } @@ -1356,7 +1362,10 @@ struct BPPHandler return 0; } else + { + bs.rewind(); return -1; + } } void createBPP(ByteStream& bs) @@ -1404,7 +1413,6 @@ struct BPPHandler bppKeys.push_back(key); bool newInsert; newInsert = bppMap.insert(pair(key, bppv)).second; - // cout << "creating BPP # " << key << endl; scoped.unlock(); if (!newInsert) @@ -1422,10 +1430,7 @@ struct BPPHandler inline SBPPV grabBPPs(uint32_t uniqueID) { BPPMap::iterator it; - /* - uint32_t failCount = 0; - uint32_t maxFailCount = (fatal ? 500 : 5000); - */ + SBPPV ret; boost::mutex::scoped_lock scoped(bppLock); @@ -1435,24 +1440,6 @@ struct BPPHandler return it->second; else return SBPPV(); - - /* - do - { - if (++failCount == maxFailCount) { - //cout << "grabBPPs couldn't find the BPPs for " << uniqueID << endl; - return ret; - //throw logic_error("grabBPPs couldn't find the unique ID"); - } - scoped.unlock(); - usleep(5000); - scoped.lock(); - it = bppMap.find(uniqueID); - } while (it == bppMap.end()); - - ret = it->second; - return ret; - */ } inline shared_mutex& getDJLock(uint32_t uniqueID) @@ -1490,6 +1477,7 @@ struct BPPHandler buf = bs.buf(); /* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */ uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]); + bppv = grabBPPs(uniqueID); if (bppv) @@ -1501,7 +1489,10 @@ struct BPPHandler else { if (posix_time::second_clock::universal_time() > dieTime) + { + cout << "addJoinerToBPP: job for id " << uniqueID << " has been killed." << endl; return 0; + } else return -1; } @@ -1519,20 +1510,22 @@ struct BPPHandler buf = bs.buf(); /* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */ uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]); - bppv = grabBPPs(uniqueID); if (!bppv) { - // cout << "got a lastJoiner msg for an unknown obj " << uniqueID << endl; if (posix_time::second_clock::universal_time() > dieTime) + { + cout << "LastJoiner: job for id " << uniqueID << " has been killed." << endl; return 0; + } else + { return -1; + } } boost::unique_lock lk(getDJLock(uniqueID)); - for (i = 0; i < bppv->get().size(); i++) { err = bppv->get()[i]->endOfJoiner(); @@ -1540,32 +1533,26 @@ struct BPPHandler if (err == -1) { if (posix_time::second_clock::universal_time() > dieTime) + { + cout << "LastJoiner: job for id " << uniqueID + << " has been killed waiting for joiner messages for too long." << endl; return 0; + } else return -1; } } - bppv->get()[0]->doneSendingJoinerData(); /* Note: some of the duplicate/run/join sync was moved to the BPPV class to do more intelligent scheduling. Once the join data is received, BPPV will start letting jobs run and create more BPP instances on demand. */ - atomicops::atomicMb(); // make sure the joinDataReceived assignment doesn't migrate upward... bppv->joinDataReceived = true; return 0; } int destroyBPP(ByteStream& bs, const posix_time::ptime& dieTime) { - // This is a corner case that damages bs so its length becomes less than a header length. - // The damaged bs doesn't pass the if that checks bs at least has header + 3x int32_t. - // The if block below works around the issue. - if (posix_time::second_clock::universal_time() > dieTime) - { - return 0; - } - uint32_t uniqueID, sessionID, stepID; BPPMap::iterator it; @@ -1608,39 +1595,33 @@ struct BPPHandler { // MCOL-5. On ubuntu, a crash was happening. Checking // joinDataReceived here fixes it. - // We're not ready for a destroy. Reschedule. + // We're not ready for a destroy. Reschedule to wait + // for all joiners to arrive. + // TODO there might be no joiners if the query is canceled. + // The memory will leak. + // Rewind to the beginning of ByteStream buf b/c of the advance above. + bs.rewind(); return -1; } } else { - // cout << "got a destroy for an unknown obj " << uniqueID << endl; - bs.rewind(); - if (posix_time::second_clock::universal_time() > dieTime) { - // XXXPAT: going to let this fall through and delete jobs for - // uniqueID if there are any. Not clear what the downside is. - /* -lk.unlock(); -deleteDJLock(uniqueID); -return 0; - */ + cout << "destroyBPP: job for id " << uniqueID << " and sessionID " << sessionID << " has been killed." + << endl; + // If for some reason there are jobs for this uniqueID that arrived later + // they won't leave PP thread pool staying there forever. } else + { + bs.rewind(); return -1; + } } - // cout << " destroy: new size is " << bppMap.size() << endl; - /* - if (sessionID & 0x80000000) - cerr << "destroyed BPP instances for sessionID " << (int) - (sessionID ^ 0x80000000) << " stepID "<< stepID << " (syscat)\n"; - else - cerr << "destroyed BPP instances for sessionID " << sessionID << - " stepID "<< stepID << endl; - */ fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID); + fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(uniqueID); lk.unlock(); deleteDJLock(uniqueID); return 0; @@ -1709,7 +1690,10 @@ class DictionaryOp : public FairThreadPool::Functor bs->rewind(); if (posix_time::second_clock::universal_time() > dieTime) + { + cout << "DictionaryOp::operator(): job has been killed." << endl; return 0; + } } return ret; @@ -1787,7 +1771,10 @@ class DestroyEqualityFilter : public DictionaryOp return 0; } else + { + bs->rewind(); return -1; + } } }; @@ -1925,7 +1912,8 @@ struct ReadThread } static void dispatchPrimitive(SBS sbs, boost::shared_ptr& fBPPHandler, - boost::shared_ptr& procPoolPtr, + boost::shared_ptr procPool, + std::shared_ptr OOBProcPool, SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock, const uint32_t processorThreads, const bool ptTrace) { @@ -1947,6 +1935,7 @@ struct ReadThread const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]); const uint32_t weight = threadpool::MetaJobsInitialWeight; const uint32_t priority = 0; + uint32_t id = 0; boost::shared_ptr functor; if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER) @@ -1980,8 +1969,8 @@ struct ReadThread id = fBPPHandler->getUniqueID(sbs, ismHdr->Command); functor.reset(new BPPHandler::Abort(fBPPHandler, sbs)); } - FairThreadPool::Job job(uniqueID, stepID, txnId, functor, weight, priority, id); - procPoolPtr->addJob(job); + PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, weight, priority, id); + OOBProcPool->addJob(job); break; } @@ -2022,10 +2011,18 @@ struct ReadThread txnId = *((uint32_t*)&buf[pos + 2]); stepID = *((uint32_t*)&buf[pos + 6]); uniqueID = *((uint32_t*)&buf[pos + 10]); - weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]); + weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]) + 100000; + } + if (hdr && hdr->flags & IS_SYSCAT) + { + PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + OOBProcPool->addJob(job); + } + else + { + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + procPool->addJob(job); } - FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); - procPoolPtr->addJob(job); break; } @@ -2049,7 +2046,8 @@ struct ReadThread void operator()() { utils::setThreadName("PPReadThread"); - boost::shared_ptr procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); + auto procPool = fPrimitiveServerPtr->getProcessorThreadPool(); + auto OOBProcPool = fPrimitiveServerPtr->getOOBProcessorThreadPool(); SBS bs; UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); @@ -2140,7 +2138,7 @@ struct ReadThread default: break; } - dispatchPrimitive(bs, fBPPHandler, procPoolPtr, outIos, writeLock, + dispatchPrimitive(bs, fBPPHandler, procPool, OOBProcPool, outIos, writeLock, fPrimitiveServerPtr->ProcessorThreads(), fPrimitiveServerPtr->PTTrace()); } else // bs.length() == 0 @@ -2282,6 +2280,9 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, medPriorityThreads, lowPriorityThreads, 0)); + // We're not using either the priority or the job-clustering features, just need a threadpool + // that can reschedule jobs, and an unlimited non-blocking queue + fOOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1)); asyncCounter = 0; @@ -2338,12 +2339,13 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace // These empty SPs have "same-host" messaging semantics. SP_UM_IOSOCK outIos(nullptr); SP_UM_MUTEX writeLock(nullptr); - auto procPoolPtr = this->getProcessorThreadPool(); + auto procPool = this->getProcessorThreadPool(); + auto OOBProcPool = this->getOOBProcessorThreadPool(); boost::shared_ptr fBPPHandler(new BPPHandler(this)); for (;;) { joblist::DistributedEngineComm::SBSVector primitiveMsgs; - for (auto& sbs : exeMgrDecPtr->readLocalQueueMessagesOrWait(primitiveMsgs)) + for (auto sbs : exeMgrDecPtr->readLocalQueueMessagesOrWait(primitiveMsgs)) { if (sbs->length() == 0) { @@ -2352,7 +2354,7 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace } idbassert(sbs->length() >= sizeof(ISMPacketHeader)); - ReadThread::dispatchPrimitive(sbs, fBPPHandler, procPoolPtr, outIos, writeLock, + ReadThread::dispatchPrimitive(sbs, fBPPHandler, procPool, OOBProcPool, outIos, writeLock, this->ProcessorThreads(), this->PTTrace()); } } @@ -2369,7 +2371,6 @@ BPPV::BPPV(PrimitiveServer* ps) sendThread->setProcessorPool(ps->getProcessorThreadPool()); v.reserve(BPPCount); pos = 0; - joinDataReceived = false; } BPPV::~BPPV() @@ -2409,27 +2410,6 @@ boost::shared_ptr BPPV::next() uint32_t size = v.size(); uint32_t i = 0; -#if 0 - - // This block of code scans for the first available BPP instance, - // makes BPPSeeder reschedule it if none are available. Relies on BPP instance - // being preallocated. - for (i = 0; i < size; i++) - { - uint32_t index = (i + pos) % size; - - if (!(v[index]->busy())) - { - pos = (index + 1) % size; - v[index]->busy(true); - return v[index]; - } - } - - // They're all busy, make threadpool reschedule the job - return boost::shared_ptr(); -#endif - // This block of code creates BPP instances if/when they are needed // don't use a processor thread when it will just block, reschedule it instead diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index 1e9980a80..2d8531d25 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -66,7 +66,7 @@ class BPPV } void abort(); bool aborted(); - volatile bool joinDataReceived; + std::atomic joinDataReceived{false}; private: std::vector > v; @@ -129,6 +129,11 @@ class PrimitiveServer return fProcessorPool; } + inline std::shared_ptr getOOBProcessorThreadPool() const + { + return fOOBPool; + } + int ReadAheadBlocks() const { return fReadAheadBlocks; @@ -161,6 +166,7 @@ class PrimitiveServer * primitive commands */ boost::shared_ptr fProcessorPool; + std::shared_ptr fOOBPool; int fServerThreads; int fServerQueueSize; diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp index cfd0350fd..9876eb57c 100644 --- a/utils/threadpool/fair_threadpool.cpp +++ b/utils/threadpool/fair_threadpool.cpp @@ -228,7 +228,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue { // to avoid excessive CPU usage waiting for data from storage usleep(500); - runList[0].weight_ += RescheduleWeightIncrement; + runList[0].weight_ += (runList[0].weight_) ? runList[0].weight_ : RescheduleWeightIncrement; addJob(runList[0]); } } diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index ba1acc33d..c9597b8df 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -40,6 +40,9 @@ namespace threadpool { + +using TransactionIdxT = uint32_t; + class PriorityThreadPool { public: @@ -57,12 +60,38 @@ class PriorityThreadPool Job() : weight(1), priority(0), id(0) { } + Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx, + const boost::shared_ptr& functor, const primitiveprocessor::SP_UM_IOSOCK& sock, + const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0) + : functor(functor) + , weight(weight) + , priority(priority) + , id(id) + , stepID(stepID) + , uniqueID(uniqueID) + , sock(sock) + { + } + // sock_ is nullptr here. This is kinda dangerous. + Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx, + const boost::shared_ptr& functor, const uint32_t weight = 1, const uint32_t priority = 0, + const uint32_t id = 0) + : functor(functor) + , weight(weight) + , priority(priority) + , id(id) + , stepID(stepID) + , uniqueID(uniqueID) + , sock(nullptr) + { + } + boost::shared_ptr functor; uint32_t weight; uint32_t priority; uint32_t id; - uint32_t uniqueID; uint32_t stepID; + uint32_t uniqueID; primitiveprocessor::SP_UM_IOSOCK sock; }; @@ -113,7 +142,7 @@ class PriorityThreadPool { return blockedThreads; } - + protected: private: struct ThreadHelper