1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-5152 This patch enables PP to put ByteStreams into DEC input queue directly for a local PP-EM connection

This commit is contained in:
Roman Nozdrin
2022-06-30 16:52:51 +00:00
parent 7d955a0f85
commit 1624c347f6
12 changed files with 148 additions and 42 deletions

View File

@ -1243,7 +1243,7 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
* (projection count)x run msgs for projection Commands
*/
void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum)
void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isExeMgrDEC)
{
ISMPacketHeader ism;
uint32_t i;
@ -1276,6 +1276,8 @@ void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum)
bs << dbRoot;
bs << count;
uint8_t sentByEM = (isExeMgrDEC) ? 1 : 0;
bs << sentByEM;
if (_hasScan)
idbassert(ridCount == 0);

View File

@ -137,7 +137,7 @@ class BatchPrimitiveProcessorJL
void addElementType(const StringElementType&, uint32_t dbroot);
// void setRowGroupData(const rowgroup::RowGroup &);
void runBPP(messageqcpp::ByteStream&, uint32_t pmNum);
void runBPP(messageqcpp::ByteStream&, uint32_t pmNum, bool isExeMgrDEC);
void abortProcessing(messageqcpp::ByteStream*);
/* After serializing a BPP object, reset it and it's ready for more input */

View File

