mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
402 lines
8.3 KiB
C++
402 lines
8.3 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/*********************************************************************
|
|
* $Id: clientrotator.cpp 9210 2013-01-21 14:10:42Z rdempsey $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
|
|
#include <iostream>
|
|
#include <iomanip>
|
|
#include <sstream>
|
|
#include <fstream>
|
|
#include <cstring>
|
|
#include <cassert>
|
|
#include <stdexcept>
|
|
#include <chrono>
|
|
using namespace std;
|
|
|
|
#include <boost/thread.hpp>
|
|
using namespace boost;
|
|
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
|
|
#include "messagequeue.h"
|
|
using namespace messageqcpp;
|
|
|
|
#include "messagelog.h"
|
|
#include "messageobj.h"
|
|
#include "loggingid.h"
|
|
using namespace logging;
|
|
|
|
#include "clientrotator.h"
|
|
|
|
//#include "idb_mysql.h"
|
|
|
|
/** Debug macro */
|
|
#ifdef INFINIDB_DEBUG
|
|
#define IDEBUG(x) \
|
|
{ \
|
|
x; \
|
|
}
|
|
#else
|
|
#define IDEBUG(x) \
|
|
{ \
|
|
}
|
|
#endif
|
|
|
|
#define LOG_TO_CERR
|
|
|
|
namespace execplan
|
|
{
|
|
const string LOCAL_EXEMGR_IP = "127.0.0.1";
|
|
const uint64_t LOCAL_EXEMGR_PORT = 8601;
|
|
|
|
string ClientRotator::getModule()
|
|
{
|
|
// Log to debug.log
|
|
LoggingID logid(24, 0, 0);
|
|
|
|
string fileName = "/var/lib/columnstore/local/module";
|
|
|
|
string module;
|
|
ifstream moduleFile(fileName.c_str());
|
|
|
|
if (moduleFile.is_open())
|
|
{
|
|
getline(moduleFile, module);
|
|
}
|
|
else
|
|
{
|
|
{
|
|
logging::Message::Args args1;
|
|
logging::Message msg(1);
|
|
std::ostringstream oss;
|
|
oss << "ClientRotator::getModule open status2 =" << strerror(errno);
|
|
args1.add(oss.str());
|
|
args1.add(fileName);
|
|
msg.format(args1);
|
|
Logger logger(logid.fSubsysID);
|
|
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
|
|
}
|
|
}
|
|
|
|
moduleFile.close();
|
|
|
|
return module;
|
|
}
|
|
|
|
ostream& operator<<(ostream& output, const ClientRotator& rhs)
|
|
{
|
|
output << __FILE__ << rhs.fName << "\t" << rhs.fSessionId << endl;
|
|
return output;
|
|
}
|
|
|
|
ClientRotator::ClientRotator(uint32_t sid, const std::string& name, bool localQuery)
|
|
: fName(name)
|
|
, fSessionId(sid)
|
|
, fClient(0)
|
|
, fClients()
|
|
, fCf(Config::makeConfig())
|
|
, fDebug(0)
|
|
, fLocalQuery(localQuery)
|
|
{
|
|
if (!fCf)
|
|
throw runtime_error((string)__FILE__ + ": No configuration file");
|
|
|
|
fDebug = static_cast<int>(Config::fromText(fCf->getConfig("CalpontConnector", "DebugLevel")));
|
|
}
|
|
|
|
void ClientRotator::loadClients()
|
|
{
|
|
// This object is statically allocated somewhere. We need to reload the config file here
|
|
// to search the extproc env for changes made after libcalora.so is loaded.
|
|
fCf = Config::makeConfig();
|
|
|
|
string pmWithUMStr = fCf->getConfig("Installation", "PMwithUM");
|
|
bool pmWithUM = (pmWithUMStr == "y" || pmWithUMStr == "Y");
|
|
|
|
// check current module type
|
|
if (!fLocalQuery && pmWithUM)
|
|
{
|
|
string module = getModule();
|
|
|
|
if (!module.empty() && (module[0] == 'P' || module[0] == 'p'))
|
|
fLocalQuery = true;
|
|
}
|
|
|
|
// connect to loopback ExeMgr for local query set up
|
|
if (fLocalQuery)
|
|
{
|
|
fClient = new MessageQueueClient(LOCAL_EXEMGR_IP, LOCAL_EXEMGR_PORT);
|
|
return;
|
|
}
|
|
|
|
stringstream ss(fName);
|
|
size_t pos = fName.length();
|
|
string str;
|
|
int i = 1;
|
|
|
|
do
|
|
{
|
|
ss.seekp(pos);
|
|
ss << i++;
|
|
str = fCf->getConfig(ss.str(), "Port");
|
|
|
|
if (str.length())
|
|
{
|
|
string moduleStr = fCf->getConfig(ss.str(), "Module");
|
|
|
|
// "if the system is not running in a 'PM with UM' config, the module type is unspecified, or the
|
|
// module is specified as a UM, use it"
|
|
if (!pmWithUM || moduleStr.empty() || moduleStr[0] == 'u' || moduleStr[0] == 'U')
|
|
fClients.push_back(ss.str());
|
|
}
|
|
} while (str.length());
|
|
|
|
if (fClients.empty())
|
|
throw runtime_error((string)__FILE__ + ": No configuration tags for " + fName + "\n");
|
|
}
|
|
|
|
void ClientRotator::resetClient()
|
|
{
|
|
try // one more time...
|
|
{
|
|
delete fClient;
|
|
fClient = 0;
|
|
connectList();
|
|
// fClient->write(msg);
|
|
}
|
|
catch (std::exception& e)
|
|
{
|
|
/* Can't fail silently */
|
|
writeToLog(__LINE__, e.what(), true);
|
|
#ifdef LOG_TO_CERR
|
|
cerr << "ClientRotator::write() failed: " << e.what() << endl;
|
|
#endif
|
|
throw;
|
|
}
|
|
}
|
|
|
|
void ClientRotator::write(const ByteStream& msg)
|
|
{
|
|
if (!fClient)
|
|
connect();
|
|
|
|
try
|
|
{
|
|
fClient->write(msg);
|
|
return;
|
|
}
|
|
catch (std::exception& e)
|
|
{
|
|
resetClient();
|
|
string errmsg = "ClientRotator caught exception: " + string(e.what());
|
|
cout << errmsg << endl;
|
|
throw runtime_error(errmsg);
|
|
}
|
|
catch (...)
|
|
{
|
|
resetClient();
|
|
string errmsg = "ClientRotator caught unknown exception";
|
|
cout << errmsg << endl;
|
|
throw runtime_error(errmsg);
|
|
}
|
|
}
|
|
|
|
ByteStream ClientRotator::read()
|
|
{
|
|
boost::mutex::scoped_lock lk(fClientLock);
|
|
|
|
ByteStream bs;
|
|
|
|
if (!fClient)
|
|
connect();
|
|
|
|
try
|
|
{
|
|
bs = fClient->read();
|
|
return bs;
|
|
}
|
|
catch (std::exception& e)
|
|
{
|
|
resetClient();
|
|
string errmsg = "ClientRotator caught exception: " + string(e.what());
|
|
cout << errmsg << endl;
|
|
throw runtime_error(errmsg);
|
|
}
|
|
catch (...)
|
|
{
|
|
resetClient();
|
|
string errmsg = "ClientRotator caught unknown exception";
|
|
cout << errmsg << endl;
|
|
throw runtime_error(errmsg);
|
|
}
|
|
|
|
#if 0
|
|
|
|
try //one more time...
|
|
{
|
|
delete fClient;
|
|
fClient = 0;
|
|
connectList();
|
|
bs = fClient->read();
|
|
return bs;
|
|
}
|
|
catch (std::exception& e)
|
|
{
|
|
/* Can't fail silently */
|
|
writeToLog(__LINE__, e.what(), true);
|
|
#ifdef LOG_TO_CERR
|
|
cerr << "ClientRotator::read() failed: " << e.what() << endl;
|
|
#endif
|
|
throw;
|
|
}
|
|
|
|
#endif
|
|
return bs;
|
|
}
|
|
|
|
void ClientRotator::connect(double timeout)
|
|
{
|
|
if (fClient)
|
|
return;
|
|
|
|
if (fClients.empty())
|
|
loadClients();
|
|
|
|
if (fClient)
|
|
return;
|
|
|
|
size_t idx = fSessionId % fClients.size();
|
|
bool connected = false;
|
|
|
|
try
|
|
{
|
|
connected = exeConnect(fClients.at(idx));
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
if (!connected)
|
|
{
|
|
if (fLocalQuery)
|
|
loadClients();
|
|
else
|
|
connectList(timeout);
|
|
}
|
|
}
|
|
|
|
bool ClientRotator::exeConnect(const string& clientName)
|
|
{
|
|
fClient = new messageqcpp::MessageQueueClient(clientName, fCf);
|
|
|
|
if (fDebug > 12)
|
|
{
|
|
stringstream ss;
|
|
ss << fSessionId;
|
|
#ifdef LOG_TO_CERR
|
|
cerr << "Connecting to " << clientName << " with sessionId " << ss.str() << endl;
|
|
#endif
|
|
writeToLog(__LINE__, "Connecting to " + clientName + " with sessionId " + ss.str(), 0);
|
|
}
|
|
|
|
try
|
|
{
|
|
if (!fClient->connect())
|
|
{
|
|
delete fClient;
|
|
fClient = 0;
|
|
return false;
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
delete fClient;
|
|
fClient = 0;
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void ClientRotator::connectList(double timeout)
|
|
{
|
|
if (fClient)
|
|
return;
|
|
|
|
if (fLocalQuery || fClients.empty())
|
|
loadClients();
|
|
|
|
if (fLocalQuery)
|
|
return;
|
|
|
|
idbassert(!fClients.empty());
|
|
uint16_t idx = fSessionId % fClients.size();
|
|
|
|
if (++idx >= fClients.size())
|
|
idx = 0;
|
|
|
|
typedef std::chrono::steady_clock clock;
|
|
auto start = clock::now();
|
|
|
|
typedef std::chrono::duration<double> double_secs;
|
|
while (std::chrono::duration_cast<double_secs>(clock::now() - start).count() < timeout)
|
|
{
|
|
try
|
|
{
|
|
if (exeConnect(fClients.at(idx++)))
|
|
return;
|
|
|
|
if (fClients.size() == idx)
|
|
idx = 0;
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
|
|
#ifdef LOG_TO_CERR
|
|
cerr << "Could not get a " << fName << " connection.\n";
|
|
#endif
|
|
writeToLog(__LINE__, "Could not get a " + fName + " connection.", 1);
|
|
throw runtime_error((string)__FILE__ + ": Could not get a connection to a " + fName);
|
|
}
|
|
|
|
void ClientRotator::writeToLog(int line, const string& msg, bool critical) const
|
|
{
|
|
LoggingID lid(05);
|
|
MessageLog ml(lid);
|
|
Message::Args args;
|
|
Message m(0);
|
|
args.add(__FILE__);
|
|
args.add("@");
|
|
args.add(line);
|
|
args.add(msg);
|
|
m.format(args);
|
|
|
|
if (critical)
|
|
ml.logCriticalMessage(m);
|
|
else if (fDebug)
|
|
ml.logDebugMessage(m);
|
|
}
|
|
|
|
} // namespace execplan
|