1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
2024-09-10 17:44:25 +03:00

2958 lines
61 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
* Copyright (C) 2016 MariaDB Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*****************************************************************************
* $Id: masterdbrmnode.cpp 1899 2013-06-12 13:32:30Z dcathey $
*
****************************************************************************/
#include <unistd.h>
#include <sys/types.h>
#include <sstream>
#include <thread>
#include "sessionmanager.h"
#include "socketclosed.h"
#include "liboamcpp.h"
#include "stopwatch.h"
#include "masterdbrmnode.h"
#include "messagequeuepool.h"
// #define BRM_VERBOSE
// minor improvement to code clarity...
#define CHECK_ERROR1(x) \
if (halting) \
{ \
for (it = responses.begin(); it != responses.end(); it++) \
delete *it; \
responses.clear(); \
undo(); \
slaveLock.unlock(); \
usleep(50000); \
goto retrycmd; \
} \
if (x) \
{ \
undo(); \
slaveLock.unlock(); \
sendError(p->sock, x); \
goto out; \
} \
else \
{ \
}
#define THREAD_EXIT \
{ \
mutex.lock(); \
for (vector<IOSocket*>::iterator _it = activeSessions.begin(); _it != activeSessions.end(); ++_it) \
if (p->sock == *_it) \
{ \
activeSessions.erase(_it); \
break; \
} \
mutex.unlock(); \
p->sock->close(); \
delete p->sock; \
delete p->t; \
delete p; \
for (it = responses.begin(); it != responses.end(); it++) \
delete *it; \
}
using namespace std;
using namespace messageqcpp;
using namespace logging;
namespace BRM
{
MasterDBRMNode::MasterDBRMNode()
{
config::Config* config;
config = config::Config::makeConfig();
if (config == NULL)
throw invalid_argument("MasterDBRMNode: Configuration error.");
runners = 0;
die = false;
reloadCmd = false;
readOnly = false;
halting = false;
tableLockServer.reset(new TableLockServer(&sm));
initMsgQueues(config);
rg = new LBIDResourceGraph();
//@Bug 2325 DBRMTimeOut is default to 60 seconds
std::string retStr = config->getConfig("SystemConfig", "DBRMTimeOut");
int secondsToWait = config->fromText(retStr);
MSG_TIMEOUT.tv_nsec = 0;
if (secondsToWait > 0)
MSG_TIMEOUT.tv_sec = secondsToWait;
else
MSG_TIMEOUT.tv_sec = 20;
}
MasterDBRMNode::~MasterDBRMNode()
{
die = true;
finalCleanup();
}
MasterDBRMNode::MasterDBRMNode(const MasterDBRMNode& m)
{
throw logic_error("Don't use the MasterDBRMNode copy constructor");
}
MasterDBRMNode& MasterDBRMNode::operator=(const MasterDBRMNode& m)
{
throw logic_error("Don't use the MasterDBRMNode = operator");
}
void MasterDBRMNode::initMsgQueues(config::Config* config)
{
std::string methodName("MasterDBRMNode::initMsgQueues()");
size_t connectTimeoutSecs = 0;
getNumWorkersAndTimeout(connectTimeoutSecs, methodName, config);
serverLock.lock();
try
{
dbrmServer = new MessageQueueServer("DBRM_Controller", config);
}
catch (...)
{
serverLock.unlock();
throw;
}
serverLock.unlock();
connectToWorkers(connectTimeoutSecs);
}
void MasterDBRMNode::getNumWorkersAndTimeout(size_t& connectTimeoutSecs, const std::string& methodName,
config::Config* config)
{
string stmp;
int ltmp;
connectTimeoutSecs = 30; // default
stmp = config->getConfig("DBRM_Controller", "NumWorkers");
if (stmp.length() == 0)
throw runtime_error(methodName + ": config file error looking for <DBRM_Controller><NumWorkers>");
ltmp = static_cast<int>(config::Config::fromText(stmp));
if (ltmp < 1)
throw runtime_error(methodName + ": Bad NumWorkers value");
NumWorkers = ltmp;
stmp = config->getConfig("DBRM_Controller", "WorkerConnectionTimeout");
if (stmp.length() > 0)
{
ltmp = static_cast<int>(config::Config::fromText(stmp));
if (ltmp > 1)
connectTimeoutSecs = ltmp;
}
}
void MasterDBRMNode::connectToWorkers(const size_t connectTimeoutSecs)
{
size_t timeoutMsecs = connectTimeoutSecs * 1000000;
size_t timeSpent = 0;
int workersOnline = 0;
bool initialRun = true;
while (timeoutMsecs > timeSpent && workersOnline < NumWorkers)
{
char ctmp[50];
for (int i = 0; i < NumWorkers; i++)
{
snprintf(ctmp, sizeof(ctmp), "DBRM_Worker%d", i + 1);
std::string module(ctmp);
if (static_cast<int>(slaves.size()) < NumWorkers)
{
slaves.push_back(MessageQueueClientPool::getInstance(module));
}
if (!slaves[i]->isConnected())
{
if (!slaves[i]->connect())
{
// first iteration
if (initialRun)
log("DBRM Controller: Warning: could not connect to " + module, logging::LOG_TYPE_WARNING);
}
else
{
log("DBRM Controller: Connected to " + module, logging::LOG_TYPE_DEBUG);
workersOnline++;
}
}
}
if (initialRun)
initialRun = false;
if (workersOnline < NumWorkers)
{
usleep(connectTimeoutStep);
timeSpent += connectTimeoutStep;
}
}
}
void MasterDBRMNode::stop()
{
die = true;
}
void MasterDBRMNode::lock()
{
slaveLock.lock();
}
void MasterDBRMNode::unlock()
{
slaveLock.unlock();
}
void MasterDBRMNode::reload()
{
config::Config* config;
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: reloading the config file." << endl;
#endif
reloadCmd = false;
config = config::Config::makeConfig();
if (config == NULL)
throw runtime_error("DBRM Controller: Configuration error. Reload aborted.");
die = true;
finalCleanup();
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: reinitializing..." << endl;
#endif
try
{
initMsgQueues(config);
}
catch (exception&)
{
throw runtime_error("DBRM Controller: Configuration error. Reload aborted.");
// masterdbrm.run() will exit after this
}
die = false;
rg = new LBIDResourceGraph();
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: reload successful." << endl;
#endif
readOnly = false;
}
void MasterDBRMNode::setReadOnly(bool ro)
{
slaveLock.lock();
readOnly = ro;
slaveLock.unlock();
}
void MasterDBRMNode::run()
{
ByteStream msg;
IOSocket* s;
boost::thread* reader;
while (!die)
{
s = new IOSocket();
#ifdef BRM_VERBOSE
cerr << "DBRM Controller waiting..." << endl;
#endif
serverLock.lock();
if (dbrmServer != NULL)
try
{
*s = dbrmServer->accept(&MSG_TIMEOUT);
}
catch (runtime_error& e)
{
cerr << e.what() << " continuing...\n";
serverLock.unlock();
delete s;
continue;
}
serverLock.unlock();
if (reloadCmd)
{
if (s->isOpen())
s->close();
delete s;
reload();
continue;
}
if (die || !s->isOpen())
{
if (s->isOpen())
s->close();
delete s;
continue;
}
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: got a connection..." << endl;
#endif
mutex.lock();
activeSessions.push_back(s);
params = new ThreadParams();
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: starting another thread" << endl;
#endif
mutex2.lock();
try
{
reader = new boost::thread(MsgProcessor(this));
}
catch (boost::thread_resource_error&)
{
log("DBRM Controller: WARNING!! Got thread resource error! Increase system stack size or decrease "
"the # of active sessions.");
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: WARNING!! Got thread resource error! Increase system stack size or "
"decrease the # of active sessions.\n";
#endif
activeSessions.pop_back();
sendError(s, ERR_NETWORK);
sleep(1); // don't close right away to avoid broken pipe on the client
s->close();
delete s;
delete params;
mutex2.unlock();
mutex.unlock();
continue;
}
params->t = reader;
params->sock = s;
mutex2.unlock();
#ifdef __FreeBSD__
mutex.unlock();
#endif
}
serverLock.lock();
delete dbrmServer;
dbrmServer = NULL;
serverLock.unlock();
}
void MasterDBRMNode::msgProcessor()
{
struct ThreadParams* p;
ByteStream msg;
vector<ByteStream*> responses;
vector<ByteStream*>::iterator it;
int err;
uint8_t cmd;
StopWatch timer;
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: msgProcessor()" << endl;
#endif
mutex2.lock();
p = params;
mutex2.unlock();
#ifndef __FreeBSD__
mutex.unlock();
#endif
while (!die)
{
try
{
msg = p->sock->read(&MSG_TIMEOUT);
}
catch (...)
{
THREAD_EXIT
throw;
}
if (die) // || msg.length() == 0)
break;
else if (msg.length() == 0)
continue;
/* Check for an command for the master */
msg.peek(cmd);
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: recv'd message " << (int)cmd << " length " << msg.length() << endl;
#endif
switch (cmd)
{
case HALT: doHalt(p->sock); continue;
case RESUME: doResume(p->sock); continue;
case RELOAD:
try
{
doReload(p->sock);
}
catch (exception& e)
{
cerr << e.what() << endl;
}
continue;
case SETREADONLY: doSetReadOnly(p->sock, true); continue;
case SETREADWRITE: doSetReadOnly(p->sock, false); continue;
case GETREADONLY: doGetReadOnly(p->sock); continue;
case GET_SYSTEM_CATALOG: doGetSystemCatalog(msg, p); continue;
}
/* Process SessionManager calls */
switch (cmd)
{
case VER_ID: doVerID(msg, p); continue;
case SYSCAT_VER_ID: doSysCatVerID(msg, p); continue;
case NEW_TXN_ID: doNewTxnID(msg, p); continue;
case COMMITTED: doCommitted(msg, p); continue;
case ROLLED_BACK: doRolledBack(msg, p); continue;
case GET_TXN_ID: doGetTxnID(msg, p); continue;
case SID_TID_MAP: doSIDTIDMap(msg, p); continue;
case GET_UNIQUE_UINT32: doGetUniqueUint32(msg, p); continue;
case GET_UNIQUE_UINT64: doGetUniqueUint64(msg, p); continue;
case GET_SYSTEM_STATE: doGetSystemState(msg, p); continue;
case SET_SYSTEM_STATE: doSetSystemState(msg, p); continue;
case CLEAR_SYSTEM_STATE: doClearSystemState(msg, p); continue;
case SM_RESET: doSessionManagerReset(msg, p); continue;
case GET_UNCOMMITTED_LBIDS: doGetUncommittedLbids(msg, p); continue;
}
switch (cmd)
{
case NEW_CPIMPORT_JOB: doNewCpimportJob(p); continue;
case FINISH_CPIMPORT_JOB: doFinishCpimportJob(msg, p); continue;
case START_READONLY: doStartReadOnly(p->sock); continue;
case FORCE_CLEAR_CPIMPORT_JOBS: doForceClearAllCpimportJobs(p->sock); continue;
}
/* Process TableLock calls */
switch (cmd)
{
case GET_TABLE_LOCK: doGetTableLock(msg, p); continue;
case RELEASE_TABLE_LOCK: doReleaseTableLock(msg, p); continue;
case CHANGE_TABLE_LOCK_STATE: doChangeTableLockState(msg, p); continue;
case CHANGE_TABLE_LOCK_OWNER: doChangeTableLockOwner(msg, p); continue;
case GET_ALL_TABLE_LOCKS: doGetAllTableLocks(msg, p); continue;
case RELEASE_ALL_TABLE_LOCKS: doReleaseAllTableLocks(msg, p); continue;
case GET_TABLE_LOCK_INFO: doGetTableLockInfo(msg, p); continue;
case OWNER_CHECK: doOwnerCheck(msg, p); continue;
}
/* Process OIDManager calls */
switch (cmd)
{
case ALLOC_OIDS: doAllocOIDs(msg, p); continue;
case RETURN_OIDS: doReturnOIDs(msg, p); continue;
case OIDM_SIZE: doOidmSize(msg, p); continue;
case ALLOC_VBOID: doAllocVBOID(msg, p); continue;
case GETDBROOTOFVBOID: doGetDBRootOfVBOID(msg, p); continue;
case GETVBOIDTODBROOTMAP: doGetVBOIDToDBRootMap(msg, p); continue;
}
/* Process Autoincrement calls */
switch (cmd)
{
case START_AI_SEQUENCE: doStartAISequence(msg, p); continue;
case GET_AI_RANGE: doGetAIRange(msg, p); continue;
case RESET_AI_SEQUENCE: doResetAISequence(msg, p); continue;
case GET_AI_LOCK: doGetAILock(msg, p); continue;
case RELEASE_AI_LOCK: doReleaseAILock(msg, p); continue;
case DELETE_AI_SEQUENCE: doDeleteAISequence(msg, p); continue;
}
retrycmd:
uint32_t haltloops = 0;
while (halting && ++haltloops < static_cast<uint32_t>(FIVE_MIN_TIMEOUT.tv_sec))
sleep(1);
slaveLock.lock();
if (haltloops == FIVE_MIN_TIMEOUT.tv_sec)
{
ostringstream os;
os << "A node is unresponsive for cmd = " << (uint32_t)cmd << ", no reconfigure in at least "
<< FIVE_MIN_TIMEOUT.tv_sec << " seconds. Setting read-only mode.";
log(os.str());
readOnly = true;
halting = false;
}
if (readOnly)
{
slaveLock.unlock();
sendError(p->sock, ERR_READONLY);
goto out;
}
/* TODO: Separate these out-of-band items into separate functions */
/* Need to get the dbroot, convert to vbOID */
if (cmd == BEGIN_VB_COPY)
{
try
{
boost::mutex::scoped_lock lk(oidsMutex);
uint8_t* buf = msg.buf();
// dbroot is currently after the cmd and transid
uint16_t* dbRoot = (uint16_t*)&buf[1 + 4];
// If that dbroot has no vboid, create one
int16_t err;
err = oids.getVBOIDOfDBRoot(*dbRoot);
// cout << "dbRoot " << *dbRoot << " -> vbOID " << err << endl;
if (err < 0)
{
err = oids.allocVBOID(*dbRoot);
// cout << " - allocated oid " << err << endl;
}
*dbRoot = err;
}
catch (exception& ex)
{
ostringstream os;
os << "DBRM Controller: Begin VBCopy failure. " << ex.what();
log(os.str());
sendError(p->sock, -1);
goto out;
}
}
/* Check for deadlock on beginVBCopy */
/* if (cmd == BEGIN_VB_COPY) {
ByteStream params(msg);
VER_t txn;
uint8_t tmp8;
uint32_t tmp32;
LBIDRange_v ranges;
LBIDRange_v::iterator it;
LBID_t end;
params >> tmp8; //throw away the cmd
params >> tmp32;
txn = tmp32;
deserializeVector(params, ranges);
for (it = ranges.begin(); it != ranges.end(); it++) {
end = (*it).start + (*it).size - 1;
err = rg->reserveRange((*it).start, end, txn, slaveLock);
if (err != ERR_OK) {
pthread_mutex_unlock(&slaveLock);
sendError(p->sock, err);
goto out;
}
}
}
*/
/* Release all blocks of lbids on vbRollback or vbCommit */
if (cmd == VB_ROLLBACK1 || cmd == VB_ROLLBACK2 || cmd == VB_COMMIT)
{
ByteStream params(msg);
VER_t txn;
uint8_t tmp8;
uint32_t tmp32;
params >> tmp8;
params >> tmp32;
txn = tmp32;
rg->releaseResources(txn);
}
/* XXXPAT: If beginVBCopy, vbRollback, or vbCommit fail for some reason,
the resource graph will be out of sync with the copylocks and/or vss.
There are 2 reasons they might fail:
1) logical inconsistency between the resource graph and the
copylocks & vss
2) "out of band" failures, like a network problem
*/
/* Need to atomically do the safety check and the clear. */
if (cmd == BRM_CLEAR)
{
uint32_t txnCount = sm.getTxnCount();
// do nothing if there's an active transaction
if (txnCount != 0)
{
ByteStream* reply = new ByteStream();
*reply << (uint8_t)ERR_FAILURE;
responses.push_back(reply);
goto no_confirm;
}
}
for (int retry = 0;; retry++)
{
try
{
distribute(&msg);
}
catch (...)
{
if (!halting)
{
undo();
readOnly = true;
slaveLock.unlock();
ostringstream ostr;
ostr << "DBRM Controller: Caught network error. "
"Sending command "
<< (uint32_t)cmd << ", length " << msg.length() << ". Setting read-only mode.";
log(ostr.str());
sendError(p->sock, ERR_NETWORK);
goto out;
}
}
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: distributed msg" << endl;
#endif
bool readErrFlag; // ignore this flag in this case
err = gatherResponses(cmd, msg.length(), &responses, readErrFlag);
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: got responses" << endl;
#endif
CHECK_ERROR1(err)
err = compareResponses(cmd, msg.length(), responses);
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: compared responses" << endl;
#endif
if (err != ERR_SLAVE_INCONSISTENCY)
{
break;
}
else
{
if (retry > 1)
{
readOnly = true;
ostringstream ostr;
ostr << "DBRM Controller: image inconsistency detected multi times. Setting read-only mode.";
log(ostr.str());
break;
}
}
}
#ifdef BRM_DEBUG
if ((cmd == BEGIN_VB_COPY || cmd == VB_ROLLBACK1 || cmd == VB_ROLLBACK2 || cmd == VB_COMMIT) && err == -1)
cerr << "DBRM Controller: inconsistency detected between the resource graph and the VSS or CopyLocks "
"logic."
<< endl;
#endif
// these command will have error message carried in the response
if (!responses.empty() &&
(cmd == DELETE_PARTITION || cmd == MARK_PARTITION_FOR_DELETION || cmd == RESTORE_PARTITION) && err)
{
if (err != ERR_PARTITION_DISABLED && err != ERR_PARTITION_ENABLED &&
err != ERR_INVALID_OP_LAST_PARTITION && err != ERR_NOT_EXIST_PARTITION &&
err != ERR_NO_PARTITION_PERFORMED)
undo();
// goto no_confirm;
}
else
{
CHECK_ERROR1(err)
}
// these cmds don't need the 2-phase commit
if (cmd == FLUSH_INODE_CACHES || cmd == BRM_CLEAR || cmd == TAKE_SNAPSHOT)
goto no_confirm;
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: sending confirmation" << endl;
#endif
try
{
confirm();
}
catch (...)
{
if (!halting)
{
ostringstream ostr;
ostr << "DBRM Controller: Caught network error. "
"Confirming command "
<< (uint32_t)cmd << ", length " << msg.length() << ". Setting read-only mode.";
log(ostr.str());
readOnly = true;
}
}
no_confirm:
slaveLock.unlock();
try
{
p->sock->write(*(responses.front()));
}
catch (...)
{
p->sock->close();
log("DBRM Controller: Warning: could not send the reply to a command", logging::LOG_TYPE_WARNING);
}
out:
for (it = responses.begin(); it != responses.end(); it++)
delete *it;
responses.clear();
}
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: closing connection" << endl;
#endif
THREAD_EXIT
return;
}
void MasterDBRMNode::distribute(ByteStream* msg)
{
uint32_t i;
for (i = 0, iSlave = slaves.begin(); iSlave != slaves.end() && !halting; iSlave++, i++)
try
{
(*iSlave)->write(*msg);
}
catch (exception&)
{
if (!halting)
{
ostringstream os;
os << "DBRM Controller: network error distributing command to worker " << i + 1 << endl;
log(os.str());
throw;
}
}
}
// readErrFlag is a separate return flag used by doChangeTableLockOwner()
// (or any subsequent function) which calls gatherResponses() outside the
// scope of msgProcessor() which instead uses the halting flag for error
// handling.
int MasterDBRMNode::gatherResponses(uint8_t cmd, uint32_t cmdMsgLength, vector<ByteStream*>* responses,
bool& readErrFlag) throw()
{
int i;
ByteStream* tmp = 0;
readErrFlag = false;
// Bug 2258 gather all responses
int error = 0;
for (i = 0, iSlave = slaves.begin(); iSlave != slaves.end() && !halting; iSlave++, i++)
{
tmp = new ByteStream();
try
{
// can't just block for 5 mins
timespec newtimeout = {10, 0};
uint32_t ntRetries = FIVE_MIN_TIMEOUT.tv_sec / newtimeout.tv_sec;
uint32_t retries = 0;
while (++retries < ntRetries && tmp->length() == 0 && !halting)
*tmp = (*iSlave)->read(&newtimeout);
//*tmp = (*iSlave)->read();
}
catch (...)
{
/* 2/21/12 - instead of setting readonly here, to support a peaceful failover
we will wait for a configuration change to come, then report the error
after a long timeout.
*/
ostringstream os;
os << "DBRM Controller: Network error reading from node " << i + 1 << ". Reading response to command "
<< (uint32_t)cmd << ", length " << cmdMsgLength << ". Will see if retry is possible.";
log(os.str());
halting = true;
readErrFlag = true;
delete tmp;
return ERR_OK;
/*
ostringstream os;
if (!halting) {
SEND_ALARM
readOnly = true;
os << "DBRM Controller: Network error reading from node " << i + 1 <<
". Reading response to command " << (uint32_t)cmd <<
", length " << cmdMsgLength << ". Setting read-only mode.";
log(os.str());
error++;
}
*/
}
if (tmp->length() == 0 && !halting)
{
/* See the comment above */
ostringstream os;
os << "DBRM Controller: Network error reading from node " << i + 1 << ". Reading response to command "
<< (uint32_t)cmd << ", length " << cmdMsgLength
<< ". 0 length response, possible time-out"
". Will see if retry is possible.";
log(os.str());
halting = true;
readErrFlag = true;
delete tmp;
return ERR_OK;
/*
ostringstream os;
SEND_ALARM;
readOnly = true;
os << "DBRM Controller: Network error reading from node " << i + 1<<
". Reading response to command " << (uint32_t)cmd <<
", length " << cmdMsgLength <<
". 0 length response, possible time-out"
". Setting read-only mode.";
log(os.str());
error++;
*/
}
if (error == 0)
responses->push_back(tmp);
else
delete tmp;
}
if (error > 0)
return ERR_NETWORK;
else
return ERR_OK;
}
int MasterDBRMNode::compareResponses(uint8_t cmd, uint32_t cmdMsgLength,
const vector<ByteStream*>& responses) const
{
vector<ByteStream*>::const_iterator it, it2;
uint8_t errCode;
ByteStream* first;
int i;
first = responses.front();
try
{
first->peek(errCode);
}
catch (exception&)
{
readOnly = true;
ostringstream os;
os << "DBRM Controller: Network error on node 1. "
"Verifying response to command "
<< (uint32_t)cmd << ", length " << cmdMsgLength << ". Reverting to read-only mode.";
log(os.str());
return ERR_NETWORK;
}
/*if (errCode != ERR_OK) {
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: first response has error code " << errCode << endl;
#endif
return errCode;
}*/
for (it = responses.begin(), it2 = it + 1, i = 2; it2 != responses.end(); it++, it2++, i++)
if (**it != **it2 && !halting)
{
ostringstream ostr;
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: error: response from node " << i << " is different" << endl;
#endif
// readOnly = true;
ostr << "DBRM Controller: image inconsistency detected at node " << i
<< ". Verifying response to command " << (uint32_t)cmd << ", length " << cmdMsgLength;
log(ostr.str());
return ERR_SLAVE_INCONSISTENCY;
}
// return ERR_OK;
return errCode;
}
void MasterDBRMNode::undo() throw()
{
vector<MessageQueueClient*>::iterator it, lastSlave;
ByteStream undoMsg;
int i;
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: sending undo()" << endl;
#endif
undoMsg << (uint8_t)BRM_UNDO;
if (iSlave != slaves.end())
lastSlave = iSlave + 1; //@Bug 2258
else
lastSlave = iSlave;
for (it = slaves.begin(), i = 1; it != lastSlave; it++, i++)
{
try
{
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: sending undo() to worker number " << i << endl;
#endif
(*it)->write(undoMsg);
}
catch (...)
{
ostringstream ostr;
ostr << "DBRM Controller: undo(): warning, could not contact worker number " << i << endl;
log(ostr.str());
}
}
}
void MasterDBRMNode::confirm()
{
ByteStream confirmMsg;
confirmMsg << CONFIRM;
distribute(&confirmMsg);
}
void MasterDBRMNode::finalCleanup()
{
vector<MessageQueueClient*>::iterator sIt;
int retry = 0;
cerr << "DBRM Controller: Waiting for threads to finish..." << endl;
delete rg; // unblocks any waiting transactions
rg = NULL;
// XXXPAT: assumption here: join_all() blocks until all threads are joined
// which implies the case where all threads are removed from the group.
// We're relying on that second condition for synchronization here.
// Problem: it looks as if join_all holds a mutex which prevents other calls
// on dbrmSessions, so threads can't be removed from the group.
// dbrmSessions.join_all();
// blah: busy wait for now, max 15 seconds, then assume everything's dead.
// @bug 1381: change retry from 15 to 5 more than the socket read() timeout.
while (runners > 0 && retry++ < (MSG_TIMEOUT.tv_sec + 5))
sleep(1);
#ifdef BRM_VERBOSE
cerr << "Closing connections" << endl;
#endif
for (sIt = slaves.begin(); sIt != slaves.end(); sIt++)
{
MessageQueueClientPool::releaseInstance(*sIt);
*sIt = NULL;
}
slaves.clear();
/* Close any connections that lost their thread somehow. This should "never" happen.
*/
#if 0 // if we see instances of blockage here flip this switch
int tmp;
tmp = pthread_mutex_trylock(&mutex);
if (tmp != 0) // try one more time then steal the lock TODO: why is this necessary?
{
sleep(1);
tmp = pthread_mutex_trylock(&mutex);
cerr << "DBRM Controller: warning! Trying to recover mutex!\n";
if (tmp != 0)
{
pthread_mutex_unlock(&mutex);
sleep(5); // let the other threads finish
tmp = pthread_mutex_trylock(&mutex);
if (tmp != 0)
{
cerr << "DBRM Controller: error! Bad mutex state, going down!\n";
exit(1);
}
}
}
#else
mutex.lock();
#endif
#ifdef BRM_VERBOSE
if (activeSessions.size() > 0)
cerr << "There are still live sessions\n";
#endif
for (uint32_t i = 0; i < activeSessions.size(); ++i)
{
activeSessions[i]->close();
delete activeSessions[i];
}
activeSessions.clear();
mutex.unlock();
serverLock.lock();
delete dbrmServer;
dbrmServer = NULL;
serverLock.unlock();
}
void MasterDBRMNode::sendError(IOSocket* caller, uint8_t err) const throw()
{
ByteStream msg;
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: returning " << (int)err << " to the caller" << endl;
#endif
msg << err;
try
{
caller->write(msg);
}
catch (exception&)
{
log("DBRM Controller: failed to send return code", logging::LOG_TYPE_WARNING);
#ifdef BRM_VERBOSE
cerr << "DBRM Controller: failed to send return code" << endl;
#endif
}
}
void MasterDBRMNode::doHalt(messageqcpp::IOSocket* sock)
{
ByteStream reply;
log("Halting...", LOG_TYPE_INFO);
halting = true;
lock();
log("Halted", LOG_TYPE_INFO);
reply << (uint8_t)ERR_OK;
try
{
sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doResume(messageqcpp::IOSocket* sock)
{
ByteStream reply;
unlock();
halting = false;
log("Resuming", LOG_TYPE_INFO);
reply << (uint8_t)ERR_OK;
try
{
sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doForceClearAllCpimportJobs(messageqcpp::IOSocket* sock)
{
#ifdef BMR_VERBOSE
std::cout << "doForceClearCpimprtJobs" << std::endl;
#endif
ByteStream reply;
std::unique_lock lk(cpimportMutex);
sm.clearAllCpimportJobs();
reply << (uint8_t)ERR_OK;
try
{
sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doStartReadOnly(messageqcpp::IOSocket* sock)
{
#ifdef BRM_VERBOSE
std::cout << "doStartReadonly" << std::endl;
#endif
ByteStream reply;
// Spawn a new thread and detach it, we cannot block `msgProcessor`.
std::thread readonlyAdjuster(
[this]()
{
std::unique_lock lk(cpimportMutex);
if (sm.getCpimportJobsCount() != 0)
{
waitToFinishJobs = true;
// Wait until all cpimort jobs are done.
cpimportJobsCond.wait(lk, [this]() { return sm.getCpimportJobsCount() == 0; });
setReadOnly(true);
waitToFinishJobs = false;
}
else
{
setReadOnly(true);
}
});
readonlyAdjuster.detach();
reply << (uint8_t)ERR_OK;
try
{
sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doSetReadOnly(messageqcpp::IOSocket* sock, bool b)
{
ByteStream reply;
setReadOnly(b);
reply << (uint8_t)ERR_OK;
try
{
sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doGetReadOnly(messageqcpp::IOSocket* sock)
{
ByteStream reply;
reply << (uint8_t)isReadOnly();
try
{
sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doReload(messageqcpp::IOSocket* sock)
{
/* This version relies on the caller to do a 'halt' and 'resume' around
* the 'reload' call, but it is synchronous. When reload() returns
* the new workernode connections have been established.
*/
ByteStream reply;
config::Config* config = config::Config::makeConfig();
log("Reloading", LOG_TYPE_INFO);
std::string methodName("MasterDBRMNode::doReload()");
size_t connectTimeoutSecs = 0;
try
{
getNumWorkersAndTimeout(connectTimeoutSecs, methodName, config);
}
catch (std::exception&)
{
reply << (uint8_t)ERR_FAILURE;
try
{
sock->write(reply);
}
catch (exception&)
{
}
throw;
}
for (int i = 0; i < (int)slaves.size(); i++)
{
MessageQueueClientPool::deleteInstance(slaves[i]);
slaves[i] = nullptr;
}
slaves.clear();
connectToWorkers(connectTimeoutSecs);
iSlave = slaves.end();
undo();
readOnly = false;
reply << (uint8_t)ERR_OK;
try
{
sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doVerID(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
QueryContext context;
context = sm.verID();
#ifdef BRM_VERBOSE
cerr << "doVerID returning " << ver << endl;
#endif
reply << (uint8_t)ERR_OK;
reply << context;
try
{
p->sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doGetSystemCatalog(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
reply << (uint8_t)ERR_OK;
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog();
const std::vector<
std::pair<execplan::CalpontSystemCatalog::OID, execplan::CalpontSystemCatalog::TableName> >
catalog_tables = systemCatalogPtr->getTables();
reply << (uint32_t)catalog_tables.size();
for (std::vector<std::pair<execplan::CalpontSystemCatalog::OID,
execplan::CalpontSystemCatalog::TableName> >::const_iterator it =
catalog_tables.begin();
it != catalog_tables.end(); ++it)
{
execplan::CalpontSystemCatalog::TableInfo tb_info = systemCatalogPtr->tableInfo((*it).second);
reply << (uint32_t)(*it).first;
reply << (*it).second.schema;
reply << (*it).second.table;
reply << (uint32_t)tb_info.numOfCols;
execplan::CalpontSystemCatalog::RIDList column_rid_list =
systemCatalogPtr->columnRIDs((*it).second, true);
for (size_t col_num = 0; col_num < column_rid_list.size(); col_num++)
{
execplan::CalpontSystemCatalog::TableColName tcn =
systemCatalogPtr->colName(column_rid_list[col_num].objnum);
execplan::CalpontSystemCatalog::ColType ct = systemCatalogPtr->colType(column_rid_list[col_num].objnum);
reply << (uint32_t)column_rid_list[col_num].objnum;
reply << tcn.column;
if (ct.ddn.dictOID == std::numeric_limits<int32_t>::min())
{
reply << (uint32_t)0;
}
else
{
reply << (uint32_t)ct.ddn.dictOID;
}
reply << (uint8_t)ct.colDataType;
reply << (uint32_t)ct.colWidth;
reply << (uint32_t)ct.colPosition;
reply << ct.defaultValue;
reply << (uint8_t)ct.autoincrement;
reply << (uint32_t)ct.precision;
reply << (uint32_t)ct.scale;
if (ct.constraintType != execplan::CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
reply << (uint8_t)1;
}
else
{
reply << (uint8_t)0;
}
reply << (uint8_t)ct.compressionType;
}
}
try
{
p->sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doSysCatVerID(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
QueryContext context;
context = sm.sysCatVerID();
#ifdef BRM_VERBOSE
cerr << "doSysCatVerID returning " << ver << endl;
#endif
reply << (uint8_t)ERR_OK;
reply << context;
try
{
p->sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doNewCpimportJob(ThreadParams* p)
{
#ifdef BRM_VERBOSE
std::cerr << "doNewCpimportJob" << std::endl;
#endif
ByteStream reply;
uint32_t jobId;
std::unique_lock lk(cpimportMutex);
// That means that controller node is waiting untill all active cpimport jobs are done.
// We cannot block `msgProcessor` and cannot create a new job, so exit with `readonly` code.
if (waitToFinishJobs)
{
reply << (uint8_t)ERR_READONLY;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
try
{
jobId = sm.newCpimportJob();
}
catch (exception&)
{
reply.reset();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
reply << (uint8_t)ERR_OK;
reply << (uint32_t)jobId;
try
{
p->sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doFinishCpimportJob(ByteStream& msg, ThreadParams* p)
{
#ifdef BRM_VERBOSE
std::cout << "doFinishCpimportJob" << std::endl;
#endif
ByteStream reply;
uint32_t cpimportJob;
uint8_t cmd;
std::unique_lock lk(cpimportMutex);
msg >> cmd;
msg >> cpimportJob;
try
{
sm.finishCpimortJob(cpimportJob);
}
catch (exception&)
{
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
reply << (uint8_t)ERR_OK;
try
{
p->sock->write(reply);
}
catch (...)
{
}
if (waitToFinishJobs && sm.getCpimportJobsCount() == 0)
cpimportJobsCond.notify_one();
}
void MasterDBRMNode::doNewTxnID(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
TxnID txnid;
uint32_t sessionID;
uint8_t block, cmd, isDDL;
try
{
msg >> cmd;
msg >> sessionID;
msg >> block;
msg >> isDDL;
txnid = sm.newTxnID(sessionID, (block != 0), (isDDL != 0));
reply << (uint8_t)ERR_OK;
reply << (uint32_t)txnid.id;
reply << (uint8_t)txnid.valid;
#ifdef BRM_VERBOSE
cerr << "newTxnID returning id=" << txnid.id << " valid=" << txnid.valid << endl;
#endif
}
catch (exception&)
{
reply.reset();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
try
{
p->sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doCommitted(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
TxnID txnid;
uint8_t cmd, tmp;
uint32_t tmp32;
try
{
msg >> cmd;
msg >> tmp32;
txnid.id = tmp32;
msg >> tmp;
txnid.valid = (tmp != 0 ? true : false);
#ifdef BRM_VERBOSE
cerr << "doCommitted" << endl;
#endif
sm.committed(txnid);
}
catch (exception&)
{
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
reply << (uint8_t)ERR_OK;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
void MasterDBRMNode::doRolledBack(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
TxnID txnid;
uint8_t cmd, tmp;
uint32_t tmp32;
try
{
msg >> cmd;
msg >> tmp32;
msg >> tmp;
txnid.id = tmp32;
txnid.valid = (tmp != 0 ? true : false);
#ifdef BRM_VERBOSE
cerr << "doRolledBack" << endl;
#endif
sm.rolledback(txnid);
}
catch (exception& ex)
{
ostringstream errStream;
errStream << "doRolledBack() failed: " << ex.what();
log(errStream.str());
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
reply << (uint8_t)ERR_OK;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
void MasterDBRMNode::doGetTxnID(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
SessionManagerServer::SID sid;
TxnID txnid;
uint8_t cmd;
try
{
msg >> cmd;
msg >> sid;
txnid = sm.getTxnID(sid);
#ifdef BRM_VERBOSE
cerr << "doGetTxnID returning id=" << txnid.id << " valid=" << txnid.valid << endl;
#endif
}
catch (exception&)
{
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
reply << (uint8_t)ERR_OK;
reply << (uint32_t)txnid.id;
reply << (uint8_t)txnid.valid;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
void MasterDBRMNode::doSIDTIDMap(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
int len, i;
std::shared_ptr<SIDTIDEntry[]> entries;
try
{
entries = sm.SIDTIDMap(len);
}
catch (exception&)
{
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
reply << (uint8_t)ERR_OK;
reply << (uint32_t)len;
#ifdef BRM_VERBOSE
cerr << "doSIDTIDMap returning " << len << " elements... " << endl;
#endif
for (i = 0; i < len; i++)
{
#ifdef BRM_VERBOSE
cerr << " " << i << ": txnid=" << entries[i].txnid.id << " valid=" << entries[i].txnid.valid
<< " sessionid=" << entries[i].sessionid << endl;
#endif
reply << (uint32_t)entries[i].txnid.id << (uint8_t)entries[i].txnid.valid
<< (uint32_t)entries[i].sessionid;
}
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
void MasterDBRMNode::doGetUncommittedLbids(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
vector<LBID_t> lbidList;
VSS vss;
ExtentMap em;
bool locked = false;
vector<LBID_t>::iterator lbidIt;
typedef pair<int64_t, int64_t> range_t;
range_t range;
vector<range_t> ranges;
vector<range_t>::iterator rangeIt;
ByteStream::byte cmd;
ByteStream::quadbyte transID;
msg >> cmd;
msg >> transID;
try
{
vss.lock(VSS::READ);
locked = true;
// Get a full list of uncommitted LBIDs related to this transactin.
vss.getUncommittedLBIDs(transID, lbidList);
vss.release(VSS::READ);
locked = false;
if (lbidList.size() > 0)
{
// Sort the vector.
std::sort<vector<LBID_t>::iterator>(lbidList.begin(), lbidList.end());
// Get the LBID range for the first block in the list.
lbidIt = lbidList.begin();
if (em.lookup(*lbidIt, range.first, range.second) < 0)
{
reply.reset();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
ranges.push_back(range);
// Loop through the LBIDs and add the new ranges.
++lbidIt;
while (lbidIt != lbidList.end())
{
if (*lbidIt > range.second)
{
if (em.lookup(*lbidIt, range.first, range.second) < 0)
{
reply.reset();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
ranges.push_back(range);
}
++lbidIt;
}
// Reset the lbidList and return only the first LBID in each extent that was changed
// in the transaction.
lbidList.clear();
for (rangeIt = ranges.begin(); rangeIt != ranges.end(); rangeIt++)
{
lbidList.push_back(rangeIt->first);
}
}
reply << (uint8_t)ERR_OK;
serializeInlineVector(reply, lbidList);
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
catch (exception& e)
{
if (locked)
vss.release(VSS::READ);
reply.reset();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
}
void MasterDBRMNode::doGetUniqueUint32(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
uint32_t ret;
try
{
ret = sm.getUnique32();
reply << (uint8_t)ERR_OK;
reply << ret;
#ifdef BRM_VERBOSE
cerr << "getUnique32() returning " << ret << endl;
#endif
}
catch (exception&)
{
reply.reset();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
try
{
p->sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doGetUniqueUint64(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
uint64_t ret;
try
{
ret = sm.getUnique64();
reply << (uint8_t)ERR_OK;
reply << ret;
#ifdef BRM_VERBOSE
cerr << "getUnique64() returning " << ret << endl;
#endif
}
catch (exception&)
{
reply.reset();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
try
{
p->sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doGetSystemState(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
uint32_t ss = 0;
ByteStream::byte err = ERR_FAILURE;
try
{
sm.getSystemState(ss);
err = ERR_OK;
reply << err;
reply << static_cast<ByteStream::quadbyte>(ss);
#ifdef BRM_VERBOSE
cerr << "getSystemState() returning " << static_cast<int>(err) << endl;
#endif
}
catch (exception&)
{
reply.reset();
err = ERR_FAILURE;
reply << err;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
try
{
p->sock->write(reply);
}
catch (exception&)
{
}
}
void MasterDBRMNode::doSetSystemState(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
ByteStream::byte cmd;
ByteStream::byte err = ERR_FAILURE;
uint32_t ss;
try
{
msg >> cmd;
msg >> ss;
sm.setSystemState(ss);
#ifdef BRM_VERBOSE
cerr << "doSetSystemState setting " << hex << ss << dec << endl;
#endif
}
catch (exception&)
{
err = ERR_FAILURE;
reply << err;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
err = ERR_OK;
reply << err;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
void MasterDBRMNode::doClearSystemState(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
ByteStream::byte cmd;
ByteStream::byte err = ERR_FAILURE;
uint32_t ss;
try
{
msg >> cmd;
msg >> ss;
sm.clearSystemState(ss);
}
catch (exception&)
{
err = ERR_FAILURE;
reply << err;
try
{
p->sock->write(reply);
}
catch (...)
{
}
return;
}
err = ERR_OK;
reply << err;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
void MasterDBRMNode::doSessionManagerReset(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
try
{
sm.reset();
reply << (uint8_t)ERR_OK;
}
catch (...)
{
reply << (uint8_t)ERR_FAILURE;
}
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
void MasterDBRMNode::doAllocOIDs(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
int ret;
uint32_t tmp32;
int num;
uint8_t cmd;
try
{
boost::mutex::scoped_lock lk(oidsMutex);
msg >> cmd;
msg >> tmp32;
num = (int)tmp32;
ret = oids.allocOIDs(num);
reply << (uint8_t)ERR_OK;
reply << (uint32_t)ret;
p->sock->write(reply);
}
catch (exception& ex)
{
ostringstream os;
os << "DBRM Controller: OID allocation failure. " << ex.what();
log(os.str());
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doReturnOIDs(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
uint8_t cmd;
uint32_t tmp32;
int start, end;
try
{
boost::mutex::scoped_lock lk(oidsMutex);
msg >> cmd;
msg >> tmp32;
start = (int)tmp32;
msg >> tmp32;
end = (int)tmp32;
oids.returnOIDs(start, end);
reply << (uint8_t)ERR_OK;
p->sock->write(reply);
}
catch (exception& ex)
{
ostringstream os;
os << "DBRM Controller: Return OIDs failure. " << ex.what();
log(os.str());
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doOidmSize(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
int ret;
try
{
boost::mutex::scoped_lock lk(oidsMutex);
ret = oids.size();
reply << (uint8_t)ERR_OK;
reply << (uint32_t)ret;
p->sock->write(reply);
}
catch (exception& ex)
{
ostringstream os;
os << "DBRM Controller: Failure to get OID count. " << ex.what();
log(os.str());
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doAllocVBOID(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
uint32_t dbroot;
uint32_t ret;
uint8_t cmd;
msg >> cmd;
try
{
boost::mutex::scoped_lock lk(oidsMutex);
msg >> dbroot;
ret = oids.allocVBOID(dbroot);
reply << (uint8_t)ERR_OK;
reply << ret;
p->sock->write(reply);
}
catch (exception& ex)
{
ostringstream os;
os << "DBRM Controller: VB OID allocation failure. " << ex.what();
log(os.str());
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doGetDBRootOfVBOID(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
uint32_t vbOID;
uint32_t ret;
uint8_t cmd;
msg >> cmd;
try
{
boost::mutex::scoped_lock lk(oidsMutex);
msg >> vbOID;
ret = oids.getDBRootOfVBOID(vbOID);
reply << (uint8_t)ERR_OK;
reply << ret;
p->sock->write(reply);
}
catch (exception& ex)
{
ostringstream os;
os << "DBRM Controller: Get DBRoot of VB OID failure. " << ex.what();
log(os.str());
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doGetVBOIDToDBRootMap(ByteStream& msg, ThreadParams* p)
{
ByteStream reply;
uint8_t cmd;
msg >> cmd;
try
{
boost::mutex::scoped_lock lk(oidsMutex);
const vector<uint16_t>& ret = oids.getVBOIDToDBRootMap();
reply << (uint8_t)ERR_OK;
serializeInlineVector<uint16_t>(reply, ret);
p->sock->write(reply);
}
catch (exception& ex)
{
ostringstream os;
os << "DBRM Controller: Get VB OID DBRoot map failure. " << ex.what();
log(os.str());
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doGetTableLock(ByteStream& msg, ThreadParams* p)
{
uint8_t cmd;
TableLockInfo tli;
uint64_t id;
ByteStream reply;
msg >> cmd;
try
{
msg >> tli;
idbassert(msg.length() == 0);
id = tableLockServer->lock(&tli);
reply << (uint8_t)ERR_OK;
reply << id;
if (id == 0)
{
reply << tli.ownerPID;
reply << tli.ownerName;
reply << (uint32_t)tli.ownerSessionID;
reply << (uint32_t)tli.ownerTxnID;
}
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doReleaseTableLock(ByteStream& msg, ThreadParams* p)
{
uint64_t id;
uint8_t cmd;
bool ret;
ByteStream reply;
msg >> cmd;
try
{
msg >> id;
idbassert(msg.length() == 0);
ret = tableLockServer->unlock(id);
reply << (uint8_t)ERR_OK;
reply << (uint8_t)ret;
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doChangeTableLockState(ByteStream& msg, ThreadParams* p)
{
uint64_t id;
uint32_t tmp32;
uint8_t cmd;
LockState state;
bool ret;
ByteStream reply;
msg >> cmd;
try
{
msg >> id;
msg >> tmp32;
idbassert(msg.length() == 0);
state = (LockState)tmp32;
ret = tableLockServer->changeState(id, state);
reply << (uint8_t)ERR_OK;
reply << (uint8_t)ret;
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doChangeTableLockOwner(ByteStream& msg, ThreadParams* p)
{
uint8_t cmd;
uint64_t id;
string name;
uint32_t pid;
ByteStream reply;
ByteStream workerNodeCmd;
uint32_t tmp32;
int32_t sessionID;
int32_t txnID;
bool ret;
vector<ByteStream*> responses;
int err;
// current owner vars
TableLockInfo tli;
string processName;
string::size_type namelen;
bool exists;
try
{
msg >> cmd >> id >> name >> pid >> tmp32;
sessionID = tmp32;
msg >> tmp32;
txnID = tmp32;
idbassert(msg.length() == 0);
/* get the current owner info
* send cmd to look for the owner
* if there's an existing owner, reject the request
*/
ret = tableLockServer->getLockInfo(id, &tli);
if (!ret)
{
reply << (uint8_t)ERR_OK << (uint8_t)ret;
goto write;
}
namelen = tli.ownerName.find_first_of(" ");
if (namelen == string::npos)
processName = tli.ownerName;
else
processName = tli.ownerName.substr(0, namelen);
workerNodeCmd << (uint8_t)OWNER_CHECK << processName << tli.ownerPID;
bool readErrFlag;
{
boost::mutex::scoped_lock lk(slaveLock);
distribute(&workerNodeCmd);
err = gatherResponses(OWNER_CHECK, workerNodeCmd.length(), &responses, readErrFlag);
}
if ((err != ERR_OK) || (readErrFlag))
{
reply << (uint8_t)ERR_FAILURE;
goto write;
}
exists = false;
for (uint32_t i = 0; i < responses.size(); i++)
{
/* Parse msg from worker node */
uint8_t ret;
idbassert(responses[i]->length() == 1);
*(responses[i]) >> ret;
if (ret == 1)
exists = true;
delete responses[i];
}
if (exists)
{
reply << (uint8_t)ERR_OK << (uint8_t) false;
goto write;
}
ret = tableLockServer->changeOwner(id, name, pid, sessionID, txnID);
reply << (uint8_t)ERR_OK << (uint8_t)ret;
write:
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doGetAllTableLocks(ByteStream& msg, ThreadParams* p)
{
uint8_t cmd;
vector<TableLockInfo> ret;
ByteStream reply;
try
{
msg >> cmd;
idbassert(msg.length() == 0);
ret = tableLockServer->getAllLocks();
reply << (uint8_t)ERR_OK;
serializeVector<TableLockInfo>(reply, ret);
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doReleaseAllTableLocks(ByteStream& msg, ThreadParams* p)
{
uint8_t cmd;
ByteStream reply;
try
{
msg >> cmd;
idbassert(msg.length() == 0);
tableLockServer->releaseAllLocks();
reply << (uint8_t)ERR_OK;
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doGetTableLockInfo(ByteStream& msg, ThreadParams* p)
{
uint8_t cmd;
ByteStream reply;
uint64_t id;
TableLockInfo tli;
bool ret;
try
{
msg >> cmd >> id;
idbassert(msg.length() == 0);
ret = tableLockServer->getLockInfo(id, &tli);
reply << (uint8_t)ERR_OK << (uint8_t)ret;
if (ret)
reply << tli;
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doOwnerCheck(ByteStream& msg, ThreadParams* p)
{
uint8_t cmd;
uint64_t id;
ByteStream reply;
ByteStream workerNodeCmd;
bool ret;
vector<ByteStream*> responses;
int err;
// current owner vars
TableLockInfo tli;
string processName;
string::size_type namelen;
bool exists;
try
{
msg >> cmd >> id;
idbassert(msg.length() == 0);
/* get the current owner info
* send cmd to look for the owner
* if there's an existing owner, reject the request
*/
ret = tableLockServer->getLockInfo(id, &tli);
if (!ret)
{
reply << (uint8_t)ERR_OK << (uint8_t)ret;
goto write;
}
namelen = tli.ownerName.find_first_of(" ");
if (namelen == string::npos)
processName = tli.ownerName;
else
processName = tli.ownerName.substr(0, namelen);
workerNodeCmd << (uint8_t)OWNER_CHECK << processName << tli.ownerPID;
bool readErrFlag;
{
boost::mutex::scoped_lock lk(slaveLock);
distribute(&workerNodeCmd);
err = gatherResponses(OWNER_CHECK, workerNodeCmd.length(), &responses, readErrFlag);
}
if ((err != ERR_OK) || (readErrFlag))
{
reply << (uint8_t)ERR_FAILURE;
goto write;
}
exists = false;
for (uint32_t i = 0; i < responses.size(); i++)
{
/* Parse msg from worker node */
uint8_t ret;
idbassert(responses[i]->length() == 1);
*(responses[i]) >> ret;
if (ret == 1)
exists = true;
delete responses[i];
}
reply << (uint8_t)ERR_OK << (uint8_t)exists;
write:
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doStartAISequence(ByteStream& msg, ThreadParams* p)
{
uint8_t cmd;
ByteStream reply;
uint8_t tmp8;
uint32_t oid, colWidth;
uint64_t firstNum;
execplan::CalpontSystemCatalog::ColDataType colDataType;
try
{
msg >> cmd >> oid >> firstNum >> colWidth >> tmp8;
colDataType = (execplan::CalpontSystemCatalog::ColDataType)tmp8;
idbassert(msg.length() == 0);
aiManager.startSequence(oid, firstNum, colWidth, colDataType);
reply << (uint8_t)ERR_OK;
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doGetAIRange(ByteStream& msg, ThreadParams* p)
{
uint8_t cmd;
ByteStream reply;
uint32_t oid, count;
uint64_t nextVal;
bool ret;
try
{
msg >> cmd >> oid >> count;
idbassert(msg.length() == 0);
ret = aiManager.getAIRange(oid, count, &nextVal);
reply << (uint8_t)ERR_OK << (uint8_t)ret << nextVal;
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doResetAISequence(ByteStream& msg, ThreadParams* p)
{
uint8_t cmd;
ByteStream reply;
uint32_t oid;
uint64_t val;
try
{
msg >> cmd >> oid >> val;
idbassert(msg.length() == 0);
aiManager.resetSequence(oid, val);
reply << (uint8_t)ERR_OK;
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doGetAILock(ByteStream& msg, ThreadParams* p)
{
uint8_t cmd;
ByteStream reply;
uint32_t oid;
try
{
msg >> cmd >> oid;
idbassert(msg.length() == 0);
aiManager.getLock(oid);
reply << (uint8_t)ERR_OK;
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doReleaseAILock(ByteStream& msg, ThreadParams* p)
{
uint8_t cmd;
ByteStream reply;
uint32_t oid;
try
{
msg >> cmd >> oid;
idbassert(msg.length() == 0);
aiManager.releaseLock(oid);
reply << (uint8_t)ERR_OK;
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
void MasterDBRMNode::doDeleteAISequence(ByteStream& msg, ThreadParams* p)
{
uint8_t cmd;
ByteStream reply;
uint32_t oid;
try
{
msg >> cmd >> oid;
idbassert(msg.length() == 0);
aiManager.deleteSequence(oid);
reply << (uint8_t)ERR_OK;
p->sock->write(reply);
}
catch (exception&)
{
reply.restart();
reply << (uint8_t)ERR_FAILURE;
try
{
p->sock->write(reply);
}
catch (...)
{
}
}
}
MasterDBRMNode::MsgProcessor::MsgProcessor(MasterDBRMNode* master) : m(master)
{
}
void MasterDBRMNode::MsgProcessor::operator()()
{
m->runners++;
try
{
m->msgProcessor();
}
#ifdef BRM_VERBOSE
catch (SocketClosed& e)
{
cerr << e.what() << endl;
#else
catch (SocketClosed&)
{
#endif
}
catch (exception& e)
{
log(e.what());
#ifdef BRM_VERBOSE
cerr << e.what() << endl;
#endif
}
catch (...)
{
log("caught something that's not an exception", logging::LOG_TYPE_WARNING);
cerr << "DBRM Controller: caught something that's not an exception" << endl;
}
m->runners--;
}
MasterDBRMNode::MsgProcessor::~MsgProcessor()
{
}
} // namespace BRM