mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
146 lines
3.0 KiB
C++
146 lines
3.0 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
#include "messagequeue.h"
|
|
#include "iosocket.h"
|
|
|
|
#include "femsghandler.h"
|
|
#include "threadnaming.h"
|
|
|
|
using namespace std;
|
|
using namespace joblist;
|
|
using namespace messageqcpp;
|
|
|
|
threadpool::ThreadPool FEMsgHandler::threadPool;
|
|
|
|
namespace
|
|
{
|
|
class Runner
|
|
{
|
|
public:
|
|
Runner(FEMsgHandler* f) : target(f)
|
|
{
|
|
}
|
|
void operator()()
|
|
{
|
|
utils::setThreadName("FEMsgHandler");
|
|
target->threadFcn();
|
|
}
|
|
FEMsgHandler* target;
|
|
};
|
|
|
|
} // namespace
|
|
|
|
FEMsgHandler::FEMsgHandler() : die(false), running(false), sawData(false), sock(NULL)
|
|
{
|
|
}
|
|
|
|
FEMsgHandler::FEMsgHandler(boost::shared_ptr<JobList> j, IOSocket* s)
|
|
: die(false), running(false), sawData(false), jl(j)
|
|
{
|
|
sock = s;
|
|
assert(sock);
|
|
}
|
|
|
|
FEMsgHandler::~FEMsgHandler()
|
|
{
|
|
stop();
|
|
threadPool.join(thr);
|
|
}
|
|
|
|
void FEMsgHandler::start()
|
|
{
|
|
if (!running)
|
|
{
|
|
running = true;
|
|
thr = threadPool.invoke(Runner(this));
|
|
}
|
|
}
|
|
|
|
void FEMsgHandler::stop()
|
|
{
|
|
die = true;
|
|
jl.reset();
|
|
}
|
|
|
|
void FEMsgHandler::setJobList(boost::shared_ptr<JobList> j)
|
|
{
|
|
jl = j;
|
|
}
|
|
|
|
void FEMsgHandler::setSocket(IOSocket* i)
|
|
{
|
|
sock = i;
|
|
assert(sock);
|
|
}
|
|
|
|
/* Note, the next two fcns strongly depend on ExeMgr's current implementation. There's a
|
|
* good chance that if ExeMgr's table send loop is changed, these will need to be
|
|
* updated to match.
|
|
*/
|
|
|
|
/* This is currently only called if InetStreamSocket::write() throws, implying
|
|
* a connection error. It might not make sense in other contexts.
|
|
*/
|
|
bool FEMsgHandler::aborted()
|
|
{
|
|
if (sawData)
|
|
return true;
|
|
|
|
boost::mutex::scoped_lock sl(mutex);
|
|
int err;
|
|
int connectionNum = sock->getConnectionNum();
|
|
|
|
err = InetStreamSocket::pollConnection(connectionNum, 1000);
|
|
|
|
if (err == 1)
|
|
{
|
|
sawData = true;
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void FEMsgHandler::threadFcn()
|
|
{
|
|
int err = 0;
|
|
int connectionNum = sock->getConnectionNum();
|
|
|
|
/* This waits for the next readable event on sock. An abort is signaled
|
|
* by sending something (anything at the moment), then dropping the connection.
|
|
* This fcn exits on all other events.
|
|
*/
|
|
while (!die && err == 0)
|
|
{
|
|
boost::mutex::scoped_lock sl(mutex);
|
|
err = InetStreamSocket::pollConnection(connectionNum, 1000);
|
|
}
|
|
|
|
if (err == 1)
|
|
sawData = true; // there's data to read, must be the abort signal
|
|
|
|
if (!die && (err == 2 || err == 1))
|
|
{
|
|
die = true;
|
|
jl->abort();
|
|
jl.reset();
|
|
}
|
|
|
|
running = false;
|
|
}
|