diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 8a170d010..e917756dc 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -441,9 +441,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) } } -#ifdef __FreeBSD__ pthread_mutex_unlock(&objLock); -#endif } bs >> filterCount; @@ -593,9 +591,7 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con memset(asyncLoaded.get(), 0, sizeof(bool) * (projectCount + 2)); buildVSSCache(count); -#ifdef __FreeBSD__ pthread_mutex_unlock(&objLock); -#endif } // This version of addToJoiner() is multithreaded. Values are first @@ -834,28 +830,11 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) idbassert(bs.length() == 0); } -void BatchPrimitiveProcessor::doneSendingJoinerData() -{ - /* to get wall-time of hash table construction -if (!firstCallTime.is_not_a_date_time() && !(sessionID & 0x80000000)) -{ - boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); - Logger logger; - ostringstream os; - os << "id " << uniqueID << ": joiner construction time = " << now-firstCallTime; - logger.logMessage(os.str()); - cout << os.str() << endl; -} - */ -} - int BatchPrimitiveProcessor::endOfJoiner() { /* Wait for all joiner elements to be added */ uint32_t i; size_t currentSize; - // it should be safe to run this without grabbing this lock - // boost::mutex::scoped_lock scoped(addToJoinerLock); if (endOfJoinerRan) return 0; @@ -876,34 +855,38 @@ int BatchPrimitiveProcessor::endOfJoiner() currentSize = 0; for (uint j = 0; j < processorThreads; ++j) if (!tJoiners[i] || !tJoiners[i][j]) + { return -1; + } else currentSize += tJoiners[i][j]->size(); if (currentSize != tJoinerSizes[i]) + { return -1; - // if ((!tJoiners[i] || tJoiners[i]->size() != tJoinerSizes[i])) - // return -1; + } } else { currentSize = 0; for (uint j = 0; j < processorThreads; ++j) + { if (!tlJoiners[i] || !tlJoiners[i][j]) + { return -1; + } else currentSize += tlJoiners[i][j]->size(); + } if (currentSize != tJoinerSizes[i]) + { return -1; - // if ((!tJoiners[i] || tlJoiners[i]->size() != tJoinerSizes[i])) - // return -1; + } } } endOfJoinerRan = true; -#ifndef __FreeBSD__ pthread_mutex_unlock(&objLock); -#endif return 0; } @@ -1076,7 +1059,6 @@ void BatchPrimitiveProcessor::initProcessor() { for (i = 0; i < (uint32_t)filterCount - 1; ++i) { - // cout << "prepping filter " << i << endl; filterSteps[i]->setBatchPrimitiveProcessor(this); if (filterSteps[i + 1]->getCommandType() == Command::DICT_STEP) @@ -1087,14 +1069,12 @@ void BatchPrimitiveProcessor::initProcessor() filterSteps[i]->prep(OT_RID, false); } - // cout << "prepping filter " << i << endl; filterSteps[i]->setBatchPrimitiveProcessor(this); filterSteps[i]->prep(OT_BOTH, false); } for (i = 0; i < projectCount; ++i) { - // cout << "prepping projection " << i << endl; projectSteps[i]->setBatchPrimitiveProcessor(this); if (noVB) @@ -1120,7 +1100,6 @@ void BatchPrimitiveProcessor::initProcessor() if (fAggregator.get() != NULL) { - // fAggRowGroupData.reset(new uint8_t[fAggregateRG.getMaxDataSize()]); fAggRowGroupData.reinit(fAggregateRG); fAggregateRG.setData(&fAggRowGroupData); @@ -1944,64 +1923,6 @@ void BatchPrimitiveProcessor::execute() } catch (NeedToRestartJob& n) { -#if 0 - - /* This block of code will flush the problematic OIDs from the - * cache. It seems to have no effect on the problem, so it's commented - * for now. - * - * This is currently thrown only on syscat queries. If we find the problem - * in user tables also, we should avoid dropping entire OIDs if possible. - * - * In local testing there was no need for flushing, because DDL flushes - * the syscat constantly. However, it can take a long time (>10 s) before - * that happens. Doing it locally should make it much more likely only - * one restart is necessary. - */ - - try - { - vector oids; - uint32_t oid; - - for (uint32_t i = 0; i < filterCount; i++) - { - oid = filterSteps[i]->getOID(); - - if (oid > 0) - oids.push_back(oid); - } - - for (uint32_t i = 0; i < projectCount; i++) - { - oid = projectSteps[i]->getOID(); - - if (oid > 0) - oids.push_back(oid); - } - -#if 0 - Logger logger; - ostringstream os; - os << "dropping OIDs: "; - - for (int i = 0; i < oids.size(); i++) - os << oids[i] << " "; - - logger.logMessage(os.str()); -#endif - - for (int i = 0; i < fCacheCount; i++) - { - dbbc::blockCacheClient bc(*BRPp[i]); -// bc.flushCache(); - bc.flushOIDs(&oids[0], oids.size()); - } - } - catch (...) { } // doesn't matter if this fails, just avoid crashing - -#endif - #ifndef __FreeBSD__ pthread_mutex_unlock(&objLock); #endif @@ -2132,21 +2053,20 @@ void BatchPrimitiveProcessor::serializeStrings() void BatchPrimitiveProcessor::sendResponse() { - auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); // Here is the fast path for local EM to PM interaction. PM puts into the // input EM DEC queue directly. - // !sock has a 'same host connection' semantics here. - if (initiatedByEM_ && (!sock || exeMgrDecPtr->clientAtTheSameHost(sock))) + // !writelock has a 'same host connection' semantics here. + if (initiatedByEM_ && !writelock) { // Flow Control now handles same node connections so the recieving DEC queue // is limited. if (sendThread->flowControlEnabled()) { - sendThread->sendResult({serialized, nullptr, nullptr, 0}, false); + sendThread->sendResult({serialized, sock, writelock, 0}, false); } else { - exeMgrDecPtr->addDataToOutput(serialized); + sock->write(serialized); serialized.reset(); } diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index 87f843f3a..012f84467 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -96,7 +96,6 @@ class BatchPrimitiveProcessor void resetBPP(messageqcpp::ByteStream&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& outputSock); void addToJoiner(messageqcpp::ByteStream&); int endOfJoiner(); - void doneSendingJoinerData(); int operator()(); void setLBIDForScan(uint64_t rid); diff --git a/primitives/primproc/bppseeder.cpp b/primitives/primproc/bppseeder.cpp index 48f821c07..22d866b59 100644 --- a/primitives/primproc/bppseeder.cpp +++ b/primitives/primproc/bppseeder.cpp @@ -153,12 +153,10 @@ int BPPSeeder::operator()() if (0 < status) { - sendErrorMsg(uniqueID, status, stepID); + error_handling::sendErrorMsg(status, uniqueID, stepID, sock); return ret; } - // if (!(sessionID & 0x80000000)) - // cout << "got request for <" << sessionID <<", " << stepID << ">\n"; scoped.lock(); if (!bppv) @@ -172,26 +170,12 @@ int BPPSeeder::operator()() if (boost::posix_time::second_clock::universal_time() > dieTime) { -#if 0 // for debugging - boost::posix_time::ptime pt = boost::posix_time::microsec_clock::local_time(); - - if (sessionID & 0x80000000) - cout << "BPPSeeder couldn't find the sessionID/stepID pair. sessionID=" - << (int) (sessionID ^ 0x80000000) << " stepID=" << stepID << " (syscat)" << pt << endl; - else - cout << "BPPSeeder couldn't find the sessionID/stepID pair. sessionID=" - << sessionID << " stepID=" << stepID << pt << endl; - - throw logic_error("BPPSeeder couldn't find the sessionID/stepID pair"); -#endif + cout << "BPPSeeder::operator(): job for id " << uniqueID << "and session " << sessionID + << " has been killed." << endl; return 0; } - // if (!isSysCat()) return -1; - // else { // syscat queries aren't run by a threadpool, can't reschedule those - //jobs usleep(1000); goto retry; - // } } bppv = it->second; @@ -205,10 +189,6 @@ int BPPSeeder::operator()() if (!bpp) { - // if (isSysCat()) { - // usleep(1000); - // goto retry; - // } return -1; // all BPP instances are busy, make threadpool reschedule } @@ -355,23 +335,8 @@ void BPPSeeder::catchHandler(const string& ex, uint32_t id, uint32_t step) { Logger log; log.logMessage(ex); - sendErrorMsg(id, logging::bppSeederErr, step); -} -void BPPSeeder::sendErrorMsg(uint32_t id, uint16_t status, uint32_t step) -{ - ISMPacketHeader ism; - PrimitiveHeader ph = {0, 0, 0, 0, 0, 0}; - - ism.Status = status; - ph.UniqueID = id; - ph.StepID = step; - ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); - msg.append((uint8_t*)&ism, sizeof(ism)); - msg.append((uint8_t*)&ph, sizeof(ph)); - - boost::mutex::scoped_lock lk(*writelock); - sock->write(msg); + error_handling::sendErrorMsg(logging::bppSeederErr, id, step, sock); } bool BPPSeeder::isSysCat() diff --git a/primitives/primproc/bppseeder.h b/primitives/primproc/bppseeder.h index f4c35b2c8..c8896e8bc 100644 --- a/primitives/primproc/bppseeder.h +++ b/primitives/primproc/bppseeder.h @@ -76,7 +76,6 @@ class BPPSeeder : public threadpool::FairThreadPool::Functor private: BPPSeeder(); void catchHandler(const std::string& s, uint32_t uniqueID, uint32_t step); - void sendErrorMsg(uint32_t id, uint16_t status, uint32_t step); void flushSyscatOIDs(); messageqcpp::SBS bs; diff --git a/primitives/primproc/bppsendthread.cpp b/primitives/primproc/bppsendthread.cpp index d444e48b5..831754e79 100644 --- a/primitives/primproc/bppsendthread.cpp +++ b/primitives/primproc/bppsendthread.cpp @@ -234,11 +234,9 @@ void BPPSendThread::mainLoop() bsSize = msg[msgsSent].msg->lengthWithHdrOverhead(); // Same node processing path - if (!sock) + if (!lock) { - auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); - assert(exeMgrDecPtr); - exeMgrDecPtr->addDataToOutput(msg[msgsSent].msg); + msg[msgsSent].sock->write(msg[msgsSent].msg); } else { diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index 4819200a0..ceac88fc5 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -59,6 +59,7 @@ using namespace BRM; #include "writeengine.h" #include "messagequeue.h" +#include "samenodepseudosocket.h" using namespace messageqcpp; #include "blockrequestprocessor.h" @@ -106,8 +107,6 @@ using namespace threadpool; #define O_NOATIME 0 #endif -typedef tr1::unordered_set USOID; - // make global for blockcache // static const char* statsName = {"pm"}; @@ -1019,7 +1018,7 @@ class DictScanJob : public threadpool::FairThreadPool::Functor DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock); virtual ~DictScanJob(); - void write(const SBS&); + void write(const SBS); int operator()(); void catchHandler(const std::string& ex, uint32_t id, uint16_t code = logging::primitiveServerErr); void sendErrorMsg(uint32_t id, uint16_t code); @@ -1041,15 +1040,14 @@ DictScanJob::~DictScanJob() { } -void DictScanJob::write(const SBS& sbs) +void DictScanJob::write(const SBS sbs) { // Here is the fast path for local EM to PM interaction. PM puts into the // input EM DEC queue directly. - // !sock has a 'same host connection' semantics here. - if (!fIos) + // !fWriteLock has a 'same host connection' semantics here. + if (!fWriteLock) { - auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); - exeMgrDecPtr->addDataToOutput(sbs); + fIos->write(sbs); return; } boost::mutex::scoped_lock lk(*fWriteLock); @@ -1209,6 +1207,7 @@ struct BPPHandler } fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key); + fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(key); } scoped.unlock(); @@ -1322,16 +1321,21 @@ struct BPPHandler } else { - bs.rewind(); - if (posix_time::second_clock::universal_time() > dieTime) + { + std::cout << "doAbort: job for key " << key << " has been killed." << std::endl; return 0; + } else + { + bs.rewind(); return -1; + } } scoped.unlock(); fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key); + fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(key); return 0; } @@ -1354,7 +1358,10 @@ struct BPPHandler return 0; } else + { + bs.rewind(); return -1; + } } void createBPP(ByteStream& bs) @@ -1402,7 +1409,6 @@ struct BPPHandler bppKeys.push_back(key); bool newInsert; newInsert = bppMap.insert(pair(key, bppv)).second; - // cout << "creating BPP # " << key << endl; scoped.unlock(); if (!newInsert) @@ -1420,10 +1426,7 @@ struct BPPHandler inline SBPPV grabBPPs(uint32_t uniqueID) { BPPMap::iterator it; - /* - uint32_t failCount = 0; - uint32_t maxFailCount = (fatal ? 500 : 5000); - */ + SBPPV ret; boost::mutex::scoped_lock scoped(bppLock); @@ -1433,24 +1436,6 @@ struct BPPHandler return it->second; else return SBPPV(); - - /* - do - { - if (++failCount == maxFailCount) { - //cout << "grabBPPs couldn't find the BPPs for " << uniqueID << endl; - return ret; - //throw logic_error("grabBPPs couldn't find the unique ID"); - } - scoped.unlock(); - usleep(5000); - scoped.lock(); - it = bppMap.find(uniqueID); - } while (it == bppMap.end()); - - ret = it->second; - return ret; - */ } inline shared_mutex& getDJLock(uint32_t uniqueID) @@ -1488,6 +1473,7 @@ struct BPPHandler buf = bs.buf(); /* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */ uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]); + bppv = grabBPPs(uniqueID); if (bppv) @@ -1499,7 +1485,10 @@ struct BPPHandler else { if (posix_time::second_clock::universal_time() > dieTime) + { + cout << "addJoinerToBPP: job for id " << uniqueID << " has been killed." << endl; return 0; + } else return -1; } @@ -1517,20 +1506,22 @@ struct BPPHandler buf = bs.buf(); /* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */ uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]); - bppv = grabBPPs(uniqueID); if (!bppv) { - // cout << "got a lastJoiner msg for an unknown obj " << uniqueID << endl; if (posix_time::second_clock::universal_time() > dieTime) + { + cout << "LastJoiner: job for id " << uniqueID << " has been killed." << endl; return 0; + } else + { return -1; + } } boost::unique_lock lk(getDJLock(uniqueID)); - for (i = 0; i < bppv->get().size(); i++) { err = bppv->get()[i]->endOfJoiner(); @@ -1538,32 +1529,26 @@ struct BPPHandler if (err == -1) { if (posix_time::second_clock::universal_time() > dieTime) + { + cout << "LastJoiner: job for id " << uniqueID + << " has been killed waiting for joiner messages for too long." << endl; return 0; + } else return -1; } } - bppv->get()[0]->doneSendingJoinerData(); /* Note: some of the duplicate/run/join sync was moved to the BPPV class to do more intelligent scheduling. Once the join data is received, BPPV will start letting jobs run and create more BPP instances on demand. */ - atomicops::atomicMb(); // make sure the joinDataReceived assignment doesn't migrate upward... bppv->joinDataReceived = true; return 0; } int destroyBPP(ByteStream& bs, const posix_time::ptime& dieTime) { - // This is a corner case that damages bs so its length becomes less than a header length. - // The damaged bs doesn't pass the if that checks bs at least has header + 3x int32_t. - // The if block below works around the issue. - if (posix_time::second_clock::universal_time() > dieTime) - { - return 0; - } - uint32_t uniqueID, sessionID, stepID; BPPMap::iterator it; if (bs.length() < sizeof(ISMPacketHeader) + sizeof(sessionID) + sizeof(stepID) + sizeof(uniqueID)) @@ -1603,39 +1588,33 @@ struct BPPHandler { // MCOL-5. On ubuntu, a crash was happening. Checking // joinDataReceived here fixes it. - // We're not ready for a destroy. Reschedule. + // We're not ready for a destroy. Reschedule to wait + // for all joiners to arrive. + // TODO there might be no joiners if the query is canceled. + // The memory will leak. + // Rewind to the beginning of ByteStream buf b/c of the advance above. + bs.rewind(); return -1; } } else { - // cout << "got a destroy for an unknown obj " << uniqueID << endl; - bs.rewind(); - if (posix_time::second_clock::universal_time() > dieTime) { - // XXXPAT: going to let this fall through and delete jobs for - // uniqueID if there are any. Not clear what the downside is. - /* -lk.unlock(); -deleteDJLock(uniqueID); -return 0; - */ + cout << "destroyBPP: job for id " << uniqueID << " and sessionID " << sessionID << " has been killed." + << endl; + // If for some reason there are jobs for this uniqueID that arrived later + // they won't leave PP thread pool staying there forever. } else + { + bs.rewind(); return -1; + } } - // cout << " destroy: new size is " << bppMap.size() << endl; - /* - if (sessionID & 0x80000000) - cerr << "destroyed BPP instances for sessionID " << (int) - (sessionID ^ 0x80000000) << " stepID "<< stepID << " (syscat)\n"; - else - cerr << "destroyed BPP instances for sessionID " << sessionID << - " stepID "<< stepID << endl; - */ fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID); + fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(uniqueID); lk.unlock(); deleteDJLock(uniqueID); return 0; @@ -1704,7 +1683,10 @@ class DictionaryOp : public FairThreadPool::Functor bs->rewind(); if (posix_time::second_clock::universal_time() > dieTime) + { + cout << "DictionaryOp::operator(): job has been killed." << endl; return 0; + } } return ret; @@ -1782,7 +1764,10 @@ class DestroyEqualityFilter : public DictionaryOp return 0; } else + { + bs->rewind(); return -1; + } } }; @@ -1920,7 +1905,8 @@ struct ReadThread } static void dispatchPrimitive(SBS sbs, boost::shared_ptr& fBPPHandler, - boost::shared_ptr& procPoolPtr, + boost::shared_ptr procPool, + std::shared_ptr OOBProcPool, SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock, const uint32_t processorThreads, const bool ptTrace) { @@ -1942,6 +1928,7 @@ struct ReadThread const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]); const uint32_t weight = threadpool::MetaJobsInitialWeight; const uint32_t priority = 0; + uint32_t id = 0; boost::shared_ptr functor; if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER) @@ -1975,8 +1962,8 @@ struct ReadThread id = fBPPHandler->getUniqueID(sbs, ismHdr->Command); functor.reset(new BPPHandler::Abort(fBPPHandler, sbs)); } - FairThreadPool::Job job(uniqueID, stepID, txnId, functor, weight, priority, id); - procPoolPtr->addJob(job); + PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + OOBProcPool->addJob(job); break; } @@ -2017,10 +2004,18 @@ struct ReadThread txnId = *((uint32_t*)&buf[pos + 2]); stepID = *((uint32_t*)&buf[pos + 6]); uniqueID = *((uint32_t*)&buf[pos + 10]); - weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]); + weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]) + 100000; + } + if (hdr && hdr->flags & IS_SYSCAT) + { + PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + OOBProcPool->addJob(job); + } + else + { + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + procPool->addJob(job); } - FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); - procPoolPtr->addJob(job); break; } @@ -2044,7 +2039,8 @@ struct ReadThread void operator()() { utils::setThreadName("PPReadThread"); - boost::shared_ptr procPoolPtr = fPrimitiveServerPtr->getProcessorThreadPool(); + auto procPool = fPrimitiveServerPtr->getProcessorThreadPool(); + auto OOBProcPool = fPrimitiveServerPtr->getOOBProcessorThreadPool(); SBS bs; UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance(); @@ -2135,7 +2131,7 @@ struct ReadThread default: break; } - dispatchPrimitive(bs, fBPPHandler, procPoolPtr, outIos, writeLock, + dispatchPrimitive(bs, fBPPHandler, procPool, OOBProcPool, outIos, writeLock, fPrimitiveServerPtr->ProcessorThreads(), fPrimitiveServerPtr->PTTrace()); } else // bs.length() == 0 @@ -2277,6 +2273,9 @@ PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int pro fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads, medPriorityThreads, lowPriorityThreads, 0)); + // We're not using either the priority or the job-clustering features, just need a threadpool + // that can reschedule jobs, and an unlimited non-blocking queue + fOOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1)); asyncCounter = 0; @@ -2330,15 +2329,18 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace sleep(1); exeMgrDecPtr = (exemgr::globServiceExeMgr) ? exemgr::globServiceExeMgr->getDec() : nullptr; } - // These empty SPs have "same-host" messaging semantics. - SP_UM_IOSOCK outIos(nullptr); + // This is a pseudo socket that puts data into DEC queue directly. + // It can be used for PP to EM communication only. + SP_UM_IOSOCK outIos(new IOSocket(new SameNodePseudoSocket(exeMgrDecPtr))); + // This empty SP transmits "same-host" messaging semantics. SP_UM_MUTEX writeLock(nullptr); - auto procPoolPtr = this->getProcessorThreadPool(); + auto procPool = this->getProcessorThreadPool(); + auto OOBProcPool = this->getOOBProcessorThreadPool(); boost::shared_ptr fBPPHandler(new BPPHandler(this)); for (;;) { joblist::DistributedEngineComm::SBSVector primitiveMsgs; - for (auto& sbs : exeMgrDecPtr->readLocalQueueMessagesOrWait(primitiveMsgs)) + for (auto sbs : exeMgrDecPtr->readLocalQueueMessagesOrWait(primitiveMsgs)) { if (sbs->length() == 0) { @@ -2347,7 +2349,7 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace } idbassert(sbs->length() >= sizeof(ISMPacketHeader)); - ReadThread::dispatchPrimitive(sbs, fBPPHandler, procPoolPtr, outIos, writeLock, + ReadThread::dispatchPrimitive(sbs, fBPPHandler, procPool, OOBProcPool, outIos, writeLock, this->ProcessorThreads(), this->PTTrace()); } } @@ -2364,7 +2366,6 @@ BPPV::BPPV(PrimitiveServer* ps) sendThread->setProcessorPool(ps->getProcessorThreadPool()); v.reserve(BPPCount); pos = 0; - joinDataReceived = false; } BPPV::~BPPV() @@ -2404,27 +2405,6 @@ boost::shared_ptr BPPV::next() uint32_t size = v.size(); uint32_t i = 0; -#if 0 - - // This block of code scans for the first available BPP instance, - // makes BPPSeeder reschedule it if none are available. Relies on BPP instance - // being preallocated. - for (i = 0; i < size; i++) - { - uint32_t index = (i + pos) % size; - - if (!(v[index]->busy())) - { - pos = (index + 1) % size; - v[index]->busy(true); - return v[index]; - } - } - - // They're all busy, make threadpool reschedule the job - return boost::shared_ptr(); -#endif - // This block of code creates BPP instances if/when they are needed // don't use a processor thread when it will just block, reschedule it instead diff --git a/primitives/primproc/primitiveserver.h b/primitives/primproc/primitiveserver.h index 1e9980a80..2d8531d25 100644 --- a/primitives/primproc/primitiveserver.h +++ b/primitives/primproc/primitiveserver.h @@ -66,7 +66,7 @@ class BPPV } void abort(); bool aborted(); - volatile bool joinDataReceived; + std::atomic joinDataReceived{false}; private: std::vector > v; @@ -129,6 +129,11 @@ class PrimitiveServer return fProcessorPool; } + inline std::shared_ptr getOOBProcessorThreadPool() const + { + return fOOBPool; + } + int ReadAheadBlocks() const { return fReadAheadBlocks; @@ -161,6 +166,7 @@ class PrimitiveServer * primitive commands */ boost::shared_ptr fProcessorPool; + std::shared_ptr fOOBPool; int fServerThreads; int fServerQueueSize; diff --git a/utils/messageqcpp/CMakeLists.txt b/utils/messageqcpp/CMakeLists.txt index 3cc8b402f..ca7f15a09 100644 --- a/utils/messageqcpp/CMakeLists.txt +++ b/utils/messageqcpp/CMakeLists.txt @@ -9,6 +9,7 @@ set(messageqcpp_LIB_SRCS bytestream.cpp socketparms.cpp inetstreamsocket.cpp + samenodepseudosocket.cpp iosocket.cpp compressed_iss.cpp bytestreampool.cpp diff --git a/utils/messageqcpp/iosocket.h b/utils/messageqcpp/iosocket.h index d1cc3ef49..4dd742a23 100644 --- a/utils/messageqcpp/iosocket.h +++ b/utils/messageqcpp/iosocket.h @@ -39,8 +39,6 @@ class MessageQTestSuite; -#define EXPORT - namespace messageqcpp { class ServerSocket; @@ -54,22 +52,22 @@ class IOSocket /** ctor * */ - EXPORT explicit IOSocket(Socket* socket = 0); + explicit IOSocket(Socket* socket = 0); /** copy ctor * */ - EXPORT IOSocket(const IOSocket& rhs); + IOSocket(const IOSocket& rhs); /** assign op * */ - EXPORT IOSocket& operator=(const IOSocket& rhs); + IOSocket& operator=(const IOSocket& rhs); /** dtor * */ - EXPORT virtual ~IOSocket(); + virtual ~IOSocket(); /** read a ByteStream from this socket * @@ -84,9 +82,9 @@ class IOSocket * This socket needs to be connected first. Will throw runtime_error on I/O error. Caller should * call close() method if exception is thrown. */ - EXPORT virtual void write(const ByteStream& msg, Stats* stats = NULL) const; - EXPORT virtual void write_raw(const ByteStream& msg, Stats* stats = NULL) const; - EXPORT virtual void write(SBS msg, Stats* stats = NULL) const; + virtual void write(const ByteStream& msg, Stats* stats = NULL) const; + virtual void write_raw(const ByteStream& msg, Stats* stats = NULL) const; + virtual void write(SBS msg, Stats* stats = NULL) const; /** access the sockaddr member */ @@ -125,29 +123,29 @@ class IOSocket * * Install a socket implementation that meets the Socket interface */ - EXPORT virtual void setSocketImpl(Socket* socket); + virtual void setSocketImpl(Socket* socket); /** get a string rep of the IOSocket * */ - EXPORT virtual const std::string toString() const; + virtual const std::string toString() const; /** syncProto() forwarder for inherited classes * */ - EXPORT virtual void syncProto(bool use) + virtual void syncProto(bool use) { fSocket->syncProto(use); } - EXPORT virtual int getConnectionNum() const; + virtual int getConnectionNum() const; // Debug - EXPORT void setSockID(uint32_t id) + void setSockID(uint32_t id) { sockID = id; } - EXPORT uint32_t getSockID() + uint32_t getSockID() { return sockID; } @@ -174,7 +172,6 @@ class IOSocket return fSocket->isSameAddr(ipv4Addr); } - /** connect() forwarder for inherited classes * */ @@ -298,5 +295,3 @@ inline std::ostream& operator<<(std::ostream& os, const IOSocket& rhs) } } // namespace messageqcpp - -#undef EXPORT diff --git a/utils/messageqcpp/samenodepseudosocket.cpp b/utils/messageqcpp/samenodepseudosocket.cpp new file mode 100644 index 000000000..2e7ac9c27 --- /dev/null +++ b/utils/messageqcpp/samenodepseudosocket.cpp @@ -0,0 +1,127 @@ +/* Copyright (C) 2024 MariaDB Corp. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include + +#include "samenodepseudosocket.h" +#include "iosocket.h" + +namespace messageqcpp +{ +SameNodePseudoSocket::SameNodePseudoSocket(joblist::DistributedEngineComm* exeMgrDecPtr) : dec_(exeMgrDecPtr) +{ + assert(dec_); +} + +SameNodePseudoSocket::~SameNodePseudoSocket() +{ +} + +void SameNodePseudoSocket::open() +{ +} + +void SameNodePseudoSocket::close() +{ +} + +Socket* SameNodePseudoSocket::clone() const +{ + return nullptr; +} + +SameNodePseudoSocket::SameNodePseudoSocket(const SameNodePseudoSocket& rhs) +{ +} + +SameNodePseudoSocket& SameNodePseudoSocket::operator=(const SameNodePseudoSocket& rhs) +{ + return *this; +} + +const SBS SameNodePseudoSocket::read(const struct ::timespec* timeout, bool* isTimeOut, Stats* stats) const +{ + return nullptr; +} + +// This is the only working method of this class. It puts SBS directly into DEC queue. +void SameNodePseudoSocket::write(SBS msg, Stats* stats) +{ + dec_->addDataToOutput(msg); +} + +void SameNodePseudoSocket::write(const ByteStream& msg, Stats* stats) +{ +} + +void SameNodePseudoSocket::write_raw(const ByteStream& msg, Stats* stats) const +{ +} + +void SameNodePseudoSocket::connect(const sockaddr* serv_addr) +{ +} + +void SameNodePseudoSocket::bind(const sockaddr* serv_addr) +{ +} + +const IOSocket SameNodePseudoSocket::accept(const struct timespec* timeout) +{ + return IOSocket(); +} + +void SameNodePseudoSocket::listen(int backlog) +{ +} + +const std::string SameNodePseudoSocket::toString() const +{ + return ""; +} + +const std::string SameNodePseudoSocket::addr2String() const +{ + return ""; +} + +bool SameNodePseudoSocket::isSameAddr(const Socket* rhs) const +{ + return false; +} + +bool SameNodePseudoSocket::isSameAddr(const struct in_addr& ipv4Addr) const +{ + return false; +} + +int SameNodePseudoSocket::ping(const std::string& ipaddr, const struct timespec* timeout) +{ + return 0; +} + +bool SameNodePseudoSocket::isConnected() const +{ + return true; +} + +bool SameNodePseudoSocket::hasData() const +{ + return false; +} + +} // namespace messageqcpp diff --git a/utils/messageqcpp/samenodepseudosocket.h b/utils/messageqcpp/samenodepseudosocket.h new file mode 100644 index 000000000..f977f02a2 --- /dev/null +++ b/utils/messageqcpp/samenodepseudosocket.h @@ -0,0 +1,99 @@ +/* Copyright (C) 2024 MariaDB Corp. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#pragma once + +#include "../../dbcon/joblist/distributedenginecomm.h" + +#include "socket.h" +#include "socketparms.h" +#include "bytestream.h" + +namespace messageqcpp +{ +class IOSocket; + +// This class is a dummy replacement for a TCP socket +// wrapper to communicate with the same node. +class SameNodePseudoSocket : public Socket +{ + public: + explicit SameNodePseudoSocket(joblist::DistributedEngineComm* exeMgrDecPtr); + virtual ~SameNodePseudoSocket(); + virtual void write(SBS msg, Stats* stats = NULL); + + private: + virtual void bind(const sockaddr* serv_addr); + SameNodePseudoSocket(const SameNodePseudoSocket& rhs); + virtual SameNodePseudoSocket& operator=(const SameNodePseudoSocket& rhs); + + virtual void connectionTimeout(const struct ::timespec* timeout) + { + } + + virtual void syncProto(bool use) + { + } + + int getConnectionNum() const + { + return 1; + } + + inline virtual void socketParms(const SocketParms& socket) + { + } + + inline virtual const SocketParms socketParms() const + { + return SocketParms(); + } + + // all these virtual methods are to stay inaccessable. + inline virtual void sa(const sockaddr* sa); + virtual void open(); + virtual void close(); + inline virtual bool isOpen() const; + virtual const SBS read(const struct timespec* timeout = 0, bool* isTimeOut = NULL, + Stats* stats = NULL) const; + virtual void write(const ByteStream& msg, Stats* stats = NULL); + virtual void write_raw(const ByteStream& msg, Stats* stats = NULL) const; + virtual void listen(int backlog = 5); + virtual const IOSocket accept(const struct timespec* timeout = 0); + virtual void connect(const sockaddr* serv_addr); + virtual Socket* clone() const; + virtual const std::string toString() const; + virtual const std::string addr2String() const; + virtual bool isSameAddr(const Socket* rhs) const; + virtual bool isSameAddr(const struct in_addr& ipv4Addr) const; + static int ping(const std::string& ipaddr, const struct timespec* timeout = 0); + virtual bool isConnected() const; + virtual bool hasData() const; + + joblist::DistributedEngineComm* dec_ = nullptr; +}; + +inline bool SameNodePseudoSocket::isOpen() const +{ + return true; +} + +inline void SameNodePseudoSocket::sa(const sockaddr* sa) +{ +} + +} // namespace messageqcpp diff --git a/utils/threadpool/fair_threadpool.cpp b/utils/threadpool/fair_threadpool.cpp index cfd0350fd..75c4b7e42 100644 --- a/utils/threadpool/fair_threadpool.cpp +++ b/utils/threadpool/fair_threadpool.cpp @@ -228,7 +228,7 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue { // to avoid excessive CPU usage waiting for data from storage usleep(500); - runList[0].weight_ += RescheduleWeightIncrement; + runList[0].weight_ += (runList[0].weight_) ? runList[0].weight_ : RescheduleWeightIncrement; addJob(runList[0]); } } @@ -259,7 +259,8 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue if (running) { - sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + error_handling::sendErrorMsg(logging::primitiveServerErr, runList[0].uniqueID_, runList[0].stepID_, + runList[0].sock_); } } catch (...) @@ -291,7 +292,8 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue #endif if (running) - sendErrorMsg(runList[0].uniqueID_, runList[0].stepID_, runList[0].sock_); + error_handling::sendErrorMsg(logging::primitiveServerErr, runList[0].uniqueID_, runList[0].stepID_, + runList[0].sock_); } catch (...) { @@ -301,21 +303,6 @@ void FairThreadPool::threadFcn(const PriorityThreadPool::Priority preferredQueue } } -void FairThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock) -{ - ISMPacketHeader ism; - PrimitiveHeader ph = {0, 0, 0, 0, 0, 0}; - - ism.Status = logging::primitiveServerErr; - ph.UniqueID = id; - ph.StepID = step; - messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); - msg.append((uint8_t*)&ism, sizeof(ism)); - msg.append((uint8_t*)&ph, sizeof(ph)); - - sock->write(msg); -} - void FairThreadPool::stop() { stop_.store(true, std::memory_order_relaxed); diff --git a/utils/threadpool/fair_threadpool.h b/utils/threadpool/fair_threadpool.h index 24adb1aed..a3c9bc6b1 100644 --- a/utils/threadpool/fair_threadpool.h +++ b/utils/threadpool/fair_threadpool.h @@ -76,19 +76,7 @@ class FairThreadPool , id_(id) { } - // sock_ is nullptr here. This is kinda dangerous. - Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx, - const boost::shared_ptr& functor, const uint32_t weight = 1, const uint32_t priority = 0, - const uint32_t id = 0) - : uniqueID_(uniqueID) - , stepID_(stepID) - , txnIdx_(txnIdx) - , functor_(functor) - , weight_(weight) - , priority_(priority) - , id_(id) - { - } + uint32_t uniqueID_; uint32_t stepID_; TransactionIdxT txnIdx_; diff --git a/utils/threadpool/prioritythreadpool.cpp b/utils/threadpool/prioritythreadpool.cpp index 908c0ba97..946d7b08d 100644 --- a/utils/threadpool/prioritythreadpool.cpp +++ b/utils/threadpool/prioritythreadpool.cpp @@ -21,7 +21,6 @@ * ***********************************************************************/ -#include #include #include using namespace std; @@ -36,6 +35,32 @@ using namespace boost; #include "dbcon/joblist/primitivemsg.h" +namespace error_handling +{ +messageqcpp::SBS makePrimitiveErrorMsg(const uint16_t status, const uint32_t id, const uint32_t step) +{ + ISMPacketHeader ism; + ism.Status = status; + + PrimitiveHeader ph = {0, 0, 0, step, id, 0}; + + messageqcpp::SBS errorMsg(new messageqcpp::ByteStream(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader))); + errorMsg->append((uint8_t*)&ism, sizeof(ism)); + errorMsg->append((uint8_t*)&ph, sizeof(ph)); + + return errorMsg; +} + +void sendErrorMsg(const uint16_t status, const uint32_t id, const uint32_t step, + primitiveprocessor::SP_UM_IOSOCK sock) +{ + auto errorMsg = error_handling::makePrimitiveErrorMsg(status, id, step); + + sock->write(errorMsg); +} + +} // namespace error_handling + namespace threadpool { PriorityThreadPool::PriorityThreadPool(uint targetWeightPerRun, uint highThreads, uint midThreads, @@ -267,7 +292,8 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() #endif if (running) - sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock); + error_handling::sendErrorMsg(logging::primitiveServerErr, runList[i].uniqueID, runList[i].stepID, + runList[i].sock); } catch (...) { @@ -293,7 +319,8 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() #endif if (running) - sendErrorMsg(runList[i].uniqueID, runList[i].stepID, runList[i].sock); + error_handling::sendErrorMsg(logging::primitiveServerErr, runList[i].uniqueID, runList[i].stepID, + runList[i].sock); } catch (...) { @@ -301,21 +328,6 @@ void PriorityThreadPool::threadFcn(const Priority preferredQueue) throw() } } -void PriorityThreadPool::sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock) -{ - ISMPacketHeader ism; - PrimitiveHeader ph = {0, 0, 0, 0, 0, 0}; - - ism.Status = logging::primitiveServerErr; - ph.UniqueID = id; - ph.StepID = step; - messageqcpp::ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); - msg.append((uint8_t*)&ism, sizeof(ism)); - msg.append((uint8_t*)&ph, sizeof(ph)); - - sock->write(msg); -} - void PriorityThreadPool::stop() { _stop = true; diff --git a/utils/threadpool/prioritythreadpool.h b/utils/threadpool/prioritythreadpool.h index ba1acc33d..9f7adc3df 100644 --- a/utils/threadpool/prioritythreadpool.h +++ b/utils/threadpool/prioritythreadpool.h @@ -24,11 +24,6 @@ #pragma once -#include -#include -#include -#include -#include #include #include #include @@ -36,10 +31,20 @@ #include #include #include "primitives/primproc/umsocketselector.h" -#include "atomicops.h" + +namespace error_handling +{ + +messageqcpp::SBS makePrimitiveErrorMsg(const uint16_t status, const uint32_t id, const uint32_t step); +void sendErrorMsg(const uint16_t status, const uint32_t id, const uint32_t step, + primitiveprocessor::SP_UM_IOSOCK sock); +} // namespace error_handling namespace threadpool { + +using TransactionIdxT = uint32_t; + class PriorityThreadPool { public: @@ -57,12 +62,25 @@ class PriorityThreadPool Job() : weight(1), priority(0), id(0) { } + Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx, + const boost::shared_ptr& functor, const primitiveprocessor::SP_UM_IOSOCK& sock, + const uint32_t weight = 1, const uint32_t priority = 0, const uint32_t id = 0) + : functor(functor) + , weight(weight) + , priority(priority) + , id(id) + , stepID(stepID) + , uniqueID(uniqueID) + , sock(sock) + { + } + boost::shared_ptr functor; uint32_t weight; uint32_t priority; uint32_t id; - uint32_t uniqueID; uint32_t stepID; + uint32_t uniqueID; primitiveprocessor::SP_UM_IOSOCK sock; }; @@ -113,7 +131,7 @@ class PriorityThreadPool { return blockedThreads; } - + protected: private: struct ThreadHelper @@ -135,7 +153,6 @@ class PriorityThreadPool Priority pickAQueue(Priority preference); void threadFcn(const Priority preferredQueue) throw(); - void sendErrorMsg(uint32_t id, uint32_t step, primitiveprocessor::SP_UM_IOSOCK sock); std::list jobQueues[3]; // higher indexes = higher priority uint32_t threadCounts[3];