mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
2958 lines
61 KiB
C++
2958 lines
61 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
* Copyright (C) 2016 MariaDB Corporation.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/*****************************************************************************
|
|
* $Id: masterdbrmnode.cpp 1899 2013-06-12 13:32:30Z dcathey $
|
|
*
|
|
****************************************************************************/
|
|
|
|
#include <unistd.h>
|
|
#include <sys/types.h>
|
|
#include <sstream>
|
|
#include <thread>
|
|
|
|
#include "sessionmanager.h"
|
|
#include "socketclosed.h"
|
|
#include "liboamcpp.h"
|
|
#include "stopwatch.h"
|
|
#include "masterdbrmnode.h"
|
|
#include "messagequeuepool.h"
|
|
// #define BRM_VERBOSE
|
|
|
|
// minor improvement to code clarity...
|
|
#define CHECK_ERROR1(x) \
|
|
if (halting) \
|
|
{ \
|
|
for (it = responses.begin(); it != responses.end(); it++) \
|
|
delete *it; \
|
|
responses.clear(); \
|
|
undo(); \
|
|
slaveLock.unlock(); \
|
|
usleep(50000); \
|
|
goto retrycmd; \
|
|
} \
|
|
if (x) \
|
|
{ \
|
|
undo(); \
|
|
slaveLock.unlock(); \
|
|
sendError(p->sock, x); \
|
|
goto out; \
|
|
} \
|
|
else \
|
|
{ \
|
|
}
|
|
|
|
#define THREAD_EXIT \
|
|
{ \
|
|
mutex.lock(); \
|
|
for (vector<IOSocket*>::iterator _it = activeSessions.begin(); _it != activeSessions.end(); ++_it) \
|
|
if (p->sock == *_it) \
|
|
{ \
|
|
activeSessions.erase(_it); \
|
|
break; \
|
|
} \
|
|
mutex.unlock(); \
|
|
p->sock->close(); \
|
|
delete p->sock; \
|
|
delete p->t; \
|
|
delete p; \
|
|
for (it = responses.begin(); it != responses.end(); it++) \
|
|
delete *it; \
|
|
}
|
|
|
|
using namespace std;
|
|
using namespace messageqcpp;
|
|
using namespace logging;
|
|
namespace BRM
|
|
{
|
|
MasterDBRMNode::MasterDBRMNode()
|
|
{
|
|
config::Config* config;
|
|
|
|
config = config::Config::makeConfig();
|
|
|
|
if (config == NULL)
|
|
throw invalid_argument("MasterDBRMNode: Configuration error.");
|
|
|
|
runners = 0;
|
|
die = false;
|
|
reloadCmd = false;
|
|
readOnly = false;
|
|
halting = false;
|
|
|
|
tableLockServer.reset(new TableLockServer(&sm));
|
|
initMsgQueues(config);
|
|
rg = new LBIDResourceGraph();
|
|
//@Bug 2325 DBRMTimeOut is default to 60 seconds
|
|
std::string retStr = config->getConfig("SystemConfig", "DBRMTimeOut");
|
|
int secondsToWait = config->fromText(retStr);
|
|
MSG_TIMEOUT.tv_nsec = 0;
|
|
|
|
if (secondsToWait > 0)
|
|
MSG_TIMEOUT.tv_sec = secondsToWait;
|
|
else
|
|
MSG_TIMEOUT.tv_sec = 20;
|
|
}
|
|
|
|
MasterDBRMNode::~MasterDBRMNode()
|
|
{
|
|
die = true;
|
|
finalCleanup();
|
|
}
|
|
|
|
MasterDBRMNode::MasterDBRMNode(const MasterDBRMNode& m)
|
|
{
|
|
throw logic_error("Don't use the MasterDBRMNode copy constructor");
|
|
}
|
|
|
|
MasterDBRMNode& MasterDBRMNode::operator=(const MasterDBRMNode& m)
|
|
{
|
|
throw logic_error("Don't use the MasterDBRMNode = operator");
|
|
}
|
|
|
|
void MasterDBRMNode::initMsgQueues(config::Config* config)
|
|
{
|
|
std::string methodName("MasterDBRMNode::initMsgQueues()");
|
|
size_t connectTimeoutSecs = 0;
|
|
getNumWorkersAndTimeout(connectTimeoutSecs, methodName, config);
|
|
|
|
serverLock.lock();
|
|
|
|
try
|
|
{
|
|
dbrmServer = new MessageQueueServer("DBRM_Controller", config);
|
|
}
|
|
catch (...)
|
|
{
|
|
serverLock.unlock();
|
|
throw;
|
|
}
|
|
|
|
serverLock.unlock();
|
|
|
|
connectToWorkers(connectTimeoutSecs);
|
|
}
|
|
|
|
void MasterDBRMNode::getNumWorkersAndTimeout(size_t& connectTimeoutSecs, const std::string& methodName,
|
|
config::Config* config)
|
|
{
|
|
string stmp;
|
|
int ltmp;
|
|
connectTimeoutSecs = 30; // default
|
|
|
|
stmp = config->getConfig("DBRM_Controller", "NumWorkers");
|
|
|
|
if (stmp.length() == 0)
|
|
throw runtime_error(methodName + ": config file error looking for <DBRM_Controller><NumWorkers>");
|
|
|
|
ltmp = static_cast<int>(config::Config::fromText(stmp));
|
|
|
|
if (ltmp < 1)
|
|
throw runtime_error(methodName + ": Bad NumWorkers value");
|
|
|
|
NumWorkers = ltmp;
|
|
stmp = config->getConfig("DBRM_Controller", "WorkerConnectionTimeout");
|
|
if (stmp.length() > 0)
|
|
{
|
|
ltmp = static_cast<int>(config::Config::fromText(stmp));
|
|
if (ltmp > 1)
|
|
connectTimeoutSecs = ltmp;
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::connectToWorkers(const size_t connectTimeoutSecs)
|
|
{
|
|
size_t timeoutMsecs = connectTimeoutSecs * 1000000;
|
|
size_t timeSpent = 0;
|
|
int workersOnline = 0;
|
|
bool initialRun = true;
|
|
while (timeoutMsecs > timeSpent && workersOnline < NumWorkers)
|
|
{
|
|
char ctmp[50];
|
|
for (int i = 0; i < NumWorkers; i++)
|
|
{
|
|
snprintf(ctmp, sizeof(ctmp), "DBRM_Worker%d", i + 1);
|
|
std::string module(ctmp);
|
|
if (static_cast<int>(slaves.size()) < NumWorkers)
|
|
{
|
|
slaves.push_back(MessageQueueClientPool::getInstance(module));
|
|
}
|
|
|
|
if (!slaves[i]->isConnected())
|
|
{
|
|
if (!slaves[i]->connect())
|
|
{
|
|
// first iteration
|
|
if (initialRun)
|
|
log("DBRM Controller: Warning: could not connect to " + module, logging::LOG_TYPE_WARNING);
|
|
}
|
|
else
|
|
{
|
|
log("DBRM Controller: Connected to " + module, logging::LOG_TYPE_DEBUG);
|
|
workersOnline++;
|
|
}
|
|
}
|
|
}
|
|
if (initialRun)
|
|
initialRun = false;
|
|
if (workersOnline < NumWorkers)
|
|
{
|
|
usleep(connectTimeoutStep);
|
|
timeSpent += connectTimeoutStep;
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::stop()
|
|
{
|
|
die = true;
|
|
}
|
|
|
|
void MasterDBRMNode::lock()
|
|
{
|
|
slaveLock.lock();
|
|
}
|
|
|
|
void MasterDBRMNode::unlock()
|
|
{
|
|
slaveLock.unlock();
|
|
}
|
|
|
|
void MasterDBRMNode::reload()
|
|
{
|
|
config::Config* config;
|
|
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: reloading the config file." << endl;
|
|
#endif
|
|
|
|
reloadCmd = false;
|
|
config = config::Config::makeConfig();
|
|
|
|
if (config == NULL)
|
|
throw runtime_error("DBRM Controller: Configuration error. Reload aborted.");
|
|
|
|
die = true;
|
|
finalCleanup();
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: reinitializing..." << endl;
|
|
#endif
|
|
|
|
try
|
|
{
|
|
initMsgQueues(config);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
throw runtime_error("DBRM Controller: Configuration error. Reload aborted.");
|
|
// masterdbrm.run() will exit after this
|
|
}
|
|
|
|
die = false;
|
|
|
|
rg = new LBIDResourceGraph();
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: reload successful." << endl;
|
|
#endif
|
|
readOnly = false;
|
|
}
|
|
|
|
void MasterDBRMNode::setReadOnly(bool ro)
|
|
{
|
|
slaveLock.lock();
|
|
readOnly = ro;
|
|
slaveLock.unlock();
|
|
}
|
|
|
|
void MasterDBRMNode::run()
|
|
{
|
|
ByteStream msg;
|
|
IOSocket* s;
|
|
boost::thread* reader;
|
|
|
|
while (!die)
|
|
{
|
|
s = new IOSocket();
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller waiting..." << endl;
|
|
#endif
|
|
serverLock.lock();
|
|
|
|
if (dbrmServer != NULL)
|
|
try
|
|
{
|
|
*s = dbrmServer->accept(&MSG_TIMEOUT);
|
|
}
|
|
catch (runtime_error& e)
|
|
{
|
|
cerr << e.what() << " continuing...\n";
|
|
serverLock.unlock();
|
|
delete s;
|
|
continue;
|
|
}
|
|
|
|
serverLock.unlock();
|
|
|
|
if (reloadCmd)
|
|
{
|
|
if (s->isOpen())
|
|
s->close();
|
|
|
|
delete s;
|
|
reload();
|
|
continue;
|
|
}
|
|
|
|
if (die || !s->isOpen())
|
|
{
|
|
if (s->isOpen())
|
|
s->close();
|
|
|
|
delete s;
|
|
continue;
|
|
}
|
|
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: got a connection..." << endl;
|
|
#endif
|
|
mutex.lock();
|
|
activeSessions.push_back(s);
|
|
params = new ThreadParams();
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: starting another thread" << endl;
|
|
#endif
|
|
mutex2.lock();
|
|
|
|
try
|
|
{
|
|
reader = new boost::thread(MsgProcessor(this));
|
|
}
|
|
catch (boost::thread_resource_error&)
|
|
{
|
|
log("DBRM Controller: WARNING!! Got thread resource error! Increase system stack size or decrease "
|
|
"the # of active sessions.");
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: WARNING!! Got thread resource error! Increase system stack size or "
|
|
"decrease the # of active sessions.\n";
|
|
#endif
|
|
activeSessions.pop_back();
|
|
sendError(s, ERR_NETWORK);
|
|
sleep(1); // don't close right away to avoid broken pipe on the client
|
|
s->close();
|
|
delete s;
|
|
delete params;
|
|
mutex2.unlock();
|
|
mutex.unlock();
|
|
continue;
|
|
}
|
|
|
|
params->t = reader;
|
|
params->sock = s;
|
|
mutex2.unlock();
|
|
#ifdef __FreeBSD__
|
|
mutex.unlock();
|
|
#endif
|
|
}
|
|
|
|
serverLock.lock();
|
|
delete dbrmServer;
|
|
dbrmServer = NULL;
|
|
serverLock.unlock();
|
|
}
|
|
|
|
void MasterDBRMNode::msgProcessor()
|
|
{
|
|
struct ThreadParams* p;
|
|
ByteStream msg;
|
|
vector<ByteStream*> responses;
|
|
vector<ByteStream*>::iterator it;
|
|
int err;
|
|
uint8_t cmd;
|
|
StopWatch timer;
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: msgProcessor()" << endl;
|
|
#endif
|
|
|
|
mutex2.lock();
|
|
p = params;
|
|
mutex2.unlock();
|
|
#ifndef __FreeBSD__
|
|
mutex.unlock();
|
|
#endif
|
|
|
|
while (!die)
|
|
{
|
|
try
|
|
{
|
|
msg = p->sock->read(&MSG_TIMEOUT);
|
|
}
|
|
catch (...)
|
|
{
|
|
THREAD_EXIT
|
|
throw;
|
|
}
|
|
|
|
if (die) // || msg.length() == 0)
|
|
break;
|
|
|
|
else if (msg.length() == 0)
|
|
continue;
|
|
|
|
/* Check for an command for the master */
|
|
msg.peek(cmd);
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: recv'd message " << (int)cmd << " length " << msg.length() << endl;
|
|
#endif
|
|
|
|
switch (cmd)
|
|
{
|
|
case HALT: doHalt(p->sock); continue;
|
|
|
|
case RESUME: doResume(p->sock); continue;
|
|
|
|
case RELOAD:
|
|
try
|
|
{
|
|
doReload(p->sock);
|
|
}
|
|
catch (exception& e)
|
|
{
|
|
cerr << e.what() << endl;
|
|
}
|
|
|
|
continue;
|
|
|
|
case SETREADONLY: doSetReadOnly(p->sock, true); continue;
|
|
|
|
case SETREADWRITE: doSetReadOnly(p->sock, false); continue;
|
|
|
|
case GETREADONLY: doGetReadOnly(p->sock); continue;
|
|
|
|
case GET_SYSTEM_CATALOG: doGetSystemCatalog(msg, p); continue;
|
|
}
|
|
|
|
/* Process SessionManager calls */
|
|
switch (cmd)
|
|
{
|
|
case VER_ID: doVerID(msg, p); continue;
|
|
|
|
case SYSCAT_VER_ID: doSysCatVerID(msg, p); continue;
|
|
|
|
case NEW_TXN_ID: doNewTxnID(msg, p); continue;
|
|
|
|
case COMMITTED: doCommitted(msg, p); continue;
|
|
|
|
case ROLLED_BACK: doRolledBack(msg, p); continue;
|
|
|
|
case GET_TXN_ID: doGetTxnID(msg, p); continue;
|
|
|
|
case SID_TID_MAP: doSIDTIDMap(msg, p); continue;
|
|
|
|
case GET_UNIQUE_UINT32: doGetUniqueUint32(msg, p); continue;
|
|
|
|
case GET_UNIQUE_UINT64: doGetUniqueUint64(msg, p); continue;
|
|
|
|
case GET_SYSTEM_STATE: doGetSystemState(msg, p); continue;
|
|
|
|
case SET_SYSTEM_STATE: doSetSystemState(msg, p); continue;
|
|
|
|
case CLEAR_SYSTEM_STATE: doClearSystemState(msg, p); continue;
|
|
|
|
case SM_RESET: doSessionManagerReset(msg, p); continue;
|
|
|
|
case GET_UNCOMMITTED_LBIDS: doGetUncommittedLbids(msg, p); continue;
|
|
}
|
|
|
|
switch (cmd)
|
|
{
|
|
case NEW_CPIMPORT_JOB: doNewCpimportJob(p); continue;
|
|
|
|
case FINISH_CPIMPORT_JOB: doFinishCpimportJob(msg, p); continue;
|
|
|
|
case START_READONLY: doStartReadOnly(p->sock); continue;
|
|
|
|
case FORCE_CLEAR_CPIMPORT_JOBS: doForceClearAllCpimportJobs(p->sock); continue;
|
|
}
|
|
|
|
/* Process TableLock calls */
|
|
switch (cmd)
|
|
{
|
|
case GET_TABLE_LOCK: doGetTableLock(msg, p); continue;
|
|
|
|
case RELEASE_TABLE_LOCK: doReleaseTableLock(msg, p); continue;
|
|
|
|
case CHANGE_TABLE_LOCK_STATE: doChangeTableLockState(msg, p); continue;
|
|
|
|
case CHANGE_TABLE_LOCK_OWNER: doChangeTableLockOwner(msg, p); continue;
|
|
|
|
case GET_ALL_TABLE_LOCKS: doGetAllTableLocks(msg, p); continue;
|
|
|
|
case RELEASE_ALL_TABLE_LOCKS: doReleaseAllTableLocks(msg, p); continue;
|
|
|
|
case GET_TABLE_LOCK_INFO: doGetTableLockInfo(msg, p); continue;
|
|
|
|
case OWNER_CHECK: doOwnerCheck(msg, p); continue;
|
|
}
|
|
|
|
/* Process OIDManager calls */
|
|
switch (cmd)
|
|
{
|
|
case ALLOC_OIDS: doAllocOIDs(msg, p); continue;
|
|
|
|
case RETURN_OIDS: doReturnOIDs(msg, p); continue;
|
|
|
|
case OIDM_SIZE: doOidmSize(msg, p); continue;
|
|
|
|
case ALLOC_VBOID: doAllocVBOID(msg, p); continue;
|
|
|
|
case GETDBROOTOFVBOID: doGetDBRootOfVBOID(msg, p); continue;
|
|
|
|
case GETVBOIDTODBROOTMAP: doGetVBOIDToDBRootMap(msg, p); continue;
|
|
}
|
|
|
|
/* Process Autoincrement calls */
|
|
switch (cmd)
|
|
{
|
|
case START_AI_SEQUENCE: doStartAISequence(msg, p); continue;
|
|
|
|
case GET_AI_RANGE: doGetAIRange(msg, p); continue;
|
|
|
|
case RESET_AI_SEQUENCE: doResetAISequence(msg, p); continue;
|
|
|
|
case GET_AI_LOCK: doGetAILock(msg, p); continue;
|
|
|
|
case RELEASE_AI_LOCK: doReleaseAILock(msg, p); continue;
|
|
|
|
case DELETE_AI_SEQUENCE: doDeleteAISequence(msg, p); continue;
|
|
}
|
|
|
|
retrycmd:
|
|
uint32_t haltloops = 0;
|
|
|
|
while (halting && ++haltloops < static_cast<uint32_t>(FIVE_MIN_TIMEOUT.tv_sec))
|
|
sleep(1);
|
|
|
|
slaveLock.lock();
|
|
|
|
if (haltloops == FIVE_MIN_TIMEOUT.tv_sec)
|
|
{
|
|
ostringstream os;
|
|
os << "A node is unresponsive for cmd = " << (uint32_t)cmd << ", no reconfigure in at least "
|
|
<< FIVE_MIN_TIMEOUT.tv_sec << " seconds. Setting read-only mode.";
|
|
log(os.str());
|
|
readOnly = true;
|
|
halting = false;
|
|
}
|
|
|
|
if (readOnly)
|
|
{
|
|
slaveLock.unlock();
|
|
sendError(p->sock, ERR_READONLY);
|
|
goto out;
|
|
}
|
|
|
|
/* TODO: Separate these out-of-band items into separate functions */
|
|
|
|
/* Need to get the dbroot, convert to vbOID */
|
|
if (cmd == BEGIN_VB_COPY)
|
|
{
|
|
try
|
|
{
|
|
boost::mutex::scoped_lock lk(oidsMutex);
|
|
uint8_t* buf = msg.buf();
|
|
|
|
// dbroot is currently after the cmd and transid
|
|
uint16_t* dbRoot = (uint16_t*)&buf[1 + 4];
|
|
|
|
// If that dbroot has no vboid, create one
|
|
int16_t err;
|
|
err = oids.getVBOIDOfDBRoot(*dbRoot);
|
|
|
|
// cout << "dbRoot " << *dbRoot << " -> vbOID " << err << endl;
|
|
if (err < 0)
|
|
{
|
|
err = oids.allocVBOID(*dbRoot);
|
|
// cout << " - allocated oid " << err << endl;
|
|
}
|
|
|
|
*dbRoot = err;
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
ostringstream os;
|
|
os << "DBRM Controller: Begin VBCopy failure. " << ex.what();
|
|
log(os.str());
|
|
sendError(p->sock, -1);
|
|
goto out;
|
|
}
|
|
}
|
|
|
|
/* Check for deadlock on beginVBCopy */
|
|
/* if (cmd == BEGIN_VB_COPY) {
|
|
ByteStream params(msg);
|
|
VER_t txn;
|
|
uint8_t tmp8;
|
|
uint32_t tmp32;
|
|
LBIDRange_v ranges;
|
|
LBIDRange_v::iterator it;
|
|
LBID_t end;
|
|
|
|
params >> tmp8; //throw away the cmd
|
|
params >> tmp32;
|
|
txn = tmp32;
|
|
deserializeVector(params, ranges);
|
|
for (it = ranges.begin(); it != ranges.end(); it++) {
|
|
end = (*it).start + (*it).size - 1;
|
|
err = rg->reserveRange((*it).start, end, txn, slaveLock);
|
|
if (err != ERR_OK) {
|
|
pthread_mutex_unlock(&slaveLock);
|
|
sendError(p->sock, err);
|
|
goto out;
|
|
}
|
|
}
|
|
}
|
|
*/
|
|
/* Release all blocks of lbids on vbRollback or vbCommit */
|
|
if (cmd == VB_ROLLBACK1 || cmd == VB_ROLLBACK2 || cmd == VB_COMMIT)
|
|
{
|
|
ByteStream params(msg);
|
|
VER_t txn;
|
|
uint8_t tmp8;
|
|
uint32_t tmp32;
|
|
|
|
params >> tmp8;
|
|
params >> tmp32;
|
|
txn = tmp32;
|
|
rg->releaseResources(txn);
|
|
}
|
|
|
|
/* XXXPAT: If beginVBCopy, vbRollback, or vbCommit fail for some reason,
|
|
the resource graph will be out of sync with the copylocks and/or vss.
|
|
There are 2 reasons they might fail:
|
|
1) logical inconsistency between the resource graph and the
|
|
copylocks & vss
|
|
2) "out of band" failures, like a network problem
|
|
*/
|
|
|
|
/* Need to atomically do the safety check and the clear. */
|
|
if (cmd == BRM_CLEAR)
|
|
{
|
|
uint32_t txnCount = sm.getTxnCount();
|
|
|
|
// do nothing if there's an active transaction
|
|
if (txnCount != 0)
|
|
{
|
|
ByteStream* reply = new ByteStream();
|
|
*reply << (uint8_t)ERR_FAILURE;
|
|
responses.push_back(reply);
|
|
goto no_confirm;
|
|
}
|
|
}
|
|
|
|
for (int retry = 0;; retry++)
|
|
{
|
|
try
|
|
{
|
|
distribute(&msg);
|
|
}
|
|
catch (...)
|
|
{
|
|
if (!halting)
|
|
{
|
|
undo();
|
|
readOnly = true;
|
|
slaveLock.unlock();
|
|
ostringstream ostr;
|
|
ostr << "DBRM Controller: Caught network error. "
|
|
"Sending command "
|
|
<< (uint32_t)cmd << ", length " << msg.length() << ". Setting read-only mode.";
|
|
log(ostr.str());
|
|
sendError(p->sock, ERR_NETWORK);
|
|
goto out;
|
|
}
|
|
}
|
|
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: distributed msg" << endl;
|
|
#endif
|
|
|
|
bool readErrFlag; // ignore this flag in this case
|
|
err = gatherResponses(cmd, msg.length(), &responses, readErrFlag);
|
|
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: got responses" << endl;
|
|
#endif
|
|
|
|
CHECK_ERROR1(err)
|
|
err = compareResponses(cmd, msg.length(), responses);
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: compared responses" << endl;
|
|
#endif
|
|
|
|
if (err != ERR_SLAVE_INCONSISTENCY)
|
|
{
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
if (retry > 1)
|
|
{
|
|
readOnly = true;
|
|
ostringstream ostr;
|
|
ostr << "DBRM Controller: image inconsistency detected multi times. Setting read-only mode.";
|
|
log(ostr.str());
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
#ifdef BRM_DEBUG
|
|
|
|
if ((cmd == BEGIN_VB_COPY || cmd == VB_ROLLBACK1 || cmd == VB_ROLLBACK2 || cmd == VB_COMMIT) && err == -1)
|
|
cerr << "DBRM Controller: inconsistency detected between the resource graph and the VSS or CopyLocks "
|
|
"logic."
|
|
<< endl;
|
|
|
|
#endif
|
|
|
|
// these command will have error message carried in the response
|
|
if (!responses.empty() &&
|
|
(cmd == DELETE_PARTITION || cmd == MARK_PARTITION_FOR_DELETION || cmd == RESTORE_PARTITION) && err)
|
|
{
|
|
if (err != ERR_PARTITION_DISABLED && err != ERR_PARTITION_ENABLED &&
|
|
err != ERR_INVALID_OP_LAST_PARTITION && err != ERR_NOT_EXIST_PARTITION &&
|
|
err != ERR_NO_PARTITION_PERFORMED)
|
|
undo();
|
|
|
|
// goto no_confirm;
|
|
}
|
|
else
|
|
{
|
|
CHECK_ERROR1(err)
|
|
}
|
|
|
|
// these cmds don't need the 2-phase commit
|
|
if (cmd == FLUSH_INODE_CACHES || cmd == BRM_CLEAR || cmd == TAKE_SNAPSHOT)
|
|
goto no_confirm;
|
|
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: sending confirmation" << endl;
|
|
#endif
|
|
|
|
try
|
|
{
|
|
confirm();
|
|
}
|
|
catch (...)
|
|
{
|
|
if (!halting)
|
|
{
|
|
ostringstream ostr;
|
|
ostr << "DBRM Controller: Caught network error. "
|
|
"Confirming command "
|
|
<< (uint32_t)cmd << ", length " << msg.length() << ". Setting read-only mode.";
|
|
log(ostr.str());
|
|
readOnly = true;
|
|
}
|
|
}
|
|
|
|
no_confirm:
|
|
slaveLock.unlock();
|
|
|
|
try
|
|
{
|
|
p->sock->write(*(responses.front()));
|
|
}
|
|
catch (...)
|
|
{
|
|
p->sock->close();
|
|
log("DBRM Controller: Warning: could not send the reply to a command", logging::LOG_TYPE_WARNING);
|
|
}
|
|
|
|
out:
|
|
|
|
for (it = responses.begin(); it != responses.end(); it++)
|
|
delete *it;
|
|
|
|
responses.clear();
|
|
}
|
|
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: closing connection" << endl;
|
|
#endif
|
|
THREAD_EXIT
|
|
return;
|
|
}
|
|
|
|
void MasterDBRMNode::distribute(ByteStream* msg)
|
|
{
|
|
uint32_t i;
|
|
|
|
for (i = 0, iSlave = slaves.begin(); iSlave != slaves.end() && !halting; iSlave++, i++)
|
|
try
|
|
{
|
|
(*iSlave)->write(*msg);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
if (!halting)
|
|
{
|
|
ostringstream os;
|
|
os << "DBRM Controller: network error distributing command to worker " << i + 1 << endl;
|
|
log(os.str());
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
|
|
// readErrFlag is a separate return flag used by doChangeTableLockOwner()
|
|
// (or any subsequent function) which calls gatherResponses() outside the
|
|
// scope of msgProcessor() which instead uses the halting flag for error
|
|
// handling.
|
|
int MasterDBRMNode::gatherResponses(uint8_t cmd, uint32_t cmdMsgLength, vector<ByteStream*>* responses,
|
|
bool& readErrFlag) throw()
|
|
{
|
|
int i;
|
|
ByteStream* tmp = 0;
|
|
readErrFlag = false;
|
|
|
|
// Bug 2258 gather all responses
|
|
int error = 0;
|
|
|
|
for (i = 0, iSlave = slaves.begin(); iSlave != slaves.end() && !halting; iSlave++, i++)
|
|
{
|
|
tmp = new ByteStream();
|
|
|
|
try
|
|
{
|
|
// can't just block for 5 mins
|
|
timespec newtimeout = {10, 0};
|
|
uint32_t ntRetries = FIVE_MIN_TIMEOUT.tv_sec / newtimeout.tv_sec;
|
|
uint32_t retries = 0;
|
|
|
|
while (++retries < ntRetries && tmp->length() == 0 && !halting)
|
|
*tmp = (*iSlave)->read(&newtimeout);
|
|
|
|
//*tmp = (*iSlave)->read();
|
|
}
|
|
catch (...)
|
|
{
|
|
/* 2/21/12 - instead of setting readonly here, to support a peaceful failover
|
|
we will wait for a configuration change to come, then report the error
|
|
after a long timeout.
|
|
*/
|
|
|
|
ostringstream os;
|
|
os << "DBRM Controller: Network error reading from node " << i + 1 << ". Reading response to command "
|
|
<< (uint32_t)cmd << ", length " << cmdMsgLength << ". Will see if retry is possible.";
|
|
log(os.str());
|
|
|
|
halting = true;
|
|
readErrFlag = true;
|
|
delete tmp;
|
|
return ERR_OK;
|
|
|
|
/*
|
|
ostringstream os;
|
|
if (!halting) {
|
|
SEND_ALARM
|
|
readOnly = true;
|
|
os << "DBRM Controller: Network error reading from node " << i + 1 <<
|
|
". Reading response to command " << (uint32_t)cmd <<
|
|
", length " << cmdMsgLength << ". Setting read-only mode.";
|
|
log(os.str());
|
|
error++;
|
|
}
|
|
*/
|
|
}
|
|
|
|
if (tmp->length() == 0 && !halting)
|
|
{
|
|
/* See the comment above */
|
|
ostringstream os;
|
|
os << "DBRM Controller: Network error reading from node " << i + 1 << ". Reading response to command "
|
|
<< (uint32_t)cmd << ", length " << cmdMsgLength
|
|
<< ". 0 length response, possible time-out"
|
|
". Will see if retry is possible.";
|
|
log(os.str());
|
|
halting = true;
|
|
readErrFlag = true;
|
|
delete tmp;
|
|
return ERR_OK;
|
|
|
|
/*
|
|
ostringstream os;
|
|
|
|
SEND_ALARM;
|
|
|
|
readOnly = true;
|
|
os << "DBRM Controller: Network error reading from node " << i + 1<<
|
|
". Reading response to command " << (uint32_t)cmd <<
|
|
", length " << cmdMsgLength <<
|
|
". 0 length response, possible time-out"
|
|
". Setting read-only mode.";
|
|
log(os.str());
|
|
error++;
|
|
*/
|
|
}
|
|
|
|
if (error == 0)
|
|
responses->push_back(tmp);
|
|
else
|
|
delete tmp;
|
|
}
|
|
|
|
if (error > 0)
|
|
return ERR_NETWORK;
|
|
else
|
|
return ERR_OK;
|
|
}
|
|
|
|
int MasterDBRMNode::compareResponses(uint8_t cmd, uint32_t cmdMsgLength,
|
|
const vector<ByteStream*>& responses) const
|
|
{
|
|
vector<ByteStream*>::const_iterator it, it2;
|
|
uint8_t errCode;
|
|
ByteStream* first;
|
|
int i;
|
|
|
|
first = responses.front();
|
|
|
|
try
|
|
{
|
|
first->peek(errCode);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
readOnly = true;
|
|
ostringstream os;
|
|
os << "DBRM Controller: Network error on node 1. "
|
|
"Verifying response to command "
|
|
<< (uint32_t)cmd << ", length " << cmdMsgLength << ". Reverting to read-only mode.";
|
|
log(os.str());
|
|
return ERR_NETWORK;
|
|
}
|
|
|
|
/*if (errCode != ERR_OK) {
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: first response has error code " << errCode << endl;
|
|
#endif
|
|
return errCode;
|
|
}*/
|
|
|
|
for (it = responses.begin(), it2 = it + 1, i = 2; it2 != responses.end(); it++, it2++, i++)
|
|
if (**it != **it2 && !halting)
|
|
{
|
|
ostringstream ostr;
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: error: response from node " << i << " is different" << endl;
|
|
#endif
|
|
// readOnly = true;
|
|
ostr << "DBRM Controller: image inconsistency detected at node " << i
|
|
<< ". Verifying response to command " << (uint32_t)cmd << ", length " << cmdMsgLength;
|
|
log(ostr.str());
|
|
return ERR_SLAVE_INCONSISTENCY;
|
|
}
|
|
|
|
// return ERR_OK;
|
|
return errCode;
|
|
}
|
|
|
|
void MasterDBRMNode::undo() throw()
|
|
{
|
|
vector<MessageQueueClient*>::iterator it, lastSlave;
|
|
ByteStream undoMsg;
|
|
int i;
|
|
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: sending undo()" << endl;
|
|
#endif
|
|
|
|
undoMsg << (uint8_t)BRM_UNDO;
|
|
|
|
if (iSlave != slaves.end())
|
|
lastSlave = iSlave + 1; //@Bug 2258
|
|
else
|
|
lastSlave = iSlave;
|
|
|
|
for (it = slaves.begin(), i = 1; it != lastSlave; it++, i++)
|
|
{
|
|
try
|
|
{
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: sending undo() to worker number " << i << endl;
|
|
#endif
|
|
(*it)->write(undoMsg);
|
|
}
|
|
catch (...)
|
|
{
|
|
ostringstream ostr;
|
|
|
|
ostr << "DBRM Controller: undo(): warning, could not contact worker number " << i << endl;
|
|
log(ostr.str());
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::confirm()
|
|
{
|
|
ByteStream confirmMsg;
|
|
|
|
confirmMsg << CONFIRM;
|
|
distribute(&confirmMsg);
|
|
}
|
|
|
|
void MasterDBRMNode::finalCleanup()
|
|
{
|
|
vector<MessageQueueClient*>::iterator sIt;
|
|
int retry = 0;
|
|
|
|
cerr << "DBRM Controller: Waiting for threads to finish..." << endl;
|
|
|
|
delete rg; // unblocks any waiting transactions
|
|
rg = NULL;
|
|
|
|
// XXXPAT: assumption here: join_all() blocks until all threads are joined
|
|
// which implies the case where all threads are removed from the group.
|
|
// We're relying on that second condition for synchronization here.
|
|
// Problem: it looks as if join_all holds a mutex which prevents other calls
|
|
// on dbrmSessions, so threads can't be removed from the group.
|
|
// dbrmSessions.join_all();
|
|
// blah: busy wait for now, max 15 seconds, then assume everything's dead.
|
|
// @bug 1381: change retry from 15 to 5 more than the socket read() timeout.
|
|
while (runners > 0 && retry++ < (MSG_TIMEOUT.tv_sec + 5))
|
|
sleep(1);
|
|
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "Closing connections" << endl;
|
|
#endif
|
|
|
|
for (sIt = slaves.begin(); sIt != slaves.end(); sIt++)
|
|
{
|
|
MessageQueueClientPool::releaseInstance(*sIt);
|
|
*sIt = NULL;
|
|
}
|
|
|
|
slaves.clear();
|
|
|
|
/* Close any connections that lost their thread somehow. This should "never" happen.
|
|
*/
|
|
|
|
#if 0 // if we see instances of blockage here flip this switch
|
|
int tmp;
|
|
tmp = pthread_mutex_trylock(&mutex);
|
|
|
|
if (tmp != 0) // try one more time then steal the lock TODO: why is this necessary?
|
|
{
|
|
sleep(1);
|
|
tmp = pthread_mutex_trylock(&mutex);
|
|
cerr << "DBRM Controller: warning! Trying to recover mutex!\n";
|
|
|
|
if (tmp != 0)
|
|
{
|
|
pthread_mutex_unlock(&mutex);
|
|
sleep(5); // let the other threads finish
|
|
tmp = pthread_mutex_trylock(&mutex);
|
|
|
|
if (tmp != 0)
|
|
{
|
|
cerr << "DBRM Controller: error! Bad mutex state, going down!\n";
|
|
exit(1);
|
|
}
|
|
}
|
|
}
|
|
|
|
#else
|
|
mutex.lock();
|
|
#endif
|
|
|
|
#ifdef BRM_VERBOSE
|
|
|
|
if (activeSessions.size() > 0)
|
|
cerr << "There are still live sessions\n";
|
|
|
|
#endif
|
|
|
|
for (uint32_t i = 0; i < activeSessions.size(); ++i)
|
|
{
|
|
activeSessions[i]->close();
|
|
delete activeSessions[i];
|
|
}
|
|
|
|
activeSessions.clear();
|
|
mutex.unlock();
|
|
|
|
serverLock.lock();
|
|
delete dbrmServer;
|
|
dbrmServer = NULL;
|
|
serverLock.unlock();
|
|
}
|
|
|
|
void MasterDBRMNode::sendError(IOSocket* caller, uint8_t err) const throw()
|
|
{
|
|
ByteStream msg;
|
|
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: returning " << (int)err << " to the caller" << endl;
|
|
#endif
|
|
msg << err;
|
|
|
|
try
|
|
{
|
|
caller->write(msg);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
log("DBRM Controller: failed to send return code", logging::LOG_TYPE_WARNING);
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "DBRM Controller: failed to send return code" << endl;
|
|
#endif
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doHalt(messageqcpp::IOSocket* sock)
|
|
{
|
|
ByteStream reply;
|
|
|
|
log("Halting...", LOG_TYPE_INFO);
|
|
halting = true;
|
|
lock();
|
|
log("Halted", LOG_TYPE_INFO);
|
|
reply << (uint8_t)ERR_OK;
|
|
|
|
try
|
|
{
|
|
sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doResume(messageqcpp::IOSocket* sock)
|
|
{
|
|
ByteStream reply;
|
|
|
|
unlock();
|
|
halting = false;
|
|
log("Resuming", LOG_TYPE_INFO);
|
|
reply << (uint8_t)ERR_OK;
|
|
|
|
try
|
|
{
|
|
sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
void MasterDBRMNode::doForceClearAllCpimportJobs(messageqcpp::IOSocket* sock)
|
|
{
|
|
#ifdef BMR_VERBOSE
|
|
std::cout << "doForceClearCpimprtJobs" << std::endl;
|
|
#endif
|
|
ByteStream reply;
|
|
std::unique_lock lk(cpimportMutex);
|
|
sm.clearAllCpimportJobs();
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
try
|
|
{
|
|
sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doStartReadOnly(messageqcpp::IOSocket* sock)
|
|
{
|
|
#ifdef BRM_VERBOSE
|
|
std::cout << "doStartReadonly" << std::endl;
|
|
#endif
|
|
ByteStream reply;
|
|
|
|
// Spawn a new thread and detach it, we cannot block `msgProcessor`.
|
|
std::thread readonlyAdjuster(
|
|
[this]()
|
|
{
|
|
std::unique_lock lk(cpimportMutex);
|
|
if (sm.getCpimportJobsCount() != 0)
|
|
{
|
|
waitToFinishJobs = true;
|
|
// Wait until all cpimort jobs are done.
|
|
cpimportJobsCond.wait(lk, [this]() { return sm.getCpimportJobsCount() == 0; });
|
|
setReadOnly(true);
|
|
waitToFinishJobs = false;
|
|
}
|
|
else
|
|
{
|
|
setReadOnly(true);
|
|
}
|
|
});
|
|
|
|
readonlyAdjuster.detach();
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
try
|
|
{
|
|
sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doSetReadOnly(messageqcpp::IOSocket* sock, bool b)
|
|
{
|
|
ByteStream reply;
|
|
setReadOnly(b);
|
|
reply << (uint8_t)ERR_OK;
|
|
|
|
try
|
|
{
|
|
sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetReadOnly(messageqcpp::IOSocket* sock)
|
|
{
|
|
ByteStream reply;
|
|
|
|
reply << (uint8_t)isReadOnly();
|
|
|
|
try
|
|
{
|
|
sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doReload(messageqcpp::IOSocket* sock)
|
|
{
|
|
/* This version relies on the caller to do a 'halt' and 'resume' around
|
|
* the 'reload' call, but it is synchronous. When reload() returns
|
|
* the new workernode connections have been established.
|
|
*/
|
|
|
|
ByteStream reply;
|
|
config::Config* config = config::Config::makeConfig();
|
|
|
|
log("Reloading", LOG_TYPE_INFO);
|
|
|
|
std::string methodName("MasterDBRMNode::doReload()");
|
|
size_t connectTimeoutSecs = 0;
|
|
try
|
|
{
|
|
getNumWorkersAndTimeout(connectTimeoutSecs, methodName, config);
|
|
}
|
|
catch (std::exception&)
|
|
{
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
|
|
throw;
|
|
}
|
|
|
|
for (int i = 0; i < (int)slaves.size(); i++)
|
|
{
|
|
MessageQueueClientPool::deleteInstance(slaves[i]);
|
|
slaves[i] = nullptr;
|
|
}
|
|
|
|
slaves.clear();
|
|
|
|
connectToWorkers(connectTimeoutSecs);
|
|
|
|
iSlave = slaves.end();
|
|
undo();
|
|
|
|
readOnly = false;
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
|
|
try
|
|
{
|
|
sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doVerID(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
QueryContext context;
|
|
|
|
context = sm.verID();
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "doVerID returning " << ver << endl;
|
|
#endif
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << context;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetSystemCatalog(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
|
|
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
|
|
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog();
|
|
const std::vector<
|
|
std::pair<execplan::CalpontSystemCatalog::OID, execplan::CalpontSystemCatalog::TableName> >
|
|
catalog_tables = systemCatalogPtr->getTables();
|
|
|
|
reply << (uint32_t)catalog_tables.size();
|
|
|
|
for (std::vector<std::pair<execplan::CalpontSystemCatalog::OID,
|
|
execplan::CalpontSystemCatalog::TableName> >::const_iterator it =
|
|
catalog_tables.begin();
|
|
it != catalog_tables.end(); ++it)
|
|
{
|
|
execplan::CalpontSystemCatalog::TableInfo tb_info = systemCatalogPtr->tableInfo((*it).second);
|
|
reply << (uint32_t)(*it).first;
|
|
reply << (*it).second.schema;
|
|
reply << (*it).second.table;
|
|
reply << (uint32_t)tb_info.numOfCols;
|
|
execplan::CalpontSystemCatalog::RIDList column_rid_list =
|
|
systemCatalogPtr->columnRIDs((*it).second, true);
|
|
|
|
for (size_t col_num = 0; col_num < column_rid_list.size(); col_num++)
|
|
{
|
|
execplan::CalpontSystemCatalog::TableColName tcn =
|
|
systemCatalogPtr->colName(column_rid_list[col_num].objnum);
|
|
execplan::CalpontSystemCatalog::ColType ct = systemCatalogPtr->colType(column_rid_list[col_num].objnum);
|
|
reply << (uint32_t)column_rid_list[col_num].objnum;
|
|
reply << tcn.column;
|
|
|
|
if (ct.ddn.dictOID == std::numeric_limits<int32_t>::min())
|
|
{
|
|
reply << (uint32_t)0;
|
|
}
|
|
else
|
|
{
|
|
reply << (uint32_t)ct.ddn.dictOID;
|
|
}
|
|
|
|
reply << (uint8_t)ct.colDataType;
|
|
reply << (uint32_t)ct.colWidth;
|
|
reply << (uint32_t)ct.colPosition;
|
|
reply << ct.defaultValue;
|
|
reply << (uint8_t)ct.autoincrement;
|
|
reply << (uint32_t)ct.precision;
|
|
reply << (uint32_t)ct.scale;
|
|
|
|
if (ct.constraintType != execplan::CalpontSystemCatalog::NOTNULL_CONSTRAINT)
|
|
{
|
|
reply << (uint8_t)1;
|
|
}
|
|
else
|
|
{
|
|
reply << (uint8_t)0;
|
|
}
|
|
|
|
reply << (uint8_t)ct.compressionType;
|
|
}
|
|
}
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doSysCatVerID(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
QueryContext context;
|
|
|
|
context = sm.sysCatVerID();
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "doSysCatVerID returning " << ver << endl;
|
|
#endif
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << context;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doNewCpimportJob(ThreadParams* p)
|
|
{
|
|
#ifdef BRM_VERBOSE
|
|
std::cerr << "doNewCpimportJob" << std::endl;
|
|
#endif
|
|
ByteStream reply;
|
|
uint32_t jobId;
|
|
|
|
std::unique_lock lk(cpimportMutex);
|
|
// That means that controller node is waiting untill all active cpimport jobs are done.
|
|
// We cannot block `msgProcessor` and cannot create a new job, so exit with `readonly` code.
|
|
if (waitToFinishJobs)
|
|
{
|
|
reply << (uint8_t)ERR_READONLY;
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
jobId = sm.newCpimportJob();
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.reset();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << (uint32_t)jobId;
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doFinishCpimportJob(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
#ifdef BRM_VERBOSE
|
|
std::cout << "doFinishCpimportJob" << std::endl;
|
|
#endif
|
|
ByteStream reply;
|
|
uint32_t cpimportJob;
|
|
uint8_t cmd;
|
|
std::unique_lock lk(cpimportMutex);
|
|
|
|
msg >> cmd;
|
|
msg >> cpimportJob;
|
|
try
|
|
{
|
|
sm.finishCpimortJob(cpimportJob);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
if (waitToFinishJobs && sm.getCpimportJobsCount() == 0)
|
|
cpimportJobsCond.notify_one();
|
|
}
|
|
|
|
void MasterDBRMNode::doNewTxnID(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
TxnID txnid;
|
|
uint32_t sessionID;
|
|
uint8_t block, cmd, isDDL;
|
|
|
|
try
|
|
{
|
|
msg >> cmd;
|
|
msg >> sessionID;
|
|
msg >> block;
|
|
msg >> isDDL;
|
|
txnid = sm.newTxnID(sessionID, (block != 0), (isDDL != 0));
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << (uint32_t)txnid.id;
|
|
reply << (uint8_t)txnid.valid;
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "newTxnID returning id=" << txnid.id << " valid=" << txnid.valid << endl;
|
|
#endif
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.reset();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doCommitted(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
TxnID txnid;
|
|
uint8_t cmd, tmp;
|
|
uint32_t tmp32;
|
|
|
|
try
|
|
{
|
|
msg >> cmd;
|
|
msg >> tmp32;
|
|
txnid.id = tmp32;
|
|
msg >> tmp;
|
|
txnid.valid = (tmp != 0 ? true : false);
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "doCommitted" << endl;
|
|
#endif
|
|
sm.committed(txnid);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doRolledBack(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
TxnID txnid;
|
|
uint8_t cmd, tmp;
|
|
uint32_t tmp32;
|
|
|
|
try
|
|
{
|
|
msg >> cmd;
|
|
msg >> tmp32;
|
|
msg >> tmp;
|
|
|
|
txnid.id = tmp32;
|
|
txnid.valid = (tmp != 0 ? true : false);
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "doRolledBack" << endl;
|
|
#endif
|
|
sm.rolledback(txnid);
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
ostringstream errStream;
|
|
errStream << "doRolledBack() failed: " << ex.what();
|
|
log(errStream.str());
|
|
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetTxnID(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
SessionManagerServer::SID sid;
|
|
TxnID txnid;
|
|
uint8_t cmd;
|
|
|
|
try
|
|
{
|
|
msg >> cmd;
|
|
msg >> sid;
|
|
|
|
txnid = sm.getTxnID(sid);
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "doGetTxnID returning id=" << txnid.id << " valid=" << txnid.valid << endl;
|
|
#endif
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << (uint32_t)txnid.id;
|
|
reply << (uint8_t)txnid.valid;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doSIDTIDMap(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
int len, i;
|
|
std::shared_ptr<SIDTIDEntry[]> entries;
|
|
|
|
try
|
|
{
|
|
entries = sm.SIDTIDMap(len);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << (uint32_t)len;
|
|
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "doSIDTIDMap returning " << len << " elements... " << endl;
|
|
#endif
|
|
|
|
for (i = 0; i < len; i++)
|
|
{
|
|
#ifdef BRM_VERBOSE
|
|
cerr << " " << i << ": txnid=" << entries[i].txnid.id << " valid=" << entries[i].txnid.valid
|
|
<< " sessionid=" << entries[i].sessionid << endl;
|
|
#endif
|
|
reply << (uint32_t)entries[i].txnid.id << (uint8_t)entries[i].txnid.valid
|
|
<< (uint32_t)entries[i].sessionid;
|
|
}
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetUncommittedLbids(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
vector<LBID_t> lbidList;
|
|
VSS vss;
|
|
ExtentMap em;
|
|
bool locked = false;
|
|
vector<LBID_t>::iterator lbidIt;
|
|
typedef pair<int64_t, int64_t> range_t;
|
|
range_t range;
|
|
vector<range_t> ranges;
|
|
vector<range_t>::iterator rangeIt;
|
|
ByteStream::byte cmd;
|
|
ByteStream::quadbyte transID;
|
|
msg >> cmd;
|
|
msg >> transID;
|
|
|
|
try
|
|
{
|
|
vss.lock(VSS::READ);
|
|
locked = true;
|
|
|
|
// Get a full list of uncommitted LBIDs related to this transactin.
|
|
vss.getUncommittedLBIDs(transID, lbidList);
|
|
|
|
vss.release(VSS::READ);
|
|
locked = false;
|
|
|
|
if (lbidList.size() > 0)
|
|
{
|
|
// Sort the vector.
|
|
std::sort<vector<LBID_t>::iterator>(lbidList.begin(), lbidList.end());
|
|
|
|
// Get the LBID range for the first block in the list.
|
|
lbidIt = lbidList.begin();
|
|
|
|
if (em.lookup(*lbidIt, range.first, range.second) < 0)
|
|
{
|
|
reply.reset();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
ranges.push_back(range);
|
|
|
|
// Loop through the LBIDs and add the new ranges.
|
|
++lbidIt;
|
|
|
|
while (lbidIt != lbidList.end())
|
|
{
|
|
if (*lbidIt > range.second)
|
|
{
|
|
if (em.lookup(*lbidIt, range.first, range.second) < 0)
|
|
{
|
|
reply.reset();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
ranges.push_back(range);
|
|
}
|
|
|
|
++lbidIt;
|
|
}
|
|
|
|
// Reset the lbidList and return only the first LBID in each extent that was changed
|
|
// in the transaction.
|
|
lbidList.clear();
|
|
|
|
for (rangeIt = ranges.begin(); rangeIt != ranges.end(); rangeIt++)
|
|
{
|
|
lbidList.push_back(rangeIt->first);
|
|
}
|
|
}
|
|
|
|
reply << (uint8_t)ERR_OK;
|
|
serializeInlineVector(reply, lbidList);
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
catch (exception& e)
|
|
{
|
|
if (locked)
|
|
vss.release(VSS::READ);
|
|
|
|
reply.reset();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetUniqueUint32(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
uint32_t ret;
|
|
|
|
try
|
|
{
|
|
ret = sm.getUnique32();
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << ret;
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "getUnique32() returning " << ret << endl;
|
|
#endif
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.reset();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetUniqueUint64(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
uint64_t ret;
|
|
|
|
try
|
|
{
|
|
ret = sm.getUnique64();
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << ret;
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "getUnique64() returning " << ret << endl;
|
|
#endif
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.reset();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetSystemState(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
uint32_t ss = 0;
|
|
ByteStream::byte err = ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
sm.getSystemState(ss);
|
|
err = ERR_OK;
|
|
reply << err;
|
|
reply << static_cast<ByteStream::quadbyte>(ss);
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "getSystemState() returning " << static_cast<int>(err) << endl;
|
|
#endif
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.reset();
|
|
err = ERR_FAILURE;
|
|
reply << err;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doSetSystemState(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
ByteStream::byte cmd;
|
|
ByteStream::byte err = ERR_FAILURE;
|
|
uint32_t ss;
|
|
|
|
try
|
|
{
|
|
msg >> cmd;
|
|
msg >> ss;
|
|
|
|
sm.setSystemState(ss);
|
|
#ifdef BRM_VERBOSE
|
|
cerr << "doSetSystemState setting " << hex << ss << dec << endl;
|
|
#endif
|
|
}
|
|
catch (exception&)
|
|
{
|
|
err = ERR_FAILURE;
|
|
reply << err;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
err = ERR_OK;
|
|
reply << err;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doClearSystemState(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
ByteStream::byte cmd;
|
|
ByteStream::byte err = ERR_FAILURE;
|
|
uint32_t ss;
|
|
|
|
try
|
|
{
|
|
msg >> cmd;
|
|
msg >> ss;
|
|
|
|
sm.clearSystemState(ss);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
err = ERR_FAILURE;
|
|
reply << err;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
err = ERR_OK;
|
|
reply << err;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doSessionManagerReset(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
|
|
try
|
|
{
|
|
sm.reset();
|
|
reply << (uint8_t)ERR_OK;
|
|
}
|
|
catch (...)
|
|
{
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
}
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doAllocOIDs(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
int ret;
|
|
uint32_t tmp32;
|
|
int num;
|
|
uint8_t cmd;
|
|
|
|
try
|
|
{
|
|
boost::mutex::scoped_lock lk(oidsMutex);
|
|
|
|
msg >> cmd;
|
|
msg >> tmp32;
|
|
num = (int)tmp32;
|
|
ret = oids.allocOIDs(num);
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << (uint32_t)ret;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
ostringstream os;
|
|
os << "DBRM Controller: OID allocation failure. " << ex.what();
|
|
log(os.str());
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doReturnOIDs(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
uint8_t cmd;
|
|
uint32_t tmp32;
|
|
int start, end;
|
|
|
|
try
|
|
{
|
|
boost::mutex::scoped_lock lk(oidsMutex);
|
|
|
|
msg >> cmd;
|
|
msg >> tmp32;
|
|
start = (int)tmp32;
|
|
msg >> tmp32;
|
|
end = (int)tmp32;
|
|
oids.returnOIDs(start, end);
|
|
reply << (uint8_t)ERR_OK;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
ostringstream os;
|
|
os << "DBRM Controller: Return OIDs failure. " << ex.what();
|
|
log(os.str());
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doOidmSize(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
int ret;
|
|
|
|
try
|
|
{
|
|
boost::mutex::scoped_lock lk(oidsMutex);
|
|
|
|
ret = oids.size();
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << (uint32_t)ret;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
ostringstream os;
|
|
os << "DBRM Controller: Failure to get OID count. " << ex.what();
|
|
log(os.str());
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doAllocVBOID(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
uint32_t dbroot;
|
|
uint32_t ret;
|
|
uint8_t cmd;
|
|
|
|
msg >> cmd;
|
|
|
|
try
|
|
{
|
|
boost::mutex::scoped_lock lk(oidsMutex);
|
|
|
|
msg >> dbroot;
|
|
ret = oids.allocVBOID(dbroot);
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << ret;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
ostringstream os;
|
|
os << "DBRM Controller: VB OID allocation failure. " << ex.what();
|
|
log(os.str());
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetDBRootOfVBOID(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
uint32_t vbOID;
|
|
uint32_t ret;
|
|
uint8_t cmd;
|
|
|
|
msg >> cmd;
|
|
|
|
try
|
|
{
|
|
boost::mutex::scoped_lock lk(oidsMutex);
|
|
|
|
msg >> vbOID;
|
|
ret = oids.getDBRootOfVBOID(vbOID);
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << ret;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
ostringstream os;
|
|
os << "DBRM Controller: Get DBRoot of VB OID failure. " << ex.what();
|
|
log(os.str());
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetVBOIDToDBRootMap(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
ByteStream reply;
|
|
uint8_t cmd;
|
|
|
|
msg >> cmd;
|
|
|
|
try
|
|
{
|
|
boost::mutex::scoped_lock lk(oidsMutex);
|
|
|
|
const vector<uint16_t>& ret = oids.getVBOIDToDBRootMap();
|
|
reply << (uint8_t)ERR_OK;
|
|
serializeInlineVector<uint16_t>(reply, ret);
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
ostringstream os;
|
|
os << "DBRM Controller: Get VB OID DBRoot map failure. " << ex.what();
|
|
log(os.str());
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetTableLock(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint8_t cmd;
|
|
TableLockInfo tli;
|
|
uint64_t id;
|
|
ByteStream reply;
|
|
|
|
msg >> cmd;
|
|
|
|
try
|
|
{
|
|
msg >> tli;
|
|
idbassert(msg.length() == 0);
|
|
id = tableLockServer->lock(&tli);
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << id;
|
|
|
|
if (id == 0)
|
|
{
|
|
reply << tli.ownerPID;
|
|
reply << tli.ownerName;
|
|
reply << (uint32_t)tli.ownerSessionID;
|
|
reply << (uint32_t)tli.ownerTxnID;
|
|
}
|
|
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doReleaseTableLock(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint64_t id;
|
|
uint8_t cmd;
|
|
bool ret;
|
|
ByteStream reply;
|
|
|
|
msg >> cmd;
|
|
|
|
try
|
|
{
|
|
msg >> id;
|
|
idbassert(msg.length() == 0);
|
|
ret = tableLockServer->unlock(id);
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << (uint8_t)ret;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doChangeTableLockState(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint64_t id;
|
|
uint32_t tmp32;
|
|
uint8_t cmd;
|
|
LockState state;
|
|
bool ret;
|
|
ByteStream reply;
|
|
|
|
msg >> cmd;
|
|
|
|
try
|
|
{
|
|
msg >> id;
|
|
msg >> tmp32;
|
|
idbassert(msg.length() == 0);
|
|
state = (LockState)tmp32;
|
|
ret = tableLockServer->changeState(id, state);
|
|
reply << (uint8_t)ERR_OK;
|
|
reply << (uint8_t)ret;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doChangeTableLockOwner(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint8_t cmd;
|
|
uint64_t id;
|
|
string name;
|
|
uint32_t pid;
|
|
ByteStream reply;
|
|
ByteStream workerNodeCmd;
|
|
uint32_t tmp32;
|
|
int32_t sessionID;
|
|
int32_t txnID;
|
|
bool ret;
|
|
vector<ByteStream*> responses;
|
|
int err;
|
|
|
|
// current owner vars
|
|
TableLockInfo tli;
|
|
string processName;
|
|
string::size_type namelen;
|
|
bool exists;
|
|
|
|
try
|
|
{
|
|
msg >> cmd >> id >> name >> pid >> tmp32;
|
|
sessionID = tmp32;
|
|
msg >> tmp32;
|
|
txnID = tmp32;
|
|
idbassert(msg.length() == 0);
|
|
|
|
/* get the current owner info
|
|
* send cmd to look for the owner
|
|
* if there's an existing owner, reject the request
|
|
*/
|
|
|
|
ret = tableLockServer->getLockInfo(id, &tli);
|
|
|
|
if (!ret)
|
|
{
|
|
reply << (uint8_t)ERR_OK << (uint8_t)ret;
|
|
goto write;
|
|
}
|
|
|
|
namelen = tli.ownerName.find_first_of(" ");
|
|
|
|
if (namelen == string::npos)
|
|
processName = tli.ownerName;
|
|
else
|
|
processName = tli.ownerName.substr(0, namelen);
|
|
|
|
workerNodeCmd << (uint8_t)OWNER_CHECK << processName << tli.ownerPID;
|
|
bool readErrFlag;
|
|
{
|
|
boost::mutex::scoped_lock lk(slaveLock);
|
|
distribute(&workerNodeCmd);
|
|
err = gatherResponses(OWNER_CHECK, workerNodeCmd.length(), &responses, readErrFlag);
|
|
}
|
|
|
|
if ((err != ERR_OK) || (readErrFlag))
|
|
{
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
goto write;
|
|
}
|
|
|
|
exists = false;
|
|
|
|
for (uint32_t i = 0; i < responses.size(); i++)
|
|
{
|
|
/* Parse msg from worker node */
|
|
uint8_t ret;
|
|
idbassert(responses[i]->length() == 1);
|
|
*(responses[i]) >> ret;
|
|
|
|
if (ret == 1)
|
|
exists = true;
|
|
|
|
delete responses[i];
|
|
}
|
|
|
|
if (exists)
|
|
{
|
|
reply << (uint8_t)ERR_OK << (uint8_t) false;
|
|
goto write;
|
|
}
|
|
|
|
ret = tableLockServer->changeOwner(id, name, pid, sessionID, txnID);
|
|
reply << (uint8_t)ERR_OK << (uint8_t)ret;
|
|
|
|
write:
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetAllTableLocks(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint8_t cmd;
|
|
vector<TableLockInfo> ret;
|
|
ByteStream reply;
|
|
|
|
try
|
|
{
|
|
msg >> cmd;
|
|
idbassert(msg.length() == 0);
|
|
ret = tableLockServer->getAllLocks();
|
|
reply << (uint8_t)ERR_OK;
|
|
serializeVector<TableLockInfo>(reply, ret);
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doReleaseAllTableLocks(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint8_t cmd;
|
|
ByteStream reply;
|
|
|
|
try
|
|
{
|
|
msg >> cmd;
|
|
idbassert(msg.length() == 0);
|
|
tableLockServer->releaseAllLocks();
|
|
reply << (uint8_t)ERR_OK;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetTableLockInfo(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint8_t cmd;
|
|
ByteStream reply;
|
|
uint64_t id;
|
|
TableLockInfo tli;
|
|
bool ret;
|
|
|
|
try
|
|
{
|
|
msg >> cmd >> id;
|
|
idbassert(msg.length() == 0);
|
|
ret = tableLockServer->getLockInfo(id, &tli);
|
|
reply << (uint8_t)ERR_OK << (uint8_t)ret;
|
|
|
|
if (ret)
|
|
reply << tli;
|
|
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doOwnerCheck(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint8_t cmd;
|
|
uint64_t id;
|
|
ByteStream reply;
|
|
ByteStream workerNodeCmd;
|
|
bool ret;
|
|
vector<ByteStream*> responses;
|
|
int err;
|
|
|
|
// current owner vars
|
|
TableLockInfo tli;
|
|
string processName;
|
|
string::size_type namelen;
|
|
bool exists;
|
|
|
|
try
|
|
{
|
|
msg >> cmd >> id;
|
|
idbassert(msg.length() == 0);
|
|
|
|
/* get the current owner info
|
|
* send cmd to look for the owner
|
|
* if there's an existing owner, reject the request
|
|
*/
|
|
|
|
ret = tableLockServer->getLockInfo(id, &tli);
|
|
|
|
if (!ret)
|
|
{
|
|
reply << (uint8_t)ERR_OK << (uint8_t)ret;
|
|
goto write;
|
|
}
|
|
|
|
namelen = tli.ownerName.find_first_of(" ");
|
|
|
|
if (namelen == string::npos)
|
|
processName = tli.ownerName;
|
|
else
|
|
processName = tli.ownerName.substr(0, namelen);
|
|
|
|
workerNodeCmd << (uint8_t)OWNER_CHECK << processName << tli.ownerPID;
|
|
bool readErrFlag;
|
|
{
|
|
boost::mutex::scoped_lock lk(slaveLock);
|
|
distribute(&workerNodeCmd);
|
|
err = gatherResponses(OWNER_CHECK, workerNodeCmd.length(), &responses, readErrFlag);
|
|
}
|
|
|
|
if ((err != ERR_OK) || (readErrFlag))
|
|
{
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
goto write;
|
|
}
|
|
|
|
exists = false;
|
|
|
|
for (uint32_t i = 0; i < responses.size(); i++)
|
|
{
|
|
/* Parse msg from worker node */
|
|
uint8_t ret;
|
|
idbassert(responses[i]->length() == 1);
|
|
*(responses[i]) >> ret;
|
|
|
|
if (ret == 1)
|
|
exists = true;
|
|
|
|
delete responses[i];
|
|
}
|
|
|
|
reply << (uint8_t)ERR_OK << (uint8_t)exists;
|
|
|
|
write:
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doStartAISequence(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint8_t cmd;
|
|
ByteStream reply;
|
|
uint8_t tmp8;
|
|
uint32_t oid, colWidth;
|
|
uint64_t firstNum;
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType;
|
|
|
|
try
|
|
{
|
|
msg >> cmd >> oid >> firstNum >> colWidth >> tmp8;
|
|
colDataType = (execplan::CalpontSystemCatalog::ColDataType)tmp8;
|
|
idbassert(msg.length() == 0);
|
|
aiManager.startSequence(oid, firstNum, colWidth, colDataType);
|
|
reply << (uint8_t)ERR_OK;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetAIRange(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint8_t cmd;
|
|
ByteStream reply;
|
|
uint32_t oid, count;
|
|
uint64_t nextVal;
|
|
bool ret;
|
|
|
|
try
|
|
{
|
|
msg >> cmd >> oid >> count;
|
|
idbassert(msg.length() == 0);
|
|
ret = aiManager.getAIRange(oid, count, &nextVal);
|
|
reply << (uint8_t)ERR_OK << (uint8_t)ret << nextVal;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doResetAISequence(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint8_t cmd;
|
|
ByteStream reply;
|
|
uint32_t oid;
|
|
uint64_t val;
|
|
|
|
try
|
|
{
|
|
msg >> cmd >> oid >> val;
|
|
idbassert(msg.length() == 0);
|
|
aiManager.resetSequence(oid, val);
|
|
reply << (uint8_t)ERR_OK;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doGetAILock(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint8_t cmd;
|
|
ByteStream reply;
|
|
uint32_t oid;
|
|
|
|
try
|
|
{
|
|
msg >> cmd >> oid;
|
|
idbassert(msg.length() == 0);
|
|
aiManager.getLock(oid);
|
|
reply << (uint8_t)ERR_OK;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doReleaseAILock(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint8_t cmd;
|
|
ByteStream reply;
|
|
uint32_t oid;
|
|
|
|
try
|
|
{
|
|
msg >> cmd >> oid;
|
|
idbassert(msg.length() == 0);
|
|
aiManager.releaseLock(oid);
|
|
reply << (uint8_t)ERR_OK;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
void MasterDBRMNode::doDeleteAISequence(ByteStream& msg, ThreadParams* p)
|
|
{
|
|
uint8_t cmd;
|
|
ByteStream reply;
|
|
uint32_t oid;
|
|
|
|
try
|
|
{
|
|
msg >> cmd >> oid;
|
|
idbassert(msg.length() == 0);
|
|
aiManager.deleteSequence(oid);
|
|
reply << (uint8_t)ERR_OK;
|
|
p->sock->write(reply);
|
|
}
|
|
catch (exception&)
|
|
{
|
|
reply.restart();
|
|
reply << (uint8_t)ERR_FAILURE;
|
|
|
|
try
|
|
{
|
|
p->sock->write(reply);
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
}
|
|
}
|
|
|
|
MasterDBRMNode::MsgProcessor::MsgProcessor(MasterDBRMNode* master) : m(master)
|
|
{
|
|
}
|
|
|
|
void MasterDBRMNode::MsgProcessor::operator()()
|
|
{
|
|
m->runners++;
|
|
|
|
try
|
|
{
|
|
m->msgProcessor();
|
|
}
|
|
|
|
#ifdef BRM_VERBOSE
|
|
catch (SocketClosed& e)
|
|
{
|
|
cerr << e.what() << endl;
|
|
#else
|
|
catch (SocketClosed&)
|
|
{
|
|
#endif
|
|
}
|
|
catch (exception& e)
|
|
{
|
|
log(e.what());
|
|
#ifdef BRM_VERBOSE
|
|
cerr << e.what() << endl;
|
|
#endif
|
|
}
|
|
catch (...)
|
|
{
|
|
log("caught something that's not an exception", logging::LOG_TYPE_WARNING);
|
|
cerr << "DBRM Controller: caught something that's not an exception" << endl;
|
|
}
|
|
|
|
m->runners--;
|
|
}
|
|
|
|
MasterDBRMNode::MsgProcessor::~MsgProcessor()
|
|
{
|
|
}
|
|
|
|
} // namespace BRM
|