You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			570 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			570 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
|  * Copyright (C) 2016 MariaDB Corporation.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| // $Id: weclients.h 525 2010-01-19 23:18:05Z xlou $
 | |
| //
 | |
| /** @file */
 | |
| 
 | |
| #include <sstream>
 | |
| #include <stdexcept>
 | |
| #include <cassert>
 | |
| #include <ctime>
 | |
| #include <algorithm>
 | |
| #include <unistd.h>
 | |
| #include <arpa/inet.h>
 | |
| #if __FreeBSD__
 | |
| #include <sys/socket.h>
 | |
| #endif
 | |
| using namespace std;
 | |
| 
 | |
| #include <boost/thread/mutex.hpp>
 | |
| using namespace boost;
 | |
| 
 | |
| #include "messagequeue.h"
 | |
| #include "bytestream.h"
 | |
| using namespace messageqcpp;
 | |
| 
 | |
| #include "configcpp.h"
 | |
| using namespace config;
 | |
| 
 | |
| #include "errorids.h"
 | |
| #include "exceptclasses.h"
 | |
| #include "messagelog.h"
 | |
| #include "messageobj.h"
 | |
| #include "loggingid.h"
 | |
| using namespace logging;
 | |
| 
 | |
| #include "liboamcpp.h"
 | |
| using namespace oam;
 | |
| 
 | |
| #include "we_clients.h"
 | |
| #include "we_messages.h"
 | |
| using namespace WriteEngine;
 | |
| 
 | |
| #include "atomicops.h"
 | |
| 
 | |
| namespace
 | |
