You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			402 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			402 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /*********************************************************************
 | |
|  * $Id: clientrotator.cpp 9210 2013-01-21 14:10:42Z rdempsey $
 | |
|  *
 | |
|  *
 | |
|  ***********************************************************************/
 | |
| 
 | |
| #include <iostream>
 | |
| #include <iomanip>
 | |
| #include <sstream>
 | |
| #include <fstream>
 | |
| #include <cstring>
 | |
| #include <cassert>
 | |
| #include <stdexcept>
 | |
| #include <chrono>
 | |
| using namespace std;
 | |
| 
 | |
| #include <boost/thread.hpp>
 | |
| using namespace boost;
 | |
| 
 | |
| #include "configcpp.h"
 | |
| using namespace config;
 | |
| 
 | |
| #include "messagequeue.h"
 | |
| using namespace messageqcpp;
 | |
| 
 | |
| #include "messagelog.h"
 | |
| #include "messageobj.h"
 | |
| #include "loggingid.h"
 | |
| using namespace logging;
 | |
| 
 | |
| #include "clientrotator.h"
 | |
| 
 | |
| //#include "idb_mysql.h"
 | |
| 
 | |
| /** Debug macro */
 | |
| #ifdef INFINIDB_DEBUG
 | |
| #define IDEBUG(x) \
 | |
|   {               \
 | |
|     x;            \
 | |
|   }
 | |
| #else
 | |
| #define IDEBUG(x) \
 | |
|   {               \
 | |
|   }
 | |
| #endif
 | |
| 
 | |
| #define LOG_TO_CERR
 | |
| 
 | |
| namespace execplan
 | |
