1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(MCOL-6082) Cluster with read-only nodes should correctly work with DML

This patch changes logic from counting all nodes to counting only
read-write nodes when messaging about DML operations.
This commit is contained in:
Serguey Zefirov
2025-06-30 14:27:26 +03:00
parent 37a626eed5
commit 2753743762
3 changed files with 40 additions and 1 deletions

View File

@ -350,7 +350,7 @@ int DMLPackageProcessor::rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID
while (1) while (1)
{ {
if (msgRecived == fWEClient->getPmCount()) if (msgRecived == fWEClient->getRWConnectionsCount())
break; break;
fWEClient->read(uniqueId, bsIn); fWEClient->read(uniqueId, bsIn);

View File

@ -154,6 +154,21 @@ struct QueueShutdown
x.shutdown(); x.shutdown();
} }
}; };
/**
* This function checks if the WriteEngineServer (WES) is configured
* for the specified node in the configuration.
* @param config Pointer to the configuration object
* @param fOtherEnd The name of the node to check
* @return true if WES is configured, false otherwise
*/
bool isWESConfigured(config::Config* config, const std::string& fOtherEnd)
{
// Check if WES IP address record exists in the config (if not, this is a read-only node)
std::string otherEndDnOrIPStr = config->getConfig(fOtherEnd, "IPAddr");
return !(otherEndDnOrIPStr.empty() || otherEndDnOrIPStr == "unassigned");
}
} // namespace } // namespace
namespace WriteEngine namespace WriteEngine
@ -224,6 +239,13 @@ void WEClients::Setup()
snprintf(buff, sizeof(buff), "pm%u_WriteEngineServer", moduleID); snprintf(buff, sizeof(buff), "pm%u_WriteEngineServer", moduleID);
string fServer(buff); string fServer(buff);
// Check if WES is configured for this module
if (!isWESConfigured(rm->getConfig(), fServer))
{
writeToLog(__FILE__, __LINE__, "Skipping WriteEngineServer client creation for " + fServer + " as the node is read-only", LOG_TYPE_INFO);
continue;
}
boost::shared_ptr<MessageQueueClient> cl(new MessageQueueClient(fServer, rm->getConfig())); boost::shared_ptr<MessageQueueClient> cl(new MessageQueueClient(fServer, rm->getConfig()));
boost::shared_ptr<boost::mutex> nl(new boost::mutex()); boost::shared_ptr<boost::mutex> nl(new boost::mutex());
@ -287,6 +309,11 @@ void WEClients::Setup()
} }
} }
bool WEClients::isConnectionReadonly(uint32_t connection)
{
return fPmConnections[connection] == nullptr;
}
int WEClients::Close() int WEClients::Close()
{ {
makeBusy(false); makeBusy(false);

View File

@ -114,6 +114,18 @@ class WEClients
return pmCount; return pmCount;
} }
uint32_t getRWConnectionsCount()
{
uint32_t count = 0;
for (uint32_t i = 0; i < fPmConnections.size(); i++)
{
count += fPmConnections[i] != nullptr;
}
return count;
}
bool isConnectionReadonly(uint32_t connection);
private: private:
WEClients(const WEClients& weClient); WEClients(const WEClients& weClient);
WEClients& operator=(const WEClients& weClient); WEClients& operator=(const WEClients& weClient);