/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "messagequeue.h" #include "iosocket.h" #include "femsghandler.h" #include "threadnaming.h" using namespace std; using namespace joblist; using namespace messageqcpp; threadpool::ThreadPool FEMsgHandler::threadPool; namespace { class Runner { public: Runner(FEMsgHandler* f) : target(f) { } void operator()() { utils::setThreadName("FEMsgHandler"); target->threadFcn(); } FEMsgHandler* target; }; } // namespace FEMsgHandler::FEMsgHandler() : die(false), running(false), sawData(false), sock(NULL) { } FEMsgHandler::FEMsgHandler(boost::shared_ptr j, IOSocket* s) : die(false), running(false), sawData(false), jl(j) { sock = s; assert(sock); } FEMsgHandler::~FEMsgHandler() { stop(); threadPool.join(thr); } void FEMsgHandler::start() { if (!running) { running = true; thr = threadPool.invoke(Runner(this)); } } void FEMsgHandler::stop() { die = true; jl.reset(); } void FEMsgHandler::setJobList(boost::shared_ptr j) { jl = j; } void FEMsgHandler::setSocket(IOSocket* i) { sock = i; assert(sock); } /* Note, the next two fcns strongly depend on ExeMgr's current implementation. There's a * good chance that if ExeMgr's table send loop is changed, these will need to be * updated to match. */ /* This is currently only called if InetStreamSocket::write() throws, implying * a connection error. It might not make sense in other contexts. */ bool FEMsgHandler::aborted() { if (sawData) return true; boost::mutex::scoped_lock sl(mutex); int err; int connectionNum = sock->getConnectionNum(); err = InetStreamSocket::pollConnection(connectionNum, 1000); if (err == 1) { sawData = true; return true; } return false; } void FEMsgHandler::threadFcn() { int err = 0; int connectionNum = sock->getConnectionNum(); /* This waits for the next readable event on sock. An abort is signaled * by sending something (anything at the moment), then dropping the connection. * This fcn exits on all other events. */ while (!die && err == 0) { boost::mutex::scoped_lock sl(mutex); err = InetStreamSocket::pollConnection(connectionNum, 1000); } if (err == 1) sawData = true; // there's data to read, must be the abort signal if (!die && (err == 2 || err == 1)) { die = true; jl->abort(); jl.reset(); } running = false; }