You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-31 18:30:33 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			146 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			146 lines
		
	
	
		
			3.0 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| #include "messagequeue.h"
 | |
| #include "iosocket.h"
 | |
| 
 | |
| #include "femsghandler.h"
 | |
| #include "threadnaming.h"
 | |
| 
 | |
| using namespace std;
 | |
| using namespace joblist;
 | |
| using namespace messageqcpp;
 | |
| 
 | |
| threadpool::ThreadPool FEMsgHandler::threadPool;
 | |
| 
 | |
| namespace
 | |
| {
 | |
| class Runner
 | |
| {
 | |
|  public:
 | |
|   Runner(FEMsgHandler* f) : target(f)
 | |
|   {
 | |
|   }
 | |
|   void operator()()
 | |
|   {
 | |
|     utils::setThreadName("FEMsgHandler");
 | |
|     target->threadFcn();
 | |
|   }
 | |
|   FEMsgHandler* target;
 | |
| };
 | |
| 
 | |
| }  // namespace
 | |
| 
 | |
| FEMsgHandler::FEMsgHandler() : die(false), running(false), sawData(false), sock(NULL)
 | |
| {
 | |
| }
 | |
| 
 | |
| FEMsgHandler::FEMsgHandler(boost::shared_ptr<JobList> j, IOSocket* s)
 | |
|  : die(false), running(false), sawData(false), jl(j)
 | |
| {
 | |
|   sock = s;
 | |
|   assert(sock);
 | |
| }
 | |
| 
 | |
| FEMsgHandler::~FEMsgHandler()
 | |
| {
 | |
|   stop();
 | |
|   threadPool.join(thr);
 | |
| }
 | |
| 
 | |
| void FEMsgHandler::start()
 | |
| {
 | |
|   if (!running)
 | |
|   {
 | |
|     running = true;
 | |
|     thr = threadPool.invoke(Runner(this));
 | |
|   }
 | |
| }
 | |
| 
 | |
| void FEMsgHandler::stop()
 | |
| {
 | |
|   die = true;
 | |
|   jl.reset();
 | |
| }
 | |
| 
 | |
| void FEMsgHandler::setJobList(boost::shared_ptr<JobList> j)
 | |
| {
 | |
|   jl = j;
 | |
| }
 | |
| 
 | |
| void FEMsgHandler::setSocket(IOSocket* i)
 | |
| {
 | |
|   sock = i;
 | |
|   assert(sock);
 | |
| }
 | |
| 
 | |
| /* Note, the next two fcns strongly depend on ExeMgr's current implementation.  There's a
 | |
|  * good chance that if ExeMgr's table send loop is changed, these will need to be
 | |
|  * updated to match.
 | |
|  */
 | |
| 
 | |
| /* This is currently only called if InetStreamSocket::write() throws, implying
 | |
|  * a connection error.  It might not make sense in other contexts.
 | |
|  */
 | |
| bool FEMsgHandler::aborted()
 | |
| {
 | |
|   if (sawData)
 | |
|     return true;
 | |
| 
 | |
|   boost::mutex::scoped_lock sl(mutex);
 | |
|   int err;
 | |
|   int connectionNum = sock->getConnectionNum();
 | |
| 
 | |
|   err = InetStreamSocket::pollConnection(connectionNum, 1000);
 | |
| 
 | |
|   if (err == 1)
 | |
|   {
 | |
|     sawData = true;
 | |
|     return true;
 | |
|   }
 | |
| 
 | |
|   return false;
 | |
| }
 | |
| 
 | |
| void FEMsgHandler::threadFcn()
 | |
| {
 | |
|   int err = 0;
 | |
|   int connectionNum = sock->getConnectionNum();
 | |
| 
 | |
|   /* This waits for the next readable event on sock.  An abort is signaled
 | |
|    * by sending something (anything at the moment), then dropping the connection.
 | |
|    * This fcn exits on all other events.
 | |
|    */
 | |
|   while (!die && err == 0)
 | |
|   {
 | |
|     boost::mutex::scoped_lock sl(mutex);
 | |
|     err = InetStreamSocket::pollConnection(connectionNum, 1000);
 | |
|   }
 | |
| 
 | |
|   if (err == 1)
 | |
|     sawData = true;  // there's data to read, must be the abort signal
 | |
| 
 | |
|   if (!die && (err == 2 || err == 1))
 | |
|   {
 | |
|     die = true;
 | |
|     jl->abort();
 | |
|     jl.reset();
 | |
|   }
 | |
| 
 | |
|   running = false;
 | |
| }
 |