1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
mariadb-columnstore-engine/dbcon/joblist/distributedenginecomm.cpp
drrtuy 6445f4dff3
feat(joblist,runtime): this is the first part of the execution model that produces a workload that can be predicted for a given query.
* feat(joblist,runtime): this is the first part of the execution model that produces a workload that can be predicted for a given query.
  - forces to UM join converter to use a value from a configuration
  - replaces a constant used to control a number of outstanding requests with a value depends on column width
  - modifies related Columnstore.xml values
2024-12-03 22:17:49 +00:00

1307 lines
35 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
* Copyright (C) 2016-2022 MariaDB Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
//
// $Id: distributedenginecomm.cpp 9655 2013-06-25 23:08:13Z xlou $
//
// C++ Implementation: distributedenginecomm
//
// Description:
//
//
// Author: <pfigg@calpont.com>, (C) 2006
//
// Copyright: See COPYING file that comes with this distribution
//
//
#include <sstream>
#include <stdexcept>
#include <cassert>
#include <ctime>
#include <algorithm>
#include <unistd.h>
#include <chrono>
#include <thread>
#include <ifaddrs.h>
using namespace std;
#include <boost/scoped_array.hpp>
#include <boost/thread.hpp>
using namespace boost;
#include "distributedenginecomm.h"
#include "messagequeue.h"
#include "bytestream.h"
using namespace messageqcpp;
#include "configcpp.h"
using namespace config;
#include "errorids.h"
#include "exceptclasses.h"
#include "messagelog.h"
#include "messageobj.h"
#include "loggingid.h"
using namespace logging;
#include "liboamcpp.h"
using namespace oam;
#include "jobstep.h"
using namespace joblist;
#include "atomicops.h"
#include "threadnaming.h"
namespace
{
void writeToLog(const char* file, int line, const string& msg, LOG_TYPE logto = LOG_TYPE_INFO)
{
LoggingID lid(05);
MessageLog ml(lid);
Message::Args args;
Message m(0);
args.add(file);
args.add("@");
args.add(line);
args.add(msg);
m.format(args);
switch (logto)
{
case LOG_TYPE_DEBUG: ml.logDebugMessage(m); break;
case LOG_TYPE_INFO: ml.logInfoMessage(m); break;
case LOG_TYPE_WARNING: ml.logWarningMessage(m); break;
case LOG_TYPE_ERROR: ml.logWarningMessage(m); break;
case LOG_TYPE_CRITICAL: ml.logCriticalMessage(m); break;
}
}
// @bug 1463. this function is added for PM failover. for dual/more nic PM,
// this function is used to get the module name
string getModuleNameByIPAddr(oam::ModuleTypeConfig moduletypeconfig, string ipAddress)
{
string modulename = "";
DeviceNetworkList::iterator pt = moduletypeconfig.ModuleNetworkList.begin();
for (; pt != moduletypeconfig.ModuleNetworkList.end(); pt++)
{
modulename = (*pt).DeviceName;
HostConfigList::iterator pt1 = (*pt).hostConfigList.begin();
for (; pt1 != (*pt).hostConfigList.end(); pt1++)
{
if (ipAddress == (*pt1).IPAddr)
return modulename;
}
}
return modulename;
}
struct EngineCommRunner
{
EngineCommRunner(joblist::DistributedEngineComm* jl, boost::shared_ptr<MessageQueueClient> cl,
uint32_t connectionIndex)
: jbl(jl), client(cl), connIndex(connectionIndex)
{
}
joblist::DistributedEngineComm* jbl;
boost::shared_ptr<MessageQueueClient> client;
uint32_t connIndex;
void operator()()
{
utils::setThreadName("DECRunner");
// cout << "Listening on client at 0x" << hex << (ptrdiff_t)client << dec << endl;
try
{
jbl->Listen(client, connIndex);
}
catch (std::exception& ex)
{
string what(ex.what());
cerr << "exception caught in EngineCommRunner: " << what << endl;
if (what.find("St9bad_alloc") != string::npos)
{
writeToLog(__FILE__, __LINE__, what, LOG_TYPE_CRITICAL);
// abort();
}
else
writeToLog(__FILE__, __LINE__, what);
}
catch (...)
{
string msg("exception caught in EngineCommRunner.");
writeToLog(__FILE__, __LINE__, msg);
cerr << msg << endl;
}
}
};
template <typename T>
struct QueueShutdown
{
void operator()(T& x)
{
x.shutdown();
}
};
} // namespace
/** Debug macro */
#define THROTTLE_DEBUG 0
#if THROTTLE_DEBUG
#define THROTTLEDEBUG std::cout
#else
#define THROTTLEDEBUG \
if (false) \
std::cout
#endif
namespace joblist
{
bool isDefaultLocalConnectionId(const uint32_t connectionId)
{
return defaultLocalConnectionId() == connectionId;
}
bool needToSendToLocalPM(const bool isExeMgr, const uint32_t connectionId)
{
return isExeMgr && !isDefaultLocalConnectionId(connectionId);
}
DistributedEngineComm* DistributedEngineComm::fInstance = 0;
/*static*/
DistributedEngineComm* DistributedEngineComm::instance(ResourceManager* rm, bool isExeMgr)
{
if (fInstance == 0)
{
fInstance = new DistributedEngineComm(rm, isExeMgr);
if (isExeMgr && fInstance)
{
fInstance->getLocalNetIfacesSins();
}
}
return fInstance;
}
/*static*/
void DistributedEngineComm::reset()
{
delete fInstance;
fInstance = 0;
}
DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr)
: fRm(rm), pmCount(0), fIsExeMgr(isExeMgr)
{
if (fIsExeMgr)
{
getLocalNetIfacesSins();
}
Setup();
}
DistributedEngineComm::~DistributedEngineComm()
{
Close();
fInstance = 0;
}
int32_t DistributedEngineComm::Setup()
{
// This is here to ensure that this function does not get invoked multiple times simultaneously.
boost::mutex::scoped_lock setupLock(fSetupMutex);
makeBusy(true);
// This needs to be here to ensure that whenever Setup function is called, the lists are
// empty. It's possible junk was left behind if exception.
ClientList::iterator iter;
for (iter = newClients.begin(); iter != newClients.end(); ++iter)
{
(*iter)->shutdown();
}
newClients.clear();
newLocks.clear();
uint32_t newPmCount = fRm->getPsCount();
tbpsThreadCount = fRm->getJlNumScanReceiveThreads();
fDECConnectionsPerQuery = fRm->getDECConnectionsPerQuery();
unsigned numConnections = getNumConnections();
flowControlEnableBytesThresh = fRm->getDECEnableBytesThresh();
flowControlDisableBytesThresh = fRm->getDECDisableBytesThresh();
oam::Oam oam;
ModuleTypeConfig moduletypeconfig;
try
{
oam.getSystemConfig("pm", moduletypeconfig);
}
catch (...)
{
writeToLog(__FILE__, __LINE__, "oam.getSystemConfig error, unknown exception", LOG_TYPE_ERROR);
throw runtime_error("Setup failed");
}
if (newPmCount == 0)
writeToLog(__FILE__, __LINE__, "Got a config file with 0 PMs", LOG_TYPE_CRITICAL);
auto* config = fRm->getConfig();
std::vector<messageqcpp::AddrAndPortPair> pmsAddressesAndPorts;
for (size_t i = 1; i <= newPmCount; ++i)
{
std::string pmConfigNodeName("PMS");
pmConfigNodeName.append(std::to_string(i));
// The port returned by getAddressAndPort can be 0 but this will be handled by
// MessageQueueClient::connect
pmsAddressesAndPorts.push_back(messageqcpp::getAddressAndPort(config, pmConfigNodeName));
}
// numConnections must be calculated as number of PMs * number of connections per PM.
// This happens earlier in getNumConnections().
for (size_t i = 0; i < numConnections; ++i)
{
size_t connectionId = i % newPmCount;
boost::shared_ptr<MessageQueueClient> cl(new MessageQueueClient(
pmsAddressesAndPorts[connectionId].first, pmsAddressesAndPorts[connectionId].second));
if (clientAtTheSameHost(cl))
{
cl->atTheSameHost(true);
assert(connectionId <= std::numeric_limits<uint32_t>::max());
localConnectionId_ = connectionId;
}
std::shared_ptr<std::mutex> nl(new std::mutex());
try
{
if (cl->connect())
{
newClients.push_back(cl);
// assign the module name
cl->moduleName(getModuleNameByIPAddr(moduletypeconfig, cl->addr2String()));
newLocks.push_back(nl);
StartClientListener(cl, i);
}
else
{
throw runtime_error("Connection refused from PMS" + std::to_string(connectionId));
}
}
catch (std::exception& ex)
{
if (i < newPmCount)
newPmCount = newPmCount > 1 ? newPmCount - 1 : 1; // We can't afford to reduce newPmCount to 0
writeToLog(__FILE__, __LINE__,
"Could not connect to PMS" + std::to_string(connectionId) + ": " + ex.what(),
LOG_TYPE_ERROR);
if (newPmCount == 0)
writeToLog(__FILE__, __LINE__, "No more PMs to try to connect to", LOG_TYPE_ERROR);
return 1;
}
catch (...)
{
if (i < newPmCount)
newPmCount = newPmCount > 1 ? newPmCount - 1 : 1; // We can't afford to reduce newPmCount to 0
writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId),
LOG_TYPE_ERROR);
if (newPmCount == 0)
writeToLog(__FILE__, __LINE__, "No more PMs to try to connect to", LOG_TYPE_ERROR);
return 1;
}
}
// for every entry in newClients up to newPmCount, scan for the same ip in the
// first pmCount. If there is no match, it's a new node,
// call the event listeners' newPMOnline() callbacks.
boost::mutex::scoped_lock lock(eventListenerLock);
for (uint32_t i = 0; i < newPmCount; i++)
{
uint32_t j;
for (j = 0; j < pmCount; j++)
{
if (!fPmConnections.empty() && j < fPmConnections.size() &&
newClients[i]->isSameAddr(*fPmConnections[j]))
{
break;
}
}
if (j == pmCount)
for (uint32_t k = 0; k < eventListeners.size(); k++)
eventListeners[k]->newPMOnline(i);
}
lock.unlock();
fWlock.swap(newLocks);
fPmConnections.swap(newClients);
// memory barrier to prevent the pmCount assignment migrating upward
atomicops::atomicMb();
pmCount = newPmCount;
newLocks.clear();
newClients.clear();
return 0;
}
int DistributedEngineComm::Close()
{
// cout << "DistributedEngineComm::Close() called" << endl;
makeBusy(false);
// for each MessageQueueClient in pmConnections delete the MessageQueueClient;
fPmConnections.clear();
fPmReader.clear();
return 0;
}
void DistributedEngineComm::Listen(boost::shared_ptr<MessageQueueClient> client, uint32_t connIndex)
{
SBS sbs;
try
{
while (Busy())
{
Stats stats;
// TODO: This call blocks so setting Busy() in another thread doesn't work here...
sbs = client->read(0, NULL, &stats);
if (sbs->length() != 0)
{
addDataToOutput(sbs, connIndex, &stats);
}
else // got zero bytes on read, nothing more will come
goto Error;
}
return;
}
catch (std::exception& e)
{
cerr << "DEC Caught EXCEPTION: " << e.what() << endl;
goto Error;
}
catch (...)
{
cerr << "DEC Caught UNKNOWN EXCEPT" << endl;
goto Error;
}
Error:
// @bug 488 - error condition! push 0 length bs to messagequeuemap and
// eventually let jobstep error out.
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{
map_tok->second->queue.clear();
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
map_tok->second->queue.push(sbs);
}
lk.unlock();
if (fIsExeMgr)
{
decltype(pmCount) originalPMCount = pmCount;
// Re-establish if a remote PM restarted.
std::this_thread::sleep_for(std::chrono::seconds(3));
auto rc = Setup();
if (rc || originalPMCount != pmCount)
{
ostringstream os;
os << "DEC: lost connection to " << client->addr2String();
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_ERROR);
}
}
return;
}
void DistributedEngineComm::addQueue(uint32_t key, bool sendACKs)
{
bool b;
boost::mutex* lock = new boost::mutex();
condition* cond = new condition();
uint32_t firstPMInterleavedConnectionId =
key % (fPmConnections.size() / pmCount) * fDECConnectionsPerQuery * pmCount % fPmConnections.size();
boost::shared_ptr<MQE> mqe(new MQE(pmCount, firstPMInterleavedConnectionId, flowControlEnableBytesThresh));
mqe->queue = StepMsgQueue(lock, cond);
mqe->sendACKs = sendACKs;
mqe->throttled = false;
std::lock_guard lk(fMlock);
b = fSessionMessages.insert(pair<uint32_t, boost::shared_ptr<MQE> >(key, mqe)).second;
if (!b)
{
ostringstream os;
os << "DEC: attempt to add a queue with a duplicate ID " << key << endl;
throw runtime_error(os.str());
}
}
void DistributedEngineComm::removeQueue(uint32_t key)
{
std::lock_guard lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
return;
map_tok->second->queue.shutdown();
map_tok->second->queue.clear();
fSessionMessages.erase(map_tok);
}
void DistributedEngineComm::shutdownQueue(uint32_t key)
{
std::lock_guard lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
return;
map_tok->second->queue.shutdown();
map_tok->second->queue.clear();
}
void DistributedEngineComm::read(uint32_t key, SBS& bs)
{
boost::shared_ptr<MQE> mqe;
// Find the StepMsgQueueList for this session
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
{
ostringstream os;
os << "DEC: attempt to read(bs) from a nonexistent queue\n";
throw runtime_error(os.str());
}
mqe = map_tok->second;
lk.unlock();
// this method can block: you can't hold any locks here...
TSQSize_t queueSize = mqe->queue.pop(&bs);
if (bs && mqe->sendACKs)
{
std::unique_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh)
setFlowControl(false, key, mqe);
vector<SBS> v;
v.push_back(bs);
sendAcks(key, v, mqe, queueSize.size);
}
if (!bs)
bs.reset(new ByteStream());
}
const ByteStream DistributedEngineComm::read(uint32_t key)
{
SBS sbs;
boost::shared_ptr<MQE> mqe;
// Find the StepMsgQueueList for this session
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
{
ostringstream os;
os << "DEC: read(): attempt to read from a nonexistent queue\n";
throw runtime_error(os.str());
}
mqe = map_tok->second;
lk.unlock();
TSQSize_t queueSize = mqe->queue.pop(&sbs);
if (sbs && mqe->sendACKs)
{
std::unique_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh)
setFlowControl(false, key, mqe);
vector<SBS> v;
v.push_back(sbs);
sendAcks(key, v, mqe, queueSize.size);
}
if (!sbs)
sbs.reset(new ByteStream());
return *sbs;
}
SBS DistributedEngineComm::createBatchPrimitiveCommand(ISMPACKETCOMMAND command, uint32_t uniqueID,
uint16_t size)
{
SBS bpCommand(new ByteStream(sizeof(ISMPacketHeader)));
auto* ism = (ISMPacketHeader*)bpCommand->getInputPtr();
ism->Interleave = uniqueID;
ism->Command = command;
ism->Size = size;
bpCommand->advanceInputPtr(sizeof(ISMPacketHeader));
return bpCommand;
}
void DistributedEngineComm::read_all(uint32_t key, vector<SBS>& v)
{
boost::shared_ptr<MQE> mqe;
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
{
ostringstream os;
os << "DEC: read_all(): attempt to read from a nonexistent queue\n";
throw runtime_error(os.str());
}
mqe = map_tok->second;
lk.unlock();
mqe->queue.pop_all(v);
if (mqe->sendACKs)
{
std::unique_lock lk(ackLock);
sendAcks(key, v, mqe, 0);
}
}
void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS>& v, bool* flowControlOn)
{
boost::shared_ptr<MQE> mqe;
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
{
ostringstream os;
os << "DEC: read_some(): attempt to read from a nonexistent queue\n";
throw runtime_error(os.str());
}
mqe = map_tok->second;
lk.unlock();
TSQSize_t queueSize = mqe->queue.pop_some(divisor, v, 1); // need to play with the min #
if (flowControlOn)
*flowControlOn = false;
if (mqe->sendACKs)
{
std::unique_lock lk(ackLock);
if (mqe->throttled && !mqe->hasBigMsgs && queueSize.size <= flowControlDisableBytesThresh)
setFlowControl(false, key, mqe);
sendAcks(key, v, mqe, queueSize.size);
if (flowControlOn)
*flowControlOn = mqe->throttled;
}
}
void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs, boost::shared_ptr<MQE> mqe,
size_t queueSize)
{
ISMPacketHeader* ism;
uint32_t l_msgCount = msgs.size();
/* If the current queue size > target, do nothing.
* If the original queue size > target, ACK the msgs below the target.
*/
if (!mqe->throttled || queueSize >= mqe->targetQueueSize)
{
/* no acks will be sent, but update unackedwork to keep the #s accurate */
uint16_t numack = 0;
uint32_t sockidx = 0;
while (l_msgCount > 0)
{
nextPMToACK(mqe, l_msgCount, &sockidx, &numack);
idbassert(numack <= l_msgCount);
l_msgCount -= numack;
}
return;
}
size_t totalMsgSize = 0;
for (uint32_t i = 0; i < msgs.size(); i++)
totalMsgSize += msgs[i]->lengthWithHdrOverhead();
if (queueSize + totalMsgSize > mqe->targetQueueSize)
{
/* update unackedwork for the overage that will never be acked */
int64_t overage = queueSize + totalMsgSize - mqe->targetQueueSize;
uint16_t numack = 0;
uint32_t sockidx = 0;
uint32_t msgsToIgnore;
for (msgsToIgnore = 0; overage >= 0; msgsToIgnore++)
overage -= msgs[msgsToIgnore]->lengthWithHdrOverhead();
if (overage < 0)
msgsToIgnore--;
l_msgCount = msgs.size() - msgsToIgnore; // this num gets acked
while (msgsToIgnore > 0)
{
nextPMToACK(mqe, msgsToIgnore, &sockidx, &numack);
idbassert(numack <= msgsToIgnore);
msgsToIgnore -= numack;
}
}
if (l_msgCount > 0)
{
SBS msg(new ByteStream(sizeof(ISMPacketHeader)));
uint16_t* toAck;
vector<bool> pmAcked(pmCount, false);
ism = (ISMPacketHeader*)msg->getInputPtr();
// The only var checked by ReadThread is the Command var. The others
// are wasted space. We hijack the Size, & Flags fields for the
// params to the ACK msg.
ism->Interleave = uniqueID;
ism->Command = BATCH_PRIMITIVE_ACK;
toAck = &ism->Size;
msg->advanceInputPtr(sizeof(ISMPacketHeader));
// There must be only one local connection here.
bool sendToLocal = false;
while (l_msgCount > 0)
{
/* could have to send up to pmCount ACKs */
uint32_t sockIndex = 0;
/* This will reset the ACK field in the Bytestream directly, and nothing
* else needs to change if multiple msgs are sent. */
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
idbassert(*toAck <= l_msgCount);
l_msgCount -= *toAck;
if (sockIndex == localConnectionId_ && needToSendToLocalPM(fIsExeMgr, sockIndex))
{
sendToLocal = true;
continue;
}
pmAcked[sockIndex] = true;
writeToClient(sockIndex, msg);
}
if (sendToLocal)
{
pmAcked[localConnectionId_] = true;
writeToClient(localConnectionId_, msg);
}
// @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
// This is apply to the big message case only. For small messages, the flow control is
// disabled when the queue size is below the flowControlDisableBytesThresh.
if (mqe->hasBigMsgs)
{
uint64_t totalUnackedWork = 0;
for (uint32_t i = 0; i < pmCount; ++i)
totalUnackedWork += mqe->unackedWork[i];
if (totalUnackedWork == 0)
{
*toAck = 1;
for (uint32_t i = 0; i < pmCount; ++i)
{
if (!pmAcked[i])
{
if (i == localConnectionId_ && fIsExeMgr)
{
continue;
}
// MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server.
SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1);
writeToClient(i, ackCommand);
}
}
if (needToSendToLocalPM(fIsExeMgr, localConnectionId_) && !pmAcked[localConnectionId_])
{
writeToClient(localConnectionId_, msg);
}
}
}
}
}
void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t maxAck, uint32_t* sockIndex,
uint16_t* numToAck)
{
uint32_t& nextIndex = mqe->ackSocketIndex;
/* Other threads can be touching mqe->unackedWork at the same time, but because of
* the locking env, mqe->unackedWork can only grow; whatever gets latched in this fcn
* is a safe minimum at the point of use. */
if (mqe->unackedWork[nextIndex] >= maxAck)
{
(void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], maxAck);
*sockIndex = nextIndex;
// FIXME: we're going to truncate here from 32 to 16 bits. Hopefully this will always fit...
*numToAck = maxAck;
if (pmCount > 0)
nextIndex = (nextIndex + 1) % pmCount;
return;
}
else
{
for (int i = pmCount - 1; i >= 0; --i)
{
uint32_t curVal = mqe->unackedWork[nextIndex];
uint32_t unackedWork = (curVal > maxAck ? maxAck : curVal);
if (unackedWork > 0)
{
(void)atomicops::atomicSub(&mqe->unackedWork[nextIndex], unackedWork);
*sockIndex = nextIndex;
*numToAck = unackedWork;
if (pmCount > 0)
nextIndex = (nextIndex + 1) % pmCount;
return;
}
if (pmCount > 0)
nextIndex = (nextIndex + 1) % pmCount;
}
cerr << "DEC::nextPMToACK(): Couldn't find a PM to ACK! ";
for (int i = pmCount - 1; i >= 0; --i)
cerr << mqe->unackedWork[i] << " ";
cerr << " max: " << maxAck;
cerr << endl;
// make sure the returned vars are legitimate
*sockIndex = nextIndex;
*numToAck = maxAck / pmCount;
if (pmCount > 0)
nextIndex = (nextIndex + 1) % pmCount;
return;
}
}
void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boost::shared_ptr<MQE> mqe)
{
mqe->throttled = enabled;
SBS msg(new ByteStream(sizeof(ISMPacketHeader)));
ISMPacketHeader* ism = (ISMPacketHeader*)msg->getInputPtr();
ism->Interleave = uniqueID;
ism->Command = BATCH_PRIMITIVE_ACK;
ism->Size = (enabled ? 0 : -1);
msg->advanceInputPtr(sizeof(ISMPacketHeader));
for (uint32_t i = 0; i < mqe->pmCount; ++i)
{
if (i == localConnectionId_ && fIsExeMgr)
{
continue;
}
writeToClient(i, msg);
}
if (needToSendToLocalPM(fIsExeMgr, localConnectionId_))
{
writeToClient(localConnectionId_, msg);
}
}
int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
{
ISMPacketHeader* ism = (ISMPacketHeader*)msg->buf();
uint32_t dest;
uint32_t numConn = fPmConnections.size();
if (numConn > 0)
{
switch (ism->Command)
{
case BATCH_PRIMITIVE_CREATE:
/* Disable flow control initially */
*msg << (uint32_t)-1;
/* FALLTHRU */
case BATCH_PRIMITIVE_DESTROY:
case BATCH_PRIMITIVE_ADD_JOINER:
case BATCH_PRIMITIVE_END_JOINER:
case BATCH_PRIMITIVE_ABORT:
case DICT_CREATE_EQUALITY_FILTER:
case DICT_DESTROY_EQUALITY_FILTER:
/* XXXPAT: This relies on the assumption that the first pmCount "PMS*"
entries in the config file point to unique PMs */
{
int32_t rc = 0;
for (uint32_t i = 0; i < pmCount; ++i)
{
if (i == localConnectionId_ && fIsExeMgr)
{
continue;
}
if ((rc = writeToClient(i, msg, senderID)))
{
return rc;
}
}
if (needToSendToLocalPM(fIsExeMgr, localConnectionId_))
{
return writeToClient(localConnectionId_, msg);
}
return rc;
}
case BATCH_PRIMITIVE_RUN:
case DICT_TOKEN_BY_SCAN_COMPARE:
{
// for efficiency, writeToClient() grabs the interleaving factor for the caller,
// and decides the final connection index because it already grabs the
// caller's queue information
dest = ism->Interleave;
return writeToClient(dest, msg, senderID, true);
}
default: idbassert_s(0, "Unknown message type");
}
}
else
{
writeToLog(__FILE__, __LINE__, "No PrimProcs are running", LOG_TYPE_DEBUG);
throw IDBExcept(ERR_NO_PRIMPROC);
}
return 0;
}
void DistributedEngineComm::write(messageqcpp::ByteStream& msg, uint32_t connection)
{
ISMPacketHeader* ism = (ISMPacketHeader*)msg.buf();
PrimitiveHeader* pm = (PrimitiveHeader*)(ism + 1);
uint32_t senderID = pm->UniqueID;
MessageQueueMap::iterator it;
// This keeps mqe's stats from being freed until end of function
boost::shared_ptr<MQE> mqe;
Stats* senderStats = NULL;
std::unique_lock lk(fMlock);
it = fSessionMessages.find(senderID);
if (it != fSessionMessages.end())
{
mqe = it->second;
senderStats = &(mqe->stats);
}
lk.unlock();
newClients[connection]->write(msg, NULL, senderStats);
}
void DistributedEngineComm::StartClientListener(boost::shared_ptr<MessageQueueClient> cl, uint32_t connIndex)
{
boost::thread* thrd = new boost::thread(EngineCommRunner(this, cl, connIndex));
fPmReader.push_back(thrd);
}
void DistributedEngineComm::addDataToOutput(SBS sbs)
{
assert(localConnectionId_ < pmCount);
return addDataToOutput(sbs, localConnectionId_, nullptr);
}
void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* stats)
{
ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf());
PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1);
uint32_t uniqueId = p->UniqueID;
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
// The message for a session that doesn't exist.
if (map_tok == fSessionMessages.end())
{
// Here gets the dead session ByteStream that is already removed
// from DEC queue.
return;
}
auto mqe = map_tok->second;
lk.unlock();
if (pmCount > 0)
{
(void)atomicops::atomicInc(&mqe->unackedWork[connIndex % pmCount]);
}
TSQSize_t queueSize = mqe->queue.push(sbs);
if (mqe->sendACKs)
{
std::lock_guard lk(ackLock);
uint64_t msgSize = sbs->lengthWithHdrOverhead();
if (!mqe->throttled && msgSize > (flowControlEnableBytesThresh / 2))
doHasBigMsgs(
mqe, (bigMessageSize > 3 * msgSize ? bigMessageSize : 3 * msgSize)); // buffer at least 3 big msgs
if (!mqe->throttled && queueSize.size >= mqe->targetQueueSize)
setFlowControl(true, uniqueId, mqe);
}
if (stats)
mqe->stats.dataRecvd(stats->dataRecvd());
}
void DistributedEngineComm::doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t targetSize)
{
mqe->hasBigMsgs = true;
if (mqe->targetQueueSize < targetSize)
mqe->targetQueueSize = targetSize;
}
DistributedEngineComm::SBSVector& DistributedEngineComm::readLocalQueueMessagesOrWait(
SBSVector& receivedMessages)
{
for (;;)
{
std::unique_lock<std::mutex> exchangeLock(inMemoryEM2PPExchMutex_);
if (inMemoryEM2PPExchQueue_.empty())
{
inMemoryEM2PPExchCV_.wait(exchangeLock);
continue;
}
// Batch processing to reduce the crit section
while (!inMemoryEM2PPExchQueue_.empty())
{
receivedMessages.push_back(inMemoryEM2PPExchQueue_.front());
inMemoryEM2PPExchQueue_.pop();
}
exchangeLock.unlock();
break;
}
return receivedMessages;
}
void DistributedEngineComm::pushToTheLocalQueueAndNotifyRecv(const messageqcpp::SBS& bs)
{
std::unique_lock<std::mutex> exchangeLock(inMemoryEM2PPExchMutex_);
inMemoryEM2PPExchQueue_.push(bs);
exchangeLock.unlock();
inMemoryEM2PPExchCV_.notify_one();
}
// This routine has an unexpected side-effect on its argument's SBS, namely
// SBS is cleared when it is sent to PP at the same host. This fact forces any
// code uses ::writeToClient to send to a local node in the last turn.
int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_t senderUniqueID,
bool doInterleaving)
{
MessageQueueMap::iterator it;
// Keep mqe's stats from being freed early
boost::shared_ptr<MQE> mqe;
Stats* senderStats = NULL;
if (fPmConnections.size() == 0)
return 0;
uint32_t connectionId = aPMIndex;
assert(connectionId < fPmConnections.size());
// EM-PP exchange via the queue.
if (fPmConnections[connectionId]->atTheSameHost() && fIsExeMgr)
{
pushToTheLocalQueueAndNotifyRecv(bs);
return 0;
}
if (senderUniqueID != numeric_limits<uint32_t>::max())
{
std::lock_guard lk(fMlock);
it = fSessionMessages.find(senderUniqueID);
if (it != fSessionMessages.end())
{
mqe = it->second;
senderStats = &(mqe->stats);
size_t pmIndex = aPMIndex % mqe->pmCount;
connectionId = it->second->getNextConnectionId(pmIndex, fPmConnections.size(), fDECConnectionsPerQuery);
}
}
ClientList::value_type client = fPmConnections[connectionId];
try
{
if (!client->isAvailable())
return 0;
std::lock_guard lk(*(fWlock[connectionId]));
client->write(bs, NULL, senderStats);
}
catch (...)
{
// @bug 488. error out under such condition instead of re-trying other connection,
// by pushing 0 size bytestream to messagequeue and throw exception
SBS sbs;
std::unique_lock lk(fMlock);
// std::cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << std::endl;
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{
map_tok->second->queue.clear();
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
map_tok->second->queue.push(sbs);
}
int tries = 0;
// Try to setup connection with PS, it could be a situation that PS is starting.
// MCOL-5263.
while (tries < 10 && Setup())
{
++tries;
std::this_thread::sleep_for(std::chrono::seconds(3));
}
lk.unlock();
if (tries == 10)
{
ostringstream os;
os << "DEC: lost connection to " << client->addr2String();
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_ERROR);
if (!fIsExeMgr)
abort();
else
throw runtime_error("DistributedEngineComm::write: Broken Pipe error");
}
// Connection was established.
return 1;
/*
// reconfig the connection array
ClientList tempConns;
{
//cout << "WARNING: DEC WRITE BROKEN PIPE " <<
fPmConnections[index]->otherEnd()<< endl; boost::mutex::scoped_lock onErrLock(fOnErrMutex); string
moduleName = fPmConnections[index]->moduleName();
//cout << "module name = " << moduleName << endl;
if (index >= fPmConnections.size()) return 0;
for (uint32_t i = 0; i < fPmConnections.size(); i++)
{
if (moduleName != fPmConnections[i]->moduleName())
tempConns.push_back(fPmConnections[i]);
}
if (tempConns.size() == fPmConnections.size()) return 0;
fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
}
// send alarm
ALARMManager alarmMgr;
string alarmItem("UNKNOWN");
if (index < fPmConnections.size())
{
alarmItem = fPmConnections[index]->addr2String();
}
alarmItem.append(" PrimProc");
alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
*/
}
return 0;
}
uint32_t DistributedEngineComm::size(uint32_t key)
{
std::unique_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(key);
if (map_tok == fSessionMessages.end())
throw runtime_error("DEC::size() attempt to get the size of a nonexistant queue!");
boost::shared_ptr<MQE> mqe = map_tok->second;
// TODO: should probably check that this is a valid iter...
lk.unlock();
return mqe->queue.size().count;
}
void DistributedEngineComm::addDECEventListener(DECEventListener* l)
{
boost::mutex::scoped_lock lk(eventListenerLock);
eventListeners.push_back(l);
}
void DistributedEngineComm::removeDECEventListener(DECEventListener* l)
{
boost::mutex::scoped_lock lk(eventListenerLock);
std::vector<DECEventListener*> newListeners;
uint32_t s = eventListeners.size();
for (uint32_t i = 0; i < s; i++)
if (eventListeners[i] != l)
newListeners.push_back(eventListeners[i]);
eventListeners.swap(newListeners);
}
Stats DistributedEngineComm::getNetworkStats(uint32_t uniqueID)
{
std::lock_guard lk(fMlock);
MessageQueueMap::iterator it;
Stats empty;
it = fSessionMessages.find(uniqueID);
if (it != fSessionMessages.end())
return it->second->stats;
return empty;
}
DistributedEngineComm::MQE::MQE(const uint32_t pCount, const uint32_t initialInterleaverValue,
const uint64_t flowControlEnableBytesThresh)
: ackSocketIndex(0), pmCount(pCount), hasBigMsgs(false), targetQueueSize(flowControlEnableBytesThresh)
{
unackedWork.reset(new volatile uint32_t[pmCount]);
interleaver.reset(new uint32_t[pmCount]);
memset((void*)unackedWork.get(), 0, pmCount * sizeof(uint32_t));
uint32_t interleaverValue = initialInterleaverValue;
initialConnectionId = initialInterleaverValue;
for (size_t pmId = 0; pmId < pmCount; ++pmId)
{
interleaver.get()[pmId] = interleaverValue++;
}
}
// Here is the assumed connections distribution schema in a pmConnections vector.
// (PM1-PM2...-PMX)-(PM1-PM2..-PMX)...
uint32_t DistributedEngineComm::MQE::getNextConnectionId(const size_t pmIndex,
const size_t pmConnectionsNumber,
const uint32_t DECConnectionsPerQuery)
{
uint32_t nextConnectionId = (interleaver[pmIndex] + pmCount) % pmConnectionsNumber;
// Wrap around the connection id.
if ((nextConnectionId - pmIndex) % DECConnectionsPerQuery == 0)
{
// The sum must be < connections vector size.
nextConnectionId = initialConnectionId + pmIndex;
interleaver[pmIndex] = nextConnectionId;
}
else
{
interleaver[pmIndex] = nextConnectionId;
}
return nextConnectionId;
}
template <typename T>
bool DistributedEngineComm::clientAtTheSameHost(T& client) const
{
for (auto& sin : localNetIfaceSins_)
{
if (client->isSameAddr(sin))
{
return true;
}
}
return false;
}
void DistributedEngineComm::getLocalNetIfacesSins()
{
struct ifaddrs* netIfacesList = nullptr;
struct ifaddrs* ifaceListMembPtr = nullptr;
int success = 0;
// retrieve the current interfaces - returns 0 on success
success = getifaddrs(&netIfacesList);
if (success == 0)
{
ifaceListMembPtr = netIfacesList;
for (; ifaceListMembPtr; ifaceListMembPtr = ifaceListMembPtr->ifa_next)
{
if (ifaceListMembPtr->ifa_addr && ifaceListMembPtr->ifa_addr->sa_family == AF_INET)
{
localNetIfaceSins_.push_back(((struct sockaddr_in*)ifaceListMembPtr->ifa_addr)->sin_addr);
}
}
}
freeifaddrs(netIfacesList);
}
template bool DistributedEngineComm::clientAtTheSameHost<DistributedEngineComm::SharedPtrEMSock>(
SharedPtrEMSock& client) const;
} // namespace joblist