diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index 4699042bc..483db763e 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -1243,7 +1243,7 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const * (projection count)x run msgs for projection Commands */ -void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum) +void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isExeMgrDEC) { ISMPacketHeader ism; uint32_t i; @@ -1276,6 +1276,8 @@ void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum) bs << dbRoot; bs << count; + uint8_t sentByEM = (isExeMgrDEC) ? 1 : 0; + bs << sentByEM; if (_hasScan) idbassert(ridCount == 0); diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.h b/dbcon/joblist/batchprimitiveprocessor-jl.h index a249b3102..5418dc169 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.h +++ b/dbcon/joblist/batchprimitiveprocessor-jl.h @@ -137,7 +137,7 @@ class BatchPrimitiveProcessorJL void addElementType(const StringElementType&, uint32_t dbroot); // void setRowGroupData(const rowgroup::RowGroup &); - void runBPP(messageqcpp::ByteStream&, uint32_t pmNum); + void runBPP(messageqcpp::ByteStream&, uint32_t pmNum, bool isExeMgrDEC); void abortProcessing(messageqcpp::ByteStream*); /* After serializing a BPP object, reset it and it's ready for more input */ diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 32f5ee45e..0da2fb0dd 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -199,7 +199,7 @@ void DistributedEngineComm::reset() } DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr) -: fRm(rm), pmCount(0), fIsExeMgr(isExeMgr) + : fRm(rm), pmCount(0), fIsExeMgr(isExeMgr) { Setup(); } @@ -288,7 +288,7 @@ void DistributedEngineComm::Setup() catch (std::exception& ex) { if (i < newPmCount) - newPmCount = newPmCount > 1 ? newPmCount-1 : 1; // We can't afford to reduce newPmCount to 0 + newPmCount = newPmCount > 1 ? newPmCount - 1 : 1; // We can't afford to reduce newPmCount to 0 writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId) + ": " + ex.what(), @@ -302,7 +302,7 @@ void DistributedEngineComm::Setup() catch (...) { if (i < newPmCount) - newPmCount = newPmCount > 1 ? newPmCount-1 : 1; // We can't afford to reduce newPmCount to 0 + newPmCount = newPmCount > 1 ? newPmCount - 1 : 1; // We can't afford to reduce newPmCount to 0 writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId), LOG_TYPE_ERROR); @@ -921,7 +921,7 @@ void DistributedEngineComm::StartClientListener(boost::shared_ptrbuf()); PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1); @@ -931,6 +931,39 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* boost::mutex::scoped_lock lk(fMlock); MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId); + // The message for a session that doesn't exist. + if (map_tok == fSessionMessages.end()) + { + // Here gets the dead session ByteStream that is already removed + // from DEC queue. + return; + } + + mqe = map_tok->second; + lk.unlock(); + + if (pmCount > 0) + { + // I hardcoded the unacked Worker id here. ACK isn't important + // for the local exchange b/c there is no need to + // enable flowcontrol localy on PM. + (void)atomicops::atomicInc(&mqe->unackedWork[0]); + } + + [[maybe_unused]] TSQSize_t queueSize = mqe->queue.push(sbs); + // There will be no statistics about data transfered + // over the memory. +} + +void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* stats) +{ + ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf()); + PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1); + uint32_t uniqueId = p->UniqueID; + boost::shared_ptr mqe; + boost::mutex::scoped_lock lk(fMlock); + MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId); + if (map_tok == fSessionMessages.end()) { // For debugging... @@ -1036,9 +1069,9 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const ByteStream& bs, // reconfig the connection array ClientList tempConns; { - //cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<< - endl; boost::mutex::scoped_lock onErrLock(fOnErrMutex); string moduleName = - fPmConnections[index]->moduleName(); + //cout << "WARNING: DEC WRITE BROKEN PIPE " << + fPmConnections[index]->otherEnd()<< endl; boost::mutex::scoped_lock onErrLock(fOnErrMutex); string + moduleName = fPmConnections[index]->moduleName(); //cout << "module name = " << moduleName << endl; if (index >= fPmConnections.size()) return 0; diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 0ec286289..1aa192df1 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -201,7 +201,13 @@ class DistributedEngineComm return fRm->getPsCount() * cpp; } + bool isExeMgrDEC() const + { + return fIsExeMgr; + } + messageqcpp::Stats getNetworkStats(uint32_t uniqueID); + void addDataToOutput(messageqcpp::SBS sbs); friend class ::TestDistributedEngineComm; @@ -251,7 +257,6 @@ class DistributedEngineComm * */ void addDataToOutput(messageqcpp::SBS, uint32_t connIndex, messageqcpp::Stats* statsToAdd); - /** @brief Writes data to the client at the index * * Continues trying to write data to the client at the next index until all clients have been tried. diff --git a/dbcon/joblist/tuple-bps.cpp b/dbcon/joblist/tuple-bps.cpp index f59f31b9b..624031f09 100644 --- a/dbcon/joblist/tuple-bps.cpp +++ b/dbcon/joblist/tuple-bps.cpp @@ -1047,12 +1047,13 @@ void TupleBPS::storeCasualPartitionInfo(const bool estimateRowCounts) const EMEntry& extent = colCmd->getExtents()[idx]; /* If any column filter eliminates an extent, it doesn't get scanned */ - scanFlags[idx] = scanFlags[idx] && (extent.colWid <= utils::MAXCOLUMNWIDTH) && // XXX: change to named constant. - (ignoreCP || extent.partition.cprange.isValid != BRM::CP_VALID || - colCmd->getColType().colWidth != extent.colWid || - lbidListVec[i]->CasualPartitionPredicate( - extent.partition.cprange, &(colCmd->getFilterString()), colCmd->getFilterCount(), - colCmd->getColType(), colCmd->getBOP(), colCmd->getIsDict())); + scanFlags[idx] = + scanFlags[idx] && (extent.colWid <= utils::MAXCOLUMNWIDTH) && // XXX: change to named constant. + (ignoreCP || extent.partition.cprange.isValid != BRM::CP_VALID || + colCmd->getColType().colWidth != extent.colWid || + lbidListVec[i]->CasualPartitionPredicate(extent.partition.cprange, &(colCmd->getFilterString()), + colCmd->getFilterCount(), colCmd->getColType(), + colCmd->getBOP(), colCmd->getIsDict())); } } @@ -2032,7 +2033,7 @@ void TupleBPS::makeJobs(vector* jobs) #endif startingLBID = scannedExtents[i].range.start; - + bool isExeMgrDEC = fDec->isExeMgrDEC(); while (blocksToScan > 0) { uint32_t blocksThisJob = min(blocksToScan, blocksPerJob); @@ -2040,7 +2041,7 @@ void TupleBPS::makeJobs(vector* jobs) fBPP->setLBID(startingLBID, scannedExtents[i]); fBPP->setCount(blocksThisJob); bs.reset(new ByteStream()); - fBPP->runBPP(*bs, (*dbRootConnectionMap)[scannedExtents[i].dbRoot]); + fBPP->runBPP(*bs, (*dbRootConnectionMap)[scannedExtents[i].dbRoot], isExeMgrDEC); jobs->push_back( Job(scannedExtents[i].dbRoot, (*dbRootConnectionMap)[scannedExtents[i].dbRoot], blocksThisJob, bs)); blocksToScan -= blocksThisJob; @@ -2373,7 +2374,9 @@ void TupleBPS::receiveMultiPrimitiveMessages() for (uint32_t z = 0; z < size; z++) { if (bsv[z]->length() > 0 && fBPP->countThisMsg(*(bsv[z]))) + { ++msgsRecvd; + } } //@Bug 1424,1298 diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 99224c7ac..59f298566 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -37,6 +37,7 @@ #include #include #include +#include "serviceexemgr.h" #include using namespace std; @@ -117,7 +118,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() , validCPData(false) , minVal(MAX64) , maxVal(MIN64) -, cpDataFromDictScan(false) + , cpDataFromDictScan(false) , lbidForCP(0) , hasWideColumnOut(false) , busyLoaderCount(0) @@ -140,6 +141,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() , ptMask(0) , firstInstance(false) , valuesLBID(0) + , initiatedByEM_(false) { pp.setLogicalBlockMode(true); pp.setBlockPtr((int*)blockData); @@ -193,6 +195,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch, // ptMask(processorThreads - 1), , firstInstance(true) , valuesLBID(0) + , initiatedByEM_(false) { // promote processorThreads to next power of 2. also need to change the name to bucketCount or similar processorThreads = nextPowOf2(processorThreads); @@ -544,6 +547,10 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con bs.advance(sizeof(ISMPacketHeader) + 16); bs >> dbRoot; bs >> count; + uint8_t u8 = 0; + bs >> u8; + initiatedByEM_ = u8; + bs >> ridCount; if (gotAbsRids) @@ -1647,8 +1654,9 @@ void BatchPrimitiveProcessor::execute() } // else - // cout << " no target found for OID " << projectSteps[j]->getOID() << - //endl; + // cout << " no target found for OID " << + // projectSteps[j]->getOID() + //<< endl; } if (fe2) { @@ -1764,7 +1772,7 @@ void BatchPrimitiveProcessor::execute() for (j = 0; j < projectCount; ++j) { if (projectionMap[j] != -1 && !keyColumnProj[j] && !hasJoinFEFilters && - !oldRow.isLongString(projectionMap[j])) + !oldRow.isLongString(projectionMap[j])) { #ifdef PRIMPROC_STOPWATCH stopwatch->start("-- projectIntoRowGroup"); @@ -1787,20 +1795,20 @@ void BatchPrimitiveProcessor::execute() while (moreRGs && !sendThread->aborted()) { /* - * generate 1 rowgroup (8192 rows max) of joined rows - * if there's an FE2, run it - * -pack results into a new rowgroup - * -if there are < 8192 rows in the new RG, continue - * if there's an agg, run it - * send the result - */ + * generate 1 rowgroup (8192 rows max) of joined rows + * if there's an FE2, run it + * -pack results into a new rowgroup + * -if there are < 8192 rows in the new RG, continue + * if there's an agg, run it + * send the result + */ resetGJRG(); moreRGs = generateJoinedRowGroup(baseJRow); - // smoreRGs = moreRGs; - sendCount = (uint8_t)(!moreRGs && !startRid); - // *serialized << (uint8_t)(!moreRGs && !startRid); // the "count - // this msg" var - *serialized << sendCount; + // smoreRGs = moreRGs; + sendCount = (uint8_t)(!moreRGs && !startRid); + // *serialized << (uint8_t)(!moreRGs && !startRid); // the "count + // this msg" var + *serialized << sendCount; if (fe2) { /* functionize this -> processFE2()*/ @@ -1828,7 +1836,7 @@ void BatchPrimitiveProcessor::execute() { fAggregator->addRowGroup(&nextRG); - if ((currentBlockOffset + 1) == count && moreRGs == false && startRid == 0) // @bug4507, 8k + if ((currentBlockOffset + 1) == count && moreRGs == false && startRid == 0) // @bug4507, 8k { fAggregator->loadResult(*serialized); // @bug4507, 8k } // @bug4507, 8k @@ -1862,7 +1870,7 @@ void BatchPrimitiveProcessor::execute() // Should we happen to finish sending data rows right on the boundary of when moreRGs flips off, // then we need to start a new buffer. I.e., it needs the count this message byte pushed. if (serialized->length() == preamble.length()) - *serialized << (uint8_t)(startRid > 0 ? 0 : 1); // the "count this msg" var + *serialized << (uint8_t)(startRid > 0 ? 0 : 1); // the "count this msg" var *serialized << ridCount; @@ -1871,7 +1879,7 @@ void BatchPrimitiveProcessor::execute() for (j = 0; j < ridCount; ++j) { serializeInlineVector(*serialized, tSmallSideMatches[i][j]); - tSmallSideMatches[i][j].clear(); + tSmallSideMatches[i][j].clear(); } } } @@ -1897,7 +1905,7 @@ void BatchPrimitiveProcessor::execute() for (j = 0; j < ridCount; ++j) { serializeInlineVector(*serialized, tSmallSideMatches[i][j]); - tSmallSideMatches[i][j].clear(); + tSmallSideMatches[i][j].clear(); } } } @@ -2134,6 +2142,17 @@ void BatchPrimitiveProcessor::serializeStrings() void BatchPrimitiveProcessor::sendResponse() { + bool isLocalNodeConnection = exemgr::globServiceExeMgr->isLocalNodeSock(sock); + // Here is the fast path for local EM to PM interacction. PM puts into the + // input EM DEC queue directly. + if (initiatedByEM_ && isLocalNodeConnection) + { + joblist::DistributedEngineComm* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec(); + exeMgrDecPtr->addDataToOutput(serialized); + serialized.reset(); + return; + } + if (sendThread->flowControlEnabled()) { // newConnection should be set only for the first result of a batch job diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index 4469fa6ce..e7cd20a89 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -53,7 +53,6 @@ #include "bppsendthread.h" #include "columnwidth.h" -//#define PRIMPROC_STOPWATCH #ifdef PRIMPROC_STOPWATCH #include "stopwatch.h" #endif @@ -433,6 +432,7 @@ class BatchPrimitiveProcessor uint ptMask; bool firstInstance; uint64_t valuesLBID; + bool initiatedByEM_; static const uint64_t maxResultCount = 1048576; // 2^20 diff --git a/primitives/primproc/serviceexemgr.h b/primitives/primproc/serviceexemgr.h index 606ae48db..905f77b0a 100644 --- a/primitives/primproc/serviceexemgr.h +++ b/primitives/primproc/serviceexemgr.h @@ -20,7 +20,9 @@ #include #include #include +#include #include +#include #undef root_name #include @@ -60,6 +62,7 @@ namespace exemgr { + using SharedPtrEMSock = boost::shared_ptr; class Opt { public: @@ -168,7 +171,6 @@ namespace exemgr } void initMaxMemPct(uint32_t sessionId) { - // WIP if (sessionId < 0x80000000) { std::lock_guard lk(sessionMemMapMutex_); @@ -187,7 +189,6 @@ namespace exemgr uint64_t getMaxMemPct(const uint32_t sessionId) { uint64_t maxMemoryPct = 0; - // WIP if (sessionId < 0x80000000) { std::lock_guard lk(sessionMemMapMutex_); @@ -290,6 +291,17 @@ namespace exemgr { return *rm_; } + bool isLocalNodeSock(SharedPtrEMSock& sock) const + { + for (auto& sin : localNetIfaceSins_) + { + if (sock->isSameAddr(sin)) + { + return true; + } + } + return false; + } private: void setupSignalHandlers(); int8_t setupCwd() @@ -326,7 +338,27 @@ namespace exemgr } return 0; } - + void getLocalNetIfacesSins() + { + string ipAddress = "Unable to get IP Address"; + struct ifaddrs* netIfacesList = nullptr; + struct ifaddrs* ifaceListMembPtr = nullptr; + int success = 0; + // retrieve the current interfaces - returns 0 on success + success = getifaddrs(&netIfacesList); + if (success == 0) + { + ifaceListMembPtr = netIfacesList; + for (; ifaceListMembPtr; ifaceListMembPtr = ifaceListMembPtr->ifa_next) + { + if (ifaceListMembPtr->ifa_addr->sa_family == AF_INET) + { + localNetIfaceSins_.push_back(((struct sockaddr_in*)ifaceListMembPtr->ifa_addr)->sin_addr); + } + } + } + freeifaddrs(netIfacesList); + } logging::Logger msgLog_; SessionMemMap_t sessionMemMap_; // track memory% usage during a query std::mutex sessionMemMapMutex_; @@ -343,6 +375,7 @@ namespace exemgr joblist::ResourceManager* rm_; // Its attributes are set in Child() querytele::QueryTeleServerParms teleServerParms_; + std::vector localNetIfaceSins_; }; extern ServiceExeMgr* globServiceExeMgr; } \ No newline at end of file diff --git a/utils/messageqcpp/inetstreamsocket.cpp b/utils/messageqcpp/inetstreamsocket.cpp index 45f605bcb..686b5e8bd 100644 --- a/utils/messageqcpp/inetstreamsocket.cpp +++ b/utils/messageqcpp/inetstreamsocket.cpp @@ -1075,6 +1075,10 @@ bool InetStreamSocket::isSameAddr(const Socket* rhs) const return (fSa.sin_addr.s_addr == issp->fSa.sin_addr.s_addr); } +bool InetStreamSocket::isSameAddr(const struct in_addr& ipv4Addr) const +{ + return (fSa.sin_addr.s_addr == ipv4Addr.s_addr); +} /*static*/ int InetStreamSocket::ping(const std::string& ipaddr, const struct timespec* timeout) diff --git a/utils/messageqcpp/inetstreamsocket.h b/utils/messageqcpp/inetstreamsocket.h index bed34d53c..1e69ee670 100644 --- a/utils/messageqcpp/inetstreamsocket.h +++ b/utils/messageqcpp/inetstreamsocket.h @@ -201,6 +201,7 @@ class InetStreamSocket : public Socket * */ virtual bool isSameAddr(const Socket* rhs) const; + virtual bool isSameAddr(const struct in_addr& ipv4Addr) const; /** ping an ip address * diff --git a/utils/messageqcpp/iosocket.h b/utils/messageqcpp/iosocket.h index 8a23ab790..965caae1a 100644 --- a/utils/messageqcpp/iosocket.h +++ b/utils/messageqcpp/iosocket.h @@ -179,6 +179,11 @@ class IOSocket { return fSocket->isSameAddr(rhs->fSocket); } + virtual bool isSameAddr(const struct in_addr& ipv4Addr) const + { + return fSocket->isSameAddr(ipv4Addr); + } + /** connect() forwarder for inherited classes * diff --git a/utils/messageqcpp/socket.h b/utils/messageqcpp/socket.h index c2c9d969f..6e46eb10b 100644 --- a/utils/messageqcpp/socket.h +++ b/utils/messageqcpp/socket.h @@ -174,6 +174,7 @@ class Socket * */ virtual bool isSameAddr(const Socket* rhs) const = 0; + virtual bool isSameAddr(const struct in_addr& ipv4Addr) const = 0; virtual bool isConnected() const = 0; virtual bool hasData() const = 0;