1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-5166 This patch adds support for in-memory communication b/w EM to PP via a shared queue in DEC class

JobList low-level code relateod to primitive jobs now uses shared pointers instead of ByteStream refs talking to DEC
b/c same-node EM-PP communication now goes over a queue in DEC instead of a network hop.
PP now has a separate thread that processes the primitive job messages from that DEC queue.
This commit is contained in:
Roman Nozdrin
2022-07-21 19:37:18 +00:00
committed by Leonid Fedorov
parent 9ef16c6ded
commit a9d8924683
11 changed files with 418 additions and 232 deletions

View File

@ -38,6 +38,7 @@
#include <unistd.h>
#include <chrono>
#include <thread>
#include <ifaddrs.h>
using namespace std;
#include <boost/scoped_array.hpp>
@ -186,7 +187,13 @@ DistributedEngineComm* DistributedEngineComm::fInstance = 0;
DistributedEngineComm* DistributedEngineComm::instance(ResourceManager* rm, bool isExeMgr)
{
if (fInstance == 0)
{
fInstance = new DistributedEngineComm(rm, isExeMgr);
if (isExeMgr && fInstance)
{
fInstance->getLocalNetIfacesSins();
}
}
return fInstance;
}
@ -201,6 +208,10 @@ void DistributedEngineComm::reset()
DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr)
: fRm(rm), pmCount(0), fIsExeMgr(isExeMgr)
{
if (fIsExeMgr)
{
getLocalNetIfacesSins();
}
Setup();
}
@ -268,6 +279,10 @@ void DistributedEngineComm::Setup()
size_t connectionId = i % newPmCount;
boost::shared_ptr<MessageQueueClient> cl(new MessageQueueClient(
pmsAddressesAndPorts[connectionId].first, pmsAddressesAndPorts[connectionId].second));
if (clientAtTheSameHost(cl))
{
cl->atTheSameHost(true);
}
boost::shared_ptr<boost::mutex> nl(new boost::mutex());
try
@ -410,8 +425,6 @@ Error:
if (fIsExeMgr)
{
// std::cout << "WARNING: DEC READ 0 LENGTH BS FROM "
// << client->otherEnd()<< " OR GOT AN EXCEPTION READING" << std::endl;
decltype(pmCount) originalPMCount = pmCount;
// Re-establish if a remote PM restarted.
std::this_thread::sleep_for(std::chrono::seconds(3));
@ -700,11 +713,11 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
if (l_msgCount > 0)
{
ByteStream msg(sizeof(ISMPacketHeader));
SBS msg(new ByteStream(sizeof(ISMPacketHeader)));
uint16_t* toAck;
vector<bool> pmAcked(pmCount, false);
ism = (ISMPacketHeader*)msg.getInputPtr();
ism = (ISMPacketHeader*)msg->getInputPtr();
// The only var checked by ReadThread is the Command var. The others
// are wasted space. We hijack the Size, & Flags fields for the
// params to the ACK msg.
@ -713,7 +726,7 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
ism->Command = BATCH_PRIMITIVE_ACK;
toAck = &ism->Size;
msg.advanceInputPtr(sizeof(ISMPacketHeader));
msg->advanceInputPtr(sizeof(ISMPacketHeader));
while (l_msgCount > 0)
{
@ -819,8 +832,8 @@ void DistributedEngineComm::nextPMToACK(boost::shared_ptr<MQE> mqe, uint32_t max
void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boost::shared_ptr<MQE> mqe)
{
mqe->throttled = enabled;
ByteStream msg(sizeof(ISMPacketHeader));
ISMPacketHeader* ism = (ISMPacketHeader*)msg.getInputPtr();
SBS msg(new ByteStream(sizeof(ISMPacketHeader)));
ISMPacketHeader* ism = (ISMPacketHeader*)msg->getInputPtr();
ism->Interleave = uniqueID;
ism->Command = BATCH_PRIMITIVE_ACK;
@ -834,15 +847,15 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos
ism->Status = 0;
#endif
msg.advanceInputPtr(sizeof(ISMPacketHeader));
msg->advanceInputPtr(sizeof(ISMPacketHeader));
for (uint32_t i = 0; i < mqe->pmCount; i++)
writeToClient(i, msg);
}
void DistributedEngineComm::write(uint32_t senderID, ByteStream& msg)
void DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
{
ISMPacketHeader* ism = (ISMPacketHeader*)msg.buf();
ISMPacketHeader* ism = (ISMPacketHeader*)msg->buf();
uint32_t dest;
uint32_t numConn = fPmConnections.size();
@ -852,7 +865,7 @@ void DistributedEngineComm::write(uint32_t senderID, ByteStream& msg)
{
case BATCH_PRIMITIVE_CREATE:
/* Disable flow control initially */
msg << (uint32_t)-1;
*msg << (uint32_t)-1;
/* FALLTHRU */
case BATCH_PRIMITIVE_DESTROY:
@ -1007,7 +1020,40 @@ void DistributedEngineComm::doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t ta
mqe->targetQueueSize = targetSize;
}
int DistributedEngineComm::writeToClient(size_t aPMIndex, const ByteStream& bs, uint32_t senderUniqueID,
DistributedEngineComm::SBSVector& DistributedEngineComm::readLocalQueueMessagesOrWait(
SBSVector& receivedMessages)
{
for (;;)
{
std::unique_lock<std::mutex> exchangeLock(inMemoryEM2PPExchMutex_);
if (inMemoryEM2PPExchQueue_.empty())
{
inMemoryEM2PPExchCV_.wait(exchangeLock);
continue;
}
// Batch processing to reduce the crit section
while (!inMemoryEM2PPExchQueue_.empty())
{
receivedMessages.push_back(inMemoryEM2PPExchQueue_.front());
inMemoryEM2PPExchQueue_.pop();
}
exchangeLock.unlock();
break;
}
return receivedMessages;
}
void DistributedEngineComm::pushToTheLocalQueueAndNotifyRecv(const messageqcpp::SBS& bs)
{
std::unique_lock<std::mutex> exchangeLock(inMemoryEM2PPExchMutex_);
inMemoryEM2PPExchQueue_.push(bs);
exchangeLock.unlock();
inMemoryEM2PPExchCV_.notify_one();
}
int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_t senderUniqueID,
bool doInterleaving)
{
boost::mutex::scoped_lock lk(fMlock, boost::defer_lock_t());
@ -1020,6 +1066,14 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const ByteStream& bs,
return 0;
uint32_t connectionId = aPMIndex;
assert(connectionId < fPmConnections.size());
// EM-PP exchange via the queue.
if (fPmConnections[connectionId]->atTheSameHost() && fIsExeMgr)
{
pushToTheLocalQueueAndNotifyRecv(bs);
return 0;
}
if (senderUniqueID != numeric_limits<uint32_t>::max())
{
lk.lock();
@ -1183,4 +1237,39 @@ uint32_t DistributedEngineComm::MQE::getNextConnectionId(const size_t pmIndex,
return nextConnectionId;
}
template <typename T>
bool DistributedEngineComm::clientAtTheSameHost(T& client) const
{
for (auto& sin : localNetIfaceSins_)
{
if (client->isSameAddr(sin))
{
return true;
}
}
return false;
}
void DistributedEngineComm::getLocalNetIfacesSins()
{
struct ifaddrs* netIfacesList = nullptr;
struct ifaddrs* ifaceListMembPtr = nullptr;
int success = 0;
// retrieve the current interfaces - returns 0 on success
success = getifaddrs(&netIfacesList);
if (success == 0)
{
ifaceListMembPtr = netIfacesList;
for (; ifaceListMembPtr; ifaceListMembPtr = ifaceListMembPtr->ifa_next)
{
if (ifaceListMembPtr->ifa_addr->sa_family == AF_INET)
{
localNetIfaceSins_.push_back(((struct sockaddr_in*)ifaceListMembPtr->ifa_addr)->sin_addr);
}
}
}
freeifaddrs(netIfacesList);
}
template bool DistributedEngineComm::clientAtTheSameHost<DistributedEngineComm::SharedPtrEMSock>(
SharedPtrEMSock& client) const;
} // namespace joblist

View File

@ -32,6 +32,8 @@
#pragma once
#include <ifaddrs.h>
#include <condition_variable>
#include <iostream>
#include <vector>
#include <queue>
@ -84,7 +86,11 @@ class DECEventListener
*/
class DistributedEngineComm
{
using SharedPtrEMSock = boost::shared_ptr<messageqcpp::IOSocket>;
public:
using SBSVector = std::vector<messageqcpp::SBS>;
/**
* Constructors
*/
@ -139,7 +145,7 @@ class DistributedEngineComm
* Writes a primitive message to a primitive server. Msg needs to conatin an ISMPacketHeader. The
* LBID is extracted from the ISMPacketHeader and used to determine the actual P/M to send to.
*/
EXPORT void write(uint32_t key, messageqcpp::ByteStream& msg);
EXPORT void write(uint32_t key, const messageqcpp::SBS& msg);
// EXPORT void throttledWrite(const messageqcpp::ByteStream& msg);
@ -206,8 +212,13 @@ class DistributedEngineComm
return fIsExeMgr;
}
template <typename T>
bool clientAtTheSameHost(T& client) const;
void getLocalNetIfacesSins();
messageqcpp::Stats getNetworkStats(uint32_t uniqueID);
void addDataToOutput(messageqcpp::SBS sbs);
SBSVector& readLocalQueueMessagesOrWait(SBSVector&);
friend class ::TestDistributedEngineComm;
@ -261,9 +272,10 @@ class DistributedEngineComm
*
* Continues trying to write data to the client at the next index until all clients have been tried.
*/
int writeToClient(size_t index, const messageqcpp::ByteStream& bs,
int writeToClient(size_t index, const messageqcpp::SBS& bs,
uint32_t senderID = std::numeric_limits<uint32_t>::max(), bool doInterleaving = false);
void pushToTheLocalQueueAndNotifyRecv(const messageqcpp::SBS& bs);
static DistributedEngineComm* fInstance;
ResourceManager* fRm;
@ -300,6 +312,11 @@ class DistributedEngineComm
void setFlowControl(bool enable, uint32_t uniqueID, boost::shared_ptr<MQE> mqe);
void doHasBigMsgs(boost::shared_ptr<MQE> mqe, uint64_t targetSize);
boost::mutex ackLock;
std::vector<struct in_addr> localNetIfaceSins_;
std::mutex inMemoryEM2PPExchMutex_;
std::condition_variable inMemoryEM2PPExchCV_;
std::queue<messageqcpp::SBS> inMemoryEM2PPExchQueue_;
};
} // namespace joblist

View File

@ -325,7 +325,6 @@ void pDictionaryScan::sendPrimitiveMessages()
LBIDRange_v::iterator it;
HWM_t hwm;
uint32_t fbo;
ByteStream primMsg(65536);
DBRM dbrm;
uint16_t dbroot;
uint32_t partNum;
@ -391,8 +390,7 @@ void pDictionaryScan::sendPrimitiveMessages()
}
}
sendAPrimitiveMessage(primMsg, msgLbidStart, msgLbidCount, (*dbRootConnectionMap)[dbroot]);
primMsg.restart();
sendAPrimitiveMessage(msgLbidStart, msgLbidCount, (*dbRootConnectionMap)[dbroot]);
mutex.lock();
msgsSent += msgLbidCount;
@ -453,8 +451,7 @@ void pDictionaryScan::sendError(uint16_t s)
//------------------------------------------------------------------------------
// Construct and send a single primitive message to primproc
//------------------------------------------------------------------------------
void pDictionaryScan::sendAPrimitiveMessage(ByteStream& primMsg, BRM::LBID_t msgLbidStart,
uint32_t msgLbidCount, uint16_t pm)
void pDictionaryScan::sendAPrimitiveMessage(BRM::LBID_t msgLbidStart, uint32_t msgLbidCount, uint16_t pm)
{
DictTokenByScanRequestHeader hdr;
void* hdrp = static_cast<void*>(&hdr);
@ -500,11 +497,11 @@ void pDictionaryScan::sendAPrimitiveMessage(ByteStream& primMsg, BRM::LBID_t msg
* than putting it in the middle or at the end in terms of simplicity & memory usage,
* given the current code.
*/
primMsg.load((const uint8_t*)&hdr, sizeof(DictTokenByScanRequestHeader));
primMsg << fVerId;
primMsg.append((const uint8_t*)&hdr, sizeof(DictTokenByScanRequestHeader));
primMsg += fFilterString;
SBS primMsg(new ByteStream(hdr.ism.Size));
primMsg->load((const uint8_t*)&hdr, sizeof(DictTokenByScanRequestHeader));
*primMsg << fVerId;
primMsg->append((const uint8_t*)&hdr, sizeof(DictTokenByScanRequestHeader));
*primMsg += fFilterString;
// cout << "Sending rqst LBIDS " << msgLbidStart
// << " hdr.Count " << hdr.Count
@ -852,7 +849,7 @@ void pDictionaryScan::appendFilter(const messageqcpp::ByteStream& filter, unsign
void pDictionaryScan::serializeEqualityFilter()
{
ByteStream msg;
SBS msg(new ByteStream());
ISMPacketHeader ism;
uint32_t i;
vector<string> empty;
@ -860,13 +857,13 @@ void pDictionaryScan::serializeEqualityFilter()
void* ismp = static_cast<void*>(&ism);
memset(ismp, 0, sizeof(ISMPacketHeader));
ism.Command = DICT_CREATE_EQUALITY_FILTER;
msg.load((uint8_t*)&ism, sizeof(ISMPacketHeader));
msg << uniqueID;
msg << (uint32_t)colType().charsetNumber;
msg << (uint32_t)equalityFilter.size();
msg->load((uint8_t*)&ism, sizeof(ISMPacketHeader));
*msg << uniqueID;
*msg << (uint32_t)colType().charsetNumber;
*msg << (uint32_t)equalityFilter.size();
for (i = 0; i < equalityFilter.size(); i++)
msg << equalityFilter[i];
*msg << equalityFilter[i];
try
{
@ -884,14 +881,14 @@ void pDictionaryScan::serializeEqualityFilter()
void pDictionaryScan::destroyEqualityFilter()
{
ByteStream msg;
SBS msg(new ByteStream());
ISMPacketHeader ism;
void* ismp = static_cast<void*>(&ism);
memset(ismp, 0, sizeof(ISMPacketHeader));
ism.Command = DICT_DESTROY_EQUALITY_FILTER;
msg.load((uint8_t*)&ism, sizeof(ISMPacketHeader));
msg << uniqueID;
msg->load((uint8_t*)&ism, sizeof(ISMPacketHeader));
*msg << uniqueID;
try
{

View File

@ -93,7 +93,6 @@ enum PrimitiveStepType
AGGRFILTERSTEP
};
class pColScanStep;
class pColStep : public JobStep
{
@ -345,19 +344,25 @@ class pColScanStep : public JobStep
const execplan::CalpontSystemCatalog::ColType& ct, const JobInfo& jobInfo);
pColScanStep(const pColStep& rhs);
~pColScanStep(){}
~pColScanStep()
{
}
/** @brief Starts processing.
*
* Starts processing.
*/
virtual void run(){}
virtual void run()
{
}
/** @brief Sync's the caller with the end of execution.
*
* Does nothing. Returns when this instance is finished.
*/
virtual void join(){}
virtual void join()
{
}
virtual bool isDictCol() const
{
@ -548,12 +553,18 @@ class pDictionaryStep : public JobStep
pDictionaryStep(execplan::CalpontSystemCatalog::OID oid, execplan::CalpontSystemCatalog::OID tabelOid,
const execplan::CalpontSystemCatalog::ColType& ct, const JobInfo& jobInfo);
virtual ~pDictionaryStep(){}
virtual ~pDictionaryStep()
{
}
/** @brief virtual void Run method
*/
virtual void run(){}
virtual void join(){}
virtual void run()
{
}
virtual void join()
{
}
// void setOutList(StringDataList* rids);
void setInputList(DataList_t* rids)
{
@ -794,8 +805,7 @@ class pDictionaryScan : public JobStep
void startPrimitiveThread();
void startAggregationThread();
void initializeConfigParms();
void sendAPrimitiveMessage(messageqcpp::ByteStream& primMsg, BRM::LBID_t msgLbidStart,
uint32_t msgLbidCount, uint16_t dbroot);
void sendAPrimitiveMessage(BRM::LBID_t msgLbidStart, uint32_t msgLbidCount, uint16_t dbroot);
void formatMiniStats();
DistributedEngineComm* fDec;

View File

@ -737,12 +737,12 @@ TupleBPS::~TupleBPS()
if (BPPIsAllocated)
{
ByteStream bs;
fBPP->destroyBPP(bs);
SBS sbs{new ByteStream()};
fBPP->destroyBPP(*sbs);
try
{
fDec->write(uniqueID, bs);
fDec->write(uniqueID, sbs);
}
catch (const std::exception& e)
{
@ -1093,8 +1093,8 @@ void TupleBPS::startProcessingThread(TupleBPS* tbps, vector<boost::shared_ptr<me
void TupleBPS::serializeJoiner()
{
ByteStream bs;
bool more = true;
SBS sbs(new ByteStream());
/* false from nextJoinerMsg means it's the last msg,
it's not exactly the exit condition*/
@ -1103,16 +1103,17 @@ void TupleBPS::serializeJoiner()
{
// code block to release the lock immediatly
boost::mutex::scoped_lock lk(serializeJoinerMutex);
more = fBPP->nextTupleJoinerMsg(bs);
more = fBPP->nextTupleJoinerMsg(*sbs);
}
#ifdef JLF_DEBUG
cout << "serializing joiner into " << bs.length() << " bytes" << endl;
#endif
fDec->write(uniqueID, bs);
bs.restart();
fDec->write(uniqueID, sbs);
sbs.reset(new ByteStream());
}
}
// Outdated method
void TupleBPS::serializeJoiner(uint32_t conn)
{
// We need this lock for TupleBPS::serializeJoiner()
@ -1376,7 +1377,7 @@ void TupleBPS::run()
std::string("TupleBPS")); // step name
}
ByteStream bs;
SBS sbs{new ByteStream()};
if (fDelivery)
{
@ -1406,8 +1407,8 @@ void TupleBPS::run()
{
fDec->addDECEventListener(this);
fBPP->priority(priority());
fBPP->createBPP(bs);
fDec->write(uniqueID, bs);
fBPP->createBPP(*sbs);
fDec->write(uniqueID, sbs);
BPPIsAllocated = true;
if (doJoin && tjoiners[0]->inPM())
@ -1453,13 +1454,13 @@ void TupleBPS::join()
if (BPPIsAllocated)
{
ByteStream bs;
SBS sbs{new ByteStream()};
fDec->removeDECEventListener(this);
fBPP->destroyBPP(bs);
fBPP->destroyBPP(*sbs);
try
{
fDec->write(uniqueID, bs);
fDec->write(uniqueID, sbs);
}
catch (...)
{
@ -1476,10 +1477,10 @@ void TupleBPS::join()
void TupleBPS::sendError(uint16_t status)
{
ByteStream msgBpp;
SBS msgBpp;
fBPP->setCount(1);
fBPP->setStatus(status);
fBPP->runErrorBPP(msgBpp);
fBPP->runErrorBPP(*msgBpp);
try
{
@ -1602,7 +1603,7 @@ void TupleBPS::sendJobs(const vector<Job>& jobs)
for (i = 0; i < jobs.size() && !cancelled(); i++)
{
fDec->write(uniqueID, *(jobs[i].msg));
fDec->write(uniqueID, jobs[i].msg);
tplLock.lock();
msgsSent += jobs[i].expectedResponses;
@ -2613,15 +2614,15 @@ void TupleBPS::receiveMultiPrimitiveMessages()
dlTimes.setEndOfInputTime();
}
ByteStream bs;
SBS sbs{new ByteStream()};
try
{
if (BPPIsAllocated)
{
fDec->removeDECEventListener(this);
fBPP->destroyBPP(bs);
fDec->write(uniqueID, bs);
fBPP->destroyBPP(*sbs);
fDec->write(uniqueID, sbs);
BPPIsAllocated = false;
}
}
@ -3302,12 +3303,12 @@ void TupleBPS::abort_nolock()
if (fDec && BPPIsAllocated)
{
ByteStream bs;
fBPP->abortProcessing(&bs);
SBS sbs{new ByteStream()};
fBPP->abortProcessing(sbs.get());
try
{
fDec->write(uniqueID, bs);
fDec->write(uniqueID, sbs);
}
catch (...)
{

View File

@ -29,6 +29,7 @@
//
//
#include <mutex>
#include <stdexcept>
#include <unistd.h>
#include <cstring>
@ -1888,7 +1889,7 @@ void BatchPrimitiveProcessor::execute()
}
else
{
// We hae no more use for this allocation
// We have no more use for this allocation
for (i = 0; i < joinerCount; i++)
for (j = 0; j < ridCount; ++j)
tSmallSideMatches[i][j].clear();
@ -2145,12 +2146,12 @@ void BatchPrimitiveProcessor::serializeStrings()
void BatchPrimitiveProcessor::sendResponse()
{
bool isLocalNodeConnection = exemgr::globServiceExeMgr->isLocalNodeSock(sock);
// Here is the fast path for local EM to PM interacction. PM puts into the
auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
// Here is the fast path for local EM to PM interaction. PM puts into the
// input EM DEC queue directly.
if (initiatedByEM_ && isLocalNodeConnection)
// !sock has a 'same host connection' semantics here.
if (initiatedByEM_ && (!sock || exeMgrDecPtr->clientAtTheSameHost(sock)))
{
joblist::DistributedEngineComm* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
exeMgrDecPtr->addDataToOutput(serialized);
serialized.reset();
return;

View File

@ -21,13 +21,16 @@
*
*
***********************************************************************/
#define _FILE_OFFSET_BITS 64
#define _LARGEFILE64_SOURCE
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <mutex>
#include <stdexcept>
//#define NDEBUG
#include <cassert>
#include <boost/thread.hpp>
@ -49,7 +52,8 @@ using namespace std;
#include <boost/scoped_array.hpp>
#include <boost/thread.hpp>
using namespace boost;
#include "distributedenginecomm.h"
#include "serviceexemgr.h"
#include "primproc.h"
#include "primitiveserver.h"
#include "primitivemsg.h"
@ -1055,7 +1059,7 @@ class DictScanJob : public threadpool::FairThreadPool::Functor
DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock);
virtual ~DictScanJob();
void write(const ByteStream&);
void write(const SBS&);
int operator()();
void catchHandler(const std::string& ex, uint32_t id, uint16_t code = logging::primitiveServerErr);
void sendErrorMsg(uint32_t id, uint16_t code);
@ -1077,17 +1081,27 @@ DictScanJob::~DictScanJob()
{
}
void DictScanJob::write(const ByteStream& bs)
void DictScanJob::write(const SBS& sbs)
{
// Here is the fast path for local EM to PM interaction. PM puts into the
// input EM DEC queue directly.
// !sock has a 'same host connection' semantics here.
if (!fIos)
{
auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
exeMgrDecPtr->addDataToOutput(sbs);
return;
}
boost::mutex::scoped_lock lk(*fWriteLock);
fIos->write(bs);
fIos->write(*sbs);
}
int DictScanJob::operator()()
{
utils::setThreadName("PPDictScanJob");
uint8_t data[DATA_BLOCK_SIZE];
uint32_t output_buf_size = MAX_BUFFER_SIZE;
// Reducing this buffer one might face unrelated issues in DictScanStep.
const uint32_t output_buf_size = MAX_BUFFER_SIZE;
uint32_t session;
uint32_t uniqueId = 0;
bool wasBlockInCache = false;
@ -1095,7 +1109,6 @@ int DictScanJob::operator()()
uint16_t runCount;
boost::shared_ptr<DictEqualityFilter> eqFilter;
ByteStream results(output_buf_size);
TokenByScanRequestHeader* cmd;
PrimitiveProcessor pproc(gDebugLevel);
TokenByScanResultHeader* output;
@ -1114,7 +1127,6 @@ int DictScanJob::operator()()
session = cmd->Hdr.SessionID;
uniqueId = cmd->Hdr.UniqueID;
runCount = cmd->Count;
output = (TokenByScanResultHeader*)results.getInputPtr();
#ifdef VALGRIND
memset(output, 0, sizeof(TokenByScanResultHeader));
#endif
@ -1145,6 +1157,9 @@ int DictScanJob::operator()()
for (uint16_t i = 0; i < runCount; ++i)
{
SBS results(new ByteStream(output_buf_size));
output = (TokenByScanResultHeader*)results->getInputPtr();
loadBlock(cmd->LBID, verInfo, cmd->Hdr.TransactionID, cmd->CompType, data, &wasBlockInCache,
&blocksRead, fLBIDTraceOn, session);
pproc.setBlockPtr((int*)data);
@ -1155,9 +1170,8 @@ int DictScanJob::operator()()
else
output->PhysicalIO += blocksRead;
results.advanceInputPtr(output->NBYTES);
results->advanceInputPtr(output->NBYTES);
write(results);
results.restart();
cmd->LBID++;
}
@ -1199,9 +1213,9 @@ void DictScanJob::sendErrorMsg(uint32_t id, uint16_t code)
ism.Status = code;
ph.UniqueID = id;
ByteStream msg(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader));
msg.append((uint8_t*)&ism, sizeof(ism));
msg.append((uint8_t*)&ph, sizeof(ph));
SBS msg(new ByteStream(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)));
msg->append((uint8_t*)&ism, sizeof(ism));
msg->append((uint8_t*)&ph, sizeof(ph));
write(msg);
}
@ -1940,6 +1954,128 @@ struct ReadThread
ios->write(buildCacheOpResp(0));
}
static void dispatchPrimitive(SBS sbs, boost::shared_ptr<BPPHandler>& fBPPHandler,
boost::shared_ptr<threadpool::FairThreadPool>& procPoolPtr,
SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock, const uint32_t processorThreads,
const bool ptTrace)
{
const ISMPacketHeader* ismHdr = reinterpret_cast<const ISMPacketHeader*>(sbs->buf());
switch (ismHdr->Command)
{
case DICT_CREATE_EQUALITY_FILTER:
case DICT_DESTROY_EQUALITY_FILTER:
case BATCH_PRIMITIVE_CREATE:
case BATCH_PRIMITIVE_ADD_JOINER:
case BATCH_PRIMITIVE_END_JOINER:
case BATCH_PRIMITIVE_DESTROY:
case BATCH_PRIMITIVE_ABORT:
{
const uint8_t* buf = sbs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
const uint32_t txnId = *((uint32_t*)&buf[pos + 2]);
const uint32_t stepID = *((uint32_t*)&buf[pos + 6]);
const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]);
const uint32_t weight = 1;
const uint32_t priority = 0;
uint32_t id = 0;
boost::shared_ptr<FairThreadPool::Functor> functor;
if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER)
{
functor.reset(new CreateEqualityFilter(sbs));
}
else if (ismHdr->Command == DICT_DESTROY_EQUALITY_FILTER)
{
functor.reset(new DestroyEqualityFilter(sbs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_CREATE)
{
functor.reset(new BPPHandler::Create(fBPPHandler, sbs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_ADD_JOINER)
{
functor.reset(new BPPHandler::AddJoiner(fBPPHandler, sbs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_END_JOINER)
{
id = fBPPHandler->getUniqueID(sbs, ismHdr->Command);
functor.reset(new BPPHandler::LastJoiner(fBPPHandler, sbs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_DESTROY)
{
id = fBPPHandler->getUniqueID(sbs, ismHdr->Command);
functor.reset(new BPPHandler::Destroy(fBPPHandler, sbs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_ABORT)
{
id = fBPPHandler->getUniqueID(sbs, ismHdr->Command);
functor.reset(new BPPHandler::Abort(fBPPHandler, sbs));
}
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, weight, priority, id);
procPoolPtr->addJob(job);
break;
}
case DICT_TOKEN_BY_SCAN_COMPARE:
case BATCH_PRIMITIVE_RUN:
{
TokenByScanRequestHeader* hdr = nullptr;
boost::shared_ptr<FairThreadPool::Functor> functor;
uint32_t id = 0;
uint32_t weight = 0;
uint32_t priority = 0;
uint32_t txnId = 0;
uint32_t stepID = 0;
uint32_t uniqueID = 0;
if (ismHdr->Command == DICT_TOKEN_BY_SCAN_COMPARE)
{
idbassert(sbs->length() >= sizeof(TokenByScanRequestHeader));
hdr = (TokenByScanRequestHeader*)ismHdr;
functor.reset(new DictScanJob(outIos, sbs, writeLock));
id = hdr->Hdr.UniqueID;
weight = LOGICAL_BLOCK_RIDS;
priority = hdr->Hdr.Priority;
const uint8_t* buf = sbs->buf();
const uint32_t pos = sizeof(ISMPacketHeader) - 2;
txnId = *((uint32_t*)&buf[pos + 2]);
stepID = *((uint32_t*)&buf[pos + 6]);
uniqueID = *((uint32_t*)&buf[pos + 10]);
}
else if (ismHdr->Command == BATCH_PRIMITIVE_RUN)
{
functor.reset(new BPPSeeder(sbs, writeLock, outIos, processorThreads, ptTrace));
BPPSeeder* bpps = dynamic_cast<BPPSeeder*>(functor.get());
id = bpps->getID();
priority = bpps->priority();
const uint8_t* buf = sbs->buf();
const uint32_t pos = sizeof(ISMPacketHeader) - 2;
txnId = *((uint32_t*)&buf[pos + 2]);
stepID = *((uint32_t*)&buf[pos + 6]);
uniqueID = *((uint32_t*)&buf[pos + 10]);
weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]);
}
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
procPoolPtr->addJob(job);
break;
}
case BATCH_PRIMITIVE_ACK:
{
fBPPHandler->doAck(*sbs);
break;
}
default:
{
std::ostringstream os;
Logger log;
os << "unknown primitive cmd: " << ismHdr->Command;
log.logMessage(os.str());
break;
}
} // the switch stmt
}
void operator()()
{
utils::setThreadName("PPReadThread");
@ -1994,9 +2130,6 @@ struct ReadThread
idbassert(bs->length() >= sizeof(ISMPacketHeader));
const ISMPacketHeader* ismHdr = reinterpret_cast<const ISMPacketHeader*>(bs->buf());
// uint64_t someVal = ismHdr->Command;
// std::cout << " PP read thread Command " << someVal << std::endl;
/* This switch is for the OOB commands */
switch (ismHdr->Command)
{
@ -2037,139 +2170,8 @@ struct ReadThread
default: break;
}
switch (ismHdr->Command)
{
case DICT_CREATE_EQUALITY_FILTER:
case DICT_DESTROY_EQUALITY_FILTER:
case BATCH_PRIMITIVE_CREATE:
case BATCH_PRIMITIVE_ADD_JOINER:
case BATCH_PRIMITIVE_END_JOINER:
case BATCH_PRIMITIVE_DESTROY:
case BATCH_PRIMITIVE_ABORT:
{
const uint8_t* buf = bs->buf();
uint32_t pos = sizeof(ISMPacketHeader) - 2;
const uint32_t txnId = *((uint32_t*)&buf[pos + 2]);
const uint32_t stepID = *((uint32_t*)&buf[pos + 6]);
const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]);
const uint32_t weight = 1;
const uint32_t priority = 0;
uint32_t id = 0;
boost::shared_ptr<FairThreadPool::Functor> functor;
if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER)
{
functor.reset(new CreateEqualityFilter(bs));
}
else if (ismHdr->Command == DICT_DESTROY_EQUALITY_FILTER)
{
functor.reset(new DestroyEqualityFilter(bs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_CREATE)
{
functor.reset(new BPPHandler::Create(fBPPHandler, bs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_ADD_JOINER)
{
functor.reset(new BPPHandler::AddJoiner(fBPPHandler, bs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_END_JOINER)
{
id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
functor.reset(new BPPHandler::LastJoiner(fBPPHandler, bs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_DESTROY)
{
id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
functor.reset(new BPPHandler::Destroy(fBPPHandler, bs));
}
else if (ismHdr->Command == BATCH_PRIMITIVE_ABORT)
{
id = fBPPHandler->getUniqueID(bs, ismHdr->Command);
functor.reset(new BPPHandler::Abort(fBPPHandler, bs));
}
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
procPoolPtr->addJob(job);
break;
}
case DICT_TOKEN_BY_SCAN_COMPARE:
case BATCH_PRIMITIVE_RUN:
{
TokenByScanRequestHeader* hdr = nullptr;
boost::shared_ptr<FairThreadPool::Functor> functor;
uint32_t id = 0;
uint32_t weight = 0;
uint32_t priority = 0;
uint32_t txnId = 0;
uint32_t stepID = 0;
uint32_t uniqueID = 0;
if (bRotateDest)
{
if (!pUmSocketSelector->nextIOSocket(fIos, outIos, writeLock))
{
// If we ever fall into this part of the
// code we have a "bug" of some sort.
// See handleUmSockSelErr() for more info.
// We reset ios and mutex to defaults.
handleUmSockSelErr(string("default cmd"));
outIos = outIosDefault;
writeLock = writeLockDefault;
pUmSocketSelector->delConnection(fIos);
bRotateDest = false;
}
}
if (ismHdr->Command == DICT_TOKEN_BY_SCAN_COMPARE)
{
idbassert(bs->length() >= sizeof(TokenByScanRequestHeader));
hdr = (TokenByScanRequestHeader*)ismHdr;
functor.reset(new DictScanJob(outIos, bs, writeLock));
id = hdr->Hdr.UniqueID;
weight = LOGICAL_BLOCK_RIDS;
priority = hdr->Hdr.Priority;
const uint8_t* buf = bs->buf();
const uint32_t pos = sizeof(ISMPacketHeader) - 2;
txnId = *((uint32_t*)&buf[pos + 2]);
stepID = *((uint32_t*)&buf[pos + 6]);
uniqueID = *((uint32_t*)&buf[pos + 10]);
}
else if (ismHdr->Command == BATCH_PRIMITIVE_RUN)
{
functor.reset(new BPPSeeder(bs, writeLock, outIos,
fPrimitiveServerPtr->ProcessorThreads(),
fPrimitiveServerPtr->PTTrace()));
BPPSeeder* bpps = dynamic_cast<BPPSeeder*>(functor.get());
id = bpps->getID();
priority = bpps->priority();
const uint8_t* buf = bs->buf();
const uint32_t pos = sizeof(ISMPacketHeader) - 2;
txnId = *((uint32_t*)&buf[pos + 2]);
stepID = *((uint32_t*)&buf[pos + 6]);
uniqueID = *((uint32_t*)&buf[pos + 10]);
weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]);
}
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
procPoolPtr->addJob(job);
break;
}
case BATCH_PRIMITIVE_ACK:
{
fBPPHandler->doAck(*bs);
break;
}
default:
{
std::ostringstream os;
Logger log;
os << "unknown primitive cmd: " << ismHdr->Command;
log.logMessage(os.str());
break;
}
} // the switch stmt
dispatchPrimitive(bs, fBPPHandler, procPoolPtr, outIos, writeLock,
fPrimitiveServerPtr->ProcessorThreads(), fPrimitiveServerPtr->PTTrace());
}
else // bs.length() == 0
{
@ -2213,8 +2215,6 @@ struct ReadThread
boost::shared_ptr<BPPHandler> fBPPHandler;
};
/** @brief accept a primitive command from the user module
*/
struct ServerThread
{
ServerThread(string serverName, PrimitiveServer* ps) : fServerName(serverName), fPrimitiveServerPtr(ps)
@ -2351,9 +2351,43 @@ void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRace
fServerpool.invoke(ServerThread(oss.str(), this));
}
startupRaceLock.release();
service->NotifyServiceStarted();
std::thread sameHostServerThread(
[this]()
{
utils::setThreadName("PPSHServerThr");
auto* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
while (!exeMgrDecPtr)
{
sleep(1);
exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
}
// These empty SPs have "same-host" messaging semantics.
SP_UM_IOSOCK outIos(nullptr);
SP_UM_MUTEX writeLock(nullptr);
auto procPoolPtr = this->getProcessorThreadPool();
boost::shared_ptr<BPPHandler> fBPPHandler(new BPPHandler(this));
for (;;)
{
joblist::DistributedEngineComm::SBSVector primitiveMsgs;
for (auto& sbs : exeMgrDecPtr->readLocalQueueMessagesOrWait(primitiveMsgs))
{
if (sbs->length() == 0)
{
std::cout << "PPSHServerThr got an empty ByteStream." << std::endl;
continue;
}
idbassert(sbs->length() >= sizeof(ISMPacketHeader));
ReadThread::dispatchPrimitive(sbs, fBPPHandler, procPoolPtr, outIos, writeLock,
this->ProcessorThreads(), this->PTTrace());
}
}
});
fServerpool.wait();
cerr << "PrimitiveServer::start() exiting!" << endl;

View File

@ -15,8 +15,9 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <iostream>
#include <gtest/gtest.h>
#include <iostream>
#include <mutex>
#include <vector>
#include "utils/threadpool/fair_threadpool.h"
@ -27,6 +28,7 @@ using namespace threadpool;
using ResultsType = std::vector<int>;
static ResultsType results;
static std::mutex globMutex;
class FairThreadPoolTest : public testing::Test
{
@ -50,6 +52,7 @@ class TestFunctor : public FairThreadPool::Functor
int operator()() override
{
usleep(delay_);
std::lock_guard<std::mutex> gl(globMutex);
results.push_back(id_);
return 0;
}
@ -74,6 +77,7 @@ class TestRescheduleFunctor : public FairThreadPool::Functor
return 1; // re-schedule the Job
}
usleep(delay_);
std::lock_guard<std::mutex> gl(globMutex);
results.push_back(id_);
return 0;
}

View File

@ -218,13 +218,17 @@ void MessageQueueClient::setup(bool syncProto)
}
MessageQueueClient::MessageQueueClient(const string& otherEnd, const string& config, bool syncProto)
: fOtherEnd(otherEnd), fConfig(Config::makeConfig(config)), fLogger(31), fIsAvailable(true)
: fOtherEnd(otherEnd)
, fConfig(Config::makeConfig(config))
, fLogger(31)
, fIsAvailable(true)
, atTheSameHost_(false)
{
setup(syncProto);
}
MessageQueueClient::MessageQueueClient(const string& otherEnd, Config* config, bool syncProto)
: fOtherEnd(otherEnd), fConfig(config), fLogger(31), fIsAvailable(true)
: fOtherEnd(otherEnd), fConfig(config), fLogger(31), fIsAvailable(true), atTheSameHost_(false)
{
if (fConfig == 0)
fConfig = Config::makeConfig();
@ -233,7 +237,7 @@ MessageQueueClient::MessageQueueClient(const string& otherEnd, Config* config, b
}
MessageQueueClient::MessageQueueClient(const string& dnOrIp, uint16_t port, bool syncProto)
: fLogger(31), fIsAvailable(true)
: fLogger(31), fIsAvailable(true), atTheSameHost_(false)
{
#ifdef SKIP_IDB_COMPRESSION
fClientSock.setSocketImpl(new InetStreamSocket());

View File

@ -275,6 +275,7 @@ class MessageQueueClient
* @brief compare the addresses of 2 MessageQueueClient
*/
inline bool isSameAddr(const MessageQueueClient& rhs) const;
inline bool isSameAddr(const struct in_addr& ipv4Addr) const;
bool isConnected()
{
@ -285,6 +286,17 @@ class MessageQueueClient
{
return fClientSock.hasData();
}
// This client's flag is set running DEC::Setup() call
bool atTheSameHost() const
{
return atTheSameHost_;
}
void atTheSameHost(const bool atTheSameHost)
{
atTheSameHost_ = atTheSameHost;
}
/*
* allow test suite access to private data for OOB test
*/
@ -312,6 +324,7 @@ class MessageQueueClient
mutable IOSocket fClientSock; /// the socket to communicate with the server
mutable logging::Logger fLogger;
bool fIsAvailable;
bool atTheSameHost_;
std::string fModuleName;
};
@ -327,6 +340,10 @@ inline bool MessageQueueClient::isSameAddr(const MessageQueueClient& rhs) const
{
return fClientSock.isSameAddr(&rhs.fClientSock);
}
inline bool MessageQueueClient::isSameAddr(const struct in_addr& ipv4Addr) const
{
return fClientSock.isSameAddr(ipv4Addr);
}
inline void MessageQueueClient::syncProto(bool use)
{
fClientSock.syncProto(use);
@ -335,4 +352,3 @@ inline void MessageQueueClient::syncProto(bool use)
} // namespace messageqcpp
#undef EXPORT

View File

@ -70,6 +70,19 @@ class FairThreadPool
, id_(id)
{
}
// sock_ is nullptr here. This is kinda dangerous.
Job(const uint32_t uniqueID, const uint32_t stepID, const TransactionIdxT txnIdx,
const boost::shared_ptr<Functor>& functor, const uint32_t weight = 1, const uint32_t priority = 0,
const uint32_t id = 0)
: uniqueID_(uniqueID)
, stepID_(stepID)
, txnIdx_(txnIdx)
, functor_(functor)
, weight_(weight)
, priority_(priority)
, id_(id)
{
}
uint32_t uniqueID_;
uint32_t stepID_;
TransactionIdxT txnIdx_;