1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Roman Nozdrin 4fe9cd64a3
Revert "No boost condition (#2822)" (#2828)
This reverts commit f916e64927cd81569327014f20c4cc0b8aca40ff.
2023-04-22 15:49:50 +03:00

402 lines
8.3 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*********************************************************************
* $Id: clientrotator.cpp 9210 2013-01-21 14:10:42Z rdempsey $
*
*
***********************************************************************/
#include <iostream>
#include <iomanip>
#include <sstream>
#include <fstream>
#include <cstring>
#include <cassert>
#include <stdexcept>
#include <chrono>
using namespace std;
#include <boost/thread.hpp>
using namespace boost;
#include "configcpp.h"
using namespace config;
#include "messagequeue.h"
using namespace messageqcpp;
#include "messagelog.h"
#include "messageobj.h"
#include "loggingid.h"
using namespace logging;
#include "clientrotator.h"
//#include "idb_mysql.h"
/** Debug macro */
#ifdef INFINIDB_DEBUG
#define IDEBUG(x) \
{ \
x; \
}
#else
#define IDEBUG(x) \
{ \
}
#endif
#define LOG_TO_CERR
namespace execplan
{
const string LOCAL_EXEMGR_IP = "127.0.0.1";
const uint64_t LOCAL_EXEMGR_PORT = 8601;
string ClientRotator::getModule()
{
// Log to debug.log
LoggingID logid(24, 0, 0);
string fileName = "/var/lib/columnstore/local/module";
string module;
ifstream moduleFile(fileName.c_str());
if (moduleFile.is_open())
{
getline(moduleFile, module);
}
else
{
{
logging::Message::Args args1;
logging::Message msg(1);
std::ostringstream oss;
oss << "ClientRotator::getModule open status2 =" << strerror(errno);
args1.add(oss.str());
args1.add(fileName);
msg.format(args1);
Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
}
}
moduleFile.close();
return module;
}
ostream& operator<<(ostream& output, const ClientRotator& rhs)
{
output << __FILE__ << rhs.fName << "\t" << rhs.fSessionId << endl;
return output;
}
ClientRotator::ClientRotator(uint32_t sid, const std::string& name, bool localQuery)
: fName(name)
, fSessionId(sid)
, fClient(0)
, fClients()
, fCf(Config::makeConfig())
, fDebug(0)
, fLocalQuery(localQuery)
{
if (!fCf)
throw runtime_error((string)__FILE__ + ": No configuration file");
fDebug = static_cast<int>(Config::fromText(fCf->getConfig("CalpontConnector", "DebugLevel")));
}
void ClientRotator::loadClients()
{
// This object is statically allocated somewhere. We need to reload the config file here
// to search the extproc env for changes made after libcalora.so is loaded.
fCf = Config::makeConfig();
string pmWithUMStr = fCf->getConfig("Installation", "PMwithUM");
bool pmWithUM = (pmWithUMStr == "y" || pmWithUMStr == "Y");
// check current module type
if (!fLocalQuery && pmWithUM)
{
string module = getModule();
if (!module.empty() && (module[0] == 'P' || module[0] == 'p'))
fLocalQuery = true;
}
// connect to loopback ExeMgr for local query set up
if (fLocalQuery)
{
fClient = new MessageQueueClient(LOCAL_EXEMGR_IP, LOCAL_EXEMGR_PORT);
return;
}
stringstream ss(fName);
size_t pos = fName.length();
string str;
int i = 1;
do
{
ss.seekp(pos);
ss << i++;
str = fCf->getConfig(ss.str(), "Port");
if (str.length())
{
string moduleStr = fCf->getConfig(ss.str(), "Module");
// "if the system is not running in a 'PM with UM' config, the module type is unspecified, or the
// module is specified as a UM, use it"
if (!pmWithUM || moduleStr.empty() || moduleStr[0] == 'u' || moduleStr[0] == 'U')
fClients.push_back(ss.str());
}
} while (str.length());
if (fClients.empty())
throw runtime_error((string)__FILE__ + ": No configuration tags for " + fName + "\n");
}
void ClientRotator::resetClient()
{
try // one more time...
{
delete fClient;
fClient = 0;
connectList();
// fClient->write(msg);
}
catch (std::exception& e)
{
/* Can't fail silently */
writeToLog(__LINE__, e.what(), true);
#ifdef LOG_TO_CERR
cerr << "ClientRotator::write() failed: " << e.what() << endl;
#endif
throw;
}
}
void ClientRotator::write(const ByteStream& msg)
{
if (!fClient)
connect();
try
{
fClient->write(msg);
return;
}
catch (std::exception& e)
{
resetClient();
string errmsg = "ClientRotator caught exception: " + string(e.what());
cout << errmsg << endl;
throw runtime_error(errmsg);
}
catch (...)
{
resetClient();
string errmsg = "ClientRotator caught unknown exception";
cout << errmsg << endl;
throw runtime_error(errmsg);
}
}
ByteStream ClientRotator::read()
{
boost::mutex::scoped_lock lk(fClientLock);
ByteStream bs;
if (!fClient)
connect();
try
{
bs = fClient->read();
return bs;
}
catch (std::exception& e)
{
resetClient();
string errmsg = "ClientRotator caught exception: " + string(e.what());
cout << errmsg << endl;
throw runtime_error(errmsg);
}
catch (...)
{
resetClient();
string errmsg = "ClientRotator caught unknown exception";
cout << errmsg << endl;
throw runtime_error(errmsg);
}
#if 0
try //one more time...
{
delete fClient;
fClient = 0;
connectList();
bs = fClient->read();
return bs;
}
catch (std::exception& e)
{
/* Can't fail silently */
writeToLog(__LINE__, e.what(), true);
#ifdef LOG_TO_CERR
cerr << "ClientRotator::read() failed: " << e.what() << endl;
#endif
throw;
}
#endif
return bs;
}
void ClientRotator::connect(double timeout)
{
if (fClient)
return;
if (fClients.empty())
loadClients();
if (fClient)
return;
size_t idx = fSessionId % fClients.size();
bool connected = false;
try
{
connected = exeConnect(fClients.at(idx));
}
catch (...)
{
}
if (!connected)
{
if (fLocalQuery)
loadClients();
else
connectList(timeout);
}
}
bool ClientRotator::exeConnect(const string& clientName)
{
fClient = new messageqcpp::MessageQueueClient(clientName, fCf);
if (fDebug > 12)
{
stringstream ss;
ss << fSessionId;
#ifdef LOG_TO_CERR
cerr << "Connecting to " << clientName << " with sessionId " << ss.str() << endl;
#endif
writeToLog(__LINE__, "Connecting to " + clientName + " with sessionId " + ss.str(), 0);
}
try
{
if (!fClient->connect())
{
delete fClient;
fClient = 0;
return false;
}
}
catch (...)
{
delete fClient;
fClient = 0;
return false;
}
return true;
}
void ClientRotator::connectList(double timeout)
{
if (fClient)
return;
if (fLocalQuery || fClients.empty())
loadClients();
if (fLocalQuery)
return;
idbassert(!fClients.empty());
uint16_t idx = fSessionId % fClients.size();
if (++idx >= fClients.size())
idx = 0;
typedef std::chrono::steady_clock clock;
auto start = clock::now();
typedef std::chrono::duration<double> double_secs;
while (std::chrono::duration_cast<double_secs>(clock::now() - start).count() < timeout)
{
try
{
if (exeConnect(fClients.at(idx++)))
return;
if (fClients.size() == idx)
idx = 0;
}
catch (...)
{
}
}
#ifdef LOG_TO_CERR
cerr << "Could not get a " << fName << " connection.\n";
#endif
writeToLog(__LINE__, "Could not get a " + fName + " connection.", 1);
throw runtime_error((string)__FILE__ + ": Could not get a connection to a " + fName);
}
void ClientRotator::writeToLog(int line, const string& msg, bool critical) const
{
LoggingID lid(05);
MessageLog ml(lid);
Message::Args args;
Message m(0);
args.add(__FILE__);
args.add("@");
args.add(line);
args.add(msg);
m.format(args);
if (critical)
ml.logCriticalMessage(m);
else if (fDebug)
ml.logDebugMessage(m);
}
} // namespace execplan