diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 0da2fb0dd..29d8577bf 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -38,6 +38,7 @@ #include #include #include +#include using namespace std; #include @@ -186,7 +187,13 @@ DistributedEngineComm* DistributedEngineComm::fInstance = 0; DistributedEngineComm* DistributedEngineComm::instance(ResourceManager* rm, bool isExeMgr) { if (fInstance == 0) + { fInstance = new DistributedEngineComm(rm, isExeMgr); + if (isExeMgr && fInstance) + { + fInstance->getLocalNetIfacesSins(); + } + } return fInstance; } @@ -201,6 +208,10 @@ void DistributedEngineComm::reset() DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr) : fRm(rm), pmCount(0), fIsExeMgr(isExeMgr) { + if (fIsExeMgr) + { + getLocalNetIfacesSins(); + } Setup(); } @@ -268,6 +279,10 @@ void DistributedEngineComm::Setup() size_t connectionId = i % newPmCount; boost::shared_ptr cl(new MessageQueueClient( pmsAddressesAndPorts[connectionId].first, pmsAddressesAndPorts[connectionId].second)); + if (clientAtTheSameHost(cl)) + { + cl->atTheSameHost(true); + } boost::shared_ptr nl(new boost::mutex()); try @@ -410,8 +425,6 @@ Error: if (fIsExeMgr) { - // std::cout << "WARNING: DEC READ 0 LENGTH BS FROM " - // << client->otherEnd()<< " OR GOT AN EXCEPTION READING" << std::endl; decltype(pmCount) originalPMCount = pmCount; // Re-establish if a remote PM restarted. std::this_thread::sleep_for(std::chrono::seconds(3)); @@ -700,11 +713,11 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, if (l_msgCount > 0) { - ByteStream msg(sizeof(ISMPacketHeader)); + SBS msg(new ByteStream(sizeof(ISMPacketHeader))); uint16_t* toAck; vector pmAcked(pmCount, false); - ism = (ISMPacketHeader*)msg.getInputPtr(); + ism = (ISMPacketHeader*)msg->getInputPtr(); // The only var checked by ReadThread is the Command var. The others // are wasted space. We hijack the Size, & Flags fields for the // params to the ACK msg. @@ -713,7 +726,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, ism->Command = BATCH_PRIMITIVE_ACK; toAck = &ism->Size; - msg.advanceInputPtr(sizeof(ISMPacketHeader)); + msg->advanceInputPtr(sizeof(ISMPacketHeader)); while (l_msgCount > 0) { @@ -819,8 +832,8 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr mqe, uint32_t max void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boost::shared_ptr mqe) { mqe->throttled = enabled; - ByteStream msg(sizeof(ISMPacketHeader)); - ISMPacketHeader* ism = (ISMPacketHeader*)msg.getInputPtr(); + SBS msg(new ByteStream(sizeof(ISMPacketHeader))); + ISMPacketHeader* ism = (ISMPacketHeader*)msg->getInputPtr(); ism->Interleave = uniqueID; ism->Command = BATCH_PRIMITIVE_ACK; @@ -834,15 +847,15 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos ism->Status = 0; #endif - msg.advanceInputPtr(sizeof(ISMPacketHeader)); + msg->advanceInputPtr(sizeof(ISMPacketHeader)); for (uint32_t i = 0; i < mqe->pmCount; i++) writeToClient(i, msg); } -void DistributedEngineComm::write(uint32_t senderID, ByteStream& msg) +void DistributedEngineComm::write(uint32_t senderID, const SBS& msg) { - ISMPacketHeader* ism = (ISMPacketHeader*)msg.buf(); + ISMPacketHeader* ism = (ISMPacketHeader*)msg->buf(); uint32_t dest; uint32_t numConn = fPmConnections.size(); @@ -852,7 +865,7 @@ void DistributedEngineComm::write(uint32_t senderID, ByteStream& msg) { case BATCH_PRIMITIVE_CREATE: /* Disable flow control initially */ - msg << (uint32_t)-1; + *msg << (uint32_t)-1; /* FALLTHRU */ case BATCH_PRIMITIVE_DESTROY: @@ -1007,7 +1020,40 @@ void DistributedEngineComm::doHasBigMsgs(boost::shared_ptr mqe, uint64_t ta mqe->targetQueueSize = targetSize; } -int DistributedEngineComm::writeToClient(size_t aPMIndex, const ByteStream& bs, uint32_t senderUniqueID, +DistributedEngineComm::SBSVector& DistributedEngineComm::readLocalQueueMessagesOrWait( + SBSVector& receivedMessages) +{ + for (;;) + { + std::unique_lock exchangeLock(inMemoryEM2PPExchMutex_); + if (inMemoryEM2PPExchQueue_.empty()) + { + inMemoryEM2PPExchCV_.wait(exchangeLock); + continue; + } + + // Batch processing to reduce the crit section + while (!inMemoryEM2PPExchQueue_.empty()) + { + receivedMessages.push_back(inMemoryEM2PPExchQueue_.front()); + inMemoryEM2PPExchQueue_.pop(); + } + exchangeLock.unlock(); + break; + } + + return receivedMessages; +} + +void DistributedEngineComm::pushToTheLocalQueueAndNotifyRecv(const messageqcpp::SBS& bs) +{ + std::unique_lock exchangeLock(inMemoryEM2PPExchMutex_); + inMemoryEM2PPExchQueue_.push(bs); + exchangeLock.unlock(); + inMemoryEM2PPExchCV_.notify_one(); +} + +int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_t senderUniqueID, bool doInterleaving) { boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t()); @@ -1020,6 +1066,14 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const ByteStream& bs, return 0; uint32_t connectionId = aPMIndex; + assert(connectionId < fPmConnections.size()); + // EM-PP exchange via the queue. + if (fPmConnections[connectionId]->atTheSameHost() && fIsExeMgr) + { + pushToTheLocalQueueAndNotifyRecv(bs); + return 0; + } + if (senderUniqueID != numeric_limits::max()) { lk.lock(); @@ -1183,4 +1237,39 @@ uint32_t DistributedEngineComm::MQE::getNextConnectionId(const size_t pmIndex, return nextConnectionId; } +template +bool DistributedEngineComm::clientAtTheSameHost(T& client) const +{ + for (auto& sin : localNetIfaceSins_) + { + if (client->isSameAddr(sin)) + { + return true; + } + } + return false; +} + +void DistributedEngineComm::getLocalNetIfacesSins() +{ + struct ifaddrs* netIfacesList = nullptr; + struct ifaddrs* ifaceListMembPtr = nullptr; + int success = 0; + // retrieve the current interfaces - returns 0 on success + success = getifaddrs(&netIfacesList); + if (success == 0) + { + ifaceListMembPtr = netIfacesList; + for (; ifaceListMembPtr; ifaceListMembPtr = ifaceListMembPtr->ifa_next) + { + if (ifaceListMembPtr->ifa_addr->sa_family == AF_INET) + { + localNetIfaceSins_.push_back(((struct sockaddr_in*)ifaceListMembPtr->ifa_addr)->sin_addr); + } + } + } + freeifaddrs(netIfacesList); +} +template bool DistributedEngineComm::clientAtTheSameHost( + SharedPtrEMSock& client) const; } // namespace joblist diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 1aa192df1..ad2692d40 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -32,6 +32,8 @@ #pragma once +#include +#include #include #include #include @@ -84,7 +86,11 @@ class DECEventListener */ class DistributedEngineComm { + using SharedPtrEMSock = boost::shared_ptr; + public: + using SBSVector = std::vector; + /** * Constructors */ @@ -139,7 +145,7 @@ class DistributedEngineComm * Writes a primitive message to a primitive server. Msg needs to conatin an ISMPacketHeader. The * LBID is extracted from the ISMPacketHeader and used to determine the actual P/M to send to. */ - EXPORT void write(uint32_t key, messageqcpp::ByteStream& msg); + EXPORT void write(uint32_t key, const messageqcpp::SBS& msg); // EXPORT void throttledWrite(const messageqcpp::ByteStream& msg); @@ -206,8 +212,13 @@ class DistributedEngineComm return fIsExeMgr; } + template + bool clientAtTheSameHost(T& client) const; + void getLocalNetIfacesSins(); + messageqcpp::Stats getNetworkStats(uint32_t uniqueID); void addDataToOutput(messageqcpp::SBS sbs); + SBSVector& readLocalQueueMessagesOrWait(SBSVector&); friend class ::TestDistributedEngineComm; @@ -261,9 +272,10 @@ class DistributedEngineComm * * Continues trying to write data to the client at the next index until all clients have been tried. */ - int writeToClient(size_t index, const messageqcpp::ByteStream& bs, + int writeToClient(size_t index, const messageqcpp::SBS& bs, uint32_t senderID = std::numeric_limits::max(), bool doInterleaving = false); + void pushToTheLocalQueueAndNotifyRecv(const messageqcpp::SBS& bs); static DistributedEngineComm* fInstance; ResourceManager* fRm; @@ -300,6 +312,11 @@ class DistributedEngineComm void setFlowControl(bool enable, uint32_t uniqueID, boost::shared_ptr mqe); void doHasBigMsgs(boost::shared_ptr mqe, uint64_t targetSize); boost::mutex ackLock; + + std::vector localNetIfaceSins_; + std::mutex inMemoryEM2PPExchMutex_; + std::condition_variable inMemoryEM2PPExchCV_; + std::queue inMemoryEM2PPExchQueue_; }; } // namespace joblist diff --git a/dbcon/joblist/pdictionaryscan.cpp b/dbcon/joblist/pdictionaryscan.cpp index 164ac157c..dc3c1cf1f 100644 --- a/dbcon/joblist/pdictionaryscan.cpp +++ b/dbcon/joblist/pdictionaryscan.cpp @@ -325,7 +325,6 @@ void pDictionaryScan::sendPrimitiveMessages() LBIDRange_v::iterator it; HWM_t hwm; uint32_t fbo; - ByteStream primMsg(65536); DBRM dbrm; uint16_t dbroot; uint32_t partNum; @@ -391,8 +390,7 @@ void pDictionaryScan::sendPrimitiveMessages() } } - sendAPrimitiveMessage(primMsg, msgLbidStart, msgLbidCount, (*dbRootConnectionMap)[dbroot]); - primMsg.restart(); + sendAPrimitiveMessage(msgLbidStart, msgLbidCount, (*dbRootConnectionMap)[dbroot]); mutex.lock(); msgsSent += msgLbidCount; @@ -453,8 +451,7 @@ void pDictionaryScan::sendError(uint16_t s) //------------------------------------------------------------------------------ // Construct and send a single primitive message to primproc //------------------------------------------------------------------------------ -void pDictionaryScan::sendAPrimitiveMessage(ByteStream& primMsg, BRM::LBID_t msgLbidStart, - uint32_t msgLbidCount, uint16_t pm) +void pDictionaryScan::sendAPrimitiveMessage(BRM::LBID_t msgLbidStart, uint32_t msgLbidCount, uint16_t pm) { DictTokenByScanRequestHeader hdr; void* hdrp = static_cast(&hdr); @@ -500,11 +497,11 @@ void pDictionaryScan::sendAPrimitiveMessage(ByteStream& primMsg, BRM::LBID_t msg * than putting it in the middle or at the end in terms of simplicity & memory usage, * given the current code. */ - - primMsg.load((const uint8_t*)&hdr, sizeof(DictTokenByScanRequestHeader)); - primMsg << fVerId; - primMsg.append((const uint8_t*)&hdr, sizeof(DictTokenByScanRequestHeader)); - primMsg += fFilterString; + SBS primMsg(new ByteStream(hdr.ism.Size)); + primMsg->load((const uint8_t*)&hdr, sizeof(DictTokenByScanRequestHeader)); + *primMsg << fVerId; + primMsg->append((const uint8_t*)&hdr, sizeof(DictTokenByScanRequestHeader)); + *primMsg += fFilterString; // cout << "Sending rqst LBIDS " << msgLbidStart // << " hdr.Count " << hdr.Count @@ -852,7 +849,7 @@ void pDictionaryScan::appendFilter(const messageqcpp::ByteStream& filter, unsign void pDictionaryScan::serializeEqualityFilter() { - ByteStream msg; + SBS msg(new ByteStream()); ISMPacketHeader ism; uint32_t i; vector empty; @@ -860,13 +857,13 @@ void pDictionaryScan::serializeEqualityFilter() void* ismp = static_cast(&ism); memset(ismp, 0, sizeof(ISMPacketHeader)); ism.Command = DICT_CREATE_EQUALITY_FILTER; - msg.load((uint8_t*)&ism, sizeof(ISMPacketHeader)); - msg << uniqueID; - msg << (uint32_t)colType().charsetNumber; - msg << (uint32_t)equalityFilter.size(); + msg->load((uint8_t*)&ism, sizeof(ISMPacketHeader)); + *msg << uniqueID; + *msg << (uint32_t)colType().charsetNumber; + *msg << (uint32_t)equalityFilter.size(); for (i = 0; i < equalityFilter.size(); i++) - msg << equalityFilter[i]; + *msg << equalityFilter[i]; try { @@ -884,14 +881,14 @@ void pDictionaryScan::serializeEqualityFilter() void pDictionaryScan::destroyEqualityFilter() { - ByteStream msg; + SBS msg(new ByteStream()); ISMPacketHeader ism; void* ismp = static_cast(&ism); memset(ismp, 0, sizeof(ISMPacketHeader)); ism.Command = DICT_DESTROY_EQUALITY_FILTER; - msg.load((uint8_t*)&ism, sizeof(ISMPacketHeader)); - msg << uniqueID; + msg->load((uint8_t*)&ism, sizeof(ISMPacketHeader)); + *msg << uniqueID; try { @@ -918,13 +915,13 @@ void pDictionaryScan::abort() uint16_t pDictionaryScan::planFlagsToPrimFlags(uint32_t planFlags) { uint16_t flags = 0; - + if (planFlags & CalpontSelectExecutionPlan::TRACE_LBIDS) flags |= PF_LBID_TRACE; - + if (planFlags & CalpontSelectExecutionPlan::PM_PROFILE) flags |= PF_PM_PROF; - + return flags; } diff --git a/dbcon/joblist/primitivestep.h b/dbcon/joblist/primitivestep.h index 87187e360..6901ca4d7 100644 --- a/dbcon/joblist/primitivestep.h +++ b/dbcon/joblist/primitivestep.h @@ -93,7 +93,6 @@ enum PrimitiveStepType AGGRFILTERSTEP }; - class pColScanStep; class pColStep : public JobStep { @@ -345,19 +344,25 @@ class pColScanStep : public JobStep const execplan::CalpontSystemCatalog::ColType& ct, const JobInfo& jobInfo); pColScanStep(const pColStep& rhs); - ~pColScanStep(){} + ~pColScanStep() + { + } /** @brief Starts processing. * * Starts processing. */ - virtual void run(){} + virtual void run() + { + } /** @brief Sync's the caller with the end of execution. * * Does nothing. Returns when this instance is finished. */ - virtual void join(){} + virtual void join() + { + } virtual bool isDictCol() const { @@ -386,7 +391,7 @@ class pColScanStep : public JobStep { fBOP = BOP; } - + int8_t BOP() const { return fBOP; @@ -548,18 +553,24 @@ class pDictionaryStep : public JobStep pDictionaryStep(execplan::CalpontSystemCatalog::OID oid, execplan::CalpontSystemCatalog::OID tabelOid, const execplan::CalpontSystemCatalog::ColType& ct, const JobInfo& jobInfo); - virtual ~pDictionaryStep(){} + virtual ~pDictionaryStep() + { + } /** @brief virtual void Run method */ - virtual void run(){} - virtual void join(){} + virtual void run() + { + } + virtual void join() + { + } // void setOutList(StringDataList* rids); void setInputList(DataList_t* rids) { requestList = rids; } - + void setBOP(int8_t b) { fBOP = b; @@ -794,8 +805,7 @@ class pDictionaryScan : public JobStep void startPrimitiveThread(); void startAggregationThread(); void initializeConfigParms(); - void sendAPrimitiveMessage(messageqcpp::ByteStream& primMsg, BRM::LBID_t msgLbidStart, - uint32_t msgLbidCount, uint16_t dbroot); + void sendAPrimitiveMessage(BRM::LBID_t msgLbidStart, uint32_t msgLbidCount, uint16_t dbroot); void formatMiniStats(); DistributedEngineComm* fDec; @@ -891,9 +901,9 @@ class BatchPrimitive : public JobStep, public DECEventListener struct _CPInfo { _CPInfo(int64_t MIN, int64_t MAX, uint64_t l, bool dictScan, bool val) - : min(MIN), max(MAX), LBID(l), valid(val), dictScan(dictScan) {}; + : min(MIN), max(MAX), LBID(l), valid(val), dictScan(dictScan){}; _CPInfo(int128_t BIGMIN, int128_t BIGMAX, uint64_t l, bool val) - : bigMin(BIGMIN), bigMax(BIGMAX), LBID(l), valid(val), dictScan(false) {}; + : bigMin(BIGMIN), bigMax(BIGMAX), LBID(l), valid(val), dictScan(false){}; union { int128_t bigMin; @@ -1508,7 +1518,7 @@ class FilterStep : public JobStep protected: // void unblockDataLists(FifoDataList* fifo, StringFifoDataList* strFifo, StrDataList* strResult, - //DataList_t* result); + // DataList_t* result); private: // This i/f is not meaningful in this step diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index 624031f09..d73cd299a 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -737,12 +737,12 @@ TupleBPS::~TupleBPS() if (BPPIsAllocated) { - ByteStream bs; - fBPP->destroyBPP(bs); + SBS sbs{new ByteStream()}; + fBPP->destroyBPP(*sbs); try { - fDec->write(uniqueID, bs); + fDec->write(uniqueID, sbs); } catch (const std::exception& e) { @@ -1093,8 +1093,8 @@ void TupleBPS::startProcessingThread(TupleBPS* tbps, vectornextTupleJoinerMsg(bs); + more = fBPP->nextTupleJoinerMsg(*sbs); } #ifdef JLF_DEBUG cout << "serializing joiner into " << bs.length() << " bytes" << endl; #endif - fDec->write(uniqueID, bs); - bs.restart(); + fDec->write(uniqueID, sbs); + sbs.reset(new ByteStream()); } } +// Outdated method void TupleBPS::serializeJoiner(uint32_t conn) { // We need this lock for TupleBPS::serializeJoiner() @@ -1376,7 +1377,7 @@ void TupleBPS::run() std::string("TupleBPS")); // step name } - ByteStream bs; + SBS sbs{new ByteStream()}; if (fDelivery) { @@ -1406,8 +1407,8 @@ void TupleBPS::run() { fDec->addDECEventListener(this); fBPP->priority(priority()); - fBPP->createBPP(bs); - fDec->write(uniqueID, bs); + fBPP->createBPP(*sbs); + fDec->write(uniqueID, sbs); BPPIsAllocated = true; if (doJoin && tjoiners[0]->inPM()) @@ -1453,13 +1454,13 @@ void TupleBPS::join() if (BPPIsAllocated) { - ByteStream bs; + SBS sbs{new ByteStream()}; fDec->removeDECEventListener(this); - fBPP->destroyBPP(bs); + fBPP->destroyBPP(*sbs); try { - fDec->write(uniqueID, bs); + fDec->write(uniqueID, sbs); } catch (...) { @@ -1476,10 +1477,10 @@ void TupleBPS::join() void TupleBPS::sendError(uint16_t status) { - ByteStream msgBpp; + SBS msgBpp; fBPP->setCount(1); fBPP->setStatus(status); - fBPP->runErrorBPP(msgBpp); + fBPP->runErrorBPP(*msgBpp); try { @@ -1602,7 +1603,7 @@ void TupleBPS::sendJobs(const vector& jobs) for (i = 0; i < jobs.size() && !cancelled(); i++) { - fDec->write(uniqueID, *(jobs[i].msg)); + fDec->write(uniqueID, jobs[i].msg); tplLock.lock(); msgsSent += jobs[i].expectedResponses; @@ -2613,15 +2614,15 @@ void TupleBPS::receiveMultiPrimitiveMessages() dlTimes.setEndOfInputTime(); } - ByteStream bs; + SBS sbs{new ByteStream()}; try { if (BPPIsAllocated) { fDec->removeDECEventListener(this); - fBPP->destroyBPP(bs); - fDec->write(uniqueID, bs); + fBPP->destroyBPP(*sbs); + fDec->write(uniqueID, sbs); BPPIsAllocated = false; } } @@ -3302,12 +3303,12 @@ void TupleBPS::abort_nolock() if (fDec && BPPIsAllocated) { - ByteStream bs; - fBPP->abortProcessing(&bs); + SBS sbs{new ByteStream()}; + fBPP->abortProcessing(sbs.get()); try { - fDec->write(uniqueID, bs); + fDec->write(uniqueID, sbs); } catch (...) { diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 1c800ea84..7b0b78534 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -29,6 +29,7 @@ // // +#include #include #include #include @@ -1888,7 +1889,7 @@ void BatchPrimitiveProcessor::execute() } else { - // We hae no more use for this allocation + // We have no more use for this allocation for (i = 0; i < joinerCount; i++) for (j = 0; j < ridCount; ++j) tSmallSideMatches[i][j].clear(); @@ -2145,12 +2146,12 @@ void BatchPrimitiveProcessor::serializeStrings() void BatchPrimitiveProcessor::sendResponse() { - bool isLocalNodeConnection = exemgr::globServiceExeMgr->isLocalNodeSock(sock); - // Here is the fast path for local EM to PM interacction. PM puts into the + auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); + // Here is the fast path for local EM to PM interaction. PM puts into the // input EM DEC queue directly. - if (initiatedByEM_ && isLocalNodeConnection) + // !sock has a 'same host connection' semantics here. + if (initiatedByEM_ && (!sock || exeMgrDecPtr->clientAtTheSameHost(sock))) { - joblist::DistributedEngineComm* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); exeMgrDecPtr->addDataToOutput(serialized); serialized.reset(); return; diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index fffc0d971..a280c4369 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -21,13 +21,16 @@ * * ***********************************************************************/ + #define _FILE_OFFSET_BITS 64 #define _LARGEFILE64_SOURCE #include #include #include #include +#include #include + //#define NDEBUG #include #include @@ -49,7 +52,8 @@ using namespace std; #include #include using namespace boost; - +#include "distributedenginecomm.h" +#include "serviceexemgr.h" #include "primproc.h" #include "primitiveserver.h" #include "primitivemsg.h" @@ -1055,7 +1059,7 @@ class DictScanJob : public threadpool::FairThreadPool::Functor DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock); virtual ~DictScanJob(); - void write(const ByteStream&); + void write(const SBS&); int operator()(); void catchHandler(const std::string& ex, uint32_t id, uint16_t code = logging::primitiveServerErr); void sendErrorMsg(uint32_t id, uint16_t code); @@ -1077,17 +1081,27 @@ DictScanJob::~DictScanJob() { } -void DictScanJob::write(const ByteStream& bs) +void DictScanJob::write(const SBS& sbs) { + // Here is the fast path for local EM to PM interaction. PM puts into the + // input EM DEC queue directly. + // !sock has a 'same host connection' semantics here. + if (!fIos) + { + auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); + exeMgrDecPtr->addDataToOutput(sbs); + return; + } boost::mutex::scoped_lock lk(*fWriteLock); - fIos->write(bs); + fIos->write(*sbs); } int DictScanJob::operator()() { utils::setThreadName("PPDictScanJob"); uint8_t data[DATA_BLOCK_SIZE]; - uint32_t output_buf_size = MAX_BUFFER_SIZE; + // Reducing this buffer one might face unrelated issues in DictScanStep. + const uint32_t output_buf_size = MAX_BUFFER_SIZE; uint32_t session; uint32_t uniqueId = 0; bool wasBlockInCache = false; @@ -1095,7 +1109,6 @@ int DictScanJob::operator()() uint16_t runCount; boost::shared_ptr eqFilter; - ByteStream results(output_buf_size); TokenByScanRequestHeader* cmd; PrimitiveProcessor pproc(gDebugLevel); TokenByScanResultHeader* output; @@ -1114,7 +1127,6 @@ int DictScanJob::operator()() session = cmd->Hdr.SessionID; uniqueId = cmd->Hdr.UniqueID; runCount = cmd->Count; - output = (TokenByScanResultHeader*)results.getInputPtr(); #ifdef VALGRIND memset(output, 0, sizeof(TokenByScanResultHeader)); #endif @@ -1145,6 +1157,9 @@ int DictScanJob::operator()() for (uint16_t i = 0; i < runCount; ++i) { + SBS results(new ByteStream(output_buf_size)); + output = (TokenByScanResultHeader*)results->getInputPtr(); + loadBlock(cmd->LBID, verInfo, cmd->Hdr.TransactionID, cmd->CompType, data, &wasBlockInCache, &blocksRead, fLBIDTraceOn, session); pproc.setBlockPtr((int*)data); @@ -1155,9 +1170,8 @@ int DictScanJob::operator()() else output->PhysicalIO += blocksRead; - results.advanceInputPtr(output->NBYTES); + results->advanceInputPtr(output->NBYTES); write(results); - results.restart(); cmd->LBID++; } @@ -1199,9 +1213,9 @@ void DictScanJob::sendErrorMsg(uint32_t id, uint16_t code) ism.Status = code; ph.UniqueID = id; - ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)); - msg.append((uint8_t*)&ism, sizeof(ism)); - msg.append((uint8_t*)&ph, sizeof(ph)); + SBS msg(new ByteStream(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader))); + msg->append((uint8_t*)&ism, sizeof(ism)); + msg->append((uint8_t*)&ph, sizeof(ph)); write(msg); } @@ -1940,6 +1954,128 @@ struct ReadThread ios->write(buildCacheOpResp(0)); } + static void dispatchPrimitive(SBS sbs, boost::shared_ptr& fBPPHandler, + boost::shared_ptr& procPoolPtr, + SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock, const uint32_t processorThreads, + const bool ptTrace) + { + const ISMPacketHeader* ismHdr = reinterpret_cast(sbs->buf()); + switch (ismHdr->Command) + { + case DICT_CREATE_EQUALITY_FILTER: + case DICT_DESTROY_EQUALITY_FILTER: + case BATCH_PRIMITIVE_CREATE: + case BATCH_PRIMITIVE_ADD_JOINER: + case BATCH_PRIMITIVE_END_JOINER: + case BATCH_PRIMITIVE_DESTROY: + case BATCH_PRIMITIVE_ABORT: + { + const uint8_t* buf = sbs->buf(); + uint32_t pos = sizeof(ISMPacketHeader) - 2; + const uint32_t txnId = *((uint32_t*)&buf[pos + 2]); + const uint32_t stepID = *((uint32_t*)&buf[pos + 6]); + const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]); + const uint32_t weight = 1; + const uint32_t priority = 0; + uint32_t id = 0; + boost::shared_ptr functor; + if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER) + { + functor.reset(new CreateEqualityFilter(sbs)); + } + else if (ismHdr->Command == DICT_DESTROY_EQUALITY_FILTER) + { + functor.reset(new DestroyEqualityFilter(sbs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_CREATE) + { + functor.reset(new BPPHandler::Create(fBPPHandler, sbs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_ADD_JOINER) + { + functor.reset(new BPPHandler::AddJoiner(fBPPHandler, sbs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_END_JOINER) + { + id = fBPPHandler->getUniqueID(sbs, ismHdr->Command); + functor.reset(new BPPHandler::LastJoiner(fBPPHandler, sbs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_DESTROY) + { + id = fBPPHandler->getUniqueID(sbs, ismHdr->Command); + functor.reset(new BPPHandler::Destroy(fBPPHandler, sbs)); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_ABORT) + { + id = fBPPHandler->getUniqueID(sbs, ismHdr->Command); + functor.reset(new BPPHandler::Abort(fBPPHandler, sbs)); + } + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, weight, priority, id); + procPoolPtr->addJob(job); + break; + } + + case DICT_TOKEN_BY_SCAN_COMPARE: + case BATCH_PRIMITIVE_RUN: + { + TokenByScanRequestHeader* hdr = nullptr; + boost::shared_ptr functor; + uint32_t id = 0; + uint32_t weight = 0; + uint32_t priority = 0; + uint32_t txnId = 0; + uint32_t stepID = 0; + uint32_t uniqueID = 0; + + if (ismHdr->Command == DICT_TOKEN_BY_SCAN_COMPARE) + { + idbassert(sbs->length() >= sizeof(TokenByScanRequestHeader)); + hdr = (TokenByScanRequestHeader*)ismHdr; + functor.reset(new DictScanJob(outIos, sbs, writeLock)); + id = hdr->Hdr.UniqueID; + weight = LOGICAL_BLOCK_RIDS; + priority = hdr->Hdr.Priority; + const uint8_t* buf = sbs->buf(); + const uint32_t pos = sizeof(ISMPacketHeader) - 2; + txnId = *((uint32_t*)&buf[pos + 2]); + stepID = *((uint32_t*)&buf[pos + 6]); + uniqueID = *((uint32_t*)&buf[pos + 10]); + } + else if (ismHdr->Command == BATCH_PRIMITIVE_RUN) + { + functor.reset(new BPPSeeder(sbs, writeLock, outIos, processorThreads, ptTrace)); + BPPSeeder* bpps = dynamic_cast(functor.get()); + id = bpps->getID(); + priority = bpps->priority(); + const uint8_t* buf = sbs->buf(); + const uint32_t pos = sizeof(ISMPacketHeader) - 2; + txnId = *((uint32_t*)&buf[pos + 2]); + stepID = *((uint32_t*)&buf[pos + 6]); + uniqueID = *((uint32_t*)&buf[pos + 10]); + weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]); + } + FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); + procPoolPtr->addJob(job); + + break; + } + + case BATCH_PRIMITIVE_ACK: + { + fBPPHandler->doAck(*sbs); + break; + } + default: + { + std::ostringstream os; + Logger log; + os << "unknown primitive cmd: " << ismHdr->Command; + log.logMessage(os.str()); + break; + } + } // the switch stmt + } + void operator()() { utils::setThreadName("PPReadThread"); @@ -1994,9 +2130,6 @@ struct ReadThread idbassert(bs->length() >= sizeof(ISMPacketHeader)); const ISMPacketHeader* ismHdr = reinterpret_cast(bs->buf()); - // uint64_t someVal = ismHdr->Command; - // std::cout << " PP read thread Command " << someVal << std::endl; - /* This switch is for the OOB commands */ switch (ismHdr->Command) { @@ -2037,139 +2170,8 @@ struct ReadThread default: break; } - - switch (ismHdr->Command) - { - case DICT_CREATE_EQUALITY_FILTER: - case DICT_DESTROY_EQUALITY_FILTER: - case BATCH_PRIMITIVE_CREATE: - case BATCH_PRIMITIVE_ADD_JOINER: - case BATCH_PRIMITIVE_END_JOINER: - case BATCH_PRIMITIVE_DESTROY: - case BATCH_PRIMITIVE_ABORT: - { - const uint8_t* buf = bs->buf(); - uint32_t pos = sizeof(ISMPacketHeader) - 2; - const uint32_t txnId = *((uint32_t*)&buf[pos + 2]); - const uint32_t stepID = *((uint32_t*)&buf[pos + 6]); - const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]); - const uint32_t weight = 1; - const uint32_t priority = 0; - uint32_t id = 0; - boost::shared_ptr functor; - if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER) - { - functor.reset(new CreateEqualityFilter(bs)); - } - else if (ismHdr->Command == DICT_DESTROY_EQUALITY_FILTER) - { - functor.reset(new DestroyEqualityFilter(bs)); - } - else if (ismHdr->Command == BATCH_PRIMITIVE_CREATE) - { - functor.reset(new BPPHandler::Create(fBPPHandler, bs)); - } - else if (ismHdr->Command == BATCH_PRIMITIVE_ADD_JOINER) - { - functor.reset(new BPPHandler::AddJoiner(fBPPHandler, bs)); - } - else if (ismHdr->Command == BATCH_PRIMITIVE_END_JOINER) - { - id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - functor.reset(new BPPHandler::LastJoiner(fBPPHandler, bs)); - } - else if (ismHdr->Command == BATCH_PRIMITIVE_DESTROY) - { - id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - functor.reset(new BPPHandler::Destroy(fBPPHandler, bs)); - } - else if (ismHdr->Command == BATCH_PRIMITIVE_ABORT) - { - id = fBPPHandler->getUniqueID(bs, ismHdr->Command); - functor.reset(new BPPHandler::Abort(fBPPHandler, bs)); - } - FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); - procPoolPtr->addJob(job); - break; - } - - case DICT_TOKEN_BY_SCAN_COMPARE: - case BATCH_PRIMITIVE_RUN: - { - TokenByScanRequestHeader* hdr = nullptr; - boost::shared_ptr functor; - uint32_t id = 0; - uint32_t weight = 0; - uint32_t priority = 0; - uint32_t txnId = 0; - uint32_t stepID = 0; - uint32_t uniqueID = 0; - - if (bRotateDest) - { - if (!pUmSocketSelector->nextIOSocket(fIos, outIos, writeLock)) - { - // If we ever fall into this part of the - // code we have a "bug" of some sort. - // See handleUmSockSelErr() for more info. - // We reset ios and mutex to defaults. - handleUmSockSelErr(string("default cmd")); - outIos = outIosDefault; - writeLock = writeLockDefault; - pUmSocketSelector->delConnection(fIos); - bRotateDest = false; - } - } - - if (ismHdr->Command == DICT_TOKEN_BY_SCAN_COMPARE) - { - idbassert(bs->length() >= sizeof(TokenByScanRequestHeader)); - hdr = (TokenByScanRequestHeader*)ismHdr; - functor.reset(new DictScanJob(outIos, bs, writeLock)); - id = hdr->Hdr.UniqueID; - weight = LOGICAL_BLOCK_RIDS; - priority = hdr->Hdr.Priority; - const uint8_t* buf = bs->buf(); - const uint32_t pos = sizeof(ISMPacketHeader) - 2; - txnId = *((uint32_t*)&buf[pos + 2]); - stepID = *((uint32_t*)&buf[pos + 6]); - uniqueID = *((uint32_t*)&buf[pos + 10]); - } - else if (ismHdr->Command == BATCH_PRIMITIVE_RUN) - { - functor.reset(new BPPSeeder(bs, writeLock, outIos, - fPrimitiveServerPtr->ProcessorThreads(), - fPrimitiveServerPtr->PTTrace())); - BPPSeeder* bpps = dynamic_cast(functor.get()); - id = bpps->getID(); - priority = bpps->priority(); - const uint8_t* buf = bs->buf(); - const uint32_t pos = sizeof(ISMPacketHeader) - 2; - txnId = *((uint32_t*)&buf[pos + 2]); - stepID = *((uint32_t*)&buf[pos + 6]); - uniqueID = *((uint32_t*)&buf[pos + 10]); - weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]); - } - FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id); - procPoolPtr->addJob(job); - - break; - } - - case BATCH_PRIMITIVE_ACK: - { - fBPPHandler->doAck(*bs); - break; - } - default: - { - std::ostringstream os; - Logger log; - os << "unknown primitive cmd: " << ismHdr->Command; - log.logMessage(os.str()); - break; - } - } // the switch stmt + dispatchPrimitive(bs, fBPPHandler, procPoolPtr, outIos, writeLock, + fPrimitiveServerPtr->ProcessorThreads(), fPrimitiveServerPtr->PTTrace()); } else // bs.length() == 0 { @@ -2213,8 +2215,6 @@ struct ReadThread boost::shared_ptr fBPPHandler; }; -/** @brief accept a primitive command from the user module - */ struct ServerThread { ServerThread(string serverName, PrimitiveServer* ps) : fServerName(serverName), fPrimitiveServerPtr(ps) @@ -2351,9 +2351,43 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace fServerpool.invoke(ServerThread(oss.str(), this)); } + startupRaceLock.release(); service->NotifyServiceStarted(); + std::thread sameHostServerThread( + [this]() + { + utils::setThreadName("PPSHServerThr"); + auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); + while (!exeMgrDecPtr) + { + sleep(1); + exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); + } + // These empty SPs have "same-host" messaging semantics. + SP_UM_IOSOCK outIos(nullptr); + SP_UM_MUTEX writeLock(nullptr); + auto procPoolPtr = this->getProcessorThreadPool(); + boost::shared_ptr fBPPHandler(new BPPHandler(this)); + for (;;) + { + joblist::DistributedEngineComm::SBSVector primitiveMsgs; + for (auto& sbs : exeMgrDecPtr->readLocalQueueMessagesOrWait(primitiveMsgs)) + { + if (sbs->length() == 0) + { + std::cout << "PPSHServerThr got an empty ByteStream." << std::endl; + continue; + } + idbassert(sbs->length() >= sizeof(ISMPacketHeader)); + + ReadThread::dispatchPrimitive(sbs, fBPPHandler, procPoolPtr, outIos, writeLock, + this->ProcessorThreads(), this->PTTrace()); + } + } + }); + fServerpool.wait(); cerr << "PrimitiveServer::start() exiting!" << endl; diff --git a/tests/fair_threadpool.cpp b/tests/fair_threadpool.cpp index a74674516..c37da24ad 100644 --- a/tests/fair_threadpool.cpp +++ b/tests/fair_threadpool.cpp @@ -15,8 +15,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -#include #include +#include +#include #include #include "utils/threadpool/fair_threadpool.h" @@ -27,6 +28,7 @@ using namespace threadpool; using ResultsType = std::vector; static ResultsType results; +static std::mutex globMutex; class FairThreadPoolTest : public testing::Test { @@ -50,6 +52,7 @@ class TestFunctor : public FairThreadPool::Functor int operator()() override { usleep(delay_); + std::lock_guard gl(globMutex); results.push_back(id_); return 0; } @@ -74,6 +77,7 @@ class TestRescheduleFunctor : public FairThreadPool::Functor return 1; // re-schedule the Job } usleep(delay_); + std::lock_guard gl(globMutex); results.push_back(id_); return 0; } diff --git a/utils/messageqcpp/messagequeue.cpp b/utils/messageqcpp/messagequeue.cpp index e168949dc..73ba6d09e 100644 --- a/utils/messageqcpp/messagequeue.cpp +++ b/utils/messageqcpp/messagequeue.cpp @@ -218,13 +218,17 @@ void MessageQueueClient::setup(bool syncProto) } MessageQueueClient::MessageQueueClient(const string& otherEnd, const string& config, bool syncProto) - : fOtherEnd(otherEnd), fConfig(Config::makeConfig(config)), fLogger(31), fIsAvailable(true) + : fOtherEnd(otherEnd) + , fConfig(Config::makeConfig(config)) + , fLogger(31) + , fIsAvailable(true) + , atTheSameHost_(false) { setup(syncProto); } MessageQueueClient::MessageQueueClient(const string& otherEnd, Config* config, bool syncProto) - : fOtherEnd(otherEnd), fConfig(config), fLogger(31), fIsAvailable(true) + : fOtherEnd(otherEnd), fConfig(config), fLogger(31), fIsAvailable(true), atTheSameHost_(false) { if (fConfig == 0) fConfig = Config::makeConfig(); @@ -233,7 +237,7 @@ MessageQueueClient::MessageQueueClient(const string& otherEnd, Config* config, b } MessageQueueClient::MessageQueueClient(const string& dnOrIp, uint16_t port, bool syncProto) - : fLogger(31), fIsAvailable(true) + : fLogger(31), fIsAvailable(true), atTheSameHost_(false) { #ifdef SKIP_IDB_COMPRESSION fClientSock.setSocketImpl(new InetStreamSocket()); diff --git a/utils/messageqcpp/messagequeue.h b/utils/messageqcpp/messagequeue.h index 99a63cbdf..f721745e6 100644 --- a/utils/messageqcpp/messagequeue.h +++ b/utils/messageqcpp/messagequeue.h @@ -275,6 +275,7 @@ class MessageQueueClient * @brief compare the addresses of 2 MessageQueueClient */ inline bool isSameAddr(const MessageQueueClient& rhs) const; + inline bool isSameAddr(const struct in_addr& ipv4Addr) const; bool isConnected() { @@ -285,6 +286,17 @@ class MessageQueueClient { return fClientSock.hasData(); } + + // This client's flag is set running DEC::Setup() call + bool atTheSameHost() const + { + return atTheSameHost_; + } + + void atTheSameHost(const bool atTheSameHost) + { + atTheSameHost_ = atTheSameHost; + } /* * allow test suite access to private data for OOB test */ @@ -312,6 +324,7 @@ class MessageQueueClient mutable IOSocket fClientSock; /// the socket to communicate with the server mutable logging::Logger fLogger; bool fIsAvailable; + bool atTheSameHost_; std::string fModuleName; }; @@ -327,6 +340,10 @@ inline bool MessageQueueClient::isSameAddr(const MessageQueueClient& rhs) const { return fClientSock.isSameAddr(&rhs.fClientSock); } +inline bool MessageQueueClient::isSameAddr(const struct in_addr& ipv4Addr) const +{ + return fClientSock.isSameAddr(ipv4Addr); +} inline void MessageQueueClient::syncProto(bool use) { fClientSock.syncProto(use); @@ -335,4 +352,3 @@ inline void MessageQueueClient::syncProto(bool use) } // namespace messageqcpp #undef EXPORT - diff --git a/utils/threadpool/fair_threadpool.h b/utils/threadpool/fair_threadpool.h index 72c4fb0ce..42a9f7fb4 100644 --- a/utils/threadpool/fair_threadpool.h +++ b/utils/threadpool/fair_threadpool.h @@ -70,6 +70,19 @@ class FairThreadPool , id_(id) { } + // sock_ is nullptr here. This is kinda dangerous. + Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx, + const boost::shared_ptr& functor, const uint32_t weight = 1, const uint32_t priority = 0, + const uint32_t id = 0) + : uniqueID_(uniqueID) + , stepID_(stepID) + , txnIdx_(txnIdx) + , functor_(functor) + , weight_(weight) + , priority_(priority) + , id_(id) + { + } uint32_t uniqueID_; uint32_t stepID_; TransactionIdxT txnIdx_;