| {
 | |
| const string LOCAL_EXEMGR_IP = "127.0.0.1";
 | |
| const uint64_t LOCAL_EXEMGR_PORT = 8601;
 | |
| 
 | |
| string ClientRotator::getModule()
 | |
| {
 | |
|   // Log to debug.log
 | |
|   LoggingID logid(24, 0, 0);
 | |
| 
 | |
|   string fileName = "/var/lib/columnstore/local/module";
 | |
| 
 | |
|   string module;
 | |
|   ifstream moduleFile(fileName.c_str());
 | |
| 
 | |
|   if (moduleFile.is_open())
 | |
|   {
 | |
|     getline(moduleFile, module);
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     {
 | |
|       logging::Message::Args args1;
 | |
|       logging::Message msg(1);
 | |
|       std::ostringstream oss;
 | |
|       oss << "ClientRotator::getModule open status2 =" << strerror(errno);
 | |
|       args1.add(oss.str());
 | |
|       args1.add(fileName);
 | |
|       msg.format(args1);
 | |
|       Logger logger(logid.fSubsysID);
 | |
|       logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   moduleFile.close();
 | |
| 
 | |
|   return module;
 | |
| }
 | |
| 
 | |
| ostream& operator<<(ostream& output, const ClientRotator& rhs)
 | |
| {
 | |
|   output << __FILE__ << rhs.fName << "\t" << rhs.fSessionId << endl;
 | |
|   return output;
 | |
| }
 | |
| 
 | |
| ClientRotator::ClientRotator(uint32_t sid, const std::string& name, bool localQuery)
 | |
|  : fName(name)
 | |
|  , fSessionId(sid)
 | |
|  , fClient(0)
 | |
|  , fClients()
 | |
|  , fCf(Config::makeConfig())
 | |
|  , fDebug(0)
 | |
|  , fLocalQuery(localQuery)
 | |
| {
 | |
|   if (!fCf)
 | |
|     throw runtime_error((string)__FILE__ + ": No configuration file");
 | |
| 
 | |
|   fDebug = static_cast<int>(Config::fromText(fCf->getConfig("CalpontConnector", "DebugLevel")));
 | |
| }
 | |
| 
 | |
| void ClientRotator::loadClients()
 | |
| {
 | |
|   // This object is statically allocated somewhere. We need to reload the config file here
 | |
|   // to search the extproc env for changes made after libcalora.so is loaded.
 | |
|   fCf = Config::makeConfig();
 | |
| 
 | |
|   string pmWithUMStr = fCf->getConfig("Installation", "PMwithUM");
 | |
|   bool pmWithUM = (pmWithUMStr == "y" || pmWithUMStr == "Y");
 | |
| 
 | |
|   // check current module type
 | |
|   if (!fLocalQuery && pmWithUM)
 | |
|   {
 | |
|     string module = getModule();
 | |
| 
 | |
|     if (!module.empty() && (module[0] == 'P' || module[0] == 'p'))
 | |
|       fLocalQuery = true;
 | |
|   }
 | |
| 
 | |
|   // connect to loopback ExeMgr for local query set up
 | |
|   if (fLocalQuery)
 | |
|   {
 | |
|     fClient = new MessageQueueClient(LOCAL_EXEMGR_IP, LOCAL_EXEMGR_PORT);
 | |
|     return;
 | |
|   }
 | |
| 
 | |
|   stringstream ss(fName);
 | |
|   size_t pos = fName.length();
 | |
|   string str;
 | |
|   int i = 1;
 | |
| 
 | |
|   do
 | |
|   {
 | |
|     ss.seekp(pos);
 | |
|     ss << i++;
 | |
|     str = fCf->getConfig(ss.str(), "Port");
 | |
| 
 | |
|     if (str.length())
 | |
|     {
 | |
|       string moduleStr = fCf->getConfig(ss.str(), "Module");
 | |
| 
 | |
|       // "if the system is not running in a 'PM with UM' config, the module type is unspecified, or the
 | |
|       // module is specified as a UM, use it"
 | |
|       if (!pmWithUM || moduleStr.empty() || moduleStr[0] == 'u' || moduleStr[0] == 'U')
 | |
|         fClients.push_back(ss.str());
 | |
|     }
 | |
|   } while (str.length());
 | |
| 
 | |
|   if (fClients.empty())
 | |
|     throw runtime_error((string)__FILE__ + ": No configuration tags for " + fName + "\n");
 | |
| }
 | |
| 
 | |
| void ClientRotator::resetClient()
 | |
| {
 | |
|   try  // one more time...
 | |
|   {
 | |
|     delete fClient;
 | |
|     fClient = 0;
 | |
|     connectList();
 | |
|     // fClient->write(msg);
 | |
|   }
 | |
|   catch (std::exception& e)
 | |
|   {
 | |
|     /* Can't fail silently */
 | |
|     writeToLog(__LINE__, e.what(), true);
 | |
| #ifdef LOG_TO_CERR
 | |
|     cerr << "ClientRotator::write() failed: " << e.what() << endl;
 | |
| #endif
 | |
|     throw;
 | |
|   }
 | |
| }
 | |
| 
 | |
| void ClientRotator::write(const ByteStream& msg)
 | |
| {
 | |
|   if (!fClient)
 | |
|     connect();
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     fClient->write(msg);
 | |
|     return;
 | |
|   }
 | |
|   catch (std::exception& e)
 | |
|   {
 | |
|     resetClient();
 | |
|     string errmsg = "ClientRotator caught exception: " + string(e.what());
 | |
|     cout << errmsg << endl;
 | |
|     throw runtime_error(errmsg);
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     resetClient();
 | |
|     string errmsg = "ClientRotator caught unknown exception";
 | |
|     cout << errmsg << endl;
 | |
|     throw runtime_error(errmsg);
 | |
|   }
 | |
| }
 | |
| 
 | |
| ByteStream ClientRotator::read()
 | |
| {
 | |
|   boost::mutex::scoped_lock lk(fClientLock);
 | |
| 
 | |
|   ByteStream bs;
 | |
| 
 | |
|   if (!fClient)
 | |
|     connect();
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     bs = fClient->read();
 | |
|     return bs;
 | |
|   }
 | |
|   catch (std::exception& e)
 | |
|   {
 | |
|     resetClient();
 | |
|     string errmsg = "ClientRotator caught exception: " + string(e.what());
 | |
|     cout << errmsg << endl;
 | |
|     throw runtime_error(errmsg);
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     resetClient();
 | |
|     string errmsg = "ClientRotator caught unknown exception";
 | |
|     cout << errmsg << endl;
 | |
|     throw runtime_error(errmsg);
 | |
|   }
 | |
| 
 | |
| #if 0
 | |
| 
 | |
|     try //one more time...
 | |
|     {
 | |
|         delete fClient;
 | |
|         fClient = 0;
 | |
|         connectList();
 | |
|         bs = fClient->read();
 | |
|         return bs;
 | |
|     }
 | |
|     catch (std::exception& e)
 | |
|     {
 | |
|         /* Can't fail silently */
 | |
|         writeToLog(__LINE__, e.what(), true);
 | |
| #ifdef LOG_TO_CERR
 | |
|         cerr << "ClientRotator::read() failed: " << e.what() << endl;
 | |
| #endif
 | |
|         throw;
 | |
|     }
 | |
| 
 | |
| #endif
 | |
|   return bs;
 | |
| }
 | |
| 
 | |
| void ClientRotator::connect(double timeout)
 | |
| {
 | |
|   if (fClient)
 | |
|     return;
 | |
| 
 | |
|   if (fClients.empty())
 | |
|     loadClients();
 | |
| 
 | |
|   if (fClient)
 | |
|     return;
 | |
| 
 | |
|   size_t idx = fSessionId % fClients.size();
 | |
|   bool connected = false;
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     connected = exeConnect(fClients.at(idx));
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|   }
 | |
| 
 | |
|   if (!connected)
 | |
|   {
 | |
|     if (fLocalQuery)
 | |
|       loadClients();
 | |
|     else
 | |
|       connectList(timeout);
 | |
|   }
 | |
| }
 | |
