diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index a2fd41e1a..6fbbee3c5 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -243,6 +243,7 @@ void DistributedEngineComm::Setup() uint32_t newPmCount = fRm->getPsCount(); throttleThreshold = fRm->getDECThrottleThreshold(); tbpsThreadCount = fRm->getJlNumScanReceiveThreads(); + fDECConnectionsPerQuery = fRm->getDECConnectionsPerQuery(); unsigned numConnections = getNumConnections(); oam::Oam oam; ModuleTypeConfig moduletypeconfig; @@ -458,7 +459,8 @@ void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs) boost::mutex* lock = new boost::mutex(); condition* cond = new condition(); - boost::shared_ptr mqe(new MQE(pmCount)); + uint32_t firstPMInterleavedConnectionId = key % (fPmConnections.size() / pmCount) * fDECConnectionsPerQuery * pmCount % fPmConnections.size(); + boost::shared_ptr mqe(new MQE(pmCount, firstPMInterleavedConnectionId)); mqe->queue = StepMsgQueue(lock, cond); mqe->sendACKs = sendACKs; @@ -972,30 +974,31 @@ void DistributedEngineComm::doHasBigMsgs(boost::shared_ptr mqe, uint64_t ta mqe->targetQueueSize = targetSize; } -int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uint32_t sender, bool doInterleaving) +int DistributedEngineComm::writeToClient(size_t aPMIndex, const ByteStream& bs, uint32_t senderUniqueID, bool doInterleaving) { boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t()); MessageQueueMap::iterator it; // Keep mqe's stats from being freed early boost::shared_ptr mqe; Stats* senderStats = NULL; - uint32_t interleaver = 0; if (fPmConnections.size() == 0) return 0; - if (sender != numeric_limits::max()) + uint32_t connectionId = aPMIndex; + if (senderUniqueID != numeric_limits::max()) { lk.lock(); - it = fSessionMessages.find(sender); + it = fSessionMessages.find(senderUniqueID); if (it != fSessionMessages.end()) { mqe = it->second; senderStats = &(mqe->stats); - - if (doInterleaving) - interleaver = it->second->interleaver[index % it->second->pmCount]++; + size_t pmIndex = aPMIndex % mqe->pmCount; + connectionId = it->second->getNextConnectionId(pmIndex, + fPmConnections.size(), + fDECConnectionsPerQuery); } lk.unlock(); @@ -1003,14 +1006,11 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin try { - if (doInterleaving) - index = (index + (interleaver * pmCount)) % fPmConnections.size(); - - ClientList::value_type client = fPmConnections[index]; + ClientList::value_type client = fPmConnections[connectionId]; if (!client->isAvailable()) return 0; - boost::mutex::scoped_lock lk(*(fWlock[index])); + boost::mutex::scoped_lock lk(*(fWlock[connectionId])); client->write(bs, NULL, senderStats); return 0; } @@ -1114,13 +1114,40 @@ Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID) return empty; } -DistributedEngineComm::MQE::MQE(uint32_t pCount) : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), - targetQueueSize(targetRecvQueueSize) +DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInterleaverValue) + : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(targetRecvQueueSize) { unackedWork.reset(new volatile uint32_t[pmCount]); interleaver.reset(new uint32_t[pmCount]); memset((void*) unackedWork.get(), 0, pmCount * sizeof(uint32_t)); - memset((void*) interleaver.get(), 0, pmCount * sizeof(uint32_t)); + uint32_t interleaverValue = initialInterleaverValue; + initialConnectionId = initialInterleaverValue; + for (size_t pmId = 0; pmId < pmCount; ++pmId) + { + interleaver.get()[pmId] = interleaverValue++; + } +} + +// Here is the assumed connections distribution schema in a pmConnections vector. +// (PM1-PM2...-PMX)-(PM1-PM2..-PMX)... +uint32_t DistributedEngineComm::MQE::getNextConnectionId(const size_t pmIndex, + const size_t pmConnectionsNumber, + const uint32_t DECConnectionsPerQuery) + +{ + uint32_t nextConnectionId = (interleaver[pmIndex] + pmCount) % pmConnectionsNumber; + // Wrap around the connection id. + if ((nextConnectionId - pmIndex) % DECConnectionsPerQuery == 0) + { + // The sum must be < connections vector size. + nextConnectionId = initialConnectionId + pmIndex; + interleaver[pmIndex] = nextConnectionId; + } + else + { + interleaver[pmIndex] = nextConnectionId; + } + return nextConnectionId; } } diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 32d15c718..01f2cf194 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -217,12 +217,16 @@ private: /* To keep some state associated with the connection. These aren't copyable. */ struct MQE : public boost::noncopyable { - MQE(uint32_t pmCount); + MQE(const uint32_t pmCount, const uint32_t initialInterleaverValue); + uint32_t getNextConnectionId(const size_t pmIndex, + const size_t pmConnectionsNumber, + const uint32_t DECConnectionsPerQuery); messageqcpp::Stats stats; StepMsgQueue queue; uint32_t ackSocketIndex; boost::scoped_array unackedWork; boost::scoped_array interleaver; + uint32_t initialConnectionId; uint32_t pmCount; // non-BPP primitives don't do ACKs bool sendACKs; @@ -286,6 +290,7 @@ private: static const uint32_t targetRecvQueueSize = 50000000; static const uint32_t disableThreshold = 10000000; uint32_t tbpsThreadCount; + uint32_t fDECConnectionsPerQuery; void sendAcks(uint32_t uniqueID, const std::vector& msgs, boost::shared_ptr mqe, size_t qSize); diff --git a/dbcon/joblist/resourcemanager.cpp b/dbcon/joblist/resourcemanager.cpp index 46e20312a..7ac2f98cb 100644 --- a/dbcon/joblist/resourcemanager.cpp +++ b/dbcon/joblist/resourcemanager.cpp @@ -152,6 +152,11 @@ ResourceManager::ResourceManager(bool runningInExeMgr) : if (temp > 0) fJlNumScanReceiveThreads = temp; + fDECConnectionsPerQuery = getUintVal(fJobListStr, "DECConnectionsPerQuery", 0); + fDECConnectionsPerQuery = (fDECConnectionsPerQuery) + ? fDECConnectionsPerQuery + : getPsConnectionsPerPrimProc(); + temp = getIntVal(fTupleWSDLStr, "NumThreads", -1); if (temp > 0) diff --git a/dbcon/joblist/resourcemanager.h b/dbcon/joblist/resourcemanager.h index e28314a0f..71c525cce 100644 --- a/dbcon/joblist/resourcemanager.h +++ b/dbcon/joblist/resourcemanager.h @@ -128,6 +128,7 @@ const uint64_t defaultDECThrottleThreshold = 200000000; // ~200 MB const uint8_t defaultUseCpimport = 1; const bool defaultAllowDiskAggregation = false; + /** @brief ResourceManager * Returns requested values from Config * @@ -179,11 +180,16 @@ public: return getIntVal(fExeMgrStr, "ExecQueueSize", defaultEMExecQueueSize); } - bool getAllowDiskAggregation() const + bool getAllowDiskAggregation() const { return fAllowedDiskAggregation; } + uint64_t getDECConnectionsPerQuery() const + { + return fDECConnectionsPerQuery; + } + int getHjMaxBuckets() const { return getUintVal(fHashJoinStr, "MaxBuckets", defaultHJMaxBuckets); @@ -615,6 +621,7 @@ private: bool isExeMgr; bool fUseHdfs; bool fAllowedDiskAggregation{false}; + uint64_t fDECConnectionsPerQuery; };