You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-529 Pool DBRM connections
DBRM connections are reused so that we don't have a huge amount of TIME_WAIT sockets when there are large amounts of DML. Also applied to i_s.columnstore_files
This commit is contained in:
@ -40,6 +40,7 @@
|
||||
#include "socketclosed.h"
|
||||
#include "configcpp.h"
|
||||
#include "sessionmanagerserver.h"
|
||||
#include "messagequeuepool.h"
|
||||
#define DBRM_DLLEXPORT
|
||||
#include "dbrm.h"
|
||||
#undef DBRM_DLLEXPORT
|
||||
@ -53,7 +54,7 @@
|
||||
#endif
|
||||
|
||||
#define DO_ERR_NETWORK \
|
||||
delete msgClient; \
|
||||
MessageQueueClientPool::releaseInstance(msgClient); \
|
||||
msgClient = NULL; \
|
||||
mutex.unlock(); \
|
||||
return ERR_NETWORK;
|
||||
@ -97,8 +98,7 @@ DBRM::DBRM(const DBRM& brm)
|
||||
DBRM::~DBRM() throw()
|
||||
{
|
||||
if (msgClient != NULL)
|
||||
msgClient->shutdown();
|
||||
delete msgClient;
|
||||
MessageQueueClientPool::releaseInstance(msgClient);
|
||||
}
|
||||
|
||||
DBRM& DBRM::operator=(const DBRM& brm)
|
||||
@ -742,7 +742,7 @@ reconnect:
|
||||
|
||||
if (msgClient == NULL)
|
||||
try {
|
||||
msgClient = new MessageQueueClient(masterName);
|
||||
msgClient = MessageQueueClientPool::getInstance(masterName);
|
||||
}
|
||||
catch(exception &e) {
|
||||
cerr << "class DBRM failed to create a MessageQueueClient: " <<
|
||||
@ -766,7 +766,7 @@ reconnect:
|
||||
cerr << "DBRM::send_recv caught: " << e.what() << endl;
|
||||
if (firstAttempt) {
|
||||
firstAttempt = false;
|
||||
delete msgClient;
|
||||
MessageQueueClientPool::releaseInstance(msgClient);
|
||||
msgClient = NULL;
|
||||
goto reconnect;
|
||||
}
|
||||
@ -776,7 +776,7 @@ reconnect:
|
||||
cerr << "DBRM::send_recv: controller node closed the connection" << endl;
|
||||
if (firstAttempt) {
|
||||
firstAttempt = false;
|
||||
delete msgClient;
|
||||
MessageQueueClientPool::releaseInstance(msgClient);
|
||||
msgClient = NULL;
|
||||
sleep(10);
|
||||
goto reconnect;
|
||||
@ -3222,7 +3222,7 @@ bool DBRM::isDBRMReady() throw()
|
||||
{
|
||||
if (msgClient == NULL)
|
||||
{
|
||||
msgClient = new MessageQueueClient(masterName);
|
||||
msgClient = MessageQueueClientPool::getInstance(masterName);
|
||||
}
|
||||
if (msgClient->connect())
|
||||
{
|
||||
@ -3232,7 +3232,7 @@ bool DBRM::isDBRMReady() throw()
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
delete msgClient;
|
||||
MessageQueueClientPool::releaseInstance(msgClient);
|
||||
msgClient = NULL;
|
||||
sleep(1);
|
||||
}
|
||||
|
@ -32,7 +32,7 @@
|
||||
#include "liboamcpp.h"
|
||||
#include "stopwatch.h"
|
||||
#include "masterdbrmnode.h"
|
||||
|
||||
#include "messagequeuepool.h"
|
||||
// #define BRM_VERBOSE
|
||||
|
||||
// minor improvement to code clarity...
|
||||
@ -163,7 +163,8 @@ void MasterDBRMNode::initMsgQueues(config::Config *config)
|
||||
serverLock.unlock();
|
||||
for (i = 1; i <= NumWorkers; i++) {
|
||||
snprintf(ctmp, 50, "DBRM_Worker%d", i);
|
||||
slaves.push_back(new MessageQueueClient(ctmp, config));
|
||||
std::string module(ctmp);
|
||||
slaves.push_back(MessageQueueClientPool::getInstance(module));
|
||||
}
|
||||
}
|
||||
|
||||
@ -882,8 +883,8 @@ void MasterDBRMNode::finalCleanup()
|
||||
cerr << "Closing connections" << endl;
|
||||
#endif
|
||||
for (sIt = slaves.begin(); sIt != slaves.end(); sIt++) {
|
||||
(*sIt)->shutdown();
|
||||
delete *sIt;
|
||||
MessageQueueClientPool::releaseInstance(*sIt);
|
||||
*sIt = NULL;
|
||||
}
|
||||
slaves.clear();
|
||||
|
||||
@ -1036,8 +1037,8 @@ void MasterDBRMNode::doReload(messageqcpp::IOSocket *sock)
|
||||
}
|
||||
|
||||
for (i = 0; i < (int) slaves.size(); i++) {
|
||||
slaves[i]->shutdown();
|
||||
delete slaves[i];
|
||||
MessageQueueClientPool::releaseInstance(slaves[i]);
|
||||
slaves[i] = NULL;
|
||||
}
|
||||
slaves.clear();
|
||||
|
||||
@ -1045,7 +1046,8 @@ void MasterDBRMNode::doReload(messageqcpp::IOSocket *sock)
|
||||
|
||||
for (i = 1; i <= NumWorkers; i++) {
|
||||
snprintf(ctmp, 50, "DBRM_Worker%d", i);
|
||||
slaves.push_back(new MessageQueueClient(ctmp, config));
|
||||
std::string module(ctmp);
|
||||
slaves.push_back(MessageQueueClientPool::getInstance(module));
|
||||
}
|
||||
|
||||
iSlave = slaves.end();
|
||||
|
Reference in New Issue
Block a user