1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

Merge pull request #1220 from drrtuy/exemgr-pp-connections

MCOL-4015 ExeMgr now re-establishes its connections with PrimProcs.
This commit is contained in:
Patrick LeBlanc
2020-05-27 09:19:01 -05:00
committed by GitHub
3 changed files with 78 additions and 59 deletions

View File

@@ -1,5 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc. /* Copyright (C) 2014 InfiniDB, Inc.
* Copyright (C) 2016 MariaDB Corporation. * Copyright (C) 2016-2020 MariaDB Corporation.
This program is free software; you can redistribute it and/or This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License modify it under the terms of the GNU General Public License
@@ -36,6 +36,8 @@
#include <ctime> #include <ctime>
#include <algorithm> #include <algorithm>
#include <unistd.h> #include <unistd.h>
#include <chrono>
#include <thread>
using namespace std; using namespace std;
#include <boost/scoped_array.hpp> #include <boost/scoped_array.hpp>
@@ -240,11 +242,10 @@ void DistributedEngineComm::Setup()
newClients.clear(); newClients.clear();
newLocks.clear(); newLocks.clear();
throttleThreshold = fRm->getDECThrottleThreshold();
uint32_t newPmCount = fRm->getPsCount(); uint32_t newPmCount = fRm->getPsCount();
int cpp = (fIsExeMgr ? fRm->getPsConnectionsPerPrimProc() : 1); throttleThreshold = fRm->getDECThrottleThreshold();
tbpsThreadCount = fRm->getJlNumScanReceiveThreads(); tbpsThreadCount = fRm->getJlNumScanReceiveThreads();
unsigned numConnections = newPmCount * cpp; unsigned numConnections = getNumConnections();
oam::Oam oam; oam::Oam oam;
ModuleTypeConfig moduletypeconfig; ModuleTypeConfig moduletypeconfig;
@@ -386,51 +387,59 @@ void DistributedEngineComm::Listen(boost::shared_ptr<MessageQueueClient> client,
Error: Error:
// @bug 488 - error condition! push 0 length bs to messagequeuemap and // @bug 488 - error condition! push 0 length bs to messagequeuemap and
// eventually let jobstep error out. // eventually let jobstep error out.
/* boost::mutex::scoped_lock lk(fMlock); boost::mutex::scoped_lock lk(fMlock);
//cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl; MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
MessageQueueMap::iterator map_tok; for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
sbs.reset(new ByteStream(0)); {
map_tok->second->queue.clear();
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
map_tok->second->queue.push(sbs);
}
lk.unlock();
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) if (fIsExeMgr)
{ {
map_tok->second->queue.clear(); //std::cout << "WARNING: DEC READ 0 LENGTH BS FROM "
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); // << client->otherEnd()<< " OR GOT AN EXCEPTION READING" << std::endl;
map_tok->second->queue.push(sbs); decltype(pmCount) originalPMCount = pmCount;
} // Re-establish if a remote PM restarted.
lk.unlock(); std::this_thread::sleep_for(std::chrono::seconds(3));
Setup();
if (originalPMCount != pmCount)
{
ostringstream os;
os << "DEC: lost connection to " << client->addr2String();
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_ERROR);
}
// reset the pmconnection vector /*
ClientList tempConns; // reset the pmconnection vector
ClientList tempConns;
boost::mutex::scoped_lock onErrLock(fOnErrMutex);
string moduleName = client->moduleName();
//cout << "moduleName=" << moduleName << endl;
for ( uint32_t i = 0; i < fPmConnections.size(); i++)
{
if (moduleName != fPmConnections[i]->moduleName())
tempConns.push_back(fPmConnections[i]);
//else
//cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl;
}
{ if (tempConns.size() == fPmConnections.size()) return;
boost::mutex::scoped_lock onErrLock(fOnErrMutex);
string moduleName = client->moduleName();
//cout << "moduleName=" << moduleName << endl;
for ( uint32_t i = 0; i < fPmConnections.size(); i++)
{
if (moduleName != fPmConnections[i]->moduleName())
tempConns.push_back(fPmConnections[i]);
//else
//cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl;
}
if (tempConns.size() == fPmConnections.size()) return; fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
//cout << "PMCOUNT=" << pmCount << endl;
fPmConnections.swap(tempConns); // log it
pmCount = (pmCount == 0 ? 0 : pmCount - 1); ostringstream os;
//cout << "PMCOUNT=" << pmCount << endl; os << "DEC: lost connection to " << client->addr2String();
*/ writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL);
// send alarm & log it */
ALARMManager alarmMgr; }
string alarmItem = client->addr2String();
alarmItem.append(" PrimProc");
alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
ostringstream os;
os << "DEC: lost connection to " << client->addr2String();
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL);
// }
return; return;
} }
@@ -999,22 +1008,22 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
catch (...) catch (...)
{ {
// @bug 488. error out under such condition instead of re-trying other connection, // @bug 488. error out under such condition instead of re-trying other connection,
// by pushing 0 size bytestream to messagequeue and throw excpetion // by pushing 0 size bytestream to messagequeue and throw exception
/* SBS sbs; SBS sbs;
lk.lock(); lk.lock();
//cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << endl; //std::cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << std::endl;
MessageQueueMap::iterator map_tok; MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0)); sbs.reset(new ByteStream(0));
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
{ {
map_tok->second->queue.clear(); map_tok->second->queue.clear();
(void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]);
map_tok->second->queue.push(sbs); map_tok->second->queue.push(sbs);
} }
lk.unlock();
lk.unlock();
/*
// reconfig the connection array // reconfig the connection array
ClientList tempConns; ClientList tempConns;
{ {
@@ -1033,7 +1042,6 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
fPmConnections.swap(tempConns); fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1); pmCount = (pmCount == 0 ? 0 : pmCount - 1);
} }
*/
// send alarm // send alarm
ALARMManager alarmMgr; ALARMManager alarmMgr;
string alarmItem("UNKNOWN"); string alarmItem("UNKNOWN");
@@ -1045,6 +1053,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
alarmItem.append(" PrimProc"); alarmItem.append(" PrimProc");
alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET); alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET);
*/
throw runtime_error("DistributedEngineComm::write: Broken Pipe error"); throw runtime_error("DistributedEngineComm::write: Broken Pipe error");
} }
} }

View File

@@ -197,6 +197,12 @@ public:
return pmCount; return pmCount;
} }
unsigned getNumConnections() const
{
unsigned cpp = (fIsExeMgr ? fRm->getPsConnectionsPerPrimProc() : 1);
return fRm->getPsCount() * cpp;
}
messageqcpp::Stats getNetworkStats(uint32_t uniqueID); messageqcpp::Stats getNetworkStats(uint32_t uniqueID);
friend class ::TestDistributedEngineComm; friend class ::TestDistributedEngineComm;

View File

@@ -642,7 +642,11 @@ new_plan:
std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl; std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl;
setRMParms(csep.rmParms()); setRMParms(csep.rmParms());
// Re-establish lost PP connections.
if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers()))
{
fEc->Setup();
}
// @bug 1021. try to get schema cache for a come in query. // @bug 1021. try to get schema cache for a come in query.
// skip system catalog queries. // skip system catalog queries.
if (!csep.isInternal()) if (!csep.isInternal())