| 
 | |
| bool ClientRotator::exeConnect(const string& clientName)
 | |
| {
 | |
|   fClient = new messageqcpp::MessageQueueClient(clientName, fCf);
 | |
| 
 | |
|   if (fDebug > 12)
 | |
|   {
 | |
|     stringstream ss;
 | |
|     ss << fSessionId;
 | |
| #ifdef LOG_TO_CERR
 | |
|     cerr << "Connecting to " << clientName << " with sessionId " << ss.str() << endl;
 | |
| #endif
 | |
|     writeToLog(__LINE__, "Connecting to  " + clientName + " with sessionId " + ss.str(), 0);
 | |
|   }
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     if (!fClient->connect())
 | |
|     {
 | |
|       delete fClient;
 | |
|       fClient = 0;
 | |
|       return false;
 | |
|     }
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     delete fClient;
 | |
|     fClient = 0;
 | |
|     return false;
 | |
|   }
 | |
| 
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| void ClientRotator::connectList(double timeout)
 | |
| {
 | |
|   if (fClient)
 | |
|     return;
 | |
| 
 | |
|   if (fLocalQuery || fClients.empty())
 | |
|     loadClients();
 | |
| 
 | |
|   if (fLocalQuery)
 | |
|     return;
 | |
| 
 | |
|   idbassert(!fClients.empty());
 | |
|   uint16_t idx = fSessionId % fClients.size();
 | |
| 
 | |
|   if (++idx >= fClients.size())
 | |
|     idx = 0;
 | |
| 
 | |
|   typedef std::chrono::steady_clock clock;
 | |
|   auto start = clock::now();
 | |
| 
 | |
|   typedef std::chrono::duration<double> double_secs;
 | |
|   while (std::chrono::duration_cast<double_secs>(clock::now() - start).count() < timeout)
 | |
|   {
 | |
|     try
 | |
|     {
 | |
|       if (exeConnect(fClients.at(idx++)))
 | |
|         return;
 | |
| 
 | |
|       if (fClients.size() == idx)
 | |
|         idx = 0;
 | |
|     }
 | |
|     catch (...)
 | |
|     {
 | |
|     }
 | |
|   }
 | |
| 
 | |
| #ifdef LOG_TO_CERR
 | |
|   cerr << "Could not get a " << fName << " connection.\n";
 | |
| #endif
 | |
|   writeToLog(__LINE__, "Could not get a " + fName + " connection.", 1);
 | |
|   throw runtime_error((string)__FILE__ + ": Could not get a connection to a " + fName);
 | |
| }
 | |
| 
 | |
| void ClientRotator::writeToLog(int line, const string& msg, bool critical) const
 | |
| {
 | |
|   LoggingID lid(05);
 | |
|   MessageLog ml(lid);
 | |
|   Message::Args args;
 | |
|   Message m(0);
 | |
|   args.add(__FILE__);
 | |
|   args.add("@");
 | |
|   args.add(line);
 | |
|   args.add(msg);
 | |
|   m.format(args);
 | |
| 
 | |
|   if (critical)
 | |
|     ml.logCriticalMessage(m);
 | |
|   else if (fDebug)
 | |
|     ml.logDebugMessage(m);
 | |
| }
 | |
| 
 | |
| }  // namespace execplan
 |