@ -199,7 +199,7 @@ void DistributedEngineComm::reset()
}
DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr)
: fRm(rm), pmCount(0), fIsExeMgr(isExeMgr)
: fRm(rm), pmCount(0), fIsExeMgr(isExeMgr)
{
Setup();
}
@ -288,7 +288,7 @@ void DistributedEngineComm::Setup()
catch (std::exception& ex)
{
if (i < newPmCount)
newPmCount = newPmCount > 1 ? newPmCount-1 : 1; // We can't afford to reduce newPmCount to 0
newPmCount = newPmCount > 1 ? newPmCount - 1 : 1; // We can't afford to reduce newPmCount to 0
writeToLog(__FILE__, __LINE__,
"Could not connect to PMS" + std::to_string(connectionId) + ": " + ex.what(),
@ -302,7 +302,7 @@ void DistributedEngineComm::Setup()
catch (...)
{
if (i < newPmCount)
newPmCount = newPmCount > 1 ? newPmCount-1 : 1; // We can't afford to reduce newPmCount to 0
newPmCount = newPmCount > 1 ? newPmCount - 1 : 1; // We can't afford to reduce newPmCount to 0
writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId),
LOG_TYPE_ERROR);
@ -921,7 +921,7 @@ void DistributedEngineComm::StartClientListener(boost::shared_ptr<MessageQueueCl
fPmReader.push_back(thrd);
}
void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* stats)
void DistributedEngineComm::addDataToOutput(SBS sbs)
{
ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf());
PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1);
@ -931,6 +931,39 @@ void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats*
boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
// The message for a session that doesn't exist.
if (map_tok == fSessionMessages.end())
{
// Here gets the dead session ByteStream that is already removed
// from DEC queue.
return;
}
mqe = map_tok->second;
lk.unlock();
if (pmCount > 0)
{
// I hardcoded the unacked Worker id here. ACK isn't important
// for the local exchange b/c there is no need to
// enable flowcontrol localy on PM.
(void)atomicops::atomicInc(&mqe->unackedWork[0]);
}
[[maybe_unused]] TSQSize_t queueSize = mqe->queue.push(sbs);
// There will be no statistics about data transfered
// over the memory.
}
void DistributedEngineComm::addDataToOutput(SBS sbs, uint32_t connIndex, Stats* stats)
{
ISMPacketHeader* hdr = (ISMPacketHeader*)(sbs->buf());
PrimitiveHeader* p = (PrimitiveHeader*)(hdr + 1);
uint32_t uniqueId = p->UniqueID;
boost::shared_ptr<MQE> mqe;
boost::mutex::scoped_lock lk(fMlock);
MessageQueueMap::iterator map_tok = fSessionMessages.find(uniqueId);
if (map_tok == fSessionMessages.end())
{
// For debugging...
@ -1036,9 +1069,9 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const ByteStream& bs,
// reconfig the connection array
ClientList tempConns;
{
//cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<<
endl; boost::mutex::scoped_lock onErrLock(fOnErrMutex); string moduleName =
fPmConnections[index]->moduleName();
//cout << "WARNING: DEC WRITE BROKEN PIPE " <<
fPmConnections[index]->otherEnd()<< endl; boost::mutex::scoped_lock onErrLock(fOnErrMutex); string
moduleName = fPmConnections[index]->moduleName();
//cout << "module name = " << moduleName << endl;
if (index >= fPmConnections.size()) return 0;

View File

@ -201,7 +201,13 @@ class DistributedEngineComm
return fRm->getPsCount() * cpp;
}
bool isExeMgrDEC() const
{
return fIsExeMgr;
}
messageqcpp::Stats getNetworkStats(uint32_t uniqueID);
void addDataToOutput(messageqcpp::SBS sbs);
friend class ::TestDistributedEngineComm;
@ -251,7 +257,6 @@ class DistributedEngineComm
*
*/
void addDataToOutput(messageqcpp::SBS, uint32_t connIndex, messageqcpp::Stats* statsToAdd);
/** @brief Writes data to the client at the index
*
* Continues trying to write data to the client at the next index until all clients have been tried.

View File

@ -1047,12 +1047,13 @@ void TupleBPS::storeCasualPartitionInfo(const bool estimateRowCounts)
const EMEntry& extent = colCmd->getExtents()[idx];
/* If any column filter eliminates an extent, it doesn't get scanned */
scanFlags[idx] = scanFlags[idx] && (extent.colWid <= utils::MAXCOLUMNWIDTH) && // XXX: change to named constant.
scanFlags[idx] =
scanFlags[idx] && (extent.colWid <= utils::MAXCOLUMNWIDTH) && // XXX: change to named constant.
(ignoreCP || extent.partition.cprange.isValid != BRM::CP_VALID ||
colCmd->getColType().colWidth != extent.colWid ||
lbidListVec[i]->CasualPartitionPredicate(
extent.partition.cprange, &(colCmd->getFilterString()), colCmd->getFilterCount(),
colCmd->getColType(), colCmd->getBOP(), colCmd->getIsDict()));
lbidListVec[i]->CasualPartitionPredicate(extent.partition.cprange, &(colCmd->getFilterString()),
colCmd->getFilterCount(), colCmd->getColType(),
colCmd->getBOP(), colCmd->getIsDict()));
}
}
@ -2032,7 +2033,7 @@ void TupleBPS::makeJobs(vector<Job>* jobs)
#endif
startingLBID = scannedExtents[i].range.start;
bool isExeMgrDEC = fDec->isExeMgrDEC();
while (blocksToScan > 0)
{
uint32_t blocksThisJob = min(blocksToScan, blocksPerJob);
@ -2040,7 +2041,7 @@ void TupleBPS::makeJobs(vector<Job>* jobs)
fBPP->setLBID(startingLBID, scannedExtents[i]);
fBPP->setCount(blocksThisJob);
bs.reset(new ByteStream());
fBPP->runBPP(*bs, (*dbRootConnectionMap)[scannedExtents[i].dbRoot]);
fBPP->runBPP(*bs, (*dbRootConnectionMap)[scannedExtents[i].dbRoot], isExeMgrDEC);
jobs->push_back(
Job(scannedExtents[i].dbRoot, (*dbRootConnectionMap)[scannedExtents[i].dbRoot], blocksThisJob, bs));
blocksToScan -= blocksThisJob;
@ -2373,8 +2374,10 @@ void TupleBPS::receiveMultiPrimitiveMessages()
for (uint32_t z = 0; z < size; z++)
{
if (bsv[z]->length() > 0 && fBPP->countThisMsg(*(bsv[z])))
{
++msgsRecvd;
}
}
//@Bug 1424,1298

View File

@ -37,6 +37,7 @@
#include <string>
#include <sstream>
#include <set>
#include "serviceexemgr.h"
#include <stdlib.h>
using namespace std;
@ -117,7 +118,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor()
, validCPData(false)
, minVal(MAX64)
, maxVal(MIN64)
, cpDataFromDictScan(false)
, cpDataFromDictScan(false)
, lbidForCP(0)
, hasWideColumnOut(false)
, busyLoaderCount(0)
@ -140,6 +141,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor()
, ptMask(0)
, firstInstance(false)
, valuesLBID(0)
, initiatedByEM_(false)
{
pp.setLogicalBlockMode(true);
pp.setBlockPtr((int*)blockData);
@ -193,6 +195,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
// ptMask(processorThreads - 1),
, firstInstance(true)
, valuesLBID(0)
, initiatedByEM_(false)
{
// promote processorThreads to next power of 2. also need to change the name to bucketCount or similar
processorThreads = nextPowOf2(processorThreads);
@ -544,6 +547,10 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, con
bs.advance(sizeof(ISMPacketHeader) + 16);
bs >> dbRoot;
bs >> count;
uint8_t u8 = 0;
bs >> u8;
initiatedByEM_ = u8;
bs >> ridCount;
if (gotAbsRids)
@ -1647,8 +1654,9 @@ void BatchPrimitiveProcessor::execute()
}
// else
// cout << " no target found for OID " << projectSteps[j]->getOID() <<
//endl;
// cout << " no target found for OID " <<
// projectSteps[j]->getOID()
//<< endl;
}
if (fe2)
{
@ -2134,6 +2142,17 @@ void BatchPrimitiveProcessor::serializeStrings()
void BatchPrimitiveProcessor::sendResponse()
{
bool isLocalNodeConnection = exemgr::globServiceExeMgr->isLocalNodeSock(sock);
// Here is the fast path for local EM to PM interacction. PM puts into the
// input EM DEC queue directly.
if (initiatedByEM_ && isLocalNodeConnection)
{
joblist::DistributedEngineComm* exeMgrDecPtr = exemgr::globServiceExeMgr->getDec();
exeMgrDecPtr->addDataToOutput(serialized);
serialized.reset();
return;
}
if (sendThread->flowControlEnabled())
{
// newConnection should be set only for the first result of a batch job

View File

@ -53,7 +53,6 @@
#include "bppsendthread.h"
#include "columnwidth.h"
//#define PRIMPROC_STOPWATCH
#ifdef PRIMPROC_STOPWATCH
#include "stopwatch.h"
#endif
@ -433,6 +432,7 @@ class BatchPrimitiveProcessor
uint ptMask;
bool firstInstance;
uint64_t valuesLBID;
bool initiatedByEM_;
static const uint64_t maxResultCount = 1048576; // 2^20

View File

@ -20,7 +20,9 @@
#include <iostream>
#include <cstdint>
#include <csignal>
#include <ifaddrs.h>
#include <sys/resource.h>
#include <sys/types.h>
#undef root_name
#include <boost/filesystem.hpp>
@ -60,6 +62,7 @@
namespace exemgr
{
using SharedPtrEMSock = boost::shared_ptr<messageqcpp::IOSocket>;
class Opt
{
public:
@ -168,7 +171,6 @@ namespace exemgr
}
void initMaxMemPct(uint32_t sessionId)
{
// WIP
if (sessionId < 0x80000000)
{
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
@ -187,7 +189,6 @@ namespace exemgr
uint64_t getMaxMemPct(const uint32_t sessionId)
{
uint64_t maxMemoryPct = 0;
// WIP
if (sessionId < 0x80000000)
{
std::lock_guard<std::mutex> lk(sessionMemMapMutex_);
@ -290,6 +291,17 @@ namespace exemgr
{
return *rm_;
}
bool isLocalNodeSock(SharedPtrEMSock& sock) const
{
for (auto& sin : localNetIfaceSins_)
{
if (sock->isSameAddr(sin))
{
return true;
}
}
return false;
}
private:
void setupSignalHandlers();
int8_t setupCwd()
@ -326,7 +338,27 @@ namespace exemgr
}
return 0;
}
void getLocalNetIfacesSins()
{
string ipAddress = "Unable to get IP Address";
struct ifaddrs* netIfacesList = nullptr;
struct ifaddrs* ifaceListMembPtr = nullptr;
int success = 0;
// retrieve the current interfaces - returns 0 on success
success = getifaddrs(&netIfacesList);
if (success == 0)
{
ifaceListMembPtr = netIfacesList;
for (; ifaceListMembPtr; ifaceListMembPtr = ifaceListMembPtr->ifa_next)
{
if (ifaceListMembPtr->ifa_addr->sa_family == AF_INET)
{
localNetIfaceSins_.push_back(((struct sockaddr_in*)ifaceListMembPtr->ifa_addr)->sin_addr);
}
}
}
freeifaddrs(netIfacesList);
}
logging::Logger msgLog_;
SessionMemMap_t sessionMemMap_; // track memory% usage during a query
std::mutex sessionMemMapMutex_;
@ -343,6 +375,7 @@ namespace exemgr
joblist::ResourceManager* rm_;
// Its attributes are set in Child()
querytele::QueryTeleServerParms teleServerParms_;
std::vector<struct in_addr> localNetIfaceSins_;
};
extern ServiceExeMgr* globServiceExeMgr;
}

View File

@ -1075,6 +1075,10 @@ bool InetStreamSocket::isSameAddr(const Socket* rhs) const
return (fSa.sin_addr.s_addr == issp->fSa.sin_addr.s_addr);
}
bool InetStreamSocket::isSameAddr(const struct in_addr& ipv4Addr) const
{
return (fSa.sin_addr.s_addr == ipv4Addr.s_addr);
}
/*static*/
int InetStreamSocket::ping(const std::string& ipaddr, const struct timespec* timeout)

View File

@ -201,6 +201,7 @@ class InetStreamSocket : public Socket
*
*/
virtual bool isSameAddr(const Socket* rhs) const;
virtual bool isSameAddr(const struct in_addr& ipv4Addr) const;
/** ping an ip address
*

View File

@ -179,6 +179,11 @@ class IOSocket
{
return fSocket->isSameAddr(rhs->fSocket);
}
virtual bool isSameAddr(const struct in_addr& ipv4Addr) const
{
return fSocket->isSameAddr(ipv4Addr);
}
/** connect() forwarder for inherited classes
*

View File

@ -174,6 +174,7 @@ class Socket
*
*/
virtual bool isSameAddr(const Socket* rhs) const = 0;
virtual bool isSameAddr(const struct in_addr& ipv4Addr) const = 0;
virtual bool isConnected() const = 0;
virtual bool hasData() const = 0;