| {
 | |
| void writeToLog(const char* file, int line, const string& msg, LOG_TYPE logto = LOG_TYPE_INFO)
 | |
| {
 | |
|   LoggingID lid(05);
 | |
|   MessageLog ml(lid);
 | |
|   Message::Args args;
 | |
|   Message m(0);
 | |
|   args.add(file);
 | |
|   args.add("@");
 | |
|   args.add(line);
 | |
|   args.add(msg);
 | |
|   m.format(args);
 | |
| 
 | |
|   switch (logto)
 | |
|   {
 | |
|     case LOG_TYPE_DEBUG: ml.logDebugMessage(m); break;
 | |
| 
 | |
|     case LOG_TYPE_INFO: ml.logInfoMessage(m); break;
 | |
| 
 | |
|     case LOG_TYPE_WARNING: ml.logWarningMessage(m); break;
 | |
| 
 | |
|     case LOG_TYPE_ERROR: ml.logErrorMessage(m); break;
 | |
| 
 | |
|     case LOG_TYPE_CRITICAL: ml.logCriticalMessage(m); break;
 | |
|   }
 | |
| }
 | |
| 
 | |
| string getModuleNameByIPAddr(oam::ModuleTypeConfig moduletypeconfig, string ipAddress)
 | |
| {
 | |
|   string modulename = "pm1";
 | |
|   DeviceNetworkList::iterator pt = moduletypeconfig.ModuleNetworkList.begin();
 | |
| 
 | |
|   for (; pt != moduletypeconfig.ModuleNetworkList.end(); pt++)
 | |
|   {
 | |
|     modulename = (*pt).DeviceName;
 | |
|     HostConfigList::iterator pt1 = (*pt).hostConfigList.begin();
 | |
| 
 | |
|     for (; pt1 != (*pt).hostConfigList.end(); pt1++)
 | |
|     {
 | |
|       if (ipAddress == (*pt1).IPAddr)
 | |
|         return modulename;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return modulename;
 | |
| }
 | |
| 
 | |
| struct WEClientRunner
 | |
| {
 | |
|   WEClientRunner(WriteEngine::WEClients* jl, boost::shared_ptr<MessageQueueClient> cl,
 | |
|                  uint32_t connectionIndex)
 | |
|    : jbl(jl), client(cl), connIndex(connectionIndex)
 | |
|   {
 | |
|   }
 | |
|   WriteEngine::WEClients* jbl;
 | |
|   boost::shared_ptr<MessageQueueClient> client;
 | |
|   uint32_t connIndex;
 | |
|   void operator()()
 | |
|   {
 | |
|     // cout << "Listening on client at 0x" << hex << (ptrdiff_t)client << dec << endl;
 | |
|     try
 | |
|     {
 | |
|       jbl->Listen(client, connIndex);
 | |
|       // cout << "Listening connIndex " << connIndex << endl;
 | |
|     }
 | |
|     catch (std::exception& ex)
 | |
|     {
 | |
|       string what(ex.what());
 | |
|       cerr << "exception caught in WEClient: " << what << endl;
 | |
| 
 | |
|       if (what.find("St9bad_alloc") != string::npos)
 | |
|       {
 | |
|         writeToLog(__FILE__, __LINE__, what, LOG_TYPE_CRITICAL);
 | |
|         //           abort();
 | |
|       }
 | |
|       else
 | |
|         writeToLog(__FILE__, __LINE__, what);
 | |
|     }
 | |
|     catch (...)
 | |
|     {
 | |
|       string msg("exception caught in WEClientRunner.");
 | |
|       writeToLog(__FILE__, __LINE__, msg);
 | |
|       cerr << msg << endl;
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| template <typename T>
 | |
| struct QueueShutdown
 | |
| {
 | |
|   void operator()(T& x)
 | |
|   {
 | |
|     x.shutdown();
 | |
|   }
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * This function checks if the WriteEngineServer (WES) is configured
 | |
|  * for the specified node in the configuration.
 | |
|  * @param config Pointer to the configuration object
 | |
|  * @param fOtherEnd The name of the node to check
 | |
|  * @return true if WES is configured, false otherwise
 | |
|  */
 | |
| bool isWESConfigured(config::Config* config, const std::string& fOtherEnd)
 | |
| {
 | |
|   // Check if WES IP address record exists in the config (if not, this is a read-only node)
 | |
|   std::string otherEndDnOrIPStr = config->getConfig(fOtherEnd, "IPAddr");
 | |
|   return !(otherEndDnOrIPStr.empty() || otherEndDnOrIPStr == "unassigned");
 | |
| }
 | |
| 
 | |
| }  // namespace
 | |
| 
 | |
| namespace WriteEngine
 | |
| {
 | |
| WEClients::WEClients(int PrgmID) : fPrgmID(PrgmID), closingConnection{0}, pmCount(0)
 | |
| {
 | |
|   Setup();
 | |
| }
 | |
| 
 | |
| WEClients::~WEClients()
 | |
| {
 | |
|   Close();
 | |
| }
 | |
| 
 | |
| void WEClients::Setup()
 | |
| {
 | |
|   makeBusy(true);
 | |
|   joblist::ResourceManager* rm = joblist::ResourceManager::instance();
 | |
|   oam::Oam oam;
 | |
|   string ipAddress;
 | |
|   ModuleTypeConfig moduletypeconfig;
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     oam.getSystemConfig("pm", moduletypeconfig);
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     writeToLog(__FILE__, __LINE__, "oam.getSystemConfig error, unknown exception", LOG_TYPE_ERROR);
 | |
|     throw runtime_error("Setup failed");
 | |
|   }
 | |
| 
 | |
|   uint32_t pmCountConfig = moduletypeconfig.ModuleCount;
 | |
|   pmCount = 0;
 | |
|   int moduleID = 1;
 | |
| 
 | |
|   char buff[32];
 | |
|   ByteStream bs, bsRead;
 | |
| 
 | |
|   if (fPrgmID == DDLPROC)
 | |
|   {
 | |
|     bs << (ByteStream::byte)WE_SVR_DDL_KEEPALIVE;
 | |
|     bs << (ByteStream::octbyte)moduleID;
 | |
|   }
 | |
|   else if (fPrgmID == DMLPROC)
 | |
|   {
 | |
|     bs << (ByteStream::byte)WE_SVR_DML_KEEPALIVE;
 | |
|     bs << (ByteStream::octbyte)moduleID;
 | |
|   }
 | |
|   else if (fPrgmID == SPLITTER)
 | |
|   {
 | |
|     bs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE;
 | |
|   }
 | |
|   else if (fPrgmID == BATCHINSERTPROC)
 | |
|   {
 | |
|     bs << (ByteStream::byte)WE_SVR_BATCH_KEEPALIVE;
 | |
|     bs << (ByteStream::octbyte)moduleID;
 | |
|   }
 | |
| 
 | |
|   for (unsigned i = 0; i < pmCountConfig; i++)
 | |
|   {
 | |
|     // Find the module id
 | |
|     moduleID = atoi((moduletypeconfig.ModuleNetworkList[i])
 | |
|                         .DeviceName.substr(MAX_MODULE_TYPE_SIZE, MAX_MODULE_ID_SIZE)
 | |
|                         .c_str());
 | |
|     // cout << "setting connection to moduleid " << moduleID << endl;
 | |
|     snprintf(buff, sizeof(buff), "pm%u_WriteEngineServer", moduleID);
 | |
|     string fServer(buff);
 | |
| 
 | |
|     // Check if WES is configured for this module
 | |
|     if (!isWESConfigured(rm->getConfig(), fServer))
 | |
|     {
 | |
|       writeToLog(__FILE__, __LINE__, "Skipping WriteEngineServer client creation for " + fServer + " as the node is read-only", LOG_TYPE_INFO);
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     boost::shared_ptr<MessageQueueClient> cl(new MessageQueueClient(fServer, rm->getConfig()));
 | |
|     boost::shared_ptr<boost::mutex> nl(new boost::mutex());
 | |
| 
 | |
|     // Bug 5224. Take out the retrys. If connection fails, we assume the server is down.
 | |
|     try
 | |
|     {
 | |
|       if (cl->connect())
 | |
|       {
 | |
|         try
 | |
|         {
 | |
|           cl->write(bs);
 | |
|         }
 | |
|         catch (std::exception& ex1)
 | |
|         {
 | |
|           ostringstream oss;
 | |
|           oss << "Write to WES during connect failed due to " << ex1.what();
 | |
|           throw runtime_error(oss.str());
 | |
|         }
 | |
| 
 | |
|         try
 | |
|         {
 | |
|           bsRead = cl->read();
 | |
| 
 | |
|           if (bsRead.length() == 0)
 | |
|             throw runtime_error("Got byte 0 during reading ");
 | |
|         }
 | |
|         catch (std::exception& ex2)
 | |
|         {
 | |
|           ostringstream oss;
 | |
|           oss << "Read from WES during connect failed due to " << ex2.what() << " and this = " << this;
 | |
|           throw runtime_error(oss.str());
 | |
|         }
 | |
| 
 | |
|         fPmConnections[moduleID] = cl;
 | |
|         // cout << "connection is open. this = " << this << endl;
 | |
|         // cout << "set up connection to mudule " << moduleID << endl;
 | |
|         // assign the module name
 | |
|         // ipAddress = sin_addr2String(cl->serv_addr().sin_addr);
 | |
|         ipAddress = cl->addr2String();
 | |
|         cl->moduleName(getModuleNameByIPAddr(moduletypeconfig, ipAddress));
 | |
|         StartClientListener(cl, i);
 | |
|         pmCount++;
 | |
|         // ostringstream oss;
 | |
|         // oss << "WECLIENT: connected to " << fServer + " and this = " << this << " and pmcount is now " <<
 | |
|         // pmCount; writeToLog(__FILE__, __LINE__, oss.str() , LOG_TYPE_DEBUG);
 | |
|       }
 | |
|       else
 | |
|       {
 | |
|         throw runtime_error("Connection refused");
 | |
|       }
 | |
|     }
 | |
|     catch (std::exception& ex)
 | |
|     {
 | |
|       writeToLog(__FILE__, __LINE__, "Could not connect to " + fServer + ": " + ex.what(), LOG_TYPE_ERROR);
 | |
|       cerr << "Could not connect to " << fServer << ": " << ex.what() << endl;
 | |
|     }
 | |
|     catch (...)
 | |
|     {
 | |
|       writeToLog(__FILE__, __LINE__, "Could not connect to " + fServer, LOG_TYPE_ERROR);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| bool WEClients::isConnectionReadonly(uint32_t connection)
 | |
| {
 | |
|   return fPmConnections[connection] == nullptr;
 | |
| }
 | |
| 
 | |
| int WEClients::Close()
 | |
| {
 | |
|   makeBusy(false);
 | |
|   closingConnection.store(1, std::memory_order_relaxed);
 | |
|   ByteStream bs;
 | |
|   bs << (ByteStream::byte)WE_SVR_CLOSE_CONNECTION;
 | |
|   write_to_all(bs);
 | |
| 
 | |
|   // cout << "connection is closed. this = " << this << " and closingConnection = " << closingConnection <<
 | |
|   // endl;
 | |
|   for (uint32_t i = 0; i < fWESReader.size(); i++)
 | |
|   {
 | |
|     fWESReader[i]->join();
 | |
|   }
 | |
| 
 | |
|   fWESReader.clear();
 | |
|   fPmConnections.clear();
 | |
|   pmCount = 0;
 | |
|   // ostringstream oss;
 | |
|   // oss << "WECLIENT: closed connection to wes and this = " << this << " and pmcount is now " << pmCount;
 | |
|   // writeToLog(__FILE__, __LINE__, oss.str() , LOG_TYPE_DEBUG);
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| void WEClients::Listen(boost::shared_ptr<MessageQueueClient> client, uint32_t connIndex)
 | |
| {
 | |
|   SBS sbs;
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     while (Busy())
 | |
|     {
 | |
|       // TODO: This call blocks so setting Busy() in another thread doesn't work here...
 | |
|       sbs = client->read();
 | |
| 
 | |
|       if (sbs->length() != 0)
 | |
|       {
 | |
|         // cout << "adding data to connIndex " << endl;
 | |
|         addDataToOutput(sbs, connIndex);
 | |
|       }
 | |
|       else  // got zero bytes on read, nothing more will come
 | |
|       {
 | |
|         if (closingConnection.load() > 0)
 | |
|         {
 | |
|           return;
 | |
|         }
 | |
| 
 | |
|         cerr << "WEC got 0 byte message for object " << this << endl;
 | |
|         goto Error;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     return;
 | |
|   }
 | |
|   catch (std::exception& e)
 | |
|   {
 | |
|     cerr << "WEC Caught EXCEPTION: " << e.what() << endl;
 | |
|     goto Error;
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     cerr << "WEC Caught UNKNOWN EXCEPT" << endl;
 | |
|     goto Error;
 | |
|   }
 | |
| 
 | |
| Error:
 | |
|   // error condition! push 0 length bs to messagequeuemap and
 | |
|   // eventually let jobstep error out.
 | |
|   boost::mutex::scoped_lock lk(fMlock);
 | |
| 
 | |
|   MessageQueueMap::iterator map_tok;
 | |
|   sbs.reset(new ByteStream(0U));
 | |
| 
 | |
|   for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
 | |
|   {
 | |
|     map_tok->second->queue.clear();
 | |
|     map_tok->second->unackedWork[0].fetch_add(1, std::memory_order_relaxed);
 | |
|     map_tok->second->queue.push(sbs);
 | |
|   }
 | |
| 
 | |
|   lk.unlock();
 | |
| 
 | |
|   // reset the pmconnection map
 | |
|   {
 | |
|     boost::mutex::scoped_lock onErrLock(fOnErrMutex);
 | |
|     string moduleName = client->moduleName();
 | |
|     ClientList::iterator itor = fPmConnections.begin();
 | |
| 
 | |
|     while (itor != fPmConnections.end())
 | |
|     {
 | |
|       if (moduleName == (itor->second)->moduleName())
 | |
|       {
 | |
|         (fPmConnections[itor->first]).reset();
 | |
|         pmCount--;
 | |
|         ostringstream oss;
 | |
|         // oss << "WECLIENT: connection to is reset and this = " << this << " and pmcount is decremented.";
 | |
|         // writeToLog(__FILE__, __LINE__, oss.str() , LOG_TYPE_DEBUG);
 | |
|       }
 | |
| 
 | |
|       itor++;
 | |
|     }
 | |
|   }
 | |
|   return;
 | |
| }
 | |
| 
 | |
| void WEClients::addQueue(uint32_t key)
 | |
| {
 | |
|   bool b;
 | |
| 
 | |
|   boost::mutex* lock = new boost::mutex();
 | |
|   condition* cond = new condition();
 | |
|   boost::shared_ptr<MQE> mqe(new MQE(pmCount, lock, cond));
 | |
| 
 | |
|   boost::mutex::scoped_lock lk(fMlock);
 | |
|   b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
 | |
| 
 | |
|   if (!b)
 | |
|   {
 | |
|     ostringstream os;
 | |
|     os << "WEClient: attempt to add a queue with a duplicate ID " << key << endl;
 | |
|     throw runtime_error(os.str());
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WEClients::removeQueue(uint32_t key)
 | |
| {
 | |
|   boost::mutex::scoped_lock lk(fMlock);
 | |
|   MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
 | |
| 
 | |
|   if (map_tok == fSessionMessages.end())
 | |
|     return;
 | |
| 
 | |
|   map_tok->second->queue.shutdown();
 | |
|   map_tok->second->queue.clear();
 | |
|   fSessionMessages.erase(map_tok);
 | |
| }
 | |
| 
 | |
| void WEClients::shutdownQueue(uint32_t key)
 | |
| {
 | |
|   boost::mutex::scoped_lock lk(fMlock);
 | |
|   MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
 | |
| 
 | |
|   if (map_tok == fSessionMessages.end())
 | |
|     return;
 | |
| 
 | |
|   map_tok->second->queue.shutdown();
 | |
|   map_tok->second->queue.clear();
 | |
| }
 | |
| 
 | |
| void WEClients::read(uint32_t key, SBS& bs)
 | |
| {
 | |
|   boost::shared_ptr<MQE> mqe;
 | |
| 
 | |
|   // Find the StepMsgQueueList for this session
 | |
|   boost::mutex::scoped_lock lk(fMlock);
 | |
|   MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
 | |
| 
 | |
|   if (map_tok == fSessionMessages.end())
 | |
|   {
 | |
|     ostringstream os;
 | |
|     // cout << " reading for key " << key << " not found" << endl;
 | |
|     os << "WEClient: attempt to read(bs) from a nonexistent queue\n";
 | |
|     throw runtime_error(os.str());
 | |
|   }
 | |
| 
 | |
|   mqe = map_tok->second;
 | |
|   lk.unlock();
 | |
| 
 | |
|   // this method can block: you can't hold any locks here...
 | |
|   (void)mqe->queue.pop(&bs);
 | |
| 
 | |
|   if (!bs)
 | |
|     bs.reset(new ByteStream());
 | |
| }
 | |
| 
 | |
| void WEClients::write(const messageqcpp::ByteStream& msg, uint32_t connection)
 | |
| {
 | |
|   if (pmCount == 0)
 | |
|   {
 | |
|     ostringstream oss;
 | |
|     oss << "WECLIENT: There is no connection to WES and this = " << this;
 | |
|     writeToLog(__FILE__, __LINE__, oss.str(), LOG_TYPE_DEBUG);
 | |
|     throw runtime_error("There is no WriteEngineServer to send message to.");
 | |
|   }
 | |
| 
 | |
|   if (fPmConnections[connection] != 0)
 | |
|     fPmConnections[connection]->write(msg);
 | |
|   else
 | |
|   {
 | |
|     // new behavior: connection client is nullptr means it is read-only.
 | |
|     ostringstream os;
 | |
|     os << "Connection to readonly pm" << connection;
 | |
|     throw runtime_error(os.str());
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WEClients::write_to_all(const messageqcpp::ByteStream& msg)
 | |
| {
 | |
|   if (pmCount == 0)
 | |
|   {
 | |
|     ostringstream oss;
 | |
|     oss << "WECLIENT:  There is no connection to WES and this = " << this;
 | |
|     writeToLog(__FILE__, __LINE__, oss.str(), LOG_TYPE_DEBUG);
 | |
|     throw runtime_error("There is no WriteEngineServer to send message to.");
 | |
|   }
 | |
| 
 | |
|   ClientList::iterator itor = fPmConnections.begin();
 | |
|   while (itor != fPmConnections.end())
 | |
|   {
 | |
|     if (itor->second != NULL)
 | |
|     {
 | |
|       itor->second->write(msg);
 | |
|     }
 | |
| 
 | |
|     itor++;
 | |
|   }
 | |
| }
 | |
| 
 | |
| void WEClients::StartClientListener(boost::shared_ptr<MessageQueueClient> cl, uint32_t connIndex)
 | |
| {
 | |
|   boost::thread* thrd = new boost::thread(WEClientRunner(this, cl, connIndex));
 | |
|   fWESReader.push_back(thrd);
 | |
| }
 | |
| 
 | |
| void WEClients::addDataToOutput(SBS sbs, uint32_t connIndex)
 | |
| {
 | |
|   // ISMPacketHeader *hdr = (ISMPacketHeader*)(sbs->buf());
 | |
|   // PrimitiveHeader *p = (PrimitiveHeader *)(hdr+1);
 | |
|   // uint32_t uniqueId = p->UniqueID;
 | |
|   uint64_t uniqueId = 0;
 | |
|   *sbs >> uniqueId;
 | |
|   boost::shared_ptr<MQE> mqe;
 | |
| 
 | |
|   boost::mutex::scoped_lock lk(fMlock);
 | |
|   MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
 | |
| 
 | |
|   if (map_tok == fSessionMessages.end())
 | |
|   {
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   mqe = map_tok->second;
 | |
|   lk.unlock();
 | |
| 
 | |
|   if (pmCount > 0)
 | |
|   {
 | |
|     mqe->unackedWork[connIndex % pmCount].fetch_add(1, std::memory_order_relaxed);
 | |
|   }
 | |
| 
 | |
|   (void)mqe->queue.push(sbs);
 | |
| }
 | |
| 
 | |
| }  // namespace WriteEngine
 | |
| 
 |