1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

the begginning

This commit is contained in:
david hill
2016-01-06 14:08:59 -06:00
parent 66a31debcb
commit f6afc42dd0
18251 changed files with 16460679 additions and 2 deletions

View File

@ -0,0 +1,962 @@
/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
//
// $Id: distributedenginecomm.cpp 9655 2013-06-25 23:08:13Z xlou $
//
// C++ Implementation: distributedenginecomm
//
// Description:
//
//
// Author: <pfigg@calpont.com>, (C) 2006
//
// Copyright: See COPYING file that comes with this distribution
//
//
#include <sstream>
#include <stdexcept>
#include <cassert>
#include <ctime>
#include <algorithm>
#include <unistd.h>
using namespace std;
#include <boost/scoped_array.hpp>
#include <boost/thread.hpp>
using namespace boost;
#include "distributedenginecomm.h"
#include "messagequeue.h"
#include "bytestream.h"
using namespace messageqcpp;
#include "configcpp.h"
using namespace config;
#include "errorids.h"
#include "exceptclasses.h"
#include "messagelog.h"
#include "messageobj.h"
#include "loggingid.h"
using namespace logging;
#include "liboamcpp.h"
#include "snmpmanager.h"
using namespace snmpmanager;
using namespace oam;
#include "jobstep.h"
using namespace joblist;
#include "atomicops.h"
namespace
{
void writeToLog(const char* file, int line, const string& msg, LOG_TYPE logto = LOG_TYPE_INFO)
{
LoggingID lid(05);
MessageLog ml(lid);
Message::Args args;
Message m(0);
args.add(file);
args.add("@");
args.add(line);
args.add(msg);
m.format(args);
switch (logto)
{
case LOG_TYPE_DEBUG: ml.logDebugMessage(m); break;
case LOG_TYPE_INFO: ml.logInfoMessage(m); break;
case LOG_TYPE_WARNING: ml.logWarningMessage(m); break;
case LOG_TYPE_ERROR: ml.logWarningMessage(m); break;
case LOG_TYPE_CRITICAL: ml.logCriticalMessage(m); break;
}
}
// @bug 1463. this function is added for PM failover. for dual/more nic PM,
// this function is used to get the module name
string getModuleNameByIPAddr(oam::ModuleTypeConfig moduletypeconfig,
string ipAddress)
{
string modulename = "";
DeviceNetworkList::iterator pt = moduletypeconfig.ModuleNetworkList.begin();
for( ; pt != moduletypeconfig.ModuleNetworkList.end() ; pt++)
{
modulename = (*pt).DeviceName;
HostConfigList::iterator pt1 = (*pt).hostConfigList.begin();
for( ; pt1 != (*pt).hostConfigList.end() ; pt1++)
{
if (ipAddress == (*pt1).IPAddr)
return modulename;
}
}
return modulename;
}
struct EngineCommRunner
{
EngineCommRunner(joblist::DistributedEngineComm *jl,
boost::shared_ptr<MessageQueueClient> cl, uint32_t connectionIndex) : jbl(jl), client(cl),
connIndex(connectionIndex) {}
joblist::DistributedEngineComm *jbl;
boost::shared_ptr<MessageQueueClient> client;
uint32_t connIndex;
void operator()()
{
//cout << "Listening on client at 0x" << hex << (ptrdiff_t)client << dec << endl;
try
{
jbl->Listen(client, connIndex);
}
catch(std::exception& ex)
{
string what(ex.what());
cerr << "exception caught in EngineCommRunner: " << what << endl;
if (what.find("St9bad_alloc") != string::npos)
{
writeToLog(__FILE__, __LINE__, what, LOG_TYPE_CRITICAL);
// abort();
}
else writeToLog(__FILE__, __LINE__, what);
}
catch(...)
{
string msg("exception caught in EngineCommRunner.");
writeToLog(__FILE__, __LINE__, msg);
cerr << msg << endl;
}
}
};
template <typename T>
struct QueueShutdown : public unary_function<T&, void>
{
void operator()(T& x)
{
x.shutdown();
}
};
}
/** Debug macro */
#define THROTTLE_DEBUG 0
#if THROTTLE_DEBUG
#define THROTTLEDEBUG std::cout
#else
#define THROTTLEDEBUG if (false) std::cout
#endif
namespace joblist
{
DistributedEngineComm* DistributedEngineComm::fInstance = 0;
/*static*/
DistributedEngineComm* DistributedEngineComm::instance(ResourceManager& rm, bool isExeMgr)
{
if (fInstance == 0)
fInstance = new DistributedEngineComm(rm, isExeMgr);
return fInstance;
}
/*static*/
void DistributedEngineComm::reset()
{
delete fInstance;
fInstance = 0;
}
DistributedEngineComm::DistributedEngineComm(ResourceManager& rm, bool isExeMgr) :
fRm(rm),
fLBIDShift(fRm.getPsLBID_Shift()),
pmCount(0),
fIsExeMgr(isExeMgr)
{
Setup();
}
DistributedEngineComm::~DistributedEngineComm()
{
Close();
fInstance = 0;
}
void DistributedEngineComm::Setup()
{
// This is here to ensure that this function does not get invoked multiple times simultaneously.
mutex::scoped_lock setupLock(fSetupMutex);
makeBusy(true);
// This needs to be here to ensure that whenever Setup function is called, the lists are
// empty. It's possible junk was left behind if exception.
ClientList::iterator iter;
for (iter = newClients.begin(); iter != newClients.end(); ++iter)
{
(*iter)->shutdown();
}
newClients.clear();
newLocks.clear();
throttleThreshold = fRm.getDECThrottleThreshold();
uint32_t newPmCount = fRm.getPsCount();
int cpp = (fIsExeMgr ? fRm.getPsConnectionsPerPrimProc() : 1);
tbpsThreadCount = fRm.getJlNumScanReceiveThreads();
unsigned numConnections = newPmCount * cpp;
oam::Oam oam;
ModuleTypeConfig moduletypeconfig;
try {
oam.getSystemConfig("pm", moduletypeconfig);
} catch (...) {
writeToLog(__FILE__, __LINE__, "oam.getSystemConfig error, unknown exception", LOG_TYPE_ERROR);
throw runtime_error("Setup failed");
}
if (newPmCount == 0)
writeToLog(__FILE__, __LINE__, "Got a config file with 0 PMs",
LOG_TYPE_CRITICAL);
//This needs to make sense when compared to the extent size
// fLBIDShift = static_cast<unsigned>(config::Config::uFromText(fConfig->getConfig(section, "LBID_Shift")));
for (unsigned i = 0; i < numConnections; i++) {
ostringstream oss;
oss << "PMS" << (i+1);
string fServer (oss.str());
boost::shared_ptr<MessageQueueClient>
cl(new MessageQueueClient(fServer, fRm.getConfig()));
boost::shared_ptr<boost::mutex> nl(new boost::mutex());
try {
if (cl->connect()) {
newClients.push_back(cl);
// assign the module name
cl->moduleName(getModuleNameByIPAddr(moduletypeconfig, cl->addr2String()));
newLocks.push_back(nl);
StartClientListener(cl, i);
} else {
throw runtime_error("Connection refused");
}
} catch (std::exception& ex) {
if (i < newPmCount)
newPmCount--;
writeToLog(__FILE__, __LINE__, "Could not connect to " + fServer + ": " + ex.what(), LOG_TYPE_ERROR);
cerr << "Could not connect to " << fServer << ": " << ex.what() << endl;
} catch (...) {
if (i < newPmCount)
newPmCount--;
writeToLog(__FILE__, __LINE__, "Could not connect to " + fServer, LOG_TYPE_ERROR);
}
}
// for every entry in newClients up to newPmCount, scan for the same ip in the
// first pmCount. If there is no match, it's a new node,
// call the event listeners' newPMOnline() callbacks.
mutex::scoped_lock lock(eventListenerLock);
for (uint32_t i = 0; i < newPmCount; i++) {
uint32_t j;
for (j = 0; j < pmCount; j++) {
if (newClients[i]->isSameAddr(*fPmConnections[j]))
break;
}
if (j == pmCount)
for (uint32_t k = 0; k < eventListeners.size(); k++)
eventListeners[k]->newPMOnline(i);
}
lock.unlock();
fWlock.swap(newLocks);
fPmConnections.swap(newClients);
// memory barrier to prevent the pmCount assignment migrating upward
atomicops::atomicMb();
pmCount = newPmCount;
newLocks.clear();
newClients.clear();
}
int DistributedEngineComm::Close()
{
//cout << "DistributedEngineComm::Close() called" << endl;
makeBusy(false);
// for each MessageQueueClient in pmConnections delete the MessageQueueClient;
fPmConnections.clear();
fPmReader.clear();
return 0;
}
void DistributedEngineComm::Listen(boost::shared_ptr<MessageQueueClient> client, uint32_t connIndex)
{
SBS sbs;
try {
while (Busy())
{
Stats stats;
//TODO: This call blocks so setting Busy() in another thread doesn't work here...
sbs = client->read(0, NULL, &stats);
if (sbs->length() != 0) {
addDataToOutput(sbs, connIndex, &stats);
}
else // got zero bytes on read, nothing more will come
goto Error;
}
return;
} catch (std::exception& e)
{
cerr << "DEC Caught EXCEPTION: " << e.what() << endl;
goto Error;
}
catch (...)
{
cerr << "DEC Caught UNKNOWN EXCEPT" << endl;
goto Error;
}
Error:
// @bug 488 - error condition! push 0 length bs to messagequeuemap and
// eventually let jobstep error out.
mutex::scoped_lock lk(fMlock);
//cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl;
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{
map_tok->second->queue.clear();
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
map_tok->second->queue.push(sbs);
}
lk.unlock();
// reset the pmconnection vector
ClientList tempConns;
{
mutex::scoped_lock onErrLock(fOnErrMutex);
string moduleName = client->moduleName();
//cout << "moduleName=" << moduleName << endl;
for ( uint32_t i = 0; i < fPmConnections.size(); i++)
{
if (moduleName != fPmConnections[i]->moduleName())
tempConns.push_back(fPmConnections[i]);
//else
//cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl;
}
if (tempConns.size() == fPmConnections.size()) return;
fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
//cout << "PMCOUNT=" << pmCount << endl;
// send alarm & log it
SNMPManager alarmMgr;
string alarmItem = client->addr2String();
alarmItem.append(" PrimProc");
alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
ostringstream os;
os << "DEC: lost connection to " << client->addr2String();
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL);
}
return;
}
void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
{
bool b;
mutex* lock = new mutex();
condition* cond = new condition();
boost::shared_ptr<MQE> mqe(new MQE(pmCount));
mqe->queue = StepMsgQueue(lock, cond);
mqe->sendACKs = sendACKs;
mqe->throttled = false;
mutex::scoped_lock lk ( fMlock );
b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
if (!b) {
ostringstream os;
os << "DEC: attempt to add a queue with a duplicate ID " << key << endl;
throw runtime_error(os.str());
}
}
void DistributedEngineComm::removeQueue(uint32_t key)
{
mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
return;
map_tok->second->queue.shutdown();
map_tok->second->queue.clear();
fSessionMessages.erase(map_tok);
}
void DistributedEngineComm::shutdownQueue(uint32_t key)
{
mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
return;
map_tok->second->queue.shutdown();
map_tok->second->queue.clear();
}
void DistributedEngineComm::read(uint32_t key, SBS &bs)
{
boost::shared_ptr<MQE> mqe;
//Find the StepMsgQueueList for this session
mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if(map_tok == fSessionMessages.end())
{
ostringstream os;
os << "DEC: attempt to read(bs) from a nonexistent queue\n";
throw runtime_error(os.str());
}
mqe = map_tok->second;
lk.unlock();
//this method can block: you can't hold any locks here...
TSQSize_t queueSize = mqe->queue.pop(&bs);
if (bs && mqe->sendACKs) {
mutex::scoped_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
setFlowControl(false, key, mqe);
vector<SBS> v;
v.push_back(bs);
sendAcks(key, v, mqe, queueSize.size);
}
if (!bs)
bs.reset(new ByteStream());
}
const ByteStream DistributedEngineComm::read(uint32_t key)
{
SBS sbs;
boost::shared_ptr<MQE> mqe;
//Find the StepMsgQueueList for this session
mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if(map_tok == fSessionMessages.end())
{
ostringstream os;
os << "DEC: read(): attempt to read from a nonexistent queue\n";
throw runtime_error(os.str());
}
mqe = map_tok->second;
lk.unlock();
TSQSize_t queueSize = mqe->queue.pop(&sbs);
if (sbs && mqe->sendACKs) {
mutex::scoped_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
setFlowControl(false, key, mqe);
vector<SBS> v;
v.push_back(sbs);
sendAcks(key, v, mqe, queueSize.size);
}
if (!sbs)
sbs.reset(new ByteStream());
return *sbs;
}
void DistributedEngineComm::read_all(uint32_t key, vector<SBS> &v)
{
boost::shared_ptr<MQE> mqe;
mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if(map_tok == fSessionMessages.end())
{
ostringstream os;
os << "DEC: read_all(): attempt to read from a nonexistent queue\n";
throw runtime_error(os.str());
}
mqe = map_tok->second;
lk.unlock();
mqe->queue.pop_all(v);
if (mqe->sendACKs) {
mutex::scoped_lock lk(ackLock);
sendAcks(key, v, mqe, 0);
}
}
void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS> &v,
bool *flowControlOn)
{
boost::shared_ptr<MQE> mqe;
mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if(map_tok == fSessionMessages.end())
{
ostringstream os;
os << "DEC: read_some(): attempt to read from a nonexistent queue\n";
throw runtime_error(os.str());
}
mqe = map_tok->second;
lk.unlock();
TSQSize_t queueSize = mqe->queue.pop_some(divisor, v, 1); // need to play with the min #
if (flowControlOn)
*flowControlOn = false;
if (mqe->sendACKs) {
mutex::scoped_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
setFlowControl(false, key, mqe);
sendAcks(key, v, mqe, queueSize.size);
if (flowControlOn)
*flowControlOn = mqe->throttled;
}
}
void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS> &msgs,
boost::shared_ptr<MQE> mqe, size_t queueSize)
{
ISMPacketHeader *ism;
uint32_t l_msgCount = msgs.size();
/* If the current queue size > target, do nothing.
* If the original queue size > target, ACK the msgs below the target.
*/
if (!mqe->throttled || queueSize >= mqe->targetQueueSize) {
/* no acks will be sent, but update unackedwork to keep the #s accurate */
uint16_t numack=0;
uint32_t sockidx=0;
while (l_msgCount > 0) {
nextPMToACK(mqe, l_msgCount, &sockidx, &numack);
idbassert(numack <= l_msgCount);
l_msgCount -= numack;
}
return;
}
size_t totalMsgSize = 0;
for (uint32_t i = 0; i < msgs.size(); i++)
totalMsgSize += msgs[i]->lengthWithHdrOverhead();
if (queueSize + totalMsgSize > mqe->targetQueueSize) {
/* update unackedwork for the overage that will never be acked */
int64_t overage = queueSize + totalMsgSize - mqe->targetQueueSize;
uint16_t numack=0;
uint32_t sockidx=0;
uint32_t msgsToIgnore;
for (msgsToIgnore = 0; overage >= 0; msgsToIgnore++)
overage -= msgs[msgsToIgnore]->lengthWithHdrOverhead();
if (overage < 0)
msgsToIgnore--;
l_msgCount = msgs.size() - msgsToIgnore; // this num gets acked
while (msgsToIgnore > 0) {
nextPMToACK(mqe, msgsToIgnore, &sockidx, &numack);
idbassert(numack <= msgsToIgnore);
msgsToIgnore -= numack;
}
}
if (l_msgCount > 0) {
ByteStream msg(sizeof(ISMPacketHeader));
uint16_t *toAck;
vector<bool> pmAcked(pmCount, false);
ism = (ISMPacketHeader *) msg.getInputPtr();
// The only var checked by ReadThread is the Command var. The others
// are wasted space. We hijack the Size, & Flags fields for the
// params to the ACK msg.
ism->Interleave = uniqueID;
ism->Command = BATCH_PRIMITIVE_ACK;
toAck = &ism->Size;
msg.advanceInputPtr(sizeof(ISMPacketHeader));
while (l_msgCount > 0) {
/* could have to send up to pmCount ACKs */
uint32_t sockIndex=0;
/* This will reset the ACK field in the Bytestream directly, and nothing
* else needs to change if multiple msgs are sent. */
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
idbassert(*toAck <= l_msgCount);
l_msgCount -= *toAck;
pmAcked[sockIndex] = true;
writeToClient(sockIndex, msg);
}
// @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
// This is apply to the big message case only. For small messages, the flow control is
// disabled when the queue size is below the disableThreshold.
if (mqe->hasBigMsgs)
{
uint64_t totalUnackedWork = 0;
for (uint32_t i = 0; i < pmCount; i++)
totalUnackedWork += mqe->unackedWork[i];
if (totalUnackedWork == 0) {
*toAck = 1;
for (uint32_t i = 0; i < pmCount; i++) {
if (!pmAcked[i])
writeToClient(i, msg);
}
}
}
}
}
void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t maxAck,
uint32_t *sockIndex, uint16_t *numToAck)
{
uint32_t i;
uint32_t &nextIndex = mqe->ackSocketIndex;
/* Other threads can be touching mqe->unackedWork at the same time, but because of
* the locking env, mqe->unackedWork can only grow; whatever gets latched in this fcn
* is a safe minimum at the point of use. */
if (mqe->unackedWork[nextIndex] >= maxAck) {
(void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], maxAck);
*sockIndex = nextIndex;
//FIXME: we're going to truncate here from 32 to 16 bits. Hopefully this will always fit...
*numToAck = maxAck;
if (pmCount > 0)
nextIndex = (nextIndex + 1) % pmCount;
return;
}
else {
for (i = 0; i < pmCount; i++) {
uint32_t curVal = mqe->unackedWork[nextIndex];
uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal);
if (unackedWork > 0) {
(void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], unackedWork);
*sockIndex = nextIndex;
*numToAck = unackedWork;
if (pmCount > 0)
nextIndex = (nextIndex + 1) % pmCount;
return;
}
if (pmCount > 0)
nextIndex = (nextIndex + 1) % pmCount;
}
cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! ";
for (i = 0; i < pmCount; i++)
cerr << mqe->unackedWork[i] << " ";
cerr << " max: " << maxAck;
cerr << endl;
//make sure the returned vars are legitimate
*sockIndex = nextIndex;
*numToAck = maxAck/pmCount;
if (pmCount > 0)
nextIndex = (nextIndex + 1) % pmCount;
return;
}
}
void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boost::shared_ptr<MQE> mqe)
{
mqe->throttled = enabled;
ByteStream msg(sizeof(ISMPacketHeader));
ISMPacketHeader *ism = (ISMPacketHeader *) msg.getInputPtr();
ism->Interleave = uniqueID;
ism->Command = BATCH_PRIMITIVE_ACK;
ism->Size = (enabled ? 0 : -1);
#ifdef VALGRIND
/* XXXPAT: For testing in valgrind, init the vars that don't get used */
ism->Flags = 0;
ism->Type = 0;
ism->MsgCount = 0;
ism->Status = 0;
#endif
msg.advanceInputPtr(sizeof(ISMPacketHeader));
for (uint32_t i = 0; i < mqe->pmCount; i++)
writeToClient(i, msg);
}
void DistributedEngineComm::write(uint32_t senderID, ByteStream& msg)
{
ISMPacketHeader *ism = (ISMPacketHeader *) msg.buf();
uint32_t dest;
uint32_t numConn = fPmConnections.size();
if (numConn > 0) {
switch (ism->Command) {
case BATCH_PRIMITIVE_CREATE:
/* Disable flow control initially */
msg << (uint32_t) -1;
case BATCH_PRIMITIVE_DESTROY:
case BATCH_PRIMITIVE_ADD_JOINER:
case BATCH_PRIMITIVE_END_JOINER:
case BATCH_PRIMITIVE_ABORT:
case DICT_CREATE_EQUALITY_FILTER:
case DICT_DESTROY_EQUALITY_FILTER:
/* XXXPAT: This relies on the assumption that the first pmCount "PMS*"
entries in the config file point to unique PMs */
uint32_t i;
for (i = 0; i < pmCount; i++)
writeToClient(i, msg, senderID);
return;
case BATCH_PRIMITIVE_RUN:
case DICT_TOKEN_BY_SCAN_COMPARE:
// for efficiency, writeToClient() grabs the interleaving factor for the caller,
// and decides the final connection index because it already grabs the
// caller's queue information
dest = ism->Interleave;
writeToClient(dest, msg, senderID, true);
break;
default:
idbassert_s(0, "Unknown message type");
}
}
else
{
writeToLog(__FILE__, __LINE__, "No PrimProcs are running", LOG_TYPE_DEBUG);
throw IDBExcept(ERR_NO_PRIMPROC);
}
}
void DistributedEngineComm::write(messageqcpp::ByteStream &msg, uint32_t connection)
{
ISMPacketHeader *ism = (ISMPacketHeader *) msg.buf();
PrimitiveHeader *pm = (PrimitiveHeader *) (ism + 1);
uint32_t senderID = pm->UniqueID;
mutex::scoped_lock lk(fMlock, defer_lock_t());
MessageQueueMap::iterator it;
Stats *senderStats = NULL;
lk.lock();
it = fSessionMessages.find(senderID);
if (it != fSessionMessages.end())
senderStats = &(it->second->stats);
lk.unlock();
newClients[connection]->write(msg, NULL, senderStats);
}
void DistributedEngineComm::StartClientListener(boost::shared_ptr<MessageQueueClient> cl, uint32_t connIndex)
{
boost::thread *thrd = new boost::thread(EngineCommRunner(this, cl, connIndex));
fPmReader.push_back(thrd);
}
void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats *stats)
{
ISMPacketHeader *hdr = (ISMPacketHeader*)(sbs->buf());
PrimitiveHeader *p = (PrimitiveHeader *)(hdr+1);
uint32_t uniqueId = p->UniqueID;
boost::shared_ptr<MQE> mqe;
mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
if(map_tok == fSessionMessages.end())
{
// For debugging...
//cerr << "DistributedEngineComm::AddDataToOutput: tried to add a message to a dead session: " << uniqueId << ", size " << sbs->length() << ", step id " << p->StepID << endl;
return;
}
mqe = map_tok->second;
lk.unlock();
if (pmCount > 0) {
(void)atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]);
}
TSQSize_t queueSize = mqe->queue.push(sbs);
if (mqe->sendACKs) {
mutex::scoped_lock lk(ackLock);
uint64_t msgSize = sbs->lengthWithHdrOverhead();
if (!mqe->throttled && msgSize > (targetRecvQueueSize/2))
doHasBigMsgs(mqe, (300*1024*1024 > 3*msgSize ?
300*1024*1024 : 3*msgSize)); //buffer at least 3 big msgs
if (!mqe->throttled && queueSize.size >= mqe->targetQueueSize)
setFlowControl(true, uniqueId, mqe);
}
if (stats)
mqe->stats.dataRecvd(stats->dataRecvd());
}
void DistributedEngineComm::doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t targetSize)
{
mqe->hasBigMsgs = true;
if (mqe->targetQueueSize < targetSize)
mqe->targetQueueSize = targetSize;
}
int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uint32_t sender, bool doInterleaving)
{
mutex::scoped_lock lk(fMlock, defer_lock_t());
MessageQueueMap::iterator it;
Stats *senderStats = NULL;
uint32_t interleaver = 0;
if (fPmConnections.size() == 0)
return 0;
if (sender != numeric_limits<uint32_t>::max()) {
lk.lock();
it = fSessionMessages.find(sender);
if (it != fSessionMessages.end()) {
senderStats = &(it->second->stats);
if (doInterleaving)
interleaver = it->second->interleaver[index % it->second->pmCount]++;
}
lk.unlock();
}
try
{
if (doInterleaving)
index = (index + (interleaver * pmCount)) % fPmConnections.size();
ClientList::value_type client = fPmConnections[index];
if (!client->isAvailable()) return 0;
mutex::scoped_lock lk(*(fWlock[index]));
client->write(bs, NULL, senderStats);
return 0;
}
catch(...)
{
// @bug 488. error out under such condition instead of re-trying other connection,
// by pushing 0 size bytestream to messagequeue and throw excpetion
SBS sbs;
lk.lock();
//cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << endl;
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{
map_tok->second->queue.clear();
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
map_tok->second->queue.push(sbs);
}
lk.unlock();
// reconfig the connection array
ClientList tempConns;
{
//cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<< endl;
mutex::scoped_lock onErrLock(fOnErrMutex);
string moduleName = fPmConnections[index]->moduleName();
//cout << "module name = " << moduleName << endl;
if (index >= fPmConnections.size()) return 0;
for (uint32_t i = 0; i < fPmConnections.size(); i++)
{
if (moduleName != fPmConnections[i]->moduleName())
tempConns.push_back(fPmConnections[i]);
}
if (tempConns.size() == fPmConnections.size()) return 0;
fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
}
// send alarm
SNMPManager alarmMgr;
string alarmItem("UNKNOWN");
if (index < fPmConnections.size())
{
alarmItem = fPmConnections[index]->addr2String();
}
alarmItem.append(" PrimProc");
alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
throw runtime_error("DistributedEngineComm::write: Broken Pipe error");
}
}
uint32_t DistributedEngineComm::size(uint32_t key)
{
mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if(map_tok == fSessionMessages.end())
throw runtime_error("DEC::size() attempt to get the size of a nonexistant queue!");
boost::shared_ptr<MQE> mqe = map_tok->second;
//TODO: should probably check that this is a valid iter...
lk.unlock();
return mqe->queue.size().count;
}
void DistributedEngineComm::addDECEventListener(DECEventListener *l)
{
mutex::scoped_lock lk(eventListenerLock);
eventListeners.push_back(l);
}
void DistributedEngineComm::removeDECEventListener(DECEventListener *l)
{
mutex::scoped_lock lk(eventListenerLock);
std::vector<DECEventListener *> newListeners;
uint32_t s = eventListeners.size();
for (uint32_t i = 0; i < s; i++)
if (eventListeners[i] != l)
newListeners.push_back(eventListeners[i]);
eventListeners.swap(newListeners);
}
Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID)
{
mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator it;
Stats empty;
it = fSessionMessages.find(uniqueID);
if (it != fSessionMessages.end())
return it->second->stats;
return empty;
}
DistributedEngineComm::MQE::MQE(uint32_t pCount) : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false),
targetQueueSize(targetRecvQueueSize)
{
unackedWork.reset(new volatile uint32_t[pmCount]);
interleaver.reset(new uint32_t[pmCount]);
memset((void *) unackedWork.get(), 0, pmCount * sizeof(uint32_t));
memset((void *) interleaver.get(), 0, pmCount * sizeof(uint32_t));
}
}
// vim:ts=4 sw=4: