diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.cpp b/dbcon/dmlpackageproc/dmlpackageprocessor.cpp index 307ba1d39..5b9dcfa25 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.cpp @@ -350,7 +350,7 @@ int DMLPackageProcessor::rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID while (1) { - if (msgRecived == fWEClient->getPmCount()) + if (msgRecived == fWEClient->getRWConnectionsCount()) break; fWEClient->read(uniqueId, bsIn); diff --git a/writeengine/client/we_clients.cpp b/writeengine/client/we_clients.cpp index 1badf9ab4..073d13744 100644 --- a/writeengine/client/we_clients.cpp +++ b/writeengine/client/we_clients.cpp @@ -154,6 +154,21 @@ struct QueueShutdown x.shutdown(); } }; + +/** + * This function checks if the WriteEngineServer (WES) is configured + * for the specified node in the configuration. + * @param config Pointer to the configuration object + * @param fOtherEnd The name of the node to check + * @return true if WES is configured, false otherwise + */ +bool isWESConfigured(config::Config* config, const std::string& fOtherEnd) +{ + // Check if WES IP address record exists in the config (if not, this is a read-only node) + std::string otherEndDnOrIPStr = config->getConfig(fOtherEnd, "IPAddr"); + return !(otherEndDnOrIPStr.empty() || otherEndDnOrIPStr == "unassigned"); +} + } // namespace namespace WriteEngine @@ -224,6 +239,13 @@ void WEClients::Setup() snprintf(buff, sizeof(buff), "pm%u_WriteEngineServer", moduleID); string fServer(buff); + // Check if WES is configured for this module + if (!isWESConfigured(rm->getConfig(), fServer)) + { + writeToLog(__FILE__, __LINE__, "Skipping WriteEngineServer client creation for " + fServer + " as the node is read-only", LOG_TYPE_INFO); + continue; + } + boost::shared_ptr cl(new MessageQueueClient(fServer, rm->getConfig())); boost::shared_ptr nl(new boost::mutex()); @@ -287,6 +309,11 @@ void WEClients::Setup() } } +bool WEClients::isConnectionReadonly(uint32_t connection) +{ + return fPmConnections[connection] == nullptr; +} + int WEClients::Close() { makeBusy(false); diff --git a/writeengine/client/we_clients.h b/writeengine/client/we_clients.h index 8a512f732..da315c6de 100644 --- a/writeengine/client/we_clients.h +++ b/writeengine/client/we_clients.h @@ -114,6 +114,18 @@ class WEClients return pmCount; } + uint32_t getRWConnectionsCount() + { + uint32_t count = 0; + for (uint32_t i = 0; i < fPmConnections.size(); i++) + { + count += fPmConnections[i] != nullptr; + } + return count; + } + + bool isConnectionReadonly(uint32_t connection); + private: WEClients(const WEClients& weClient); WEClients& operator=(const WEClients& weClient);