1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
2024-09-10 17:44:25 +03:00

4771 lines
101 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*****************************************************************************
* $Id: dbrm.cpp 1878 2013-05-02 15:17:12Z dcathey $
*
****************************************************************************/
#include <iostream>
#include <unordered_set>
#include <sys/types.h>
#include <vector>
#ifdef __linux__
#include <values.h>
#endif
#include <boost/thread.hpp>
//#define NDEBUG
#include <cassert>
#include "dataconvert.h"
#include "oamcache.h"
#include "rwlock.h"
#include "mastersegmenttable.h"
#include "extentmap.h"
#include "copylocks.h"
#include "vss.h"
#include "vbbm.h"
#include "socketclosed.h"
#include "configcpp.h"
#include "sessionmanagerserver.h"
#include "messagequeuepool.h"
#include "blocksize.h"
#define DBRM_DLLEXPORT
#include "dbrm.h"
#undef DBRM_DLLEXPORT
#ifdef BRM_DEBUG
#define CHECK_EMPTY(x) \
if (x.length() != 0) \
throw logic_error("DBRM: got a message of the wrong size");
#else
#define CHECK_EMPTY(x)
#endif
#define DO_ERR_NETWORK \
MessageQueueClientPool::releaseInstance(msgClient); \
msgClient = NULL; \
mutex.unlock(); \
return ERR_NETWORK;
using namespace std;
using namespace messageqcpp;
using namespace oam;
#ifdef BRM_INFO
#include "tracer.h"
#endif
namespace BRM
{
DBRM::DBRM(bool noBRMinit) : fDebug(false)
{
if (!noBRMinit)
{
mst.reset(new MasterSegmentTable());
em.reset(new ExtentMap());
vss.reset(new VSS());
vbbm.reset(new VBBM());
copylocks.reset(new CopyLocks());
em->setReadOnly();
vss->setReadOnly();
vbbm->setReadOnly();
}
msgClient = NULL;
masterName = "DBRM_Controller";
config = config::Config::makeConfig();
#ifdef BRM_INFO
fDebug = ("Y" == config->getConfig("DBRM", "Debug"));
#endif
}
DBRM::DBRM(const DBRM& brm)
{
throw logic_error("DBRM: Don't use the copy constructor.");
}
DBRM::~DBRM()
{
if (msgClient != NULL)
MessageQueueClientPool::releaseInstance(msgClient);
}
DBRM& DBRM::operator=(const DBRM& brm)
{
throw logic_error("DBRM: Don't use the = operator.");
}
int DBRM::saveState() throw()
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("saveState()");
#endif
string prefix = config->getConfig("SystemConfig", "DBRMRoot");
if (prefix.length() == 0)
{
cerr << "Error: Need a valid Calpont configuation file" << endl;
exit(1);
}
int rc = saveState(prefix);
return rc;
}
int DBRM::saveState(string filename) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("saveState(filename)");
TRACER_ADDSTRINPUT(filename);
TRACER_WRITE;
}
#endif
string emFilename = filename + "_em";
string vssFilename = filename + "_vss";
string vbbmFilename = filename + "_vbbm";
bool locked[3] = {false, false, false};
try
{
vbbm->lock(VBBM::READ);
locked[0] = true;
vss->lock(VSS::READ);
locked[1] = true;
copylocks->lock(CopyLocks::READ);
locked[2] = true;
saveExtentMap(emFilename);
vbbm->save(vbbmFilename);
vss->save(vssFilename);
copylocks->release(CopyLocks::READ);
locked[2] = false;
vss->release(VSS::READ);
locked[1] = false;
vbbm->release(VBBM::READ);
locked[0] = false;
}
catch (exception& e)
{
if (locked[2])
copylocks->release(CopyLocks::READ);
if (locked[1])
vss->release(VSS::READ);
if (locked[0])
vbbm->release(VBBM::READ);
return -1;
}
return 0;
}
int DBRM::saveExtentMap(const string& filename) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("saveExtentMap");
TRACER_ADDSTRINPUT(filename);
TRACER_WRITE;
}
#endif
try
{
em->save(filename);
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
return 0;
}
// @bug 1055+. New functions added for multiple files per OID enhancement.
int DBRM::lookupLocal(LBID_t lbid, VER_t verid, bool vbFlag, OID_t& oid, uint16_t& dbRoot,
uint32_t& partitionNum, uint16_t& segmentNum, uint32_t& fileBlockOffset) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("lookupLocal(lbid,ver,..)");
TRACER_ADDINPUT(lbid);
TRACER_ADDINPUT(verid);
TRACER_ADDBOOLINPUT(vbFlag);
TRACER_ADDOUTPUT(oid);
TRACER_ADDSHORTOUTPUT(dbRoot);
TRACER_ADDOUTPUT(partitionNum);
TRACER_ADDSHORTOUTPUT(segmentNum);
TRACER_ADDOUTPUT(fileBlockOffset);
TRACER_WRITE;
}
#endif
bool locked[2] = {false, false};
int ret;
bool tooOld = false;
try
{
if (!vbFlag)
return em->lookupLocal(lbid, (int&)oid, dbRoot, partitionNum, segmentNum, fileBlockOffset);
else
{
vbbm->lock(VBBM::READ);
locked[0] = true;
ret = vbbm->lookup(lbid, verid, oid, fileBlockOffset);
vbbm->release(VBBM::READ);
locked[0] = false;
if (ret < 0)
{
vss->lock(VSS::READ);
locked[1] = true;
tooOld = vss->isTooOld(lbid, verid);
vss->release(VSS::READ);
locked[1] = false;
if (tooOld)
return ERR_SNAPSHOT_TOO_OLD;
}
return ret;
}
}
catch (exception& e)
{
if (locked[1])
vss->release(VSS::READ);
if (locked[0])
vbbm->release(VBBM::READ);
cerr << e.what() << endl;
return -1;
}
}
int DBRM::lookupLocal(OID_t oid, uint32_t partitionNum, uint16_t segmentNum, uint32_t fileBlockOffset,
LBID_t& lbid) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("lookupLocal(oid,fbo,..)");
TRACER_ADDINPUT(oid);
TRACER_ADDINPUT(partitionNum);
TRACER_ADDSHORTINPUT(segmentNum);
TRACER_ADDINPUT(fileBlockOffset);
TRACER_ADDOUTPUT(lbid);
TRACER_WRITE;
}
#endif
try
{
return em->lookupLocal(oid, partitionNum, segmentNum, fileBlockOffset, lbid);
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
}
int DBRM::lookupLocal_DBroot(OID_t oid, uint32_t dbroot, uint32_t partitionNum, uint16_t segmentNum,
uint32_t fileBlockOffset, LBID_t& lbid) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("lookupLocal(oid,fbo,..)");
TRACER_ADDINPUT(oid);
TRACER_ADDINPUT(partitionNum);
TRACER_ADDSHORTINPUT(segmentNum);
TRACER_ADDINPUT(fileBlockOffset);
TRACER_ADDOUTPUT(lbid);
TRACER_WRITE;
}
#endif
try
{
return em->lookupLocal_DBroot(oid, dbroot, partitionNum, segmentNum, fileBlockOffset, lbid);
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
}
// @bug 1055-
//------------------------------------------------------------------------------
// Lookup/return starting LBID for the specified OID, partition, segment, and
// file block offset.
//------------------------------------------------------------------------------
int DBRM::lookupLocalStartLbid(OID_t oid, uint32_t partitionNum, uint16_t segmentNum,
uint32_t fileBlockOffset, LBID_t& lbid) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("lookupLocalStartLbid(oid,fbo,..)");
TRACER_ADDINPUT(oid);
TRACER_ADDINPUT(partitionNum);
TRACER_ADDSHORTINPUT(segmentNum);
TRACER_ADDINPUT(fileBlockOffset);
TRACER_ADDOUTPUT(lbid);
TRACER_WRITE;
}
#endif
try
{
return em->lookupLocalStartLbid(oid, partitionNum, segmentNum, fileBlockOffset, lbid);
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
}
int DBRM::lookup(OID_t oid, LBIDRange_v& lbidList) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("lookup(oid,range)");
TRACER_ADDINPUT(oid);
TRACER_WRITE;
}
#endif
try
{
em->lookup(oid, lbidList);
return 0;
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
}
// Casual Partitioning support
int DBRM::markExtentInvalid(const LBID_t lbid,
execplan::CalpontSystemCatalog::ColDataType colDataType) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("markExtentInvalid");
TRACER_ADDINPUT(lbid);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << MARKEXTENTINVALID << (uint64_t)lbid << (uint32_t)colDataType;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::markExtentsInvalid(const vector<LBID_t>& lbids,
const std::vector<execplan::CalpontSystemCatalog::ColDataType>& colDataTypes)
DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("markExtentsInvalid");
#endif
ByteStream command, response;
uint8_t err;
uint32_t size = lbids.size(), i;
command << MARKMANYEXTENTSINVALID << size;
for (i = 0; i < size; i++)
{
command << (uint64_t)lbids[i];
command << (uint32_t)colDataTypes[i];
}
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
template <typename T>
int DBRM::getExtentMaxMin(const LBID_t lbid, T& max, T& min, int32_t& seqNum)
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getExtentMaxMin");
TRACER_ADDINPUT(lbid);
TRACER_ADDOUTPUT(max);
TRACER_ADDOUTPUT(min);
TRACER_ADDOUTPUT(seqNum);
TRACER_WRITE;
}
#endif
try
{
int ret = em->getMaxMin(lbid, max, min, seqNum);
return ret;
}
catch (exception& e)
{
cerr << e.what() << endl;
return false;
}
}
int DBRM::getExtentCPMaxMin(const LBID_t lbid, CPMaxMin& cpMaxMin)
{
try
{
em->getCPMaxMin(lbid, cpMaxMin);
}
catch (...)
{
return ERR_FAILURE;
}
return ERR_OK;
}
int DBRM::setExtentMaxMin(const LBID_t lbid, const int64_t max, const int64_t min,
const int32_t seqNum) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("setExtentMaxMin");
TRACER_ADDINPUT(lbid);
TRACER_ADDINPUT(max);
TRACER_ADDINPUT(min);
TRACER_ADDINPUT(seqNum);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << SETEXTENTMAXMIN << (uint64_t)lbid << (uint64_t)max << (uint64_t)min << (uint64_t)seqNum;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
// @bug 1970 - Added function below to set multiple extents casual partition info in one call.
int DBRM::setExtentsMaxMin(const CPInfoList_t& cpInfos) DBRM_THROW
{
CPInfoList_t::const_iterator it;
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("setExtentsMaxMin");
for (it = cpInfos.begin(); it != cpInfos.end(); it++)
{
TRACER_ADDINPUT(it->firstLbid);
TRACER_ADDINPUT(it->max);
TRACER_ADDINPUT(it->min);
TRACER_ADDINPUT(it->seqNum);
TRACER_WRITE;
}
}
#endif
ByteStream command, response;
uint8_t err = 0;
if (cpInfos.size() == 0)
return err;
if (cpInfos.empty())
return ERR_OK;
command << SETMANYEXTENTSMAXMIN << (uint32_t)cpInfos.size();
for (it = cpInfos.begin(); it != cpInfos.end(); it++)
{
if (it->isBinaryColumn)
{
command << (uint8_t)1 << (uint64_t)it->firstLbid << (uint128_t)it->bigMax << (uint128_t)it->bigMin
<< (uint32_t)it->seqNum;
}
else
{
command << (uint8_t)0 << (uint64_t)it->firstLbid << (uint64_t)it->max << (uint64_t)it->min
<< (uint32_t)it->seqNum;
}
}
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
//------------------------------------------------------------------------------
// @bug 2117 - Add function to merge Casual Partition info with current extent
// map information.
//------------------------------------------------------------------------------
int DBRM::mergeExtentsMaxMin(const CPInfoMergeList_t& cpInfos) DBRM_THROW
{
CPInfoMergeList_t::const_iterator it;
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("updateExtentsMaxMin");
for (it = cpInfos.begin(); it != cpInfos.end(); it++)
{
TRACER_ADDINPUT(it->startLbid);
TRACER_ADDINPUT(it->max);
TRACER_ADDINPUT(it->min);
TRACER_ADDINPUT(it->seqNum);
TRACER_ADDINPUT(it->type);
TRACER_ADDINPUT(it->newExtent);
TRACER_WRITE;
}
}
#endif
ByteStream command, response;
uint8_t err;
command << MERGEMANYEXTENTSMAXMIN << (uint32_t)cpInfos.size();
for (it = cpInfos.begin(); it != cpInfos.end(); it++)
{
command << (uint64_t)it->startLbid << (uint64_t)it->max << (uint64_t)it->min << (uint32_t)it->seqNum
<< (uint32_t)it->type << (uint32_t)it->newExtent;
}
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::vssLookup(LBID_t lbid, const QueryContext& verInfo, VER_t txnID, VER_t* outVer, bool* vbFlag,
bool vbOnly) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("vssLookup");
TRACER_ADDINPUT(lbid);
TRACER_ADDINPUT(verInfo);
TRACER_ADDINPUT(txnID);
TRACER_ADDBOOLINPUT(vbOnly);
TRACER_WRITE;
}
#endif
if (!vbOnly && vss->isEmpty())
{
*outVer = 0;
*vbFlag = false;
return -1;
}
bool locked = false;
try
{
int rc = 0;
vss->lock(VSS::READ);
locked = true;
rc = vss->lookup(lbid, verInfo, txnID, outVer, vbFlag, vbOnly);
vss->release(VSS::READ);
return rc;
}
catch (exception& e)
{
if (locked)
vss->release(VSS::READ);
cerr << e.what() << endl;
return -1;
}
}
int DBRM::bulkVSSLookup(const std::vector<LBID_t>& lbids, const QueryContext_vss& verInfo, VER_t txnID,
std::vector<VSSData>* out)
{
uint32_t i;
bool locked = false;
try
{
out->resize(lbids.size());
vss->lock(VSS::READ);
locked = true;
if (vss->isEmpty(false))
{
for (i = 0; i < lbids.size(); i++)
{
VSSData& vd = (*out)[i];
vd.verID = 0;
vd.vbFlag = false;
vd.returnCode = -1;
}
}
else
{
for (i = 0; i < lbids.size(); i++)
{
VSSData& vd = (*out)[i];
vd.returnCode = vss->lookup(lbids[i], verInfo, txnID, &vd.verID, &vd.vbFlag, false);
}
}
vss->release(VSS::READ);
return 0;
}
catch (exception& e)
{
cerr << e.what() << endl;
}
catch (...)
{
cerr << "bulkVSSLookup: caught an exception" << endl;
}
if (locked)
vss->release(VSS::READ);
out->clear();
return -1;
}
VER_t DBRM::getCurrentVersion(LBID_t lbid, bool* isLocked) const
{
bool locked = false;
VER_t ret = 0;
try
{
vss->lock(VSS::READ);
locked = true;
ret = vss->getCurrentVersion(lbid, isLocked);
vss->release(VSS::READ);
locked = false;
}
catch (exception& e)
{
cerr << e.what() << endl;
if (locked)
vss->release(VSS::READ);
throw;
}
return ret;
}
int DBRM::bulkGetCurrentVersion(const vector<LBID_t>& lbids, vector<VER_t>* versions,
vector<bool>* isLocked) const
{
bool locked = false;
versions->resize(lbids.size());
if (isLocked != NULL)
isLocked->resize(lbids.size());
try
{
vss->lock(VSS::READ);
locked = true;
if (isLocked != NULL)
{
bool tmp = false;
for (uint32_t i = 0; i < lbids.size(); i++)
{
(*versions)[i] = vss->getCurrentVersion(lbids[i], &tmp);
(*isLocked)[i] = tmp;
}
}
else
for (uint32_t i = 0; i < lbids.size(); i++)
(*versions)[i] = vss->getCurrentVersion(lbids[i], NULL);
vss->release(VSS::READ);
locked = false;
return 0;
}
catch (exception& e)
{
versions->clear();
cerr << e.what() << endl;
if (locked)
vss->release(VSS::READ);
return -1;
}
}
VER_t DBRM::getHighestVerInVB(LBID_t lbid, VER_t max) const
{
bool locked = false;
VER_t ret = -1;
try
{
vss->lock(VSS::READ);
locked = true;
ret = vss->getHighestVerInVB(lbid, max);
vss->release(VSS::READ);
locked = false;
}
catch (exception& e)
{
cerr << e.what() << endl;
if (locked)
vss->release(VSS::READ);
throw;
}
return ret;
}
bool DBRM::isVersioned(LBID_t lbid, VER_t ver) const
{
bool ret = false;
bool locked = false;
try
{
vss->lock(VSS::READ);
locked = true;
ret = vss->isVersioned(lbid, ver);
vss->release(VSS::READ);
locked = false;
}
catch (exception& e)
{
cerr << e.what() << endl;
if (locked)
vss->release(VSS::READ);
throw;
}
return ret;
}
int8_t DBRM::send_recv(const ByteStream& in, ByteStream& out) throw()
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("send_recv");
#endif
bool firstAttempt = true;
bool secondAttempt = true;
mutex.lock();
reconnect:
if (msgClient == NULL)
try
{
msgClient = MessageQueueClientPool::getInstance(masterName);
}
catch (exception& e)
{
cerr << "class DBRM failed to create a MessageQueueClient: " << e.what() << endl;
msgClient = NULL;
mutex.unlock();
return ERR_NETWORK;
}
try
{
msgClient->write(in);
out = msgClient->read();
}
/* If we add a timeout to the read() call, uncomment this clause
catch (SocketClosed &e) {
cerr << "DBRM::send_recv: controller node closed the connection" << endl;
DO_ERR_NETWORK;
}
*/
catch (exception& e)
{
cerr << "DBRM::send_recv caught: " << e.what() << endl;
if (firstAttempt)
{
firstAttempt = false;
MessageQueueClientPool::releaseInstance(msgClient);
msgClient = NULL;
goto reconnect;
}
DO_ERR_NETWORK;
}
if (out.length() == 0)
{
cerr << "DBRM::send_recv: controller node closed the connection" << endl;
if (secondAttempt)
{
MessageQueueClientPool::releaseInstance(msgClient);
msgClient = NULL;
if (!firstAttempt)
{
secondAttempt = false;
sleep(3);
}
firstAttempt = false;
goto reconnect;
}
DO_ERR_NETWORK;
}
mutex.unlock();
return ERR_OK;
}
//------------------------------------------------------------------------------
// Send a request to create a "stripe" of column extents for the specified
// column OIDs and DBRoot.
//------------------------------------------------------------------------------
int DBRM::createStripeColumnExtents(const std::vector<CreateStripeColumnExtentsArgIn>& cols, uint16_t dbRoot,
uint32_t& partitionNum, uint16_t& segmentNum,
std::vector<CreateStripeColumnExtentsArgOut>& extents) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("createStripeColumnExtents");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
uint16_t tmp16;
uint32_t tmp32;
command << CREATE_STRIPE_COLUMN_EXTENTS;
serializeInlineVector(command, cols);
command << dbRoot << partitionNum;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
try
{
response >> err;
if (err != 0)
return (int)err;
response >> tmp32;
partitionNum = tmp32;
response >> tmp16;
segmentNum = tmp16;
deserializeInlineVector(response, extents);
}
catch (exception& e)
{
cerr << e.what() << endl;
return ERR_FAILURE;
}
CHECK_EMPTY(response);
return 0;
}
//------------------------------------------------------------------------------
// Send a request to create a column extent for the specified OID and DBRoot.
//------------------------------------------------------------------------------
int DBRM::createColumnExtent_DBroot(OID_t oid, uint32_t colWidth, uint16_t dbRoot, uint32_t& partitionNum,
uint16_t& segmentNum,
execplan::CalpontSystemCatalog::ColDataType colDataType, LBID_t& lbid,
int& allocdSize, uint32_t& startBlockOffset) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("createColumnExtent_DBroot");
TRACER_ADDINPUT(oid);
TRACER_ADDINPUT(colWidth);
TRACER_ADDSHORTINPUT(dbRoot);
TRACER_ADDOUTPUT(partitionNum);
TRACER_ADDSHORTOUTPUT(segmentNum);
TRACER_ADDINT64OUTPUT(lbid);
TRACER_ADDOUTPUT(allocdSize);
TRACER_ADDOUTPUT(startBlockOffset);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
uint32_t tmp8 = (uint8_t)colDataType;
uint16_t tmp16;
uint32_t tmp32;
uint64_t tmp64;
command << CREATE_COLUMN_EXTENT_DBROOT << (ByteStream::quadbyte)oid << colWidth << dbRoot << partitionNum
<< segmentNum << tmp8;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
try
{
response >> err;
if (err != 0)
return (int)err;
response >> tmp32;
partitionNum = tmp32;
response >> tmp16;
segmentNum = tmp16;
response >> tmp64;
lbid = (int64_t)tmp64;
response >> tmp32;
allocdSize = (int32_t)tmp32;
response >> tmp32;
startBlockOffset = (int32_t)tmp32;
}
catch (exception& e)
{
cerr << e.what() << endl;
return ERR_FAILURE;
}
CHECK_EMPTY(response);
return 0;
}
//------------------------------------------------------------------------------
// Send a request to create a column extent for the exact segment file
// specified by the requested OID, DBRoot, partition, and segment.
//------------------------------------------------------------------------------
int DBRM::createColumnExtentExactFile(OID_t oid, uint32_t colWidth, uint16_t dbRoot, uint32_t partitionNum,
uint16_t segmentNum,
execplan::CalpontSystemCatalog::ColDataType colDataType, LBID_t& lbid,
int& allocdSize, uint32_t& startBlockOffset) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("createColumnExtentExactFile");
TRACER_ADDINPUT(oid);
TRACER_ADDINPUT(colWidth);
TRACER_ADDSHORTINPUT(dbRoot);
TRACER_ADDOUTPUT(partitionNum);
TRACER_ADDSHORTOUTPUT(segmentNum);
TRACER_ADDINT64OUTPUT(lbid);
TRACER_ADDOUTPUT(allocdSize);
TRACER_ADDOUTPUT(startBlockOffset);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
uint8_t tmp8;
uint16_t tmp16;
uint32_t tmp32;
uint64_t tmp64;
tmp8 = (uint8_t)colDataType;
command << CREATE_COLUMN_EXTENT_EXACT_FILE << (ByteStream::quadbyte)oid << colWidth << dbRoot
<< partitionNum << segmentNum << tmp8;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
try
{
response >> err;
if (err != 0)
return (int)err;
response >> tmp32;
partitionNum = tmp32;
response >> tmp16;
segmentNum = tmp16;
response >> tmp64;
lbid = (int64_t)tmp64;
response >> tmp32;
allocdSize = (int32_t)tmp32;
response >> tmp32;
startBlockOffset = (int32_t)tmp32;
}
catch (exception& e)
{
cerr << e.what() << endl;
return ERR_FAILURE;
}
CHECK_EMPTY(response);
return 0;
}
//------------------------------------------------------------------------------
// Send a request to create a dictionary store extent.
//------------------------------------------------------------------------------
int DBRM::createDictStoreExtent(OID_t oid, uint16_t dbRoot, uint32_t partitionNum, uint16_t segmentNum,
LBID_t& lbid, int& allocdSize) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("createDictStoreExtent");
TRACER_ADDINPUT(oid);
TRACER_ADDSHORTINPUT(dbRoot);
TRACER_ADDINPUT(partitionNum);
TRACER_ADDSHORTINPUT(segmentNum);
TRACER_ADDINT64OUTPUT(lbid);
TRACER_ADDOUTPUT(allocdSize);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
uint32_t tmp32;
uint64_t tmp64;
command << CREATE_DICT_STORE_EXTENT << (ByteStream::quadbyte)oid << dbRoot << partitionNum << segmentNum;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
try
{
response >> err;
if (err != 0)
return (int)err;
response >> tmp64;
lbid = (int64_t)tmp64;
response >> tmp32;
allocdSize = (int32_t)tmp32;
}
catch (exception& e)
{
cerr << e.what() << endl;
return ERR_FAILURE;
}
CHECK_EMPTY(response);
return 0;
}
//------------------------------------------------------------------------------
// Send a request to delete a set extents for the specified column OID and
// DBRoot, and to return the extents to the free list. HWMs for the last
// stripe of extents in the specified DBRoot are updated accordingly.
//------------------------------------------------------------------------------
int DBRM::rollbackColumnExtents_DBroot(OID_t oid, bool bDeleteAll, uint16_t dbRoot, uint32_t partitionNum,
uint16_t segmentNum, HWM_t hwm) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("rollbackColumnExtents");
TRACER_ADDINPUT(oid);
TRACER_ADDBOOLINPUT(bDeleteAll);
TRACER_ADDSHORTINPUT(dbRoot);
TRACER_ADDINPUT(partitionNum);
TRACER_ADDSHORTINPUT(segmentNum);
TRACER_ADDINPUT(hwm);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << ROLLBACK_COLUMN_EXTENTS_DBROOT << (ByteStream::quadbyte)oid << (uint8_t)bDeleteAll << dbRoot
<< partitionNum << segmentNum << hwm;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
//------------------------------------------------------------------------------
// Send a request to delete a set of extents for the specified dictionary store
// OID and DBRoot, and to return the extents to the free list. HWMs for the
// last stripe of extents are updated accordingly.
//------------------------------------------------------------------------------
int DBRM::rollbackDictStoreExtents_DBroot(OID_t oid, uint16_t dbRoot, uint32_t partitionNum,
const vector<uint16_t>& segNums,
const vector<HWM_t>& hwms) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("rollbackDictStoreExtents");
TRACER_ADDINPUT(oid);
TRACER_ADDSHORTINPUT(dbRoot);
TRACER_ADDINPUT(partitionNum);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << ROLLBACK_DICT_STORE_EXTENTS_DBROOT << (ByteStream::quadbyte)oid << dbRoot << partitionNum;
serializeVector(command, segNums);
serializeVector(command, hwms);
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::deleteEmptyColExtents(const std::vector<ExtentInfo>& extentsInfo) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("deleteEmptyColExtents");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
uint32_t size = extentsInfo.size();
command << DELETE_EMPTY_COL_EXTENTS;
command << size;
for (unsigned i = 0; i < extentsInfo.size(); i++)
{
command << (ByteStream::quadbyte)extentsInfo[i].oid;
command << extentsInfo[i].partitionNum;
command << extentsInfo[i].segmentNum;
command << extentsInfo[i].dbRoot;
command << extentsInfo[i].hwm;
}
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::deleteEmptyDictStoreExtents(const std::vector<ExtentInfo>& extentsInfo) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("deleteEmptyDictStoreExtents");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
uint32_t size = extentsInfo.size();
command << DELETE_EMPTY_DICT_STORE_EXTENTS;
command << size;
for (unsigned i = 0; i < extentsInfo.size(); i++)
{
command << (ByteStream::quadbyte)extentsInfo[i].oid;
command << extentsInfo[i].partitionNum;
command << extentsInfo[i].segmentNum;
command << extentsInfo[i].dbRoot;
command << extentsInfo[i].hwm;
command << (uint8_t)extentsInfo[i].newFile;
}
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::deleteOID(OID_t oid) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("deleteOID");
TRACER_ADDINPUT(oid);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << DELETE_OID << (ByteStream::quadbyte)oid;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
try
{
deleteAISequence(oid);
}
catch (...)
{
} // an error here means a network problem, will be caught elsewhere
return err;
}
int DBRM::deleteOIDs(const std::vector<OID_t>& oids) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("deleteOIDs");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
uint32_t size = oids.size();
command << DELETE_OIDS;
command << size;
for (unsigned i = 0; i < oids.size(); i++)
{
command << (ByteStream::quadbyte)oids[i];
}
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
try
{
for (uint32_t i = 0; i < oids.size(); i++)
deleteAISequence(oids[i]);
}
catch (...)
{
} // an error here means a network problem, will be caught elsewhere
return err;
}
//------------------------------------------------------------------------------
// Return the last local HWM for the specified OID and DBroot. The corresponding
// partition number, and segment number are returned as well. This function
// can be used by cpimport for example to find out where the current "end-of-
// data" is, so that cpimport will know where to begin adding new rows.
// If no available or outOfService extent is found, then bFound is returned
// as false.
//------------------------------------------------------------------------------
int DBRM::getLastHWM_DBroot(int oid, uint16_t dbRoot, uint32_t& partitionNum, uint16_t& segmentNum,
HWM_t& hwm, int& status, bool& bFound) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getLastHWM_DBroot");
TRACER_ADDINPUT(oid);
TRACER_ADDSHORTOUTPUT(dbRoot);
TRACER_ADDOUTPUT(partitionNum);
TRACER_ADDSHORTOUTPUT(segmentNum);
TRACER_ADDOUTPUT(hwm);
TRACER_ADDOUTPUT(status);
TRACER_WRITE;
}
#endif
try
{
hwm = em->getLastHWM_DBroot(oid, dbRoot, partitionNum, segmentNum, status, bFound);
}
catch (exception& e)
{
return ERR_FAILURE;
}
return ERR_OK;
}
//------------------------------------------------------------------------------
// Return the HWM for the specified OID, partition number, and segment number.
// This is used to get the HWM for a particular dictionary segment store file,
// or a specific column segment file.
//------------------------------------------------------------------------------
int DBRM::getLocalHWM(OID_t oid, uint32_t partitionNum, uint16_t segmentNum, HWM_t& hwm, int& status) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getLocalHWM");
TRACER_ADDINPUT(oid);
TRACER_ADDINPUT(partitionNum);
TRACER_ADDSHORTINPUT(segmentNum);
TRACER_ADDOUTPUT(hwm);
TRACER_ADDOUTPUT(status);
TRACER_WRITE;
}
#endif
try
{
hwm = em->getLocalHWM(oid, partitionNum, segmentNum, status);
}
catch (exception& e)
{
cerr << e.what() << endl;
return ERR_FAILURE;
}
return ERR_OK;
}
//------------------------------------------------------------------------------
// Set the local HWM for the file referenced by the specified OID, partition
// number, and segment number.
//------------------------------------------------------------------------------
int DBRM::setLocalHWM(OID_t oid, uint32_t partitionNum, uint16_t segmentNum, HWM_t hwm) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("setLocalHWM");
TRACER_ADDINPUT(oid);
TRACER_ADDINPUT(partitionNum);
TRACER_ADDSHORTINPUT(segmentNum);
TRACER_ADDINPUT(hwm);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << SET_LOCAL_HWM << (ByteStream::quadbyte)oid << partitionNum << segmentNum << hwm;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::bulkSetHWM(const vector<BulkSetHWMArg>& v, VER_t transID) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("bulkSetHWM");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << BULK_SET_HWM;
serializeInlineVector(command, v);
command << (uint32_t)transID;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::bulkSetHWMAndCP(const vector<BulkSetHWMArg>& v, const vector<CPInfo>& setCPDataArgs,
const vector<CPInfoMerge>& mergeCPDataArgs, VER_t transID) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("bulkSetHWMAndCP");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << BULK_SET_HWM_AND_CP;
serializeInlineVector(command, v);
serializeInlineVector(command, setCPDataArgs);
serializeInlineVector(command, mergeCPDataArgs);
command << (uint32_t)transID;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::bulkUpdateDBRoot(const vector<BulkUpdateDBRootArg>& args)
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("bulkUpdateDBRoot");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << BULK_UPDATE_DBROOT;
serializeInlineVector(command, args);
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
//------------------------------------------------------------------------------
// For the specified OID and PM number, this function will return a vector
// of objects carrying HWM info (for the last segment file) and block count
// information about each DBRoot assigned to the specified PM.
//------------------------------------------------------------------------------
int DBRM::getDbRootHWMInfo(OID_t oid, uint16_t pmNumber, EmDbRootHWMInfo_v& emDBRootHwmInfos) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getDbRootHWMInfo");
TRACER_ADDINPUT(oid);
TRACER_ADDSHORTINPUT(pmNumber);
TRACER_WRITE;
}
#endif
try
{
em->getDbRootHWMInfo(oid, pmNumber, emDBRootHwmInfos);
}
catch (exception& e)
{
cerr << e.what() << endl;
return ERR_FAILURE;
}
return ERR_OK;
}
//------------------------------------------------------------------------------
// Return the status or state of the extents in the segment file specified
// by the arguments: oid, partitionNum, and segment Num.
//------------------------------------------------------------------------------
int DBRM::getExtentState(OID_t oid, uint32_t partitionNum, uint16_t segmentNum, bool& bFound,
int& status) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getExtentState");
TRACER_ADDINPUT(oid);
TRACER_ADDINPUT(partitionNum);
TRACER_ADDSHORTINPUT(segmentNum);
TRACER_ADDOUTPUT(status);
TRACER_WRITE;
}
#endif
try
{
em->getExtentState(oid, partitionNum, segmentNum, bFound, status);
}
catch (exception& e)
{
cerr << e.what() << endl;
return ERR_FAILURE;
}
return ERR_OK;
}
// dmc-should eventually deprecate
int DBRM::getExtentSize() throw()
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("getExtentSize");
#endif
return em->getExtentSize();
}
unsigned DBRM::getExtentRows() throw()
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("getExtentRows");
#endif
return em->getExtentRows();
}
int DBRM::getExtents(int OID, std::vector<struct EMEntry>& entries, bool sorted, bool notFoundErr,
bool incOutOfService)
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getExtents");
TRACER_ADDINPUT(OID);
TRACER_WRITE;
}
#endif
try
{
em->getExtents(OID, entries, sorted, notFoundErr, incOutOfService);
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
return 0;
}
int DBRM::getExtents_dbroot(int OID, std::vector<struct EMEntry>& entries, const uint16_t dbroot) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getExtents_dbroot");
TRACER_ADDINPUT(OID);
TRACER_WRITE;
}
#endif
try
{
em->getExtents_dbroot(OID, entries, dbroot);
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
return 0;
}
//------------------------------------------------------------------------------
// Return the number of extents for the specified OID and DBRoot.
// Any out-of-service extents can optionally be included or excluded.
//------------------------------------------------------------------------------
int DBRM::getExtentCount_dbroot(int OID, uint16_t dbroot, bool incOutOfService, uint64_t& numExtents) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getExtentCount_dbroot");
TRACER_ADDINPUT(OID);
TRACER_WRITE;
}
#endif
try
{
em->getExtentCount_dbroot(OID, dbroot, incOutOfService, numExtents);
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
return 0;
}
//------------------------------------------------------------------------------
// Gets the DBRoot for the specified system catalog OID.
// Function assumes the specified System Catalog OID is fully contained on
// a single DBRoot, as the function only searches for and returns the first
// DBRoot entry that is found in the extent map.
//------------------------------------------------------------------------------
int DBRM::getSysCatDBRoot(OID_t oid, uint16_t& dbRoot) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getSysCatDBRoot");
TRACER_ADDINPUT(oid);
TRACER_ADDSHORTOUTPUT(dbRoot);
TRACER_WRITE;
}
#endif
try
{
em->getSysCatDBRoot(oid, dbRoot);
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
return 0;
}
//------------------------------------------------------------------------------
// Delete all extents for the specified OID(s) and partition number.
//------------------------------------------------------------------------------
int DBRM::deletePartition(const std::vector<OID_t>& oids, const std::set<LogicalPartition>& partitionNums,
string& emsg) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITENOW("deletePartition");
std::ostringstream oss;
oss << "partitionNum: ";
std::set<LogicalPartition>::const_iterator partIt;
for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt)
oss << (*partIt) << " ";
oss << "; OIDS: ";
std::vector<OID_t>::const_iterator it;
for (it = oids.begin(); it != oids.end(); ++it)
{
oss << (*it) << ", ";
}
TRACER_WRITEDIRECT(oss.str());
}
#endif
ByteStream command, response;
uint8_t err;
command << DELETE_PARTITION;
serializeSet<LogicalPartition>(command, partitionNums);
uint32_t oidSize = oids.size();
command << oidSize;
for (unsigned i = 0; i < oidSize; i++)
command << (ByteStream::quadbyte)oids[i];
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
response >> err;
if (err != 0)
response >> emsg;
CHECK_EMPTY(response);
return err;
}
//------------------------------------------------------------------------------
// Mark all extents as out of service, for the specified OID(s) and partition
// number.
//------------------------------------------------------------------------------
int DBRM::markPartitionForDeletion(const std::vector<OID_t>& oids,
const std::set<LogicalPartition>& partitionNums, string& emsg) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITENOW("markPartitionForDeletion");
std::ostringstream oss;
oss << "partitionNum: ";
std::set<LogicalPartition>::const_iterator partIt;
for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt)
oss << (*partIt) << " ";
oss << "; OIDS: ";
std::vector<OID_t>::const_iterator it;
for (it = oids.begin(); it != oids.end(); ++it)
{
oss << (*it) << ", ";
}
TRACER_WRITEDIRECT(oss.str());
}
#endif
ByteStream command, response;
uint8_t err;
command << MARK_PARTITION_FOR_DELETION;
serializeSet<LogicalPartition>(command, partitionNums);
uint32_t oidSize = oids.size();
command << oidSize;
for (unsigned i = 0; i < oidSize; i++)
command << (ByteStream::quadbyte)oids[i];
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
response >> err;
if (err)
response >> emsg;
CHECK_EMPTY(response);
return err;
}
//------------------------------------------------------------------------------
// Mark all extents as out of service, for the specified OID(s)
//------------------------------------------------------------------------------
int DBRM::markAllPartitionForDeletion(const std::vector<OID_t>& oids) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITENOW("markAllPartitionForDeletion");
std::ostringstream oss;
oss << "OIDS: ";
std::vector<OID_t>::const_iterator it;
for (it = oids.begin(); it != oids.end(); ++it)
{
oss << (*it) << ", ";
}
TRACER_WRITEDIRECT(oss.str());
}
#endif
ByteStream command, response;
uint8_t err;
uint32_t size = oids.size();
command << MARK_ALL_PARTITION_FOR_DELETION << size;
for (unsigned i = 0; i < size; i++)
{
command << (ByteStream::quadbyte)oids[i];
}
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
//------------------------------------------------------------------------------
// Restore all extents for the specified OID(s) and partition number.
//------------------------------------------------------------------------------
int DBRM::restorePartition(const std::vector<OID_t>& oids, const std::set<LogicalPartition>& partitionNums,
string& emsg) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITENOW("restorePartition");
std::ostringstream oss;
oss << "partitionNum: ";
std::set<LogicalPartition>::const_iterator partIt;
for (partIt = partitionNums.begin(); partIt != partitionNums.end(); ++partIt)
oss << (*partIt) << " ";
oss << "; OIDS: ";
std::vector<OID_t>::const_iterator it;
for (it = oids.begin(); it != oids.end(); ++it)
{
oss << (*it) << ", ";
}
TRACER_WRITEDIRECT(oss.str());
}
#endif
ByteStream command, response;
uint8_t err;
command << RESTORE_PARTITION;
serializeSet<LogicalPartition>(command, partitionNums);
uint32_t oidSize = oids.size();
command << oidSize;
for (unsigned i = 0; i < oidSize; i++)
command << (ByteStream::quadbyte)oids[i];
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
response >> err;
if (err)
response >> emsg;
CHECK_EMPTY(response);
return err;
}
//------------------------------------------------------------------------------
// Return all the out-of-service partitions for the specified OID.
//------------------------------------------------------------------------------
int DBRM::getOutOfServicePartitions(OID_t oid, std::set<LogicalPartition>& partitionNums) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getOutOfServicePartitions");
TRACER_ADDINPUT(oid);
TRACER_WRITE;
}
#endif
try
{
em->getOutOfServicePartitions(oid, partitionNums);
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
return ERR_OK;
}
//------------------------------------------------------------------------------
// Delete all extents for the specified DBRoot
//------------------------------------------------------------------------------
int DBRM::deleteDBRoot(uint16_t dbroot) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITENOW("deleteDBRoot");
std::ostringstream oss;
oss << "DBRoot: " << dbroot;
TRACER_WRITEDIRECT(oss.str());
}
#endif
ByteStream command, response;
uint8_t err;
command << DELETE_DBROOT;
uint32_t q = static_cast<uint32_t>(dbroot);
command << q;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
//------------------------------------------------------------------------------
// Does the specified DBRoot have any extents.
// Returns an error if extentmap shared memory is not loaded.
//------------------------------------------------------------------------------
int DBRM::isDBRootEmpty(uint16_t dbroot, bool& isEmpty, std::string& errMsg) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("isDBRootEmpty");
TRACER_ADDINPUT(dbroot);
TRACER_WRITE;
}
#endif
errMsg.clear();
try
{
isEmpty = em->isDBRootEmpty(dbroot);
}
catch (exception& e)
{
cerr << e.what() << endl;
errMsg = e.what();
return ERR_FAILURE;
}
return ERR_OK;
}
int DBRM::writeVBEntry(VER_t transID, LBID_t lbid, OID_t vbOID, uint32_t vbFBO) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("writeVBEntry");
TRACER_ADDINPUT(transID);
TRACER_ADDINPUT(lbid);
TRACER_ADDINPUT(vbOID);
TRACER_ADDINPUT(vbFBO);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << WRITE_VB_ENTRY << (uint32_t)transID << (uint64_t)lbid << (uint32_t)vbOID << vbFBO;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::bulkWriteVBEntry(VER_t transID, const std::vector<BRM::LBID_t>& lbids, OID_t vbOID,
const std::vector<uint32_t>& vbFBOs) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("bulkWriteVBEntry");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << BULK_WRITE_VB_ENTRY << (uint32_t)transID;
serializeInlineVector(command, lbids);
command << (uint32_t)vbOID;
serializeInlineVector(command, vbFBOs);
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
struct _entry
{
_entry(LBID_t l) : lbid(l){};
LBID_t lbid;
inline bool operator<(const _entry& e) const
{
return ((e.lbid >> 10) < (lbid >> 10));
}
};
int DBRM::getDBRootsForRollback(VER_t transID, vector<uint16_t>* dbroots) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getDBRootsForRollback");
TRACER_ADDINPUT(transID);
TRACER_WRITE;
}
#endif
bool locked[2] = {false, false};
set<OID_t> vbOIDs;
set<OID_t>::iterator vbIt;
vector<LBID_t> lbidList;
uint32_t i, size;
uint32_t tmp32;
OID_t vbOID;
int err;
set<_entry> lbidPruner;
set<_entry>::iterator it;
try
{
vbbm->lock(VBBM::READ);
locked[0] = true;
vss->lock(VSS::READ);
locked[1] = true;
vss->getUncommittedLBIDs(transID, lbidList);
// prune the list; will leave at most 1 entry per 1024-lbid range
for (i = 0, size = lbidList.size(); i < size; i++)
lbidPruner.insert(_entry(lbidList[i]));
// get the VB oids
for (it = lbidPruner.begin(); it != lbidPruner.end(); ++it)
{
err = vbbm->lookup(it->lbid, transID, vbOID, tmp32);
if (err) // this error will be caught by DML; more appropriate to handle it there
continue;
vbOIDs.insert(vbOID);
}
// get the dbroots
for (vbIt = vbOIDs.begin(); vbIt != vbOIDs.end(); ++vbIt)
{
err = getDBRootOfVBOID(*vbIt);
if (err)
{
ostringstream os;
os << "DBRM::getDBRootOfVBOID() returned an error looking for vbOID " << *vbIt;
log(os.str());
return ERR_FAILURE;
}
dbroots->push_back((uint16_t)err);
}
vss->release(VSS::READ);
locked[1] = false;
vbbm->release(VBBM::READ);
locked[0] = false;
return ERR_OK;
}
catch (exception& e)
{
if (locked[0])
vbbm->release(VBBM::READ);
if (locked[1])
vss->release(VSS::READ);
return -1;
}
}
int DBRM::getUncommittedLBIDs(VER_t transID, vector<LBID_t>& lbidList) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getUncommittedLBIDs");
TRACER_ADDINPUT(transID);
TRACER_WRITE;
}
#endif
bool locked = false;
try
{
vss->lock(VSS::READ);
locked = true;
vss->getUncommittedLBIDs(transID, lbidList);
vss->release(VSS::READ);
locked = false;
return 0;
}
catch (exception& e)
{
if (locked)
vss->release(VSS::READ);
return -1;
}
}
// @bug 1509. New function that returns one LBID per extent touched as part of the transaction. Used to get
// a list of blocks to use for updating casual partitioning when the transaction is committed.
int DBRM::getUncommittedExtentLBIDs(VER_t transID, vector<LBID_t>& lbidList) throw()
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getUncommittedExtentLBIDs");
TRACER_ADDINPUT(transID);
TRACER_WRITE;
}
#endif
bool locked = false;
vector<LBID_t>::iterator lbidIt;
typedef pair<int64_t, int64_t> range_t;
range_t range;
vector<range_t> ranges;
vector<range_t>::iterator rangeIt;
try
{
vss->lock(VSS::READ);
locked = true;
// Get a full list of uncommitted LBIDs related to this transactin.
vss->getUncommittedLBIDs(transID, lbidList);
vss->release(VSS::READ);
locked = false;
if (lbidList.size() > 0)
{
// Sort the vector.
std::sort<vector<LBID_t>::iterator>(lbidList.begin(), lbidList.end());
// Get the LBID range for the first block in the list.
lbidIt = lbidList.begin();
if (em->lookup(*lbidIt, range.first, range.second) < 0)
{
return -1;
}
ranges.push_back(range);
// Loop through the LBIDs and add the new ranges.
++lbidIt;
while (lbidIt != lbidList.end())
{
if (*lbidIt > range.second)
{
if (em->lookup(*lbidIt, range.first, range.second) < 0)
{
return -1;
}
ranges.push_back(range);
}
++lbidIt;
}
// Reset the lbidList and return only the first LBID in each extent that was changed
// in the transaction.
lbidList.clear();
for (rangeIt = ranges.begin(); rangeIt != ranges.end(); rangeIt++)
{
lbidList.push_back(rangeIt->first);
}
}
return 0;
}
catch (exception& e)
{
if (locked)
vss->release(VSS::READ);
return -1;
}
}
int DBRM::beginVBCopy(VER_t transID, uint16_t dbRoot, const LBIDRange_v& ranges,
VBRange_v& freeList) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("beginVBCopy");
TRACER_ADDINPUT(transID);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << BEGIN_VB_COPY << (ByteStream::quadbyte)transID << dbRoot;
serializeVector<LBIDRange>(command, ranges);
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
try
{
response >> err;
if (err != 0)
return err;
deserializeVector(response, freeList);
}
catch (exception& e)
{
cerr << e.what() << endl;
return ERR_NETWORK;
}
CHECK_EMPTY(response);
return 0;
}
int DBRM::endVBCopy(VER_t transID, const LBIDRange_v& ranges) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("endVBCopy");
TRACER_ADDINPUT(transID);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << END_VB_COPY << (ByteStream::quadbyte)transID;
serializeVector(command, ranges);
err = send_recv(command, response);
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::vbCommit(VER_t transID) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("vbCommit");
TRACER_ADDINPUT(transID);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << VB_COMMIT << (ByteStream::quadbyte)transID;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::vbRollback(VER_t transID, const LBIDRange_v& lbidList) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("vbRollback ");
TRACER_ADDINPUT(transID);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << VB_ROLLBACK1 << (ByteStream::quadbyte)transID;
serializeVector(command, lbidList);
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::vbRollback(VER_t transID, const vector<LBID_t>& lbidList) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("vbRollback");
TRACER_ADDINPUT(transID);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << VB_ROLLBACK2 << (ByteStream::quadbyte)transID;
serializeVector(command, lbidList);
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::halt() DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("halt");
#endif
ByteStream command, response;
uint8_t err;
command << HALT;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::resume() DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("resume");
#endif
ByteStream command, response;
uint8_t err;
command << RESUME;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::forceReload() DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("forceReload");
#endif
ByteStream command, response;
uint8_t err;
command << RELOAD;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::setReadOnly(bool b) DBRM_THROW
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("setReadOnly");
TRACER_ADDBOOLINPUT(b);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << (b ? SETREADONLY : SETREADWRITE);
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::startReadOnly() DBRM_THROW
{
ByteStream command, response;
uint8_t err;
command << START_READONLY;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::isReadWrite() throw()
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("isReadWrite");
#endif
ByteStream command, response;
uint8_t err;
command << GETREADONLY;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
// CHECK_EMPTY(response);
return (err == 0 ? ERR_OK : ERR_READONLY);
}
int DBRM::dmlLockLBIDRanges(const vector<LBIDRange>& ranges, int txnID)
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("clear");
#endif
ByteStream command, response;
uint8_t err;
command << LOCK_LBID_RANGES;
serializeVector<LBIDRange>(command, ranges);
command << (uint32_t)txnID;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::dmlReleaseLBIDRanges(const vector<LBIDRange>& ranges)
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("clear");
#endif
ByteStream command, response;
uint8_t err;
command << RELEASE_LBID_RANGES;
serializeVector<LBIDRange>(command, ranges);
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::clear() DBRM_THROW
{
ByteStream command, response;
uint8_t err;
command << BRM_CLEAR;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() != 1)
return ERR_NETWORK;
response >> err;
CHECK_EMPTY(response);
return err;
}
int DBRM::checkConsistency() throw()
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("checkConsistency");
#endif
bool locked[2] = {false, false};
try
{
em->checkConsistency();
}
catch (exception& e)
{
cerr << e.what() << endl;
return -1;
}
try
{
vbbm->lock(VBBM::READ);
locked[0] = true;
vss->lock(VSS::READ);
locked[1] = true;
vss->checkConsistency(*vbbm, *em);
vss->release(VSS::READ);
locked[1] = false;
vbbm->release(VBBM::READ);
locked[0] = false;
}
catch (exception& e)
{
cerr << e.what() << endl;
if (locked[1])
vss->release(VSS::READ);
if (locked[0])
vbbm->release(VBBM::READ);
return -1;
}
try
{
vbbm->lock(VBBM::READ);
vbbm->checkConsistency();
vbbm->release(VBBM::READ);
}
catch (exception& e)
{
cerr << e.what() << endl;
vbbm->release(VBBM::READ);
return -1;
}
return 0;
}
int DBRM::getCurrentTxnIDs(set<VER_t>& txnList) throw()
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("getCurrentTxnIDs");
#endif
bool locked[2] = {false, false};
try
{
txnList.clear();
vss->lock(VSS::READ);
locked[0] = true;
copylocks->lock(CopyLocks::READ);
locked[1] = true;
copylocks->getCurrentTxnIDs(txnList);
vss->getCurrentTxnIDs(txnList);
copylocks->release(CopyLocks::READ);
locked[1] = false;
vss->release(VSS::READ);
locked[0] = false;
}
catch (exception& e)
{
if (locked[1])
copylocks->release(CopyLocks::READ);
if (locked[0])
vss->release(VSS::READ);
cerr << e.what() << endl;
return -1;
}
return 0;
}
const QueryContext DBRM::verID()
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("verID");
#endif
ByteStream command, response;
uint8_t err;
QueryContext ret;
command << VER_ID;
err = send_recv(command, response);
if (err != ERR_OK)
{
cerr << "DBRM: SessionManager::verID(): network error" << endl;
ret.currentScn = -1;
return ret;
}
try
{
response >> err;
response >> ret;
CHECK_EMPTY(response);
}
catch (exception& e)
{
cerr << "DBRM: SessionManager::verID(): bad response" << endl;
log("DBRM: SessionManager::verID(): bad response", logging::LOG_TYPE_WARNING);
ret.currentScn = -1;
}
return ret;
}
const QueryContext DBRM::sysCatVerID()
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("sysCatVerID");
#endif
ByteStream command, response;
uint8_t err;
QueryContext ret;
command << SYSCAT_VER_ID;
err = send_recv(command, response);
if (err != ERR_OK)
{
cerr << "DBRM: SessionManager::sysCatVerID(): network error" << endl;
ret.currentScn = -1;
return ret;
}
try
{
response >> err;
response >> ret;
CHECK_EMPTY(response);
}
catch (exception& e)
{
cerr << "DBRM: SessionManager::sysCatVerID(): bad response" << endl;
log("DBRM: SessionManager::sysCatVerID(): bad response", logging::LOG_TYPE_WARNING);
ret.currentScn = -1;
}
return ret;
}
uint8_t DBRM::newCpimportJob(uint32_t& jobId)
{
ByteStream command, response;
uint8_t err;
command << NEW_CPIMPORT_JOB;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: SessionManager::newCpimportJob(): network error");
return err;
}
if (response.length() != 5)
{
log("DBRM: SessionManager::newCpimportJob(): bad response");
return ERR_READONLY;
}
response >> err;
response >> jobId;
return ERR_OK;
}
void DBRM::finishCpimportJob(uint32_t jobId)
{
ByteStream command, response;
uint8_t err;
command << FINISH_CPIMPORT_JOB << (uint32_t)jobId;
err = send_recv(command, response);
if (err != ERR_OK)
log("DBRM: error: SessionManager::finishCpimportJob() failed");
else if (response.length() != 1)
log("DBRM: error: SessionManager::finishCpimportJob() failed (bad response)", logging::LOG_TYPE_ERROR);
response >> err;
if (err != ERR_OK)
log("DBRM: error: SessionManager::finishCpimportJob() failed (valid error code)",
logging::LOG_TYPE_ERROR);
}
int DBRM::forceClearCpimportJobs() DBRM_THROW
{
ByteStream command, response;
uint8_t err;
command << FORCE_CLEAR_CPIMPORT_JOBS;
err = send_recv(command, response);
if (err != ERR_OK)
log("DBRM: error: SessionManager::forceClearAllCpimportJobs()) failed");
else if (response.length() != 1)
log("DBRM: error: SessionManager::forceClearAllCpimportJobs() failed (bad response)",
logging::LOG_TYPE_ERROR);
response >> err;
if (err != ERR_OK)
log("DBRM: error: SessionManager::forceClearAllCpimportJobs() failed (valid error code)",
logging::LOG_TYPE_ERROR);
return err;
}
const TxnID DBRM::newTxnID(const SessionManagerServer::SID session, bool block, bool isDDL)
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("newTxnID");
TRACER_ADDINPUT(session);
TRACER_ADDBOOLINPUT(block);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err, tmp;
uint32_t tmp32;
TxnID ret;
command << NEW_TXN_ID << session << (uint8_t)block << (uint8_t)isDDL;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: SessionManager::newTxnID(): network error");
ret.valid = false;
return ret;
}
if (response.length() != 6)
{
log("DBRM: SessionManager::newTxnID(): bad response");
ret.valid = false;
return ret;
}
response >> err;
response >> tmp32;
ret.id = tmp32;
response >> tmp;
ret.valid = (tmp == 0 ? false : true);
CHECK_EMPTY(response);
return ret;
}
void DBRM::committed(TxnID& txnid)
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("committed");
TRACER_ADDINPUT(txnid);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << COMMITTED << (uint32_t)txnid.id << (uint8_t)txnid.valid;
err = send_recv(command, response);
txnid.valid = false;
if (err != ERR_OK)
log("DBRM: error: SessionManager::committed() failed");
else if (response.length() != 1)
log("DBRM: error: SessionManager::committed() failed (bad response)", logging::LOG_TYPE_ERROR);
response >> err;
if (err != ERR_OK)
log("DBRM: error: SessionManager::committed() failed (valid error code)", logging::LOG_TYPE_ERROR);
}
void DBRM::rolledback(TxnID& txnid)
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("rolledback");
TRACER_ADDINPUT(txnid);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err, tmp;
command << ROLLED_BACK << (uint32_t)txnid.id << (uint8_t)txnid.valid;
err = send_recv(command, response);
txnid.valid = false;
if (err != ERR_OK)
log("DBRM: error: SessionManager::rolledback() failed (network)");
else if (response.length() != 1)
log("DBRM: error: SessionManager::rolledback() failed (bad response)", logging::LOG_TYPE_ERROR);
response >> tmp;
if (tmp != ERR_OK)
{
if (getSystemReady() != 0)
{
std::stringstream errorStream;
errorStream << "DBRM: error: SessionManager::rolledback() failed (error code " << tmp << ")";
log(errorStream.str(), logging::LOG_TYPE_ERROR);
}
}
}
int DBRM::getUnlockedLBIDs(BlockList_t* list) DBRM_THROW
{
bool locked = false;
list->clear();
try
{
vss->lock(VSS::READ);
locked = true;
vss->getUnlockedLBIDs(*list);
vss->release(VSS::READ);
locked = false;
return 0;
}
catch (exception& e)
{
if (locked)
vss->release(VSS::READ);
cerr << e.what() << endl;
return -1;
}
}
const TxnID DBRM::getTxnID(const SessionManagerServer::SID session)
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getTxnID");
TRACER_ADDINPUT(session);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err, tmp8;
uint32_t tmp32;
TxnID ret;
command << GET_TXN_ID << (uint32_t)session;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: error: SessionManager::getTxnID() failed (network)", logging::LOG_TYPE_ERROR);
ret.valid = false;
return ret;
}
response >> err;
if (err != ERR_OK)
{
log("DBRM: error: SessionManager::getTxnID() failed (got an error)", logging::LOG_TYPE_ERROR);
ret.valid = false;
return ret;
}
response >> tmp32 >> tmp8;
ret.id = tmp32;
ret.valid = tmp8;
return ret;
}
std::shared_ptr<SIDTIDEntry[]> DBRM::SIDTIDMap(int& len)
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("SIDTIDMap");
TRACER_ADDOUTPUT(len);
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err, tmp8;
uint32_t tmp32;
int i;
std::shared_ptr<SIDTIDEntry[]> ret;
command << SID_TID_MAP;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: error: SessionManager::SIDTIDEntry() failed (network)");
return ret;
}
response >> err;
if (err != ERR_OK)
{
log("DBRM: error: SessionManager::SIDTIDEntry() failed (valid error code)", logging::LOG_TYPE_ERROR);
return ret;
}
response >> tmp32;
len = (int)tmp32;
ret.reset(new SIDTIDEntry[len]);
for (i = 0; i < len; i++)
{
response >> tmp32 >> tmp8;
ret[i].txnid.id = tmp32;
ret[i].txnid.valid = (tmp8 == 0 ? false : true);
response >> tmp32;
ret[i].sessionid = tmp32;
}
CHECK_EMPTY(response);
return ret;
}
uint32_t DBRM::getUnique32()
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("getUnique32");
#endif
ByteStream command, response;
uint8_t err;
uint32_t ret;
command << GET_UNIQUE_UINT32;
err = send_recv(command, response);
if (err != ERR_OK)
{
cerr << "DBRM: getUnique32() failed (network)\n";
log("DBRM: getUnique32() failed (network)", logging::LOG_TYPE_ERROR);
throw runtime_error("DBRM: getUnique32() failed check the controllernode");
return 0;
}
/* Some jobsteps don't need the connection after this so close it to free up
resources on the controller node */
/* Comment the following 4 lines out. The DBRM instance is a singleton so no need to
remove the client. Plus, it may cause weird network issue when the socket is being
released and re-established very quickly*/
// pthread_mutex_lock(&mutex);
// delete msgClient;
// msgClient = NULL;
// pthread_mutex_unlock(&mutex);
response >> err;
if (err != ERR_OK)
{
cerr << "DBRM: getUnique32() failed (got an error)\n";
log("DBRM: getUnique32() failed (got an error)", logging::LOG_TYPE_ERROR);
throw runtime_error("DBRM: getUnique32() failed check the controllernode");
return 0;
}
response >> ret;
// cerr << "DBRM returning " << ret << endl;
return ret;
}
uint64_t DBRM::getUnique64()
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("getUnique64");
#endif
ByteStream command, response;
uint8_t err;
uint64_t ret;
command << GET_UNIQUE_UINT64;
err = send_recv(command, response);
if (err != ERR_OK)
{
cerr << "DBRM: getUnique64() failed (network)\n";
log("DBRM: getUnique64() failed (network)", logging::LOG_TYPE_ERROR);
throw runtime_error("DBRM: getUnique64() failed check the controllernode");
return 0;
}
/* Some jobsteps don't need the connection after this so close it to free up
resources on the controller node */
/* Comment the following 4 lines out. The DBRM instance is a singleton so no need to
remove the client. Plus, it may cause weird network issue when the socket is being
released and re-established very quickly*/
// pthread_mutex_lock(&mutex);
// delete msgClient;
// msgClient = NULL;
// pthread_mutex_unlock(&mutex);
response >> err;
if (err != ERR_OK)
{
cerr << "DBRM: getUnique64() failed (got an error)\n";
log("DBRM: getUnique64() failed (got an error)", logging::LOG_TYPE_ERROR);
throw runtime_error("DBRM: getUnique64() failed check the controllernode");
return 0;
}
response >> ret;
// cerr << "DBRM returning " << ret << endl;
return ret;
}
void DBRM::sessionmanager_reset()
{
ByteStream command, response;
command << SM_RESET;
send_recv(command, response);
}
bool DBRM::isEMEmpty() throw()
{
bool res = false;
try
{
res = em->empty();
}
catch (...)
{
res = false;
}
return res;
}
vector<InlineLBIDRange> DBRM::getEMFreeListEntries() throw()
{
vector<InlineLBIDRange> res;
try
{
res = em->getFreeListEntries();
}
catch (...)
{
res.clear();
}
return res;
}
int DBRM::takeSnapshot() throw()
{
return 0; // don't know why, but we're calling this all the time. Need to take most/all of those calls
// out, it's very wasteful.
ByteStream command, response;
uint8_t err;
command << TAKE_SNAPSHOT;
err = send_recv(command, response);
if (err != ERR_OK)
return err;
if (response.length() == 0)
return ERR_NETWORK;
return 0;
}
int DBRM::getSystemReady() throw()
{
uint32_t stateFlags;
if (getSystemState(stateFlags) < 0)
{
return -1;
}
return (stateFlags & SessionManagerServer::SS_READY);
}
int DBRM::getSystemQueryReady() throw()
{
uint32_t stateFlags;
if (getSystemState(stateFlags) < 0)
{
return -1;
}
return (stateFlags & SessionManagerServer::SS_QUERY_READY);
}
int DBRM::getSystemSuspended() throw()
{
uint32_t stateFlags;
if (getSystemState(stateFlags) < 0)
{
return -1;
}
return (stateFlags & SessionManagerServer::SS_SUSPENDED);
}
int DBRM::getSystemSuspendPending(bool& bRollback) throw()
{
uint32_t stateFlags;
if (getSystemState(stateFlags) < 0)
{
return -1;
}
bRollback = stateFlags & SessionManagerServer::SS_ROLLBACK;
return (stateFlags & SessionManagerServer::SS_SUSPEND_PENDING);
}
int DBRM::getSystemShutdownPending(bool& bRollback, bool& bForce) throw()
{
uint32_t stateFlags;
if (getSystemState(stateFlags) < 0)
{
return -1;
}
bRollback = stateFlags & SessionManagerServer::SS_ROLLBACK;
bForce = stateFlags & SessionManagerServer::SS_FORCE;
return (stateFlags & SessionManagerServer::SS_SHUTDOWN_PENDING);
}
int DBRM::setSystemReady(bool bReady) throw()
{
if (bReady)
{
return setSystemState(SessionManagerServer::SS_READY);
}
else
{
return clearSystemState(SessionManagerServer::SS_READY);
}
}
int DBRM::setSystemQueryReady(bool bReady) throw()
{
if (bReady)
{
return setSystemState(SessionManagerServer::SS_QUERY_READY);
}
else
{
return clearSystemState(SessionManagerServer::SS_QUERY_READY);
}
}
int DBRM::setSystemSuspended(bool bSuspended) throw()
{
uint32_t stateFlags = 0;
if (bSuspended)
{
if (setSystemState(SessionManagerServer::SS_SUSPENDED) < 0)
{
return -1;
}
}
else
{
stateFlags = SessionManagerServer::SS_SUSPENDED;
}
// In either case, we need to clear the pending and rollback flags
stateFlags |= SessionManagerServer::SS_SUSPEND_PENDING;
stateFlags |= SessionManagerServer::SS_ROLLBACK;
return clearSystemState(stateFlags);
}
int DBRM::setSystemSuspendPending(bool bPending, bool bRollback) throw()
{
uint32_t stateFlags = SessionManagerServer::SS_SUSPEND_PENDING;
if (bPending)
{
if (bRollback)
{
stateFlags |= SessionManagerServer::SS_ROLLBACK;
}
return setSystemState(stateFlags);
}
else
{
stateFlags |= SessionManagerServer::SS_ROLLBACK;
return clearSystemState(stateFlags);
}
}
int DBRM::setSystemShutdownPending(bool bPending, bool bRollback, bool bForce) throw()
{
int rtn = 0;
uint32_t stateFlags = SessionManagerServer::SS_SHUTDOWN_PENDING;
if (bPending)
{
if (bForce)
{
stateFlags |= SessionManagerServer::SS_FORCE;
}
else if (bRollback)
{
stateFlags |= SessionManagerServer::SS_ROLLBACK;
}
rtn = setSystemState(stateFlags);
}
else
{
stateFlags |= SessionManagerServer::SS_ROLLBACK;
stateFlags |= SessionManagerServer::SS_FORCE;
rtn = clearSystemState(stateFlags); // Clears the flags that are turned on in stateFlags
}
return rtn;
}
/* Return the shm stateflags
*/
int DBRM::getSystemState(uint32_t& stateFlags) throw()
{
try
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("getSystemState");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << GET_SYSTEM_STATE;
err = send_recv(command, response);
if (err != ERR_OK)
{
std::ostringstream oss;
oss << "DBRM: error: SessionManager::getSystemState() failed (network)";
log(oss.str(), logging::LOG_TYPE_ERROR);
return -1;
}
response >> err;
if (err != ERR_OK)
{
std::ostringstream oss;
oss << "DBRM: error: SessionManager::getSystemState() failed (error " << err << ")";
log(oss.str(), logging::LOG_TYPE_ERROR);
return -1;
}
response >> stateFlags;
return 1;
}
catch (...)
{
}
return -1;
}
/* Set the shm stateflags that are set in the parameter
*/
int DBRM::setSystemState(uint32_t stateFlags) throw()
{
try
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("setSystemState");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << SET_SYSTEM_STATE << static_cast<ByteStream::quadbyte>(stateFlags);
err = send_recv(command, response);
if (err != ERR_OK)
{
std::ostringstream oss;
oss << "DBRM: error: SessionManager::setSystemState() failed (network)";
log(oss.str(), logging::LOG_TYPE_ERROR);
stateFlags = 0;
return -1;
}
response >> err;
if (err != ERR_OK)
{
std::ostringstream oss;
oss << "DBRM: error: SessionManager::setSystemState() failed (got an error)";
log(oss.str(), logging::LOG_TYPE_ERROR);
stateFlags = 0;
return -1;
}
return 1;
}
catch (...)
{
}
stateFlags = 0;
return -1;
}
/* Clear the shm stateflags that are set in the parameter
*/
int DBRM::clearSystemState(uint32_t stateFlags) throw()
{
try
{
#ifdef BRM_INFO
if (fDebug)
{
TRACER_WRITELATER("clearSystemState");
TRACER_WRITE;
}
#endif
ByteStream command, response;
uint8_t err;
command << CLEAR_SYSTEM_STATE << static_cast<ByteStream::quadbyte>(stateFlags);
err = send_recv(command, response);
if (err != ERR_OK)
{
std::ostringstream oss;
oss << "DBRM: error: SessionManager::clearSystemState() failed (network)";
log(oss.str(), logging::LOG_TYPE_ERROR);
return -1;
}
response >> err;
if (err != ERR_OK)
{
std::ostringstream oss;
oss << "DBRM: error: SessionManager::clearSystemState() failed (got an error)";
log(oss.str(), logging::LOG_TYPE_ERROR);
return -1;
}
return 1;
}
catch (...)
{
}
return -1;
}
/* Ping the controller node. Don't print anything.
*/
bool DBRM::isDBRMReady() throw()
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("isDBRMReady");
#endif
boost::mutex::scoped_lock scoped(mutex);
try
{
for (int attempt = 0; attempt < 2; ++attempt)
{
try
{
if (msgClient == NULL)
{
msgClient = MessageQueueClientPool::getInstance(masterName);
}
if (msgClient->connect())
{
return true;
}
}
catch (...)
{
}
MessageQueueClientPool::releaseInstance(msgClient);
msgClient = NULL;
sleep(1);
}
}
catch (...)
{
}
return false;
}
/* This waits for the lock up to 30 sec. After 30 sec, the assumption is something
* bad happened, and this will fix the lock state so that primproc can keep
* running. These prevent a non-critical problem anyway.
*/
void DBRM::lockLBIDRange(LBID_t start, uint32_t count)
{
bool locked = false, lockedRange = false;
LBIDRange range;
const uint32_t waitInterval = 50000; // in usec
const uint32_t maxRetries = 30000000 / waitInterval; // 30 secs
uint32_t retries = 0;
range.start = start;
range.size = count;
try
{
copylocks->lock(CopyLocks::WRITE);
locked = true;
while (copylocks->isLocked(range) && retries < maxRetries)
{
copylocks->release(CopyLocks::WRITE);
locked = false;
usleep(waitInterval);
retries++;
copylocks->lock(CopyLocks::WRITE);
locked = true;
}
if (retries >= maxRetries)
copylocks->forceRelease(range);
copylocks->lockRange(range, -1);
lockedRange = true;
copylocks->confirmChanges();
copylocks->release(CopyLocks::WRITE);
locked = false;
}
catch (...)
{
if (lockedRange)
copylocks->releaseRange(range);
if (locked)
{
copylocks->confirmChanges();
copylocks->release(CopyLocks::WRITE);
}
throw;
}
}
void DBRM::releaseLBIDRange(LBID_t start, uint32_t count)
{
bool locked = false;
LBIDRange range;
range.start = start;
range.size = count;
try
{
copylocks->lock(CopyLocks::WRITE);
locked = true;
copylocks->releaseRange(range);
copylocks->confirmChanges();
copylocks->release(CopyLocks::WRITE);
locked = false;
}
catch (...)
{
if (locked)
{
copylocks->confirmChanges();
copylocks->release(CopyLocks::WRITE);
}
throw;
}
}
/* OID Manager section */
int DBRM::allocOIDs(int num)
{
#ifdef BRM_INFO
if (fDebug)
TRACER_WRITENOW("allocOID");
#endif
ByteStream command, response;
uint8_t err;
uint32_t ret;
command << ALLOC_OIDS;
command << (uint32_t)num;
err = send_recv(command, response);
if (err != ERR_OK)
{
cerr << "DBRM: OIDManager::allocOIDs(): network error" << endl;
log("DBRM: OIDManager::allocOIDs(): network error", logging::LOG_TYPE_CRITICAL);
return -1;
}
try
{
response >> err;
if (err != ERR_OK)
return -1;
response >> ret;
CHECK_EMPTY(response);
return (int)ret;
}
catch (...)
{
log("DBRM: OIDManager::allocOIDs(): bad response", logging::LOG_TYPE_CRITICAL);
return -1;
}
}
void DBRM::returnOIDs(int start, int end)
{
ByteStream command, response;
uint8_t err;
command << RETURN_OIDS;
command << (uint32_t)start;
command << (uint32_t)end;
err = send_recv(command, response);
if (err == ERR_NETWORK)
{
cerr << "DBRM: OIDManager::returnOIDs(): network error" << endl;
log("DBRM: OIDManager::returnOIDs(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: OIDManager::returnOIDs(): network error");
}
try
{
response >> err;
CHECK_EMPTY(response);
}
catch (...)
{
err = ERR_FAILURE;
}
if (err != ERR_OK)
{
log("DBRM: OIDManager::returnOIDs() failed", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: OIDManager::returnOIDs() failed");
}
}
int DBRM::oidm_size()
{
ByteStream command, response;
uint8_t err;
uint32_t ret;
command << OIDM_SIZE;
err = send_recv(command, response);
if (err != ERR_OK)
{
cerr << "DBRM: OIDManager::size(): network error" << endl;
log("DBRM: OIDManager::size(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: OIDManager::size(): network error");
}
try
{
response >> err;
if (err == ERR_OK)
{
response >> ret;
CHECK_EMPTY(response);
return ret;
}
CHECK_EMPTY(response);
return -1;
}
catch (...)
{
log("DBRM: OIDManager::size(): bad response", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: OIDManager::size(): bad response");
}
}
int DBRM::allocVBOID(uint32_t dbroot)
{
ByteStream command, response;
uint8_t err;
uint32_t ret;
command << ALLOC_VBOID << (uint32_t)dbroot;
err = send_recv(command, response);
if (err != ERR_OK)
{
cerr << "DBRM: OIDManager::allocVBOID(): network error" << endl;
log("DBRM: OIDManager::allocVBOID(): network error", logging::LOG_TYPE_CRITICAL);
return -1;
}
try
{
response >> err;
if (err == ERR_OK)
{
response >> ret;
CHECK_EMPTY(response);
return ret;
}
CHECK_EMPTY(response);
return -1;
}
catch (...)
{
log("DBRM: OIDManager::allocVBOID(): bad response", logging::LOG_TYPE_CRITICAL);
return -1;
}
}
int DBRM::getDBRootOfVBOID(uint32_t vbOID)
{
ByteStream command, response;
uint8_t err;
uint32_t ret;
command << GETDBROOTOFVBOID << (uint32_t)vbOID;
err = send_recv(command, response);
if (err != ERR_OK)
{
cerr << "DBRM: OIDManager::getDBRootOfVBOID(): network error" << endl;
log("DBRM: OIDManager::getDBRootOfVBOID(): network error", logging::LOG_TYPE_CRITICAL);
return -1;
}
try
{
response >> err;
if (err == ERR_OK)
{
response >> ret;
CHECK_EMPTY(response);
return (int)ret;
}
CHECK_EMPTY(response);
return -1;
}
catch (...)
{
log("DBRM: OIDManager::getDBRootOfVBOID(): bad response", logging::LOG_TYPE_CRITICAL);
return -1;
}
}
vector<uint16_t> DBRM::getVBOIDToDBRootMap()
{
ByteStream command, response;
uint8_t err;
vector<uint16_t> ret;
command << GETVBOIDTODBROOTMAP;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: OIDManager::getVBOIDToDBRootMap(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): network error");
}
try
{
response >> err;
if (err != ERR_OK)
{
log("DBRM: OIDManager::getVBOIDToDBRootMap(): processing error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): processing error");
}
deserializeInlineVector<uint16_t>(response, ret);
CHECK_EMPTY(response);
return ret;
}
catch (...)
{
log("DBRM: OIDManager::getVBOIDToDBRootMap(): bad response", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: OIDManager::getVBOIDToDBRootMap(): bad response");
}
}
uint64_t DBRM::getTableLock(const vector<uint32_t>& pmList, uint32_t tableOID, string* ownerName,
uint32_t* ownerPID, int32_t* ownerSessionID, int32_t* ownerTxnID, LockState state)
{
ByteStream command, response;
uint8_t err;
uint64_t ret;
TableLockInfo tli;
uint32_t tmp32;
vector<uint32_t> dbRootsList;
OamCache* oamcache = OamCache::makeOamCache();
OamCache::PMDbrootsMap_t pmDbroots = oamcache->getPMToDbrootsMap();
int moduleId = 0;
for (uint32_t i = 0; i < pmList.size(); i++)
{
moduleId = pmList[i];
vector<int> dbroots = (*pmDbroots)[moduleId];
for (uint32_t j = 0; j < dbroots.size(); j++)
dbRootsList.push_back((uint32_t)dbroots[j]);
}
tli.id = 0;
tli.ownerName = *ownerName;
tli.ownerPID = *ownerPID;
tli.ownerSessionID = *ownerSessionID;
tli.ownerTxnID = *ownerTxnID;
tli.dbrootList = dbRootsList;
tli.state = state;
tli.tableOID = tableOID;
tli.creationTime = time(NULL);
command << GET_TABLE_LOCK << tli;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: getTableLock(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: getTableLock(): network error");
}
response >> err;
/* TODO: this means a save failure, need a specific exception type */
if (err != ERR_OK)
throw runtime_error("Table lock save file failure");
response >> ret;
if (ret == 0)
{
response >> *ownerPID;
response >> *ownerName;
response >> tmp32;
*ownerSessionID = tmp32;
response >> tmp32;
*ownerTxnID = tmp32;
}
idbassert(response.length() == 0);
return ret;
}
bool DBRM::releaseTableLock(uint64_t id)
{
ByteStream command, response;
uint8_t err;
command << RELEASE_TABLE_LOCK << id;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: releaseTableLock(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: releaseTableLock(): network error");
}
response >> err;
/* TODO: this means a save failure, need a specific exception type */
if (err != ERR_OK)
throw runtime_error("Table lock save file failure");
response >> err;
idbassert(response.length() == 0);
return (bool)err;
}
bool DBRM::changeState(uint64_t id, LockState state)
{
ByteStream command, response;
uint8_t err;
command << CHANGE_TABLE_LOCK_STATE << id << (uint32_t)state;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: changeState(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: changeState(): network error");
}
response >> err;
/* TODO: this means a save failure, need a specific exception type */
if (err != ERR_OK)
throw runtime_error("Table lock save file failure");
response >> err;
idbassert(response.length() == 0);
return (bool)err;
}
bool DBRM::changeOwner(uint64_t id, const string& ownerName, uint32_t ownerPID, int32_t ownerSessionID,
int32_t ownerTxnID)
{
ByteStream command, response;
uint8_t err;
command << CHANGE_TABLE_LOCK_OWNER << id << ownerName << ownerPID << (uint32_t)ownerSessionID
<< (uint32_t)ownerTxnID;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: changeOwner(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: changeOwner(): network error");
}
response >> err;
/* TODO: this means a save failure, need a specific exception type */
if (err != ERR_OK)
throw runtime_error("Table lock save file failure");
response >> err;
idbassert(response.length() == 0);
return (bool)err;
}
bool DBRM::checkOwner(uint64_t id)
{
ByteStream command, response;
uint8_t err;
command << OWNER_CHECK << id;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: ownerCheck(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: ownerCheck(): network error");
}
response >> err;
/* TODO: this means a save failure, need a specific exception type */
if (err != ERR_OK)
throw runtime_error("Table lock save file failure");
response >> err;
idbassert(response.length() == 0);
return (bool)err; // Return true means the owner is valid
}
vector<TableLockInfo> DBRM::getAllTableLocks()
{
ByteStream command, response;
uint8_t err;
vector<TableLockInfo> ret;
command << GET_ALL_TABLE_LOCKS;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: getAllTableLocks(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: getAllTableLocks(): network error");
}
response >> err;
if (err != ERR_OK)
{
log("DBRM: getAllTableLocks(): processing error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: getAllTableLocks(): processing error");
}
deserializeVector<TableLockInfo>(response, ret);
idbassert(response.length() == 0);
return ret;
}
void DBRM::releaseAllTableLocks()
{
ByteStream command, response;
uint8_t err;
command << RELEASE_ALL_TABLE_LOCKS;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: releaseAllTableLocks(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: releaseAllTableLocks(): network error");
}
response >> err;
idbassert(response.length() == 0);
if (err != ERR_OK)
throw runtime_error("DBRM: releaseAllTableLocks(): processing error");
}
bool DBRM::getTableLockInfo(uint64_t id, TableLockInfo* tli)
{
ByteStream command, response;
uint8_t err;
command << GET_TABLE_LOCK_INFO << id;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: getTableLockInfo(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: getTableLockInfo(): network error");
}
response >> err;
if (err != ERR_OK)
throw runtime_error("DBRM: getTableLockInfo() processing error");
response >> err;
if (err)
response >> *tli;
return (bool)err;
}
void DBRM::startAISequence(uint32_t OID, uint64_t firstNum, uint32_t colWidth,
execplan::CalpontSystemCatalog::ColDataType colDataType)
{
ByteStream command, response;
uint8_t err;
uint8_t tmp8 = colDataType;
command << START_AI_SEQUENCE << OID << firstNum << colWidth << tmp8;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: startAISequence(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: startAISequence(): network error");
}
response >> err;
idbassert(response.length() == 0);
if (err != ERR_OK)
{
log("DBRM: startAISequence(): processing error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: startAISequence(): processing error");
}
}
bool DBRM::getAIRange(uint32_t OID, uint32_t count, uint64_t* firstNum)
{
ByteStream command, response;
uint8_t err;
command << GET_AI_RANGE << OID << count;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: getAIRange(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: getAIRange(): network error");
}
response >> err;
if (err != ERR_OK)
{
log("DBRM: getAIRange(): processing error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: getAIRange(): processing error");
}
response >> err;
if (err == 0)
return false;
response >> *firstNum;
idbassert(response.length() == 0);
return true;
}
bool DBRM::getAIValue(uint32_t OID, uint64_t* value)
{
return getAIRange(OID, 0, value);
}
void DBRM::resetAISequence(uint32_t OID, uint64_t value)
{
ByteStream command, response;
uint8_t err;
command << RESET_AI_SEQUENCE << OID << value;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: resetAISequence(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: resetAISequence(): network error");
}
response >> err;
idbassert(response.length() == 0);
if (err != ERR_OK)
{
log("DBRM: resetAISequence(): processing error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: resetAISequence(): processing error");
}
}
void DBRM::getAILock(uint32_t OID)
{
ByteStream command, response;
uint8_t err;
command << GET_AI_LOCK << OID;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: getAILock(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: getAILock(): network error");
}
response >> err;
idbassert(response.length() == 0);
if (err != ERR_OK)
{
log("DBRM: getAILock(): processing error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: getAILock(): processing error");
}
}
void DBRM::releaseAILock(uint32_t OID)
{
ByteStream command, response;
uint8_t err;
command << RELEASE_AI_LOCK << OID;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM: releaseAILock(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: releaseAILock(): network error");
}
response >> err;
idbassert(response.length() == 0);
if (err != ERR_OK)
{
log("DBRM: releaseAILock(): processing error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: releaseAILock(): processing error");
}
}
void DBRM::deleteAISequence(uint32_t OID)
{
ByteStream command, response;
uint8_t err;
command << DELETE_AI_SEQUENCE << OID;
err = send_recv(command, response);
if (err != ERR_OK)
{
log("DBRM:deleteAILock(): network error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: deleteAILock(): network error");
}
response >> err;
idbassert(response.length() == 0);
if (err != ERR_OK)
{
log("DBRM: deleteAILock(): processing error", logging::LOG_TYPE_CRITICAL);
throw runtime_error("DBRM: deleteAILock(): processing error");
}
}
void DBRM::addToLBIDList(uint32_t sessionID, vector<LBID_t>& lbidList)
{
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
std::unordered_map<
execplan::CalpontSystemCatalog::OID,
std::unordered_map<execplan::CalpontSystemCatalog::OID, std::vector<struct BRM::EMEntry>>>
extentMap;
int err = 0;
std::unordered_set<LBID_t> lbidSet;
std::unordered_set<execplan::CalpontSystemCatalog::OID> tableOidSet;
for (const auto i : lbidList)
{
lbidSet.insert(i);
execplan::CalpontSystemCatalog::OID oid;
uint16_t dbRoot, segmentNum;
uint32_t partitionNum, fbo;
err = lookupLocal(i, 0, false, oid, dbRoot, partitionNum, segmentNum, fbo);
if (err)
{
ostringstream os;
std::string errorMsg;
BRM::errString(err, errorMsg);
os << "lookupLocal from extent map error encountered while looking up lbid " << i
<< " and error code is " << err << " with message " << errorMsg;
throw runtime_error(os.str());
}
execplan::CalpontSystemCatalog::OID tableOid = systemCatalogPtr->isAUXColumnOID(oid);
if (tableOid >= 3000)
{
if (tableOidSet.find(tableOid) == tableOidSet.end())
{
tableOidSet.insert(tableOid);
execplan::CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid);
execplan::CalpontSystemCatalog::RIDList tableColRidList = systemCatalogPtr->columnRIDs(tableName);
for (unsigned j = 0; j < tableColRidList.size(); j++)
{
auto objNum = tableColRidList[j].objnum;
err = getExtents(objNum, extentMap[tableOid][objNum]);
if (err)
{
ostringstream os;
os << "BRM lookup error. Could not get extents for OID " << objNum;
throw runtime_error(os.str());
}
}
}
uint32_t extentNumAux = fbo / ((getExtentRows() * execplan::AUX_COL_WIDTH) / BLOCK_SIZE);
for (auto iter = extentMap[tableOid].begin(); iter != extentMap[tableOid].end(); iter++)
{
const std::vector<struct BRM::EMEntry>& extents = iter->second;
for (const struct BRM::EMEntry& extent : extents)
{
uint32_t extentNum = extent.blockOffset / (extent.range.size * 1024);
if (dbRoot == extent.dbRoot && partitionNum == extent.partitionNum &&
segmentNum == extent.segmentNum && extentNumAux == extentNum)
{
lbidSet.insert(extent.range.start);
break;
}
}
}
}
}
lbidList.clear();
for (auto iter = lbidSet.begin(); iter != lbidSet.end(); iter++)
{
lbidList.push_back(*iter);
}
// Sort the vector.
std::sort<vector<LBID_t>::iterator>(lbidList.begin(), lbidList.end());
}
void DBRM::invalidateUncommittedExtentLBIDs(execplan::CalpontSystemCatalog::SCN txnid, bool allExtents,
vector<LBID_t>* plbidList)
{
// Here we want to minimize the number of calls to dbrm
// Given that, and the fact that we need to know the column type
// in order to set the invalid min and max correctly in the extents,
// We do the following:
// 1) Maintain a vector of all extents we've looked at.
// 2) Get the list of uncommitted lbids for the transaction.
// 3) Look in that list to see if we've already looked at this extent.
// 4) If not,
// a) lookup the min and max lbid for the extent it belongs to
// b) lookup the column oid for that lbid
// c) add to the vector of extents
// 5) Create a list of CPInfo structures with the first lbid and col type of each extent
// 6) Lookup the column type for each retrieved oid.
// 7) mark each extent invalid, just like we would during update. This sets the proper
// min and max (and set the state to CP_UPDATING.
// 6) Call setExtentsMaxMin to set the state to CP_INVALID.
vector<LBID_t> localLBIDList;
boost::shared_ptr<execplan::CalpontSystemCatalog> csc;
CPInfoList_t cpInfos;
CPInfo aInfo;
int oid;
uint16_t dbRoot;
uint32_t partitionNum;
uint16_t segmentNum;
uint32_t fileBlockOffset;
// 2) Get the list of uncommitted lbids for the transaction, if we weren't given one.
if (plbidList == NULL)
{
getUncommittedExtentLBIDs(static_cast<VER_t>(txnid), localLBIDList);
try
{
addToLBIDList(0, localLBIDList);
}
catch (exception& e)
{
cerr << e.what() << endl;
return;
}
catch (...)
{
cerr << "invalidateUncommittedExtentLBIDs: caught an exception" << endl;
return;
}
plbidList = &localLBIDList;
}
if (plbidList->size() == 0)
{
return; // Nothing to do.
}
vector<LBID_t>::const_iterator iter = plbidList->begin();
vector<LBID_t>::const_iterator end = plbidList->end();
csc = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog();
for (; iter != end; ++iter)
{
LBID_t lbid = *iter;
aInfo.firstLbid = lbid;
// lookup the column oid for that lbid (all we care about is oid here)
if (em->lookupLocal(lbid, oid, dbRoot, partitionNum, segmentNum, fileBlockOffset) == 0)
{
execplan::CalpontSystemCatalog::ColType colType = csc->colType(oid);
bool isBinaryColumn = colType.colWidth > 8;
aInfo.isBinaryColumn = isBinaryColumn;
if (!isBinaryColumn)
{
if (datatypes::isUnsigned(colType.colDataType))
{
aInfo.max = 0;
aInfo.min = numeric_limits<uint64_t>::max();
}
else
{
aInfo.max = numeric_limits<int64_t>::min();
aInfo.min = numeric_limits<int64_t>::max();
}
}
else
{
if (datatypes::isUnsigned(colType.colDataType))
{
aInfo.bigMax = 0;
aInfo.bigMin = -1;
}
else
{
utils::int128Min(aInfo.bigMax);
utils::int128Max(aInfo.bigMin);
}
}
}
else
{
// We have a problem, but we need to put something in. This should never happen.
aInfo.max = numeric_limits<int64_t>::min();
aInfo.min = numeric_limits<int64_t>::max();
// MCOL-641 is this correct?
aInfo.isBinaryColumn = false;
}
aInfo.seqNum = allExtents ? SEQNUM_MARK_INVALID_SET_RANGE : SEQNUM_MARK_UPDATING_INVALID_SET_RANGE;
cpInfos.push_back(aInfo);
}
// Call setExtentsMaxMin to invalidate and set the proper max/min in each extent
setExtentsMaxMin(cpInfos);
}
size_t DBRM::EMIndexShmemSize()
{
return em->EMIndexShmemSize();
}
size_t DBRM::EMIndexShmemFree()
{
return em->EMIndexShmemFree();
}
template int DBRM::getExtentMaxMin<int128_t>(const LBID_t lbid, int128_t& max, int128_t& min,
int32_t& seqNum);
template int DBRM::getExtentMaxMin<int64_t>(const LBID_t lbid, int64_t& max, int64_t& min, int32_t& seqNum);
} // namespace BRM