You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-08 14:22:09 +03:00
It is possible that DistributedEngineComm can get the Stats object from an MQE object and the MQE object freed before it's stats object is passed to InetStreamSocket. This patch makes sure that DistributedEngineComm gets a reference to MQE instead of the pointer to the Stats object in another reference. Therefore making sure that the Stats object still exists in InetStreamSocket.
970 lines
28 KiB
C++
970 lines
28 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
* Copyright (C) 2016 MariaDB Corporation.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
//
|
|
// $Id: distributedenginecomm.cpp 9655 2013-06-25 23:08:13Z xlou $
|
|
//
|
|
// C++ Implementation: distributedenginecomm
|
|
//
|
|
// Description:
|
|
//
|
|
//
|
|
// Author: <pfigg@calpont.com>, (C) 2006
|
|
//
|
|
// Copyright: See COPYING file that comes with this distribution
|
|
//
|
|
//
|
|
|
|
#include <sstream>
|
|
#include <stdexcept>
|
|
#include <cassert>
|
|
#include <ctime>
|
|
#include <algorithm>
|
|
#include <unistd.h>
|
|
using namespace std;
|
|
|
|
#include <boost/scoped_array.hpp>
|
|
#include <boost/thread.hpp>
|
|
using namespace boost;
|
|
|
|
#include "distributedenginecomm.h"
|
|
|
|
#include "messagequeue.h"
|
|
#include "bytestream.h"
|
|
using namespace messageqcpp;
|
|
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
|
|
#include "errorids.h"
|
|
#include "exceptclasses.h"
|
|
#include "messagelog.h"
|
|
#include "messageobj.h"
|
|
#include "loggingid.h"
|
|
using namespace logging;
|
|
|
|
#include "liboamcpp.h"
|
|
#include "alarmmanager.h"
|
|
using namespace alarmmanager;
|
|
using namespace oam;
|
|
|
|
#include "jobstep.h"
|
|
using namespace joblist;
|
|
|
|
#include "atomicops.h"
|
|
|
|
namespace
|
|
{
|
|
|
|
void writeToLog(const char* file, int line, const string& msg, LOG_TYPE logto = LOG_TYPE_INFO)
|
|
{
|
|
LoggingID lid(05);
|
|
MessageLog ml(lid);
|
|
Message::Args args;
|
|
Message m(0);
|
|
args.add(file);
|
|
args.add("@");
|
|
args.add(line);
|
|
args.add(msg);
|
|
m.format(args);
|
|
switch (logto)
|
|
{
|
|
case LOG_TYPE_DEBUG: ml.logDebugMessage(m); break;
|
|
case LOG_TYPE_INFO: ml.logInfoMessage(m); break;
|
|
case LOG_TYPE_WARNING: ml.logWarningMessage(m); break;
|
|
case LOG_TYPE_ERROR: ml.logWarningMessage(m); break;
|
|
case LOG_TYPE_CRITICAL: ml.logCriticalMessage(m); break;
|
|
}
|
|
}
|
|
|
|
// @bug 1463. this function is added for PM failover. for dual/more nic PM,
|
|
// this function is used to get the module name
|
|
string getModuleNameByIPAddr(oam::ModuleTypeConfig moduletypeconfig,
|
|
string ipAddress)
|
|
{
|
|
string modulename = "";
|
|
DeviceNetworkList::iterator pt = moduletypeconfig.ModuleNetworkList.begin();
|
|
for( ; pt != moduletypeconfig.ModuleNetworkList.end() ; pt++)
|
|
{
|
|
modulename = (*pt).DeviceName;
|
|
HostConfigList::iterator pt1 = (*pt).hostConfigList.begin();
|
|
for( ; pt1 != (*pt).hostConfigList.end() ; pt1++)
|
|
{
|
|
if (ipAddress == (*pt1).IPAddr)
|
|
return modulename;
|
|
}
|
|
}
|
|
return modulename;
|
|
}
|
|
|
|
struct EngineCommRunner
|
|
{
|
|
EngineCommRunner(joblist::DistributedEngineComm *jl,
|
|
boost::shared_ptr<MessageQueueClient> cl, uint32_t connectionIndex) : jbl(jl), client(cl),
|
|
connIndex(connectionIndex) {}
|
|
joblist::DistributedEngineComm *jbl;
|
|
boost::shared_ptr<MessageQueueClient> client;
|
|
uint32_t connIndex;
|
|
void operator()()
|
|
{
|
|
//cout << "Listening on client at 0x" << hex << (ptrdiff_t)client << dec << endl;
|
|
try
|
|
{
|
|
jbl->Listen(client, connIndex);
|
|
}
|
|
catch(std::exception& ex)
|
|
{
|
|
string what(ex.what());
|
|
cerr << "exception caught in EngineCommRunner: " << what << endl;
|
|
if (what.find("St9bad_alloc") != string::npos)
|
|
{
|
|
writeToLog(__FILE__, __LINE__, what, LOG_TYPE_CRITICAL);
|
|
// abort();
|
|
}
|
|
else writeToLog(__FILE__, __LINE__, what);
|
|
}
|
|
catch(...)
|
|
{
|
|
string msg("exception caught in EngineCommRunner.");
|
|
writeToLog(__FILE__, __LINE__, msg);
|
|
cerr << msg << endl;
|
|
}
|
|
}
|
|
};
|
|
|
|
template <typename T>
|
|
struct QueueShutdown : public unary_function<T&, void>
|
|
{
|
|
void operator()(T& x)
|
|
{
|
|
x.shutdown();
|
|
}
|
|
};
|
|
|
|
}
|
|
|
|
/** Debug macro */
|
|
#define THROTTLE_DEBUG 0
|
|
#if THROTTLE_DEBUG
|
|
#define THROTTLEDEBUG std::cout
|
|
#else
|
|
#define THROTTLEDEBUG if (false) std::cout
|
|
#endif
|
|
|
|
namespace joblist
|
|
{
|
|
DistributedEngineComm* DistributedEngineComm::fInstance = 0;
|
|
|
|
/*static*/
|
|
DistributedEngineComm* DistributedEngineComm::instance(ResourceManager* rm, bool isExeMgr)
|
|
{
|
|
if (fInstance == 0)
|
|
fInstance = new DistributedEngineComm(rm, isExeMgr);
|
|
|
|
return fInstance;
|
|
}
|
|
|
|
/*static*/
|
|
void DistributedEngineComm::reset()
|
|
{
|
|
delete fInstance;
|
|
fInstance = 0;
|
|
}
|
|
|
|
DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr) :
|
|
fRm(rm),
|
|
fLBIDShift(fRm->getPsLBID_Shift()),
|
|
pmCount(0),
|
|
fIsExeMgr(isExeMgr)
|
|
{
|
|
|
|
Setup();
|
|
}
|
|
|
|
DistributedEngineComm::~DistributedEngineComm()
|
|
{
|
|
Close();
|
|
fInstance = 0;
|
|
}
|
|
|
|
void DistributedEngineComm::Setup()
|
|
{
|
|
// This is here to ensure that this function does not get invoked multiple times simultaneously.
|
|
mutex::scoped_lock setupLock(fSetupMutex);
|
|
|
|
makeBusy(true);
|
|
|
|
// This needs to be here to ensure that whenever Setup function is called, the lists are
|
|
// empty. It's possible junk was left behind if exception.
|
|
ClientList::iterator iter;
|
|
for (iter = newClients.begin(); iter != newClients.end(); ++iter)
|
|
{
|
|
(*iter)->shutdown();
|
|
}
|
|
newClients.clear();
|
|
newLocks.clear();
|
|
|
|
throttleThreshold = fRm->getDECThrottleThreshold();
|
|
uint32_t newPmCount = fRm->getPsCount();
|
|
int cpp = (fIsExeMgr ? fRm->getPsConnectionsPerPrimProc() : 1);
|
|
tbpsThreadCount = fRm->getJlNumScanReceiveThreads();
|
|
unsigned numConnections = newPmCount * cpp;
|
|
oam::Oam oam;
|
|
ModuleTypeConfig moduletypeconfig;
|
|
try {
|
|
oam.getSystemConfig("pm", moduletypeconfig);
|
|
} catch (...) {
|
|
writeToLog(__FILE__, __LINE__, "oam.getSystemConfig error, unknown exception", LOG_TYPE_ERROR);
|
|
throw runtime_error("Setup failed");
|
|
}
|
|
|
|
if (newPmCount == 0)
|
|
writeToLog(__FILE__, __LINE__, "Got a config file with 0 PMs",
|
|
LOG_TYPE_CRITICAL);
|
|
|
|
//This needs to make sense when compared to the extent size
|
|
// fLBIDShift = static_cast<unsigned>(config::Config::uFromText(fConfig->getConfig(section, "LBID_Shift")));
|
|
|
|
for (unsigned i = 0; i < numConnections; i++) {
|
|
ostringstream oss;
|
|
oss << "PMS" << (i+1);
|
|
string fServer (oss.str());
|
|
|
|
boost::shared_ptr<MessageQueueClient>
|
|
cl(new MessageQueueClient(fServer, fRm->getConfig()));
|
|
boost::shared_ptr<boost::mutex> nl(new boost::mutex());
|
|
try {
|
|
if (cl->connect()) {
|
|
newClients.push_back(cl);
|
|
// assign the module name
|
|
cl->moduleName(getModuleNameByIPAddr(moduletypeconfig, cl->addr2String()));
|
|
newLocks.push_back(nl);
|
|
StartClientListener(cl, i);
|
|
} else {
|
|
throw runtime_error("Connection refused");
|
|
}
|
|
} catch (std::exception& ex) {
|
|
if (i < newPmCount)
|
|
newPmCount--;
|
|
writeToLog(__FILE__, __LINE__, "Could not connect to " + fServer + ": " + ex.what(), LOG_TYPE_ERROR);
|
|
cerr << "Could not connect to " << fServer << ": " << ex.what() << endl;
|
|
} catch (...) {
|
|
if (i < newPmCount)
|
|
newPmCount--;
|
|
writeToLog(__FILE__, __LINE__, "Could not connect to " + fServer, LOG_TYPE_ERROR);
|
|
}
|
|
}
|
|
// for every entry in newClients up to newPmCount, scan for the same ip in the
|
|
// first pmCount. If there is no match, it's a new node,
|
|
// call the event listeners' newPMOnline() callbacks.
|
|
mutex::scoped_lock lock(eventListenerLock);
|
|
for (uint32_t i = 0; i < newPmCount; i++) {
|
|
uint32_t j;
|
|
for (j = 0; j < pmCount; j++) {
|
|
if (newClients[i]->isSameAddr(*fPmConnections[j]))
|
|
break;
|
|
}
|
|
if (j == pmCount)
|
|
for (uint32_t k = 0; k < eventListeners.size(); k++)
|
|
eventListeners[k]->newPMOnline(i);
|
|
}
|
|
lock.unlock();
|
|
|
|
fWlock.swap(newLocks);
|
|
fPmConnections.swap(newClients);
|
|
// memory barrier to prevent the pmCount assignment migrating upward
|
|
atomicops::atomicMb();
|
|
pmCount = newPmCount;
|
|
|
|
newLocks.clear();
|
|
newClients.clear();
|
|
}
|
|
|
|
int DistributedEngineComm::Close()
|
|
{
|
|
//cout << "DistributedEngineComm::Close() called" << endl;
|
|
|
|
makeBusy(false);
|
|
// for each MessageQueueClient in pmConnections delete the MessageQueueClient;
|
|
fPmConnections.clear();
|
|
fPmReader.clear();
|
|
return 0;
|
|
}
|
|
|
|
void DistributedEngineComm::Listen(boost::shared_ptr<MessageQueueClient> client, uint32_t connIndex)
|
|
{
|
|
SBS sbs;
|
|
|
|
try {
|
|
while (Busy())
|
|
{
|
|
Stats stats;
|
|
//TODO: This call blocks so setting Busy() in another thread doesn't work here...
|
|
sbs = client->read(0, NULL, &stats);
|
|
if (sbs->length() != 0) {
|
|
addDataToOutput(sbs, connIndex, &stats);
|
|
}
|
|
else // got zero bytes on read, nothing more will come
|
|
goto Error;
|
|
}
|
|
return;
|
|
} catch (std::exception& e)
|
|
{
|
|
cerr << "DEC Caught EXCEPTION: " << e.what() << endl;
|
|
goto Error;
|
|
}
|
|
catch (...)
|
|
{
|
|
cerr << "DEC Caught UNKNOWN EXCEPT" << endl;
|
|
goto Error;
|
|
}
|
|
Error:
|
|
// @bug 488 - error condition! push 0 length bs to messagequeuemap and
|
|
// eventually let jobstep error out.
|
|
/* mutex::scoped_lock lk(fMlock);
|
|
//cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl;
|
|
|
|
MessageQueueMap::iterator map_tok;
|
|
sbs.reset(new ByteStream(0));
|
|
|
|
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
|
|
{
|
|
map_tok->second->queue.clear();
|
|
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
|
|
map_tok->second->queue.push(sbs);
|
|
}
|
|
lk.unlock();
|
|
|
|
// reset the pmconnection vector
|
|
ClientList tempConns;
|
|
|
|
{
|
|
mutex::scoped_lock onErrLock(fOnErrMutex);
|
|
string moduleName = client->moduleName();
|
|
//cout << "moduleName=" << moduleName << endl;
|
|
for ( uint32_t i = 0; i < fPmConnections.size(); i++)
|
|
{
|
|
if (moduleName != fPmConnections[i]->moduleName())
|
|
tempConns.push_back(fPmConnections[i]);
|
|
//else
|
|
//cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl;
|
|
}
|
|
|
|
if (tempConns.size() == fPmConnections.size()) return;
|
|
|
|
fPmConnections.swap(tempConns);
|
|
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
|
|
//cout << "PMCOUNT=" << pmCount << endl;
|
|
*/
|
|
// send alarm & log it
|
|
ALARMManager alarmMgr;
|
|
string alarmItem = client->addr2String();
|
|
alarmItem.append(" PrimProc");
|
|
alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
|
|
|
|
ostringstream os;
|
|
os << "DEC: lost connection to " << client->addr2String();
|
|
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL);
|
|
// }
|
|
return;
|
|
}
|
|
|
|
void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
|
|
{
|
|
bool b;
|
|
|
|
mutex* lock = new mutex();
|
|
condition* cond = new condition();
|
|
boost::shared_ptr<MQE> mqe(new MQE(pmCount));
|
|
|
|
mqe->queue = StepMsgQueue(lock, cond);
|
|
mqe->sendACKs = sendACKs;
|
|
mqe->throttled = false;
|
|
|
|
mutex::scoped_lock lk ( fMlock );
|
|
b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
|
|
if (!b) {
|
|
ostringstream os;
|
|
os << "DEC: attempt to add a queue with a duplicate ID " << key << endl;
|
|
throw runtime_error(os.str());
|
|
}
|
|
}
|
|
|
|
void DistributedEngineComm::removeQueue(uint32_t key)
|
|
{
|
|
mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
|
|
if (map_tok == fSessionMessages.end())
|
|
return;
|
|
|
|
map_tok->second->queue.shutdown();
|
|
map_tok->second->queue.clear();
|
|
fSessionMessages.erase(map_tok);
|
|
}
|
|
|
|
void DistributedEngineComm::shutdownQueue(uint32_t key)
|
|
{
|
|
mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
|
|
if (map_tok == fSessionMessages.end())
|
|
return;
|
|
map_tok->second->queue.shutdown();
|
|
map_tok->second->queue.clear();
|
|
}
|
|
|
|
void DistributedEngineComm::read(uint32_t key, SBS &bs)
|
|
{
|
|
boost::shared_ptr<MQE> mqe;
|
|
|
|
//Find the StepMsgQueueList for this session
|
|
mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
|
|
if(map_tok == fSessionMessages.end())
|
|
{
|
|
ostringstream os;
|
|
|
|
os << "DEC: attempt to read(bs) from a nonexistent queue\n";
|
|
throw runtime_error(os.str());
|
|
}
|
|
|
|
mqe = map_tok->second;
|
|
lk.unlock();
|
|
|
|
//this method can block: you can't hold any locks here...
|
|
TSQSize_t queueSize = mqe->queue.pop(&bs);
|
|
|
|
if (bs && mqe->sendACKs) {
|
|
mutex::scoped_lock lk(ackLock);
|
|
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
|
|
setFlowControl(false, key, mqe);
|
|
vector<SBS> v;
|
|
v.push_back(bs);
|
|
sendAcks(key, v, mqe, queueSize.size);
|
|
}
|
|
if (!bs)
|
|
bs.reset(new ByteStream());
|
|
}
|
|
|
|
const ByteStream DistributedEngineComm::read(uint32_t key)
|
|
{
|
|
SBS sbs;
|
|
boost::shared_ptr<MQE> mqe;
|
|
|
|
//Find the StepMsgQueueList for this session
|
|
mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
|
|
if(map_tok == fSessionMessages.end())
|
|
{
|
|
ostringstream os;
|
|
|
|
os << "DEC: read(): attempt to read from a nonexistent queue\n";
|
|
throw runtime_error(os.str());
|
|
}
|
|
|
|
mqe = map_tok->second;
|
|
lk.unlock();
|
|
|
|
TSQSize_t queueSize = mqe->queue.pop(&sbs);
|
|
|
|
if (sbs && mqe->sendACKs) {
|
|
mutex::scoped_lock lk(ackLock);
|
|
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
|
|
setFlowControl(false, key, mqe);
|
|
vector<SBS> v;
|
|
v.push_back(sbs);
|
|
sendAcks(key, v, mqe, queueSize.size);
|
|
}
|
|
if (!sbs)
|
|
sbs.reset(new ByteStream());
|
|
return *sbs;
|
|
}
|
|
|
|
void DistributedEngineComm::read_all(uint32_t key, vector<SBS> &v)
|
|
{
|
|
boost::shared_ptr<MQE> mqe;
|
|
|
|
mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
|
|
if(map_tok == fSessionMessages.end())
|
|
{
|
|
ostringstream os;
|
|
os << "DEC: read_all(): attempt to read from a nonexistent queue\n";
|
|
throw runtime_error(os.str());
|
|
}
|
|
|
|
mqe = map_tok->second;
|
|
lk.unlock();
|
|
|
|
mqe->queue.pop_all(v);
|
|
|
|
if (mqe->sendACKs) {
|
|
mutex::scoped_lock lk(ackLock);
|
|
sendAcks(key, v, mqe, 0);
|
|
}
|
|
}
|
|
|
|
void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS> &v,
|
|
bool *flowControlOn)
|
|
{
|
|
boost::shared_ptr<MQE> mqe;
|
|
|
|
mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
|
|
if(map_tok == fSessionMessages.end())
|
|
{
|
|
ostringstream os;
|
|
|
|
os << "DEC: read_some(): attempt to read from a nonexistent queue\n";
|
|
throw runtime_error(os.str());
|
|
}
|
|
|
|
mqe = map_tok->second;
|
|
lk.unlock();
|
|
|
|
TSQSize_t queueSize = mqe->queue.pop_some(divisor, v, 1); // need to play with the min #
|
|
|
|
if (flowControlOn)
|
|
*flowControlOn = false;
|
|
|
|
if (mqe->sendACKs) {
|
|
mutex::scoped_lock lk(ackLock);
|
|
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= disableThreshold)
|
|
setFlowControl(false, key, mqe);
|
|
sendAcks(key, v, mqe, queueSize.size);
|
|
if (flowControlOn)
|
|
*flowControlOn = mqe->throttled;
|
|
}
|
|
}
|
|
|
|
void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS> &msgs,
|
|
boost::shared_ptr<MQE> mqe, size_t queueSize)
|
|
{
|
|
ISMPacketHeader *ism;
|
|
uint32_t l_msgCount = msgs.size();
|
|
|
|
/* If the current queue size > target, do nothing.
|
|
* If the original queue size > target, ACK the msgs below the target.
|
|
*/
|
|
if (!mqe->throttled || queueSize >= mqe->targetQueueSize) {
|
|
/* no acks will be sent, but update unackedwork to keep the #s accurate */
|
|
uint16_t numack=0;
|
|
uint32_t sockidx=0;
|
|
while (l_msgCount > 0) {
|
|
nextPMToACK(mqe, l_msgCount, &sockidx, &numack);
|
|
idbassert(numack <= l_msgCount);
|
|
l_msgCount -= numack;
|
|
}
|
|
return;
|
|
}
|
|
|
|
size_t totalMsgSize = 0;
|
|
for (uint32_t i = 0; i < msgs.size(); i++)
|
|
totalMsgSize += msgs[i]->lengthWithHdrOverhead();
|
|
|
|
if (queueSize + totalMsgSize > mqe->targetQueueSize) {
|
|
/* update unackedwork for the overage that will never be acked */
|
|
int64_t overage = queueSize + totalMsgSize - mqe->targetQueueSize;
|
|
uint16_t numack=0;
|
|
uint32_t sockidx=0;
|
|
uint32_t msgsToIgnore;
|
|
for (msgsToIgnore = 0; overage >= 0; msgsToIgnore++)
|
|
overage -= msgs[msgsToIgnore]->lengthWithHdrOverhead();
|
|
if (overage < 0)
|
|
msgsToIgnore--;
|
|
l_msgCount = msgs.size() - msgsToIgnore; // this num gets acked
|
|
while (msgsToIgnore > 0) {
|
|
nextPMToACK(mqe, msgsToIgnore, &sockidx, &numack);
|
|
idbassert(numack <= msgsToIgnore);
|
|
msgsToIgnore -= numack;
|
|
}
|
|
}
|
|
|
|
if (l_msgCount > 0) {
|
|
ByteStream msg(sizeof(ISMPacketHeader));
|
|
uint16_t *toAck;
|
|
vector<bool> pmAcked(pmCount, false);
|
|
|
|
ism = (ISMPacketHeader *) msg.getInputPtr();
|
|
// The only var checked by ReadThread is the Command var. The others
|
|
// are wasted space. We hijack the Size, & Flags fields for the
|
|
// params to the ACK msg.
|
|
|
|
ism->Interleave = uniqueID;
|
|
ism->Command = BATCH_PRIMITIVE_ACK;
|
|
toAck = &ism->Size;
|
|
|
|
msg.advanceInputPtr(sizeof(ISMPacketHeader));
|
|
|
|
while (l_msgCount > 0) {
|
|
/* could have to send up to pmCount ACKs */
|
|
uint32_t sockIndex=0;
|
|
|
|
/* This will reset the ACK field in the Bytestream directly, and nothing
|
|
* else needs to change if multiple msgs are sent. */
|
|
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
|
|
idbassert(*toAck <= l_msgCount);
|
|
l_msgCount -= *toAck;
|
|
pmAcked[sockIndex] = true;
|
|
writeToClient(sockIndex, msg);
|
|
}
|
|
|
|
// @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
|
|
// This is apply to the big message case only. For small messages, the flow control is
|
|
// disabled when the queue size is below the disableThreshold.
|
|
if (mqe->hasBigMsgs)
|
|
{
|
|
uint64_t totalUnackedWork = 0;
|
|
for (uint32_t i = 0; i < pmCount; i++)
|
|
totalUnackedWork += mqe->unackedWork[i];
|
|
|
|
if (totalUnackedWork == 0) {
|
|
*toAck = 1;
|
|
for (uint32_t i = 0; i < pmCount; i++) {
|
|
if (!pmAcked[i])
|
|
writeToClient(i, msg);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t maxAck,
|
|
uint32_t *sockIndex, uint16_t *numToAck)
|
|
{
|
|
uint32_t i;
|
|
uint32_t &nextIndex = mqe->ackSocketIndex;
|
|
|
|
/* Other threads can be touching mqe->unackedWork at the same time, but because of
|
|
* the locking env, mqe->unackedWork can only grow; whatever gets latched in this fcn
|
|
* is a safe minimum at the point of use. */
|
|
|
|
if (mqe->unackedWork[nextIndex] >= maxAck) {
|
|
(void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], maxAck);
|
|
*sockIndex = nextIndex;
|
|
//FIXME: we're going to truncate here from 32 to 16 bits. Hopefully this will always fit...
|
|
*numToAck = maxAck;
|
|
if (pmCount > 0)
|
|
nextIndex = (nextIndex + 1) % pmCount;
|
|
return;
|
|
}
|
|
else {
|
|
for (i = 0; i < pmCount; i++) {
|
|
uint32_t curVal = mqe->unackedWork[nextIndex];
|
|
uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal);
|
|
if (unackedWork > 0) {
|
|
(void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], unackedWork);
|
|
*sockIndex = nextIndex;
|
|
*numToAck = unackedWork;
|
|
if (pmCount > 0)
|
|
nextIndex = (nextIndex + 1) % pmCount;
|
|
return;
|
|
}
|
|
if (pmCount > 0)
|
|
nextIndex = (nextIndex + 1) % pmCount;
|
|
}
|
|
cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! ";
|
|
for (i = 0; i < pmCount; i++)
|
|
cerr << mqe->unackedWork[i] << " ";
|
|
cerr << " max: " << maxAck;
|
|
cerr << endl;
|
|
//make sure the returned vars are legitimate
|
|
*sockIndex = nextIndex;
|
|
*numToAck = maxAck/pmCount;
|
|
if (pmCount > 0)
|
|
nextIndex = (nextIndex + 1) % pmCount;
|
|
return;
|
|
}
|
|
}
|
|
|
|
void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boost::shared_ptr<MQE> mqe)
|
|
{
|
|
mqe->throttled = enabled;
|
|
ByteStream msg(sizeof(ISMPacketHeader));
|
|
ISMPacketHeader *ism = (ISMPacketHeader *) msg.getInputPtr();
|
|
|
|
ism->Interleave = uniqueID;
|
|
ism->Command = BATCH_PRIMITIVE_ACK;
|
|
ism->Size = (enabled ? 0 : -1);
|
|
|
|
#ifdef VALGRIND
|
|
/* XXXPAT: For testing in valgrind, init the vars that don't get used */
|
|
ism->Flags = 0;
|
|
ism->Type = 0;
|
|
ism->MsgCount = 0;
|
|
ism->Status = 0;
|
|
#endif
|
|
|
|
msg.advanceInputPtr(sizeof(ISMPacketHeader));
|
|
|
|
for (uint32_t i = 0; i < mqe->pmCount; i++)
|
|
writeToClient(i, msg);
|
|
}
|
|
|
|
void DistributedEngineComm::write(uint32_t senderID, ByteStream& msg)
|
|
{
|
|
ISMPacketHeader *ism = (ISMPacketHeader *) msg.buf();
|
|
uint32_t dest;
|
|
uint32_t numConn = fPmConnections.size();
|
|
|
|
if (numConn > 0) {
|
|
switch (ism->Command) {
|
|
case BATCH_PRIMITIVE_CREATE:
|
|
/* Disable flow control initially */
|
|
msg << (uint32_t) -1;
|
|
case BATCH_PRIMITIVE_DESTROY:
|
|
case BATCH_PRIMITIVE_ADD_JOINER:
|
|
case BATCH_PRIMITIVE_END_JOINER:
|
|
case BATCH_PRIMITIVE_ABORT:
|
|
case DICT_CREATE_EQUALITY_FILTER:
|
|
case DICT_DESTROY_EQUALITY_FILTER:
|
|
/* XXXPAT: This relies on the assumption that the first pmCount "PMS*"
|
|
entries in the config file point to unique PMs */
|
|
uint32_t i;
|
|
for (i = 0; i < pmCount; i++)
|
|
writeToClient(i, msg, senderID);
|
|
return;
|
|
case BATCH_PRIMITIVE_RUN:
|
|
case DICT_TOKEN_BY_SCAN_COMPARE:
|
|
// for efficiency, writeToClient() grabs the interleaving factor for the caller,
|
|
// and decides the final connection index because it already grabs the
|
|
// caller's queue information
|
|
dest = ism->Interleave;
|
|
writeToClient(dest, msg, senderID, true);
|
|
break;
|
|
default:
|
|
idbassert_s(0, "Unknown message type");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
writeToLog(__FILE__, __LINE__, "No PrimProcs are running", LOG_TYPE_DEBUG);
|
|
throw IDBExcept(ERR_NO_PRIMPROC);
|
|
}
|
|
}
|
|
|
|
void DistributedEngineComm::write(messageqcpp::ByteStream &msg, uint32_t connection)
|
|
{
|
|
ISMPacketHeader *ism = (ISMPacketHeader *) msg.buf();
|
|
PrimitiveHeader *pm = (PrimitiveHeader *) (ism + 1);
|
|
uint32_t senderID = pm->UniqueID;
|
|
|
|
mutex::scoped_lock lk(fMlock, defer_lock_t());
|
|
MessageQueueMap::iterator it;
|
|
// This keeps mqe's stats from being freed until end of function
|
|
boost::shared_ptr<MQE> mqe;
|
|
Stats *senderStats = NULL;
|
|
|
|
lk.lock();
|
|
it = fSessionMessages.find(senderID);
|
|
if (it != fSessionMessages.end())
|
|
mqe = it->second;
|
|
senderStats = &(mqe->stats);
|
|
lk.unlock();
|
|
|
|
newClients[connection]->write(msg, NULL, senderStats);
|
|
}
|
|
|
|
void DistributedEngineComm::StartClientListener(boost::shared_ptr<MessageQueueClient> cl, uint32_t connIndex)
|
|
{
|
|
boost::thread *thrd = new boost::thread(EngineCommRunner(this, cl, connIndex));
|
|
fPmReader.push_back(thrd);
|
|
}
|
|
|
|
void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats *stats)
|
|
{
|
|
ISMPacketHeader *hdr = (ISMPacketHeader*)(sbs->buf());
|
|
PrimitiveHeader *p = (PrimitiveHeader *)(hdr+1);
|
|
uint32_t uniqueId = p->UniqueID;
|
|
boost::shared_ptr<MQE> mqe;
|
|
|
|
mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
|
|
if(map_tok == fSessionMessages.end())
|
|
{
|
|
// For debugging...
|
|
//cerr << "DistributedEngineComm::AddDataToOutput: tried to add a message to a dead session: " << uniqueId << ", size " << sbs->length() << ", step id " << p->StepID << endl;
|
|
return;
|
|
}
|
|
mqe = map_tok->second;
|
|
lk.unlock();
|
|
|
|
if (pmCount > 0) {
|
|
(void)atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]);
|
|
}
|
|
TSQSize_t queueSize = mqe->queue.push(sbs);
|
|
|
|
if (mqe->sendACKs) {
|
|
mutex::scoped_lock lk(ackLock);
|
|
uint64_t msgSize = sbs->lengthWithHdrOverhead();
|
|
if (!mqe->throttled && msgSize > (targetRecvQueueSize/2))
|
|
doHasBigMsgs(mqe, (300*1024*1024 > 3*msgSize ?
|
|
300*1024*1024 : 3*msgSize)); //buffer at least 3 big msgs
|
|
if (!mqe->throttled && queueSize.size >= mqe->targetQueueSize)
|
|
setFlowControl(true, uniqueId, mqe);
|
|
}
|
|
if (stats)
|
|
mqe->stats.dataRecvd(stats->dataRecvd());
|
|
}
|
|
|
|
void DistributedEngineComm::doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t targetSize)
|
|
{
|
|
mqe->hasBigMsgs = true;
|
|
if (mqe->targetQueueSize < targetSize)
|
|
mqe->targetQueueSize = targetSize;
|
|
}
|
|
|
|
int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uint32_t sender, bool doInterleaving)
|
|
{
|
|
mutex::scoped_lock lk(fMlock, defer_lock_t());
|
|
MessageQueueMap::iterator it;
|
|
// Keep mqe's stats from being freed early
|
|
boost::shared_ptr<MQE> mqe;
|
|
Stats *senderStats = NULL;
|
|
uint32_t interleaver = 0;
|
|
|
|
if (fPmConnections.size() == 0)
|
|
return 0;
|
|
|
|
if (sender != numeric_limits<uint32_t>::max()) {
|
|
lk.lock();
|
|
it = fSessionMessages.find(sender);
|
|
if (it != fSessionMessages.end()) {
|
|
mqe = it->second;
|
|
senderStats = &(mqe->stats);
|
|
if (doInterleaving)
|
|
interleaver = it->second->interleaver[index % it->second->pmCount]++;
|
|
}
|
|
lk.unlock();
|
|
}
|
|
|
|
try
|
|
{
|
|
if (doInterleaving)
|
|
index = (index + (interleaver * pmCount)) % fPmConnections.size();
|
|
ClientList::value_type client = fPmConnections[index];
|
|
if (!client->isAvailable()) return 0;
|
|
|
|
mutex::scoped_lock lk(*(fWlock[index]));
|
|
client->write(bs, NULL, senderStats);
|
|
return 0;
|
|
}
|
|
catch(...)
|
|
{
|
|
// @bug 488. error out under such condition instead of re-trying other connection,
|
|
// by pushing 0 size bytestream to messagequeue and throw excpetion
|
|
/* SBS sbs;
|
|
lk.lock();
|
|
//cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << endl;
|
|
MessageQueueMap::iterator map_tok;
|
|
sbs.reset(new ByteStream(0));
|
|
|
|
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
|
|
{
|
|
map_tok->second->queue.clear();
|
|
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
|
|
map_tok->second->queue.push(sbs);
|
|
}
|
|
|
|
lk.unlock();
|
|
|
|
// reconfig the connection array
|
|
ClientList tempConns;
|
|
{
|
|
//cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<< endl;
|
|
mutex::scoped_lock onErrLock(fOnErrMutex);
|
|
string moduleName = fPmConnections[index]->moduleName();
|
|
//cout << "module name = " << moduleName << endl;
|
|
if (index >= fPmConnections.size()) return 0;
|
|
|
|
for (uint32_t i = 0; i < fPmConnections.size(); i++)
|
|
{
|
|
if (moduleName != fPmConnections[i]->moduleName())
|
|
tempConns.push_back(fPmConnections[i]);
|
|
}
|
|
if (tempConns.size() == fPmConnections.size()) return 0;
|
|
fPmConnections.swap(tempConns);
|
|
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
|
|
}
|
|
*/
|
|
// send alarm
|
|
ALARMManager alarmMgr;
|
|
string alarmItem("UNKNOWN");
|
|
if (index < fPmConnections.size())
|
|
{
|
|
alarmItem = fPmConnections[index]->addr2String();
|
|
}
|
|
alarmItem.append(" PrimProc");
|
|
alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
|
|
throw runtime_error("DistributedEngineComm::write: Broken Pipe error");
|
|
}
|
|
}
|
|
|
|
uint32_t DistributedEngineComm::size(uint32_t key)
|
|
{
|
|
mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
|
|
if(map_tok == fSessionMessages.end())
|
|
throw runtime_error("DEC::size() attempt to get the size of a nonexistant queue!");
|
|
boost::shared_ptr<MQE> mqe = map_tok->second;
|
|
//TODO: should probably check that this is a valid iter...
|
|
lk.unlock();
|
|
return mqe->queue.size().count;
|
|
}
|
|
|
|
void DistributedEngineComm::addDECEventListener(DECEventListener *l)
|
|
{
|
|
mutex::scoped_lock lk(eventListenerLock);
|
|
eventListeners.push_back(l);
|
|
}
|
|
|
|
void DistributedEngineComm::removeDECEventListener(DECEventListener *l)
|
|
{
|
|
mutex::scoped_lock lk(eventListenerLock);
|
|
std::vector<DECEventListener *> newListeners;
|
|
uint32_t s = eventListeners.size();
|
|
|
|
for (uint32_t i = 0; i < s; i++)
|
|
if (eventListeners[i] != l)
|
|
newListeners.push_back(eventListeners[i]);
|
|
eventListeners.swap(newListeners);
|
|
}
|
|
|
|
Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID)
|
|
{
|
|
mutex::scoped_lock lk(fMlock);
|
|
MessageQueueMap::iterator it;
|
|
Stats empty;
|
|
|
|
it = fSessionMessages.find(uniqueID);
|
|
if (it != fSessionMessages.end())
|
|
return it->second->stats;
|
|
return empty;
|
|
}
|
|
|
|
DistributedEngineComm::MQE::MQE(uint32_t pCount) : ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false),
|
|
targetQueueSize(targetRecvQueueSize)
|
|
{
|
|
unackedWork.reset(new volatile uint32_t[pmCount]);
|
|
interleaver.reset(new uint32_t[pmCount]);
|
|
memset((void *) unackedWork.get(), 0, pmCount * sizeof(uint32_t));
|
|
memset((void *) interleaver.get(), 0, pmCount * sizeof(uint32_t));
|
|
}
|
|
|
|
}
|
|
// vim:ts=4 sw=4:
|