You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			394 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			394 lines
		
	
	
		
			8.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
/*********************************************************************
 | 
						|
* $Id: clientrotator.cpp 9210 2013-01-21 14:10:42Z rdempsey $
 | 
						|
*
 | 
						|
*
 | 
						|
***********************************************************************/
 | 
						|
 | 
						|
#include <iostream>
 | 
						|
#include <iomanip>
 | 
						|
#include <sstream>
 | 
						|
#include <fstream>
 | 
						|
#include <cstring>
 | 
						|
#include <cassert>
 | 
						|
#include <stdexcept>
 | 
						|
using namespace std;
 | 
						|
 | 
						|
#include <boost/timer.hpp>
 | 
						|
#include <boost/thread.hpp>
 | 
						|
using namespace boost;
 | 
						|
 | 
						|
#include "configcpp.h"
 | 
						|
using namespace config;
 | 
						|
 | 
						|
#include "messagequeue.h"
 | 
						|
using namespace messageqcpp;
 | 
						|
 | 
						|
#include "messagelog.h"
 | 
						|
#include "messageobj.h"
 | 
						|
#include "loggingid.h"
 | 
						|
using namespace logging;
 | 
						|
 | 
						|
#include "installdir.h"
 | 
						|
 | 
						|
#include "clientrotator.h"
 | 
						|
 | 
						|
//#include "idb_mysql.h"
 | 
						|
 | 
						|
/** Debug macro */
 | 
						|
#ifdef INFINIDB_DEBUG
 | 
						|
#define IDEBUG(x) {x;}
 | 
						|
#else
 | 
						|
#define IDEBUG(x) {}
 | 
						|
#endif
 | 
						|
 | 
						|
#define LOG_TO_CERR
 | 
						|
 | 
						|
namespace execplan
 | 
						|
{
 | 
						|
 | 
						|
const string LOCAL_EXEMGR_IP = "127.0.0.1";
 | 
						|
const uint64_t LOCAL_EXEMGR_PORT = 8601;
 | 
						|
 | 
						|
string ClientRotator::getModule()
 | 
						|
{
 | 
						|
    string installDir = startup::StartUp::installDir();
 | 
						|
	
 | 
						|
	//Log to debug.log
 | 
						|
	LoggingID logid( 24, 0, 0);
 | 
						|
 | 
						|
    string fileName = installDir + "/local/module";
 | 
						|
 | 
						|
    string module;
 | 
						|
    ifstream moduleFile (fileName.c_str());
 | 
						|
 | 
						|
    if (moduleFile.is_open())
 | 
						|
	{
 | 
						|
        getline (moduleFile, module);
 | 
						|
	}
 | 
						|
	else
 | 
						|
	{
 | 
						|
        {
 | 
						|
            logging::Message::Args args1;
 | 
						|
            logging::Message msg(1);
 | 
						|
            std::ostringstream oss;
 | 
						|
            oss << "ClientRotator::getModule open status2 =" << strerror(errno);
 | 
						|
            args1.add(oss.str());
 | 
						|
            args1.add(fileName);
 | 
						|
            msg.format( args1 );
 | 
						|
            Logger logger(logid.fSubsysID);
 | 
						|
            logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
 | 
						|
        }
 | 
						|
	}
 | 
						|
 | 
						|
    moduleFile.close();
 | 
						|
 | 
						|
    return module;
 | 
						|
}
 | 
						|
 | 
						|
ostream& operator<<(ostream& output, const ClientRotator& rhs)
 | 
						|
{
 | 
						|
    output << __FILE__ << rhs.fName << "\t" << rhs.fSessionId << endl;
 | 
						|
    return output;
 | 
						|
}
 | 
						|
 | 
						|
ClientRotator::ClientRotator(uint32_t sid, const std::string& name, bool localQuery) :
 | 
						|
    fName(name), fSessionId(sid), fClient(0), fClients(), fCf(Config::makeConfig()),
 | 
						|
    fDebug(0), fLocalQuery(localQuery)
 | 
						|
{
 | 
						|
    if (! fCf)
 | 
						|
        throw runtime_error((string)__FILE__ + ": No configuration file");
 | 
						|
 | 
						|
    fDebug = static_cast<int>(Config::fromText(fCf->getConfig("CalpontConnector", "DebugLevel")));
 | 
						|
}
 | 
						|
 | 
						|
void ClientRotator::loadClients()
 | 
						|
{
 | 
						|
    //This object is statically allocated somewhere. We need to reload the config file here
 | 
						|
    // to search the extproc env for changes made after libcalora.so is loaded.
 | 
						|
    fCf = Config::makeConfig();
 | 
						|
 | 
						|
    string pmWithUMStr = fCf->getConfig("Installation", "PMwithUM");
 | 
						|
    bool pmWithUM = (pmWithUMStr == "y" || pmWithUMStr == "Y");
 | 
						|
 | 
						|
    // check current module type
 | 
						|
    if (!fLocalQuery && pmWithUM)
 | 
						|
    {
 | 
						|
        string module = getModule();
 | 
						|
 | 
						|
        if (!module.empty() && (module[0] == 'P' || module[0] == 'p'))
 | 
						|
            fLocalQuery = true;
 | 
						|
    }
 | 
						|
 | 
						|
    // connect to loopback ExeMgr for local query set up
 | 
						|
    if (fLocalQuery)
 | 
						|
    {
 | 
						|
        fClient = new MessageQueueClient(LOCAL_EXEMGR_IP, LOCAL_EXEMGR_PORT);
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    stringstream ss(fName);
 | 
						|
    size_t pos = fName.length();
 | 
						|
    string str;
 | 
						|
    int i = 1;
 | 
						|
 | 
						|
    do
 | 
						|
    {
 | 
						|
        ss.seekp(pos);
 | 
						|
        ss << i++;
 | 
						|
        str = fCf->getConfig(ss.str(), "Port");
 | 
						|
 | 
						|
        if (str.length() )
 | 
						|
        {
 | 
						|
            string moduleStr = fCf->getConfig(ss.str(), "Module");
 | 
						|
 | 
						|
            // "if the system is not running in a 'PM with UM' config, the module type is unspecified, or the
 | 
						|
            // module is specified as a UM, use it"
 | 
						|
            if (!pmWithUM || moduleStr.empty() || moduleStr[0] == 'u' || moduleStr[0] == 'U')
 | 
						|
                fClients.push_back(ss.str());
 | 
						|
        }
 | 
						|
    }
 | 
						|
    while ( str.length() );
 | 
						|
 | 
						|
    if (fClients.empty())
 | 
						|
        throw runtime_error((string)__FILE__ + ": No configuration tags for " + fName + "\n");
 | 
						|
}
 | 
						|
 | 
						|
void ClientRotator::resetClient()
 | 
						|
{
 | 
						|
    try //one more time...
 | 
						|
    {
 | 
						|
        delete fClient;
 | 
						|
        fClient = 0;
 | 
						|
        connectList();
 | 
						|
        //fClient->write(msg);
 | 
						|
    }
 | 
						|
    catch (std::exception& e)
 | 
						|
    {
 | 
						|
        /* Can't fail silently */
 | 
						|
        writeToLog(__LINE__, e.what(), true);
 | 
						|
#ifdef LOG_TO_CERR
 | 
						|
        cerr << "ClientRotator::write() failed: " << e.what() << endl;
 | 
						|
#endif
 | 
						|
        throw;
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
void ClientRotator::write(const ByteStream& msg)
 | 
						|
{
 | 
						|
    if (!fClient)
 | 
						|
        connect();
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
        fClient->write(msg);
 | 
						|
        return;
 | 
						|
    }
 | 
						|
    catch (std::exception& e)
 | 
						|
    {
 | 
						|
        resetClient();
 | 
						|
        string errmsg = "ClientRotator caught exception: " + string(e.what());
 | 
						|
        cout << errmsg << endl;
 | 
						|
        throw runtime_error(errmsg);
 | 
						|
    }
 | 
						|
    catch (...)
 | 
						|
    {
 | 
						|
        resetClient();
 | 
						|
        string errmsg = "ClientRotator caught unknown exception";
 | 
						|
        cout << errmsg << endl;
 | 
						|
        throw runtime_error(errmsg);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
ByteStream ClientRotator::read()
 | 
						|
{
 | 
						|
    mutex::scoped_lock lk(fClientLock);
 | 
						|
 | 
						|
    ByteStream bs;
 | 
						|
 | 
						|
    if (!fClient)
 | 
						|
        connect();
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
        bs = fClient->read();
 | 
						|
        return bs;
 | 
						|
    }
 | 
						|
    catch (std::exception& e)
 | 
						|
    {
 | 
						|
        resetClient();
 | 
						|
        string errmsg = "ClientRotator caught exception: " + string(e.what());
 | 
						|
        cout << errmsg << endl;
 | 
						|
        throw runtime_error(errmsg);
 | 
						|
    }
 | 
						|
    catch (...)
 | 
						|
    {
 | 
						|
        resetClient();
 | 
						|
        string errmsg = "ClientRotator caught unknown exception";
 | 
						|
        cout << errmsg << endl;
 | 
						|
        throw runtime_error(errmsg);
 | 
						|
    }
 | 
						|
 | 
						|
#if 0
 | 
						|
 | 
						|
    try //one more time...
 | 
						|
    {
 | 
						|
        delete fClient;
 | 
						|
        fClient = 0;
 | 
						|
        connectList();
 | 
						|
        bs = fClient->read();
 | 
						|
        return bs;
 | 
						|
    }
 | 
						|
    catch (std::exception& e)
 | 
						|
    {
 | 
						|
        /* Can't fail silently */
 | 
						|
        writeToLog(__LINE__, e.what(), true);
 | 
						|
#ifdef LOG_TO_CERR
 | 
						|
        cerr << "ClientRotator::read() failed: " << e.what() << endl;
 | 
						|
#endif
 | 
						|
        throw;
 | 
						|
    }
 | 
						|
 | 
						|
#endif
 | 
						|
    return bs;
 | 
						|
}
 | 
						|
 | 
						|
void ClientRotator::connect(double timeout)
 | 
						|
{
 | 
						|
    if (fClient) return;
 | 
						|
 | 
						|
    if (fClients.empty())
 | 
						|
        loadClients();
 | 
						|
 | 
						|
    if (fClient) return;
 | 
						|
 | 
						|
    size_t idx = fSessionId % fClients.size();
 | 
						|
    bool connected = false;
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
        connected = exeConnect(fClients.at(idx));
 | 
						|
    }
 | 
						|
    catch (... )
 | 
						|
    {
 | 
						|
    }
 | 
						|
 | 
						|
    if (!connected)
 | 
						|
    {
 | 
						|
        if (fLocalQuery)
 | 
						|
            loadClients();
 | 
						|
        else
 | 
						|
            connectList(timeout);
 | 
						|
    }
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
bool ClientRotator::exeConnect( const string& clientName )
 | 
						|
{
 | 
						|
    fClient = new messageqcpp::MessageQueueClient( clientName, fCf);
 | 
						|
 | 
						|
    if (fDebug > 12)
 | 
						|
    {
 | 
						|
        stringstream ss;
 | 
						|
        ss << fSessionId;
 | 
						|
#ifdef LOG_TO_CERR
 | 
						|
        cerr << "Connecting to " << clientName << " with sessionId " << ss.str() << endl;
 | 
						|
#endif
 | 
						|
        writeToLog( __LINE__, "Connecting to  " + clientName + " with sessionId " + ss.str(), 0);
 | 
						|
    }
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
        if (!fClient->connect())
 | 
						|
        {
 | 
						|
            delete fClient;
 | 
						|
            fClient = 0;
 | 
						|
            return false;
 | 
						|
        }
 | 
						|
    }
 | 
						|
    catch (...)
 | 
						|
    {
 | 
						|
        delete fClient;
 | 
						|
        fClient = 0;
 | 
						|
        return false;
 | 
						|
    }
 | 
						|
 | 
						|
    return true;
 | 
						|
}
 | 
						|
 | 
						|
void ClientRotator::connectList(double timeout)
 | 
						|
{
 | 
						|
    if (fClient) return;
 | 
						|
 | 
						|
    if (fLocalQuery || fClients.empty())
 | 
						|
        loadClients();
 | 
						|
 | 
						|
    if (fLocalQuery) return;
 | 
						|
 | 
						|
    idbassert(!fClients.empty());
 | 
						|
    uint16_t idx = fSessionId % fClients.size();
 | 
						|
 | 
						|
    if (++idx >= fClients.size() )
 | 
						|
        idx = 0;
 | 
						|
 | 
						|
    timer runTime;
 | 
						|
 | 
						|
    while ( runTime.elapsed() < timeout)
 | 
						|
    {
 | 
						|
        try
 | 
						|
        {
 | 
						|
            if (exeConnect(fClients.at(idx++)))
 | 
						|
                return;
 | 
						|
 | 
						|
            if (fClients.size() == idx)
 | 
						|
                idx = 0;
 | 
						|
        }
 | 
						|
        catch (... )
 | 
						|
        {		}
 | 
						|
    }
 | 
						|
 | 
						|
#ifdef LOG_TO_CERR
 | 
						|
    cerr << "Could not get a " << fName << " connection.\n";
 | 
						|
#endif
 | 
						|
    writeToLog(__LINE__, "Could not get a " + fName + " connection.", 1);
 | 
						|
    throw runtime_error((string)__FILE__ + ": Could not get a connection to a " + fName);
 | 
						|
}
 | 
						|
 | 
						|
void ClientRotator::writeToLog(int line, const string& msg, bool critical) const
 | 
						|
{
 | 
						|
    LoggingID lid(05);
 | 
						|
    MessageLog ml(lid);
 | 
						|
    Message::Args args;
 | 
						|
    Message m(0);
 | 
						|
    args.add(__FILE__);
 | 
						|
    args.add("@");
 | 
						|
    args.add(line);
 | 
						|
    args.add(msg);
 | 
						|
    m.format(args);
 | 
						|
 | 
						|
    if (critical)
 | 
						|
        ml.logCriticalMessage(m);
 | 
						|
    else if (fDebug)
 | 
						|
        ml.logDebugMessage(m);
 | 
						|
}
 | 
						|
 | 
						|
}
 | 
						|
// vim:ts=4 sw=4:
 | 
						|
 |