mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
545 lines
14 KiB
C++
545 lines
14 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
* Copyright (C) 2016 MariaDB Corporation.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: weclients.h 525 2010-01-19 23:18:05Z xlou $
|
|
//
|
|
/** @file */
|
|
|
|
#include <sstream>
|
|
#include <stdexcept>
|
|
#include <cassert>
|
|
#include <ctime>
|
|
#include <algorithm>
|
|
#include <unistd.h>
|
|
#include <arpa/inet.h>
|
|
#if __FreeBSD__
|
|
#include <sys/socket.h>
|
|
#endif
|
|
using namespace std;
|
|
|
|
#include <boost/thread/mutex.hpp>
|
|
using namespace boost;
|
|
|
|
#include "messagequeue.h"
|
|
#include "bytestream.h"
|
|
using namespace messageqcpp;
|
|
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
|
|
#include "errorids.h"
|
|
#include "exceptclasses.h"
|
|
#include "messagelog.h"
|
|
#include "messageobj.h"
|
|
#include "loggingid.h"
|
|
using namespace logging;
|
|
|
|
#include "liboamcpp.h"
|
|
using namespace oam;
|
|
|
|
#include "we_clients.h"
|
|
#include "we_messages.h"
|
|
using namespace WriteEngine;
|
|
|
|
#include "atomicops.h"
|
|
|
|
namespace
|
|
{
|
|
void writeToLog(const char* file, int line, const string& msg, LOG_TYPE logto = LOG_TYPE_INFO)
|
|
{
|
|
LoggingID lid(05);
|
|
MessageLog ml(lid);
|
|
Message::Args args;
|
|
Message m(0);
|
|
args.add(file);
|
|
args.add("@");
|
|
args.add(line);
|
|
args.add(msg);
|
|
m.format(args);
|
|
|
|
switch (logto)
|
|
{
|
|
case LOG_TYPE_DEBUG: ml.logDebugMessage(m); break;
|
|
|
|
case LOG_TYPE_INFO: ml.logInfoMessage(m); break;
|
|
|
|
case LOG_TYPE_WARNING: ml.logWarningMessage(m); break;
|
|
|
|
case LOG_TYPE_ERROR: ml.logErrorMessage(m); break;
|
|
|
|
case LOG_TYPE_CRITICAL: ml.logCriticalMessage(m); break;
|
|
}
|
|
}
|
|
|
|
string getModuleNameByIPAddr(oam::ModuleTypeConfig moduletypeconfig, string ipAddress)
|
|
{
|
|
string modulename = "pm1";
|
|
DeviceNetworkList::iterator pt = moduletypeconfig.ModuleNetworkList.begin();
|
|
|
|
for (; pt != moduletypeconfig.ModuleNetworkList.end(); pt++)
|
|
{
|
|
modulename = (*pt).DeviceName;
|
|
HostConfigList::iterator pt1 = (*pt).hostConfigList.begin();
|
|
|
|
for (; pt1 != (*pt).hostConfigList.end(); pt1++)
|
|
{
|
|
if (ipAddress == (*pt1).IPAddr)
|
|
return modulename;
|
|
}
|
|
}
|
|
|
|
return modulename;
|
|
}
|
|
|
|
struct WEClientRunner
|
|
{
|
|
WEClientRunner(WriteEngine::WEClients* jl, boost::shared_ptr<MessageQueueClient> cl,
|
|
uint32_t connectionIndex)
|
|
: jbl(jl), client(cl), connIndex(connectionIndex)
|
|
{
|
|
}
|
|
WriteEngine::WEClients* jbl;
|
|
boost::shared_ptr<MessageQueueClient> client;
|
|
uint32_t connIndex;
|
|
void operator()()
|
|
{
|
|
// cout << "Listening on client at 0x" << hex << (ptrdiff_t)client << dec << endl;
|
|
try
|
|
{
|
|
jbl->Listen(client, connIndex);
|
|
// cout << "Listening connIndex " << connIndex << endl;
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
string what(ex.what());
|
|
cerr << "exception caught in WEClient: " << what << endl;
|
|
|
|
if (what.find("St9bad_alloc") != string::npos)
|
|
{
|
|
writeToLog(__FILE__, __LINE__, what, LOG_TYPE_CRITICAL);
|
|
// abort();
|
|
}
|
|
else
|
|
writeToLog(__FILE__, __LINE__, what);
|
|
}
|
|
catch (...)
|
|
{
|
|
string msg("exception caught in WEClientRunner.");
|
|
writeToLog(__FILE__, __LINE__, msg);
|
|
cerr << msg << endl;
|
|
}
|
|
}
|
|
};
|
|
|
|
template <typename T>
|
|
struct QueueShutdown
|
|
{
|
|
void operator()(T& x)
|
|
{
|
|
x.shutdown();
|
|
}
|
|
};
|
|
} // namespace
|
|
|
|
namespace WriteEngine
|
|
{
|
|
WEClients::WEClients(int PrgmID) : fPrgmID(PrgmID), pmCount(0)
|
|
{
|
|
closingConnection = 0;
|
|
Setup();
|
|
}
|
|
|
|
WEClients::~WEClients()
|
|
{
|
|
Close();
|
|
}
|
|
|
|
void WEClients::Setup()
|
|
{
|
|
makeBusy(true);
|
|
joblist::ResourceManager* rm = joblist::ResourceManager::instance();
|
|
oam::Oam oam;
|
|
string ipAddress;
|
|
ModuleTypeConfig moduletypeconfig;
|
|
|
|
try
|
|
{
|
|
oam.getSystemConfig("pm", moduletypeconfig);
|
|
}
|
|
catch (...)
|
|
{
|
|
writeToLog(__FILE__, __LINE__, "oam.getSystemConfig error, unknown exception", LOG_TYPE_ERROR);
|
|
throw runtime_error("Setup failed");
|
|
}
|
|
|
|
uint32_t pmCountConfig = moduletypeconfig.ModuleCount;
|
|
pmCount = 0;
|
|
int moduleID = 1;
|
|
|
|
char buff[32];
|
|
ByteStream bs, bsRead;
|
|
|
|
if (fPrgmID == DDLPROC)
|
|
{
|
|
bs << (ByteStream::byte)WE_SVR_DDL_KEEPALIVE;
|
|
bs << (ByteStream::octbyte)moduleID;
|
|
}
|
|
else if (fPrgmID == DMLPROC)
|
|
{
|
|
bs << (ByteStream::byte)WE_SVR_DML_KEEPALIVE;
|
|
bs << (ByteStream::octbyte)moduleID;
|
|
}
|
|
else if (fPrgmID == SPLITTER)
|
|
{
|
|
bs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE;
|
|
}
|
|
else if (fPrgmID == BATCHINSERTPROC)
|
|
{
|
|
bs << (ByteStream::byte)WE_SVR_BATCH_KEEPALIVE;
|
|
bs << (ByteStream::octbyte)moduleID;
|
|
}
|
|
|
|
for (unsigned i = 0; i < pmCountConfig; i++)
|
|
{
|
|
// Find the module id
|
|
moduleID = atoi((moduletypeconfig.ModuleNetworkList[i])
|
|
.DeviceName.substr(MAX_MODULE_TYPE_SIZE, MAX_MODULE_ID_SIZE)
|
|
.c_str());
|
|
// cout << "setting connection to moduleid " << moduleID << endl;
|
|
snprintf(buff, sizeof(buff), "pm%u_WriteEngineServer", moduleID);
|
|
string fServer(buff);
|
|
|
|
boost::shared_ptr<MessageQueueClient> cl(new MessageQueueClient(fServer, rm->getConfig()));
|
|
boost::shared_ptr<boost::mutex> nl(new boost::mutex());
|
|
|
|
// Bug 5224. Take out the retrys. If connection fails, we assume the server is down.
|
|
try
|
|
{
|
|
if (cl->connect())
|
|
{
|
|
try
|
|
{
|
|
cl->write(bs);
|
|
}
|
|
catch (std::exception& ex1)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Write to WES during connect failed due to " << ex1.what();
|
|
throw runtime_error(oss.str());
|
|
}
|
|
|
|
try
|
|
{
|
|
bsRead = cl->read();
|
|
|
|
if (bsRead.length() == 0)
|
|
throw runtime_error("Got byte 0 during reading ");
|
|
}
|
|
catch (std::exception& ex2)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Read from WES during connect failed due to " << ex2.what() << " and this = " << this;
|
|
throw runtime_error(oss.str());
|
|
}
|
|
|
|
fPmConnections[moduleID] = cl;
|
|
// cout << "connection is open. this = " << this << endl;
|
|
// cout << "set up connection to mudule " << moduleID << endl;
|
|
// assign the module name
|
|
// ipAddress = sin_addr2String(cl->serv_addr().sin_addr);
|
|
ipAddress = cl->addr2String();
|
|
cl->moduleName(getModuleNameByIPAddr(moduletypeconfig, ipAddress));
|
|
StartClientListener(cl, i);
|
|
pmCount++;
|
|
// ostringstream oss;
|
|
// oss << "WECLIENT: connected to " << fServer + " and this = " << this << " and pmcount is now " <<
|
|
// pmCount; writeToLog(__FILE__, __LINE__, oss.str() , LOG_TYPE_DEBUG);
|
|
}
|
|
else
|
|
{
|
|
throw runtime_error("Connection refused");
|
|
}
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
writeToLog(__FILE__, __LINE__, "Could not connect to " + fServer + ": " + ex.what(), LOG_TYPE_ERROR);
|
|
cerr << "Could not connect to " << fServer << ": " << ex.what() << endl;
|
|
}
|
|
catch (...)
|
|
{
|
|
writeToLog(__FILE__, __LINE__, "Could not connect to " + fServer, LOG_TYPE_ERROR);
|
|
}
|
|
}
|
|
}
|
|
|
|
int WEClients::Close()
|
|
{
|
|
makeBusy(false);
|
|
closingConnection = 1;
|
|
ByteStream bs;
|
|
bs << (ByteStream::byte)WE_SVR_CLOSE_CONNECTION;
|
|
write_to_all(bs);
|
|
|
|
// cout << "connection is closed. this = " << this << " and closingConnection = " << closingConnection <<
|
|
// endl;
|
|
for (uint32_t i = 0; i < fWESReader.size(); i++)
|
|
{
|
|
fWESReader[i]->join();
|
|
}
|
|
|
|
fWESReader.clear();
|
|
fPmConnections.clear();
|
|
pmCount = 0;
|
|
// ostringstream oss;
|
|
// oss << "WECLIENT: closed connection to wes and this = " << this << " and pmcount is now " << pmCount;
|
|
// writeToLog(__FILE__, __LINE__, oss.str() , LOG_TYPE_DEBUG);
|
|
return 0;
|
|
}
|
|
|
|
void WEClients::Listen(boost::shared_ptr<MessageQueueClient> client, uint32_t connIndex)
|
|
{
|
|
SBS sbs;
|
|
|
|
try
|
|
{
|
|
while (Busy())
|
|
{
|
|
// TODO: This call blocks so setting Busy() in another thread doesn't work here...
|
|
sbs = client->read();
|
|
|
|
if (sbs->length() != 0)
|
|
{
|
|
// cout << "adding data to connIndex " << endl;
|
|
addDataToOutput(sbs, connIndex);
|
|
}
|
|
else // got zero bytes on read, nothing more will come
|
|
{
|
|
if (closingConnection > 0)
|
|
{
|
|
return;
|
|
}
|
|
|
|
cerr << "WEC got 0 byte message for object " << this << endl;
|
|
goto Error;
|
|
}
|
|
}
|
|
|
|
return;
|
|
}
|
|
catch (std::exception& e)
|
|
{
|
|
cerr << "WEC Caught EXCEPTION: " << e.what() << endl;
|
|
goto Error;
|
|
}
|
|
catch (...)
|
|
{
|
|
cerr << "WEC Caught UNKNOWN EXCEPT" << endl;
|
|
goto Error;
|
|
}
|
|
|
|
Error:
|
|
// error condition! push 0 length bs to messagequeuemap and
|
|
// eventually let jobstep error out.
|
|
boost::mutex::scoped_lock lk(fMlock);
|
|
|
|
MessageQueueMap::iterator map_tok;
|
|
sbs.reset(new ByteStream(0U));
|
|
|
|
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
|
|
{
|
|
map_tok->second->queue.clear();
|
|
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
|
|
map_tok->second->queue.push(sbs);
|
|
}
|
|
|
|
lk.unlock();
|
|
|
|
// reset the pmconnection map
|
|
{
|
|
boost::mutex::scoped_lock onErrLock(fOnErrMutex);
|
|
string moduleName = client->moduleName();
|
|
ClientList::iterator itor = fPmConnections.begin();
|
|
|
|
while (itor != fPmConnections.end())
|
|
{
|
|
if (moduleName == (itor->second)->moduleName())
|
|
{
|
|
(fPmConnections[itor->first]).reset();
|
|
pmCount--;
|
|
ostringstream oss;
|
|
// oss << "WECLIENT: connection to is reset and this = " << this << " and pmcount is decremented.";
|
|
// writeToLog(__FILE__, __LINE__, oss.str() , LOG_TYPE_DEBUG);
|
|
}
|
|
|
|
itor++;
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
|
|
void WEClients::addQueue(uint32_t key)
|
|
{
|
|
bool b;
|
|
|
|
boost::mutex* lock = new boost::mutex();
|
|
condition* cond = new condition();
|
|
boost::shared_ptr<MQE> mqe(new MQE(pmCount));
|
|
|
|
mqe->queue = WESMsgQueue(lock, cond);
|
|
|
|
boost::mutex::scoped_lock lk(fMlock);
|
|
b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
|
|
|
|
if (!b)
|
|
{
|
|
ostringstream os;
|
|
os << "WEClient: attempt to add a queue with a duplicate ID " << key << endl;
|
|
throw runtime_error(os.str());
|
|
}
|
|
}
|
|
|
|
void WEClients::removeQueue(uint32_t key)
|
|
{
|
|
boost::mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
|
|
|
|
if (map_tok == fSessionMessages.end())
|
|
return;
|
|
|
|
map_tok->second->queue.shutdown();
|
|
map_tok->second->queue.clear();
|
|
fSessionMessages.erase(map_tok);
|
|
}
|
|
|
|
void WEClients::shutdownQueue(uint32_t key)
|
|
{
|
|
boost::mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
|
|
|
|
if (map_tok == fSessionMessages.end())
|
|
return;
|
|
|
|
map_tok->second->queue.shutdown();
|
|
map_tok->second->queue.clear();
|
|
}
|
|
|
|
void WEClients::read(uint32_t key, SBS& bs)
|
|
{
|
|
boost::shared_ptr<MQE> mqe;
|
|
|
|
// Find the StepMsgQueueList for this session
|
|
boost::mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
|
|
|
|
if (map_tok == fSessionMessages.end())
|
|
{
|
|
ostringstream os;
|
|
// cout << " reading for key " << key << " not found" << endl;
|
|
os << "WEClient: attempt to read(bs) from a nonexistent queue\n";
|
|
throw runtime_error(os.str());
|
|
}
|
|
|
|
mqe = map_tok->second;
|
|
lk.unlock();
|
|
|
|
// this method can block: you can't hold any locks here...
|
|
(void)mqe->queue.pop(&bs);
|
|
|
|
if (!bs)
|
|
bs.reset(new ByteStream());
|
|
}
|
|
|
|
void WEClients::write(const messageqcpp::ByteStream& msg, uint32_t connection)
|
|
{
|
|
if (pmCount == 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "WECLIENT: There is no connection to WES and this = " << this;
|
|
writeToLog(__FILE__, __LINE__, oss.str(), LOG_TYPE_DEBUG);
|
|
throw runtime_error("There is no WriteEngineServer to send message to.");
|
|
}
|
|
|
|
if (fPmConnections[connection] != 0)
|
|
fPmConnections[connection]->write(msg);
|
|
else
|
|
{
|
|
ostringstream os;
|
|
os << "Lost connection to WriteEngineServer on pm" << connection;
|
|
throw runtime_error(os.str());
|
|
}
|
|
}
|
|
|
|
void WEClients::write_to_all(const messageqcpp::ByteStream& msg)
|
|
{
|
|
if (pmCount == 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "WECLIENT: There is no connection to WES and this = " << this;
|
|
writeToLog(__FILE__, __LINE__, oss.str(), LOG_TYPE_DEBUG);
|
|
throw runtime_error("There is no WriteEngineServer to send message to.");
|
|
}
|
|
|
|
ClientList::iterator itor = fPmConnections.begin();
|
|
while (itor != fPmConnections.end())
|
|
{
|
|
if (itor->second != NULL)
|
|
{
|
|
itor->second->write(msg);
|
|
}
|
|
|
|
itor++;
|
|
}
|
|
}
|
|
|
|
void WEClients::StartClientListener(boost::shared_ptr<MessageQueueClient> cl, uint32_t connIndex)
|
|
{
|
|
boost::thread* thrd = new boost::thread(WEClientRunner(this, cl, connIndex));
|
|
fWESReader.push_back(thrd);
|
|
}
|
|
|
|
void WEClients::addDataToOutput(SBS sbs, uint32_t connIndex)
|
|
{
|
|
// ISMPacketHeader *hdr = (ISMPacketHeader*)(sbs->buf());
|
|
// PrimitiveHeader *p = (PrimitiveHeader *)(hdr+1);
|
|
// uint32_t uniqueId = p->UniqueID;
|
|
uint64_t uniqueId = 0;
|
|
*sbs >> uniqueId;
|
|
boost::shared_ptr<MQE> mqe;
|
|
|
|
boost::mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
|
|
|
|
if (map_tok == fSessionMessages.end())
|
|
{
|
|
return;
|
|
}
|
|
|
|
mqe = map_tok->second;
|
|
lk.unlock();
|
|
|
|
if (pmCount > 0)
|
|
{
|
|
atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]);
|
|
}
|
|
|
|
(void)mqe->queue.push(sbs);
|
|
}
|
|
|
|
} // namespace WriteEngine
|
|
|