/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id$ * * ***********************************************************************/ #include #include #include #include "bppsendthread.h" #include "resourcemanager.h" #include "serviceexemgr.h" namespace primitiveprocessor { extern uint32_t connectionsPerUM; extern uint32_t BPPCount; BPPSendThread::BPPSendThread() { queueBytesThresh = joblist::ResourceManager::instance()->getBPPSendThreadBytesThresh(); queueMsgThresh = joblist::ResourceManager::instance()->getBPPSendThreadMsgThresh(); runner = boost::thread(Runner_t(this)); } BPPSendThread::~BPPSendThread() { abort(); runner.join(); } void BPPSendThread::sendResult(const Msg_t& msg, bool newConnection) { // Wait for the queue to empty out a bit if it's stuffed full if (sizeTooBig()) { std::unique_lock sl1(respondLock); while (currentByteSize >= queueBytesThresh && msgQueue.size() > 3 && !die) { fProcessorPool->incBlockedThreads(); okToRespond.wait(sl1); fProcessorPool->decBlockedThreads(); } } if (die) return; std::unique_lock sl(msgQueueLock); if (gotException) throw runtime_error(exceptionString); (void)atomicops::atomicAdd(¤tByteSize, msg.msg->lengthWithHdrOverhead()); msgQueue.push(msg); if (!sawAllConnections && newConnection) { Connection_t ins(msg.sockLock, msg.sock); bool inserted = connections_s.insert(ins).second; if (inserted) { connections_v.push_back(ins); if (connections_v.size() == connectionsPerUM) { connections_s.clear(); sawAllConnections = true; } } } if (mainThreadWaiting) queueNotEmpty.notify_one(); } void BPPSendThread::sendResults(const vector& msgs, bool newConnection) { // Wait for the queue to empty out a bit if it's stuffed full if (sizeTooBig()) { std::unique_lock sl1(respondLock); while (currentByteSize >= queueBytesThresh && msgQueue.size() > 3 && !die) { fProcessorPool->incBlockedThreads(); okToRespond.wait(sl1); fProcessorPool->decBlockedThreads(); } } if (die) return; std::unique_lock sl(msgQueueLock); if (gotException) throw runtime_error(exceptionString); if (!sawAllConnections && newConnection) { idbassert(msgs.size() > 0); Connection_t ins(msgs[0].sockLock, msgs[0].sock); bool inserted = connections_s.insert(ins).second; if (inserted) { connections_v.push_back(ins); if (connections_v.size() == connectionsPerUM) { connections_s.clear(); sawAllConnections = true; } } } for (uint32_t i = 0; i < msgs.size(); i++) { (void)atomicops::atomicAdd(¤tByteSize, msgs[i].msg->lengthWithHdrOverhead()); msgQueue.push(msgs[i]); } if (mainThreadWaiting) queueNotEmpty.notify_one(); } void BPPSendThread::sendMore(int num) { std::unique_lock sl(ackLock); if (num == -1) fcEnabled = false; else if (num == 0) { fcEnabled = true; msgsLeft = 0; } else (void)atomicops::atomicAdd(&msgsLeft, num); sl.unlock(); if (waiting) okToSend.notify_one(); } bool BPPSendThread::flowControlEnabled() { return fcEnabled; } void BPPSendThread::mainLoop() { const uint32_t msgCap = 20; boost::scoped_array msg; uint32_t msgCount = 0, i, msgsSent; SP_UM_MUTEX lock; SP_UM_IOSOCK sock; bool doLoadBalancing = false; msg.reset(new Msg_t[msgCap]); while (!die) { std::unique_lock sl(msgQueueLock); if (msgQueue.empty() && !die) { mainThreadWaiting = true; queueNotEmpty.wait(sl); mainThreadWaiting = false; continue; } msgCount = (msgQueue.size() > msgCap ? msgCap : msgQueue.size()); for (i = 0; i < msgCount; i++) { msg[i] = msgQueue.front(); msgQueue.pop(); } doLoadBalancing = sawAllConnections; sl.unlock(); /* In the send loop below, msgsSent tracks progress on sending the msg array, * i how many msgs are sent by 1 run of the loop, limited by msgCount or msgsLeft. */ msgsSent = 0; while (msgsSent < msgCount && !die) { uint64_t bsSize; if (msgsLeft <= 0 && fcEnabled && !die) { std::unique_lock sl2(ackLock); while (msgsLeft <= 0 && fcEnabled && !die) { waiting = true; okToSend.wait(sl2); waiting = false; } } for (i = 0; msgsSent < msgCount && ((fcEnabled && msgsLeft > 0) || !fcEnabled) && !die; msgsSent++, i++) { if (doLoadBalancing) { // Bug 4475 move control of sockIndex to batchPrimitiveProcessor lock = connections_v[msg[msgsSent].sockIndex].sockLock; sock = connections_v[msg[msgsSent].sockIndex].sock; } else { lock = msg[msgsSent].sockLock; sock = msg[msgsSent].sock; } bsSize = msg[msgsSent].msg->lengthWithHdrOverhead(); // Same node processing path if (!lock) { msg[msgsSent].sock->write(msg[msgsSent].msg); } else { try { boost::mutex::scoped_lock sl2(*lock); sock->write(*msg[msgsSent].msg); } catch (std::exception& e) { sl.lock(); exceptionString = e.what(); gotException = true; return; } } (void)atomicops::atomicDec(&msgsLeft); (void)atomicops::atomicSub(¤tByteSize, bsSize); msg[msgsSent].msg.reset(); } if (fProcessorPool->blockedThreadCount() > 0 && currentByteSize < queueBytesThresh) { okToRespond.notify_one(); } } } } void BPPSendThread::abort() { std::lock_guard sl(msgQueueLock); std::lock_guard sl2(ackLock); std::lock_guard sl3(respondLock); die = true; queueNotEmpty.notify_all(); okToSend.notify_all(); okToRespond.notify_all(); } } // namespace primitiveprocessor