From 13414513e52c28562300b4c2e129885940c7a320 Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Fri, 22 May 2020 12:43:19 +0000 Subject: [PATCH] MCOL-4015 ExeMgr now re-establishes its connections with PrimProcs. --- dbcon/joblist/distributedenginecomm.cpp | 125 +++++++++++++----------- dbcon/joblist/distributedenginecomm.h | 6 ++ exemgr/main.cpp | 6 +- 3 files changed, 78 insertions(+), 59 deletions(-) diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 6285b187b..1cc318ef8 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -1,5 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. - * Copyright (C) 2016 MariaDB Corporation. + * Copyright (C) 2016-2020 MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -36,6 +36,8 @@ #include #include #include +#include +#include using namespace std; #include @@ -240,11 +242,10 @@ void DistributedEngineComm::Setup() newClients.clear(); newLocks.clear(); - throttleThreshold = fRm->getDECThrottleThreshold(); uint32_t newPmCount = fRm->getPsCount(); - int cpp = (fIsExeMgr ? fRm->getPsConnectionsPerPrimProc() : 1); + throttleThreshold = fRm->getDECThrottleThreshold(); tbpsThreadCount = fRm->getJlNumScanReceiveThreads(); - unsigned numConnections = newPmCount * cpp; + unsigned numConnections = getNumConnections(); oam::Oam oam; ModuleTypeConfig moduletypeconfig; @@ -386,51 +387,59 @@ void DistributedEngineComm::Listen(boost::shared_ptr client, Error: // @bug 488 - error condition! push 0 length bs to messagequeuemap and // eventually let jobstep error out. - /* boost::mutex::scoped_lock lk(fMlock); - //cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl; + boost::mutex::scoped_lock lk(fMlock); + MessageQueueMap::iterator map_tok; + sbs.reset(new ByteStream(0)); - MessageQueueMap::iterator map_tok; - sbs.reset(new ByteStream(0)); + for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) + { + map_tok->second->queue.clear(); + (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); + map_tok->second->queue.push(sbs); + } + lk.unlock(); - for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) - { - map_tok->second->queue.clear(); - (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); - map_tok->second->queue.push(sbs); - } - lk.unlock(); + if (fIsExeMgr) + { + //std::cout << "WARNING: DEC READ 0 LENGTH BS FROM " + // << client->otherEnd()<< " OR GOT AN EXCEPTION READING" << std::endl; + decltype(pmCount) originalPMCount = pmCount; + // Re-establish if a remote PM restarted. + std::this_thread::sleep_for(std::chrono::seconds(3)); + Setup(); + if (originalPMCount != pmCount) + { + ostringstream os; + os << "DEC: lost connection to " << client->addr2String(); + writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_ERROR); + } - // reset the pmconnection vector - ClientList tempConns; +/* + // reset the pmconnection vector + ClientList tempConns; + boost::mutex::scoped_lock onErrLock(fOnErrMutex); + string moduleName = client->moduleName(); + //cout << "moduleName=" << moduleName << endl; + for ( uint32_t i = 0; i < fPmConnections.size(); i++) + { + if (moduleName != fPmConnections[i]->moduleName()) + tempConns.push_back(fPmConnections[i]); + //else + //cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl; + } - { - boost::mutex::scoped_lock onErrLock(fOnErrMutex); - string moduleName = client->moduleName(); - //cout << "moduleName=" << moduleName << endl; - for ( uint32_t i = 0; i < fPmConnections.size(); i++) - { - if (moduleName != fPmConnections[i]->moduleName()) - tempConns.push_back(fPmConnections[i]); - //else - //cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl; - } + if (tempConns.size() == fPmConnections.size()) return; - if (tempConns.size() == fPmConnections.size()) return; + fPmConnections.swap(tempConns); + pmCount = (pmCount == 0 ? 0 : pmCount - 1); + //cout << "PMCOUNT=" << pmCount << endl; - fPmConnections.swap(tempConns); - pmCount = (pmCount == 0 ? 0 : pmCount - 1); - //cout << "PMCOUNT=" << pmCount << endl; - */ - // send alarm & log it - ALARMManager alarmMgr; - string alarmItem = client->addr2String(); - alarmItem.append(" PrimProc"); - alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET); - - ostringstream os; - os << "DEC: lost connection to " << client->addr2String(); - writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL); -// } + // log it + ostringstream os; + os << "DEC: lost connection to " << client->addr2String(); + writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL); +*/ + } return; } @@ -999,22 +1008,22 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin catch (...) { // @bug 488. error out under such condition instead of re-trying other connection, - // by pushing 0 size bytestream to messagequeue and throw excpetion - /* SBS sbs; - lk.lock(); - //cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << endl; - MessageQueueMap::iterator map_tok; - sbs.reset(new ByteStream(0)); + // by pushing 0 size bytestream to messagequeue and throw exception + SBS sbs; + lk.lock(); + //std::cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << std::endl; + MessageQueueMap::iterator map_tok; + sbs.reset(new ByteStream(0)); - for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) - { - map_tok->second->queue.clear(); - (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); - map_tok->second->queue.push(sbs); - } - - lk.unlock(); + for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok) + { + map_tok->second->queue.clear(); + (void)atomicops::atomicInc(&map_tok->second->unackedWork[0]); + map_tok->second->queue.push(sbs); + } + lk.unlock(); + /* // reconfig the connection array ClientList tempConns; { @@ -1033,7 +1042,6 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin fPmConnections.swap(tempConns); pmCount = (pmCount == 0 ? 0 : pmCount - 1); } - */ // send alarm ALARMManager alarmMgr; string alarmItem("UNKNOWN"); @@ -1045,6 +1053,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin alarmItem.append(" PrimProc"); alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET); + */ throw runtime_error("DistributedEngineComm::write: Broken Pipe error"); } } diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 2e4faa023..32d15c718 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -197,6 +197,12 @@ public: return pmCount; } + unsigned getNumConnections() const + { + unsigned cpp = (fIsExeMgr ? fRm->getPsConnectionsPerPrimProc() : 1); + return fRm->getPsCount() * cpp; + } + messageqcpp::Stats getNetworkStats(uint32_t uniqueID); friend class ::TestDistributedEngineComm; diff --git a/exemgr/main.cpp b/exemgr/main.cpp index 3fa3a1c79..1c6008e7f 100644 --- a/exemgr/main.cpp +++ b/exemgr/main.cpp @@ -642,7 +642,11 @@ new_plan: std::cout << "### For session id " << csep.sessionID() << ", got a CSEP" << std::endl; setRMParms(csep.rmParms()); - + // Re-establish lost PP connections. + if (UNLIKELY(fEc->getNumConnections() != fEc->connectedPmServers())) + { + fEc->Setup(); + } // @bug 1021. try to get schema cache for a come in query. // skip system catalog queries. if (!csep.isInternal())