1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-27 21:01:50 +03:00

MCOL-4337 Controllernode now tries to establish connections to

all workernodes when initialising
There is a new corresponding XML config options
DBRM_Controller.WorkerConnectionTimeout that controls how long
controllernode waits for wns before it proceeds
This commit is contained in:
Roman Nozdrin
2020-10-08 12:09:35 +00:00
parent d85fc579ba
commit cfce821513
5 changed files with 96 additions and 371 deletions

View File

@ -140,22 +140,9 @@ MasterDBRMNode& MasterDBRMNode::operator=(const MasterDBRMNode& m)
void MasterDBRMNode::initMsgQueues(config::Config* config)
{
string stmp;
int ltmp;
char ctmp[50];
int i;
stmp = config->getConfig("DBRM_Controller", "NumWorkers");
if (stmp.length() == 0)
throw runtime_error("MasterDBRMNode::initMsgQueues(): config file error looking for <DBRM_Controller><NumWorkers>");
ltmp = static_cast<int>(config::Config::fromText(stmp));
if (ltmp < 1)
throw runtime_error("MasterDBRMNode::initMsgQueues(): Bad NumWorkers value");
NumWorkers = ltmp;
std::string methodName("MasterDBRMNode::initMsgQueues()");
size_t connectTimeoutSecs = 0;
getNumWorkersAndTimeout(connectTimeoutSecs, methodName, config);
serverLock.lock();
@ -171,11 +158,80 @@ void MasterDBRMNode::initMsgQueues(config::Config* config)
serverLock.unlock();
for (i = 1; i <= NumWorkers; i++)
connectToWorkers(connectTimeoutSecs);
}
void MasterDBRMNode::getNumWorkersAndTimeout(size_t& connectTimeoutSecs,
const std::string& methodName,
config::Config* config)
{
string stmp;
int ltmp;
connectTimeoutSecs = 30; // default
stmp = config->getConfig("DBRM_Controller", "NumWorkers");
if (stmp.length() == 0)
throw runtime_error(methodName +
": config file error looking for <DBRM_Controller><NumWorkers>");
ltmp = static_cast<int>(config::Config::fromText(stmp));
if (ltmp < 1)
throw runtime_error(methodName + ": Bad NumWorkers value");
NumWorkers = ltmp;
stmp = config->getConfig("DBRM_Controller", "WorkerConnectionTimeout");
if (stmp.length() > 0)
{
snprintf(ctmp, 50, "DBRM_Worker%d", i);
std::string module(ctmp);
slaves.push_back(MessageQueueClientPool::getInstance(module));
ltmp = static_cast<int>(config::Config::fromText(stmp));
if (ltmp > 1)
connectTimeoutSecs = ltmp;
}
}
void MasterDBRMNode::connectToWorkers(const size_t connectTimeoutSecs)
{
size_t timeoutMsecs = connectTimeoutSecs * 1000000;
size_t timeSpent = 0;
int workersOnline = 0;
bool initialRun = true;
while (timeoutMsecs > timeSpent && workersOnline < NumWorkers)
{
char ctmp[50];
for (int i = 0; i < NumWorkers; i++)
{
snprintf(ctmp, sizeof(ctmp), "DBRM_Worker%d", i+1);
std::string module(ctmp);
if (static_cast<int>(slaves.size()) < NumWorkers)
{
slaves.push_back(MessageQueueClientPool::getInstance(module));
}
if (!slaves[i]->isConnected())
{
if (!slaves[i]->connect())
{
// first iteration
if (initialRun)
log("DBRM Controller: Warning: could not connect to " + module,
logging::LOG_TYPE_WARNING);
}
else
{
log("DBRM Controller: Connected to " + module,
logging::LOG_TYPE_DEBUG);
workersOnline++;
}
}
}
if (initialRun)
initialRun = false;
if (workersOnline < NumWorkers)
{
usleep(connectTimeoutStep);
timeSpent += connectTimeoutStep;
}
}
}
@ -1238,17 +1294,17 @@ void MasterDBRMNode::doReload(messageqcpp::IOSocket* sock)
*/
ByteStream reply;
string stmp;
int ltmp;
char ctmp[50];
int i;
config::Config* config = config::Config::makeConfig();
log("Reloading", LOG_TYPE_INFO);
stmp = config->getConfig("DBRM_Controller", "NumWorkers");
if (stmp.length() == 0)
std::string methodName("MasterDBRMNode::doReload()");
size_t connectTimeoutSecs = 0;
try
{
getNumWorkersAndTimeout(connectTimeoutSecs, methodName, config);
}
catch (std::exception&)
{
reply << (uint8_t) ERR_FAILURE;
@ -1258,40 +1314,18 @@ void MasterDBRMNode::doReload(messageqcpp::IOSocket* sock)
}
catch (exception&) { }
throw runtime_error("MasterDBRMNode::doReload(): config file error looking for <DBRM_Controller><NumWorkers>");
throw;
}
ltmp = static_cast<int>(config::Config::fromText(stmp));
if (ltmp < 1)
{
reply << (uint8_t) ERR_FAILURE;
try
{
sock->write(reply);
}
catch (exception&) { }
throw runtime_error("MasterDBRMNode::doReload(): Bad NumWorkers value");
}
for (i = 0; i < (int) slaves.size(); i++)
for (int i = 0; i < (int) slaves.size(); i++)
{
MessageQueueClientPool::deleteInstance(slaves[i]);
slaves[i] = NULL;
slaves[i] = nullptr;
}
slaves.clear();
NumWorkers = ltmp;
for (i = 1; i <= NumWorkers; i++)
{
snprintf(ctmp, 50, "DBRM_Worker%d", i);
std::string module(ctmp);
slaves.push_back(MessageQueueClientPool::getInstance(module));
}
connectToWorkers(connectTimeoutSecs);
iSlave = slaves.end();
undo();
@ -1306,18 +1340,6 @@ void MasterDBRMNode::doReload(messageqcpp::IOSocket* sock)
}
catch (exception&) { }
/* Asynchronous version
ByteStream reply;
reply << (uint8_t) ERR_OK;
try {
sock->write(reply);
}
catch (exception&) { }
// sock->close();
reloadCmd = true;
*/
}
void MasterDBRMNode::doVerID(ByteStream& msg, ThreadParams* p)