1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00

962 lines
24 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id: dmlpackageprocessor.cpp 9673 2013-07-09 15:59:49Z chao $
*
*
***********************************************************************/
#include "dmlpackageprocessor.h"
#include <math.h>
using namespace std;
#include <boost/algorithm/string/case_conv.hpp>
using namespace boost::algorithm;
#include <boost/tokenizer.hpp>
#include <boost/shared_ptr.hpp>
#include "we_messages.h"
using namespace WriteEngine;
using namespace dmlpackage;
#include "calpontselectexecutionplan.h"
#include "simplecolumn.h"
#include "constantcolumn.h"
#include "simplefilter.h"
#include "constantfilter.h"
#include "columnresult.h"
using namespace execplan;
using namespace logging;
#include "configcpp.h"
using namespace config;
#include "joblistfactory.h"
#include "joblist.h"
#include "distributedenginecomm.h"
using namespace joblist;
#include "bytestream.h"
#include "messagequeue.h"
using namespace messageqcpp;
#include "tablelockdata.h"
#include "exceptclasses.h"
namespace
{
using namespace execplan;
const SOP opeq(new Operator("="));
const SOP opne(new Operator("<>"));
const SOP opor(new Operator("or"));
const SOP opand(new Operator("and"));
} // namespace
namespace dmlpackageprocessor
{
DMLPackageProcessor::~DMLPackageProcessor()
{
// cout << "In DMLPackageProcessor destructor " << this << endl;
if (fWEClient)
delete fWEClient;
if (fExeMgr)
delete fExeMgr;
}
//@bug 397
void DMLPackageProcessor::cleanString(string& s)
{
string::size_type pos = s.find_first_not_of(" ");
// stripe off space and ' or '' at beginning and end
if (pos < s.length())
{
s = s.substr(pos, s.length() - pos);
if ((pos = s.find_last_of(" ")) < s.length())
{
s = s.substr(0, pos);
}
}
if (s[0] == '\'')
{
s = s.substr(1, s.length() - 2);
if (s[0] == '\'')
s = s.substr(1, s.length() - 2);
}
}
#if 0
boost::any DMLPackageProcessor::tokenizeData( execplan::CalpontSystemCatalog::SCN txnID,
execplan::CalpontSystemCatalog::ColType colType,
const std::string& data, DMLResult& result, bool isNULL )
{
SUMMARY_INFO("DMLPackageProcessor::tokenizeData");
bool retval = true;
boost::any value;
if (isNULL)
{
WriteEngine::Token nullToken;
value = nullToken;
}
else
{
if ( data.length() > (unsigned int)colType.colWidth )
{
retval = false;
// build the logging message
logging::Message::Args args;
logging::Message message(6);
args.add("Insert value is too large for colum ");
message.format( args );
result.result = INSERT_ERROR;
result.message = message;
}
else
{
//Tokenize the data value
WriteEngine::DctnryStruct dictStruct;
dictStruct.dctnryOid = colType.ddn.dictOID;
//cout << "Dictionary OIDs: " << colType.ddn.treeOID << " " << colType.ddn.listOID << endl;
WriteEngine::DctnryTuple dictTuple;
dictTuple.sigValue = data.c_str();
dictTuple.sigSize = data.length();
int error = NO_ERROR;
if ( NO_ERROR != (error = fWriteEngine.tokenize( txnID, dictStruct, dictTuple)) )
{
retval = false;
//cout << "Error code from WE: " << error << endl;
// build the logging message
logging::Message::Args args;
logging::Message message(1);
args.add("Tokenization failed on: ");
args.add(data);
args.add("error number: ");
args.add( error );
message.format( args );
result.result = TOKEN_ERROR;
result.message = message;
}
WriteEngine::Token aToken = dictTuple.token;
value = aToken;
}
}
return value;
}
#endif
void DMLPackageProcessor::getColumnsForTable(uint32_t sessionID, std::string schema, std::string table,
dmlpackage::ColumnList& colList)
{
CalpontSystemCatalog::TableName tableName;
tableName.schema = schema;
tableName.table = table;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(tableName, true);
CalpontSystemCatalog::RIDList::const_iterator rid_iterator = ridList.begin();
while (rid_iterator != ridList.end())
{
CalpontSystemCatalog::ROPair roPair = *rid_iterator;
DMLColumn* columnPtr = new DMLColumn();
CalpontSystemCatalog::TableColName tblColName = systemCatalogPtr->colName(roPair.objnum);
columnPtr->set_Name(tblColName.column);
colList.push_back(columnPtr);
++rid_iterator;
}
}
char* DMLPackageProcessor::strlower(char* in)
{
char* p = in;
if (p)
{
while (*p)
{
*p = tolower(*p);
p++;
}
}
return in;
}
void DMLPackageProcessor::convertRidToColumn(uint64_t& rid, unsigned& dbRoot, unsigned& partition,
unsigned& segment, unsigned filesPerColumnPartition,
unsigned extentsPerSegmentFile, unsigned extentRows,
unsigned startDBRoot, unsigned dbrootCnt,
const unsigned startPartitionNum)
{
partition = rid / (filesPerColumnPartition * extentsPerSegmentFile * extentRows);
segment = (((rid % (filesPerColumnPartition * extentsPerSegmentFile * extentRows)) / extentRows)) %
filesPerColumnPartition;
dbRoot = ((startDBRoot - 1 + segment) % dbrootCnt) + 1;
// Calculate the relative rid for this segment file
uint64_t relRidInPartition = rid - ((uint64_t)partition * (uint64_t)filesPerColumnPartition *
(uint64_t)extentsPerSegmentFile * (uint64_t)extentRows);
idbassert(relRidInPartition <=
(uint64_t)filesPerColumnPartition * (uint64_t)extentsPerSegmentFile * (uint64_t)extentRows);
uint32_t numExtentsInThisPart = relRidInPartition / extentRows;
unsigned numExtentsInThisSegPart = numExtentsInThisPart / filesPerColumnPartition;
uint64_t relRidInThisExtent = relRidInPartition - numExtentsInThisPart * extentRows;
rid = relRidInThisExtent + numExtentsInThisSegPart * extentRows;
}
string DMLPackageProcessor::projectTableErrCodeToMsg(uint32_t ec)
{
if (ec < 1000) // pre IDB error code
{
ErrorCodes ecObj;
string errMsg("Statement failed.");
errMsg += ecObj.errorString(ec).substr(150); // substr removes ErrorCodes::fPreamble
return errMsg;
}
// IDB error
return IDBErrorInfo::instance()->errorMsg(ec);
}
bool DMLPackageProcessor::validateVarbinaryVal(std::string& inStr)
{
bool invalid = false;
for (unsigned i = 0; i < inStr.length(); i++)
{
if (!isxdigit(inStr[i]))
{
invalid = true;
break;
}
}
return invalid;
}
int DMLPackageProcessor::commitTransaction(uint64_t uniqueId, BRM::TxnID txnID)
{
int rc = fDbrm->vbCommit(txnID.id);
return rc;
}
// Tries to rollback transaction, if network error tries one more time
// MCOL-5263.
int32_t DMLPackageProcessor::tryToRollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
string& errorMsg)
{
auto weRc = rollBackTransaction(uniqueId, txnID, sessionID, errorMsg);
if (weRc)
{
weRc = rollBackTransaction(uniqueId, txnID, sessionID, errorMsg);
if (weRc == 0)
{
// Setup connection in WE with PS.
joblist::ResourceManager* rm = joblist::ResourceManager::instance(true);
joblist::DistributedEngineComm* fEc = joblist::DistributedEngineComm::instance(rm);
weRc = fEc->Setup();
}
}
return weRc;
}
DMLPackageProcessor::DMLResult DMLPackageProcessor::processPackage(dmlpackage::CalpontDMLPackage& cpackage)
{
auto result = processPackageInternal(cpackage);
uint32_t tries = 0;
// Try to setup connection and process package one more time.
while ((result.result == PP_LOST_CONNECTION) && (tries < 5))
{
std::cerr << "DMLPackageProcessor: NETWORK ERROR; attempt # " << tries << std::endl;
joblist::ResourceManager* rm = joblist::ResourceManager::instance(true);
joblist::DistributedEngineComm* fEc = joblist::DistributedEngineComm::instance(rm);
if (fEc->Setup())
return result;
result = processPackageInternal(cpackage);
++tries;
}
return result;
}
bool DMLPackageProcessor::checkPPLostConnection(std::exception& ex)
{
std::string error = ex.what();
return error.find(PPLostConnectionErrorCode) != std::string::npos;
}
int DMLPackageProcessor::rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
std::string& errorMsg)
{
std::vector<BRM::LBID_t> lbidList;
std::vector<BRM::LBIDRange> lbidRangeList;
BRM::LBIDRange range;
int rc = 0;
// Check BRM status before processing.
rc = fDbrm->isReadWrite();
if (rc != 0)
{
std::string brmMsg;
errorMsg = "Can't read DBRM isReadWrite [ ";
BRM::errString(rc, brmMsg);
errorMsg += brmMsg;
errorMsg += "]";
return rc;
}
ByteStream bytestream;
fWEClient->addQueue(uniqueId);
// cout << "adding to queue with id " << uniqueId << endl;
bytestream << (ByteStream::byte)WE_SVR_ROLLBACK_BLOCKS;
bytestream << uniqueId;
bytestream << sessionID;
bytestream << (uint32_t)txnID.id;
uint32_t msgRecived = 0;
try
{
fWEClient->write_to_all(bytestream);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
bsIn.reset(new ByteStream());
ByteStream::byte tmp8;
while (1)
{
if (msgRecived == fWEClient->getPmCount())
break;
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
errorMsg = "Network error reading WEClient";
fWEClient->removeQueue(uniqueId);
// cout << "erroring out remove queue id " << uniqueId << endl;
break;
}
else
{
*bsIn >> tmp8;
rc = tmp8;
if (rc != 0)
{
char szrc[20];
*bsIn >> errorMsg;
errorMsg += " (WriteEngine returns error ";
sprintf(szrc, "%d", rc);
errorMsg += szrc;
errorMsg += ")";
fWEClient->removeQueue(uniqueId);
cout << "erroring out remove queue id " << uniqueId << endl;
break;
}
else
msgRecived++;
}
}
}
catch (std::exception& e)
{
rc = NETWORK_ERROR;
errorMsg = "Network error occurred when rolling back blocks";
errorMsg += e.what();
fWEClient->removeQueue(uniqueId);
cout << "erroring out remove queue id " << uniqueId << endl;
// delete fWEClient;
return rc;
}
catch (...)
{
rc = NETWORK_ERROR;
errorMsg = "Unknown exception caught while rolling back transaction.";
fWEClient->removeQueue(uniqueId);
cout << "erroring out remove queue id " << uniqueId << endl;
// delete fWEClient;
return rc;
}
if (rc != 0)
{
// delete fWEClient;
return rc;
}
fWEClient->removeQueue(uniqueId);
// delete fWEClient;
// cout << "success. remove queue id " << uniqueId << endl;
rc = fDbrm->getUncommittedLBIDs(txnID.id, lbidList);
if (rc != 0)
{
std::string brmMsg;
errorMsg = "DBRM getUncommittedLBIDs [ ";
BRM::errString(rc, brmMsg);
errorMsg += brmMsg;
errorMsg += "]";
return rc;
}
for (size_t i = 0; i < lbidList.size(); i++)
{
range.start = lbidList[i];
range.size = 1;
lbidRangeList.push_back(range);
}
rc = fDbrm->vbRollback(txnID.id, lbidRangeList);
if (rc != 0)
{
std::string brmMsg;
errorMsg = "DBRM vbRollback [ ";
BRM::errString(rc, brmMsg);
errorMsg += brmMsg;
errorMsg += "]";
return rc;
}
return rc;
}
int DMLPackageProcessor::commitBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID,
const uint32_t tableOid, std::string& errorMsg)
{
// collect hwm info from all pms and set them here. remove table metadata if all successful
ByteStream bytestream;
fWEClient->addQueue(uniqueId);
bytestream << (ByteStream::byte)WE_SVR_COMMIT_BATCH_AUTO_ON;
bytestream << uniqueId;
bytestream << (uint32_t)txnID.id;
bytestream << tableOid;
bytestream << fSessionID;
uint32_t msgRecived = 0;
fWEClient->write_to_all(bytestream);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
bsIn.reset(new ByteStream());
int rc = 0;
ByteStream::byte tmp8;
typedef std::vector<BRM::BulkSetHWMArg> BulkSetHWMArgs;
std::vector<BulkSetHWMArgs> hwmArgsAllPms;
while (1)
{
if (msgRecived == fWEClient->getPmCount())
break;
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
fWEClient->removeQueue(uniqueId);
break;
}
else
{
*bsIn >> tmp8;
rc = tmp8;
if (rc != 0)
{
*bsIn >> errorMsg;
fWEClient->removeQueue(uniqueId);
break;
}
else
{
// get hwm info
*bsIn >> errorMsg;
BulkSetHWMArgs setHWMArgs;
// cout << "received from WES bytestream length = " << bsIn->length() << endl;
deserializeInlineVector(*(bsIn.get()), setHWMArgs);
// cout << "get hwm info from WES size " << setHWMArgs.size() << endl;
hwmArgsAllPms.push_back(setHWMArgs);
msgRecived++;
}
}
}
if (rc != 0)
return rc;
// set hwm
std::vector<BRM::BulkSetHWMArg> allHwm;
BulkSetHWMArgs::const_iterator itor;
// cout << "total hwmArgsAllPms size " << hwmArgsAllPms.size() << endl;
for (unsigned i = 0; i < fWEClient->getPmCount(); i++)
{
itor = hwmArgsAllPms[i].begin();
while (itor != hwmArgsAllPms[i].end())
{
allHwm.push_back(*itor);
// cout << "received hwm info: " << itor->oid << ":" << itor->hwm << endl;
itor++;
}
}
// set CP data before hwm.
// cout << "setting hwm allHwm size " << allHwm.size() << endl;
BRM::CPInfoList_t cpInfos;
std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
rc = fDbrm->bulkSetHWMAndCP(allHwm, cpInfos, mergeCPDataArgs, txnID.id);
fDbrm->takeSnapshot();
// Set tablelock to rollforward remove meta files
if (rc != 0)
return rc;
bool stateChanged = true;
TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
uint64_t tablelockId = tablelockData->getTablelockId(tableOid);
try
{
stateChanged = fDbrm->changeState(tablelockId, BRM::CLEANUP);
}
catch (std::exception&)
{
errorMsg = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
stateChanged = false;
}
if (!stateChanged)
return rc;
bytestream.restart();
//@Bug 4517 Remove meta data failure doesn't stop tablelock releasing.
bytestream << (ByteStream::byte)WE_SVR_BATCH_AUTOON_REMOVE_META;
bytestream << uniqueId;
bytestream << tableOid;
msgRecived = 0;
fWEClient->write_to_all(bytestream);
while (1)
{
if (msgRecived == fWEClient->getPmCount())
break;
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
fWEClient->removeQueue(uniqueId);
break;
}
else
{
*bsIn >> tmp8;
msgRecived++;
}
}
return rc;
}
int DMLPackageProcessor::rollBackBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID,
uint32_t sessionID, const uint32_t tableOid,
std::string& errorMsg)
{
// Bulkrollback, rollback blocks, vbrollback, change state, remove meta file
// cout << "In rollBackBatchAutoOnTransaction" << endl;
std::vector<BRM::TableLockInfo> tableLocks;
tableLocks = fDbrm->getAllTableLocks();
// cout << " Got all tablelocks" << endl;
unsigned idx = 0;
string ownerName("DMLProc batchinsert");
uint64_t tableLockId = 0;
int rc = 0;
for (; idx < tableLocks.size(); idx++)
{
if ((tableLocks[idx].ownerName == ownerName) && (tableLocks[idx].tableOID == tableOid))
{
tableLockId = tableLocks[idx].id;
break;
}
}
if ((tableLockId == 0) || (tableOid == 0))
{
// table is not locked by DMLProc. Could happen if we failed to get lock
// while inserting. Not an error during rollback, but we don't
// want to do anything.
return rc;
}
// cout << "sending to WES" << endl;
ByteStream bytestream;
fWEClient->addQueue(uniqueId);
// cout << "adding queue id " << uniqueId << endl;
bytestream << (ByteStream::byte)WE_SVR_ROLLBACK_BATCH_AUTO_ON;
bytestream << uniqueId;
bytestream << sessionID;
bytestream << tableLockId;
bytestream << tableOid;
uint32_t msgRecived = 0;
fWEClient->write_to_all(bytestream);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
bsIn.reset(new ByteStream());
ByteStream::byte tmp8;
// cout << "waiting for reply from WES" << endl;
while (1)
{
if (msgRecived == fWEClient->getPmCount())
break;
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
fWEClient->removeQueue(uniqueId);
// cout << "erroring out remove queue id " << uniqueId << endl;
break;
}
else
{
*bsIn >> tmp8;
rc = tmp8;
if (rc != 0)
{
*bsIn >> errorMsg;
fWEClient->removeQueue(uniqueId);
// cout << "erroring out remove queue id " << uniqueId << endl;
break;
}
else
msgRecived++;
}
}
if (rc == 0) // change table lock state
{
bool stateChanged = true;
// cout << "changing tablelock state" << endl;
try
{
stateChanged = fDbrm->changeState(tableLockId, BRM::CLEANUP);
}
catch (std::exception&)
{
errorMsg = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
stateChanged = false;
}
if (!stateChanged)
{
rc = 1;
}
}
if (rc != 0)
return rc;
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_BATCH_AUTOON_REMOVE_META;
bytestream << uniqueId;
bytestream << tableOid;
msgRecived = 0;
fWEClient->write_to_all(bytestream);
while (1)
{
if (msgRecived == fWEClient->getPmCount())
break;
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
fWEClient->removeQueue(uniqueId);
// cout << "erroring out remove queue id " << uniqueId << endl;
break;
}
else
{
*bsIn >> tmp8;
msgRecived++;
}
}
fWEClient->removeQueue(uniqueId);
return rc;
}
int DMLPackageProcessor::commitBatchAutoOffTransaction(uint64_t uniqueId, BRM::TxnID txnID,
const uint32_t tableOid, std::string& errorMsg)
{
std::vector<BRM::TableLockInfo> tableLocks;
tableLocks = fDbrm->getAllTableLocks();
// cout << " Got all tablelocks" << endl;
unsigned idx = 0;
string ownerName("DMLProc batchinsert");
uint64_t tableLockId = 0;
int rc = 0;
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
bsIn.reset(new ByteStream());
ByteStream::byte tmp8;
for (; idx < tableLocks.size(); idx++)
{
if ((tableLocks[idx].ownerName == ownerName) && (tableLocks[idx].tableOID == tableOid))
{
tableLockId = tableLocks[idx].id;
break;
}
}
if ((tableLockId == 0) || (tableOid == 0))
{
// table is not locked by DMLProc. Could happen if we failed to get lock
// while inserting. Not an error during rollback, but we don't
// want to do anything.
return rc;
}
bool stateChanged = true;
// cout << "changing tablelock state" << endl;
try
{
stateChanged = fDbrm->changeState(tableLockId, BRM::CLEANUP);
}
catch (std::exception&)
{
errorMsg = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
stateChanged = false;
}
if (!stateChanged)
{
rc = 1;
}
if (rc != 0)
return rc;
ByteStream bytestream;
fWEClient->addQueue(uniqueId);
bytestream << (ByteStream::byte)WE_SVR_BATCH_AUTOON_REMOVE_META;
bytestream << uniqueId;
bytestream << tableOid;
uint32_t msgRecived = 0;
fWEClient->write_to_all(bytestream);
while (1)
{
if (msgRecived == fWEClient->getPmCount())
break;
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
fWEClient->removeQueue(uniqueId);
break;
}
else
{
*bsIn >> tmp8;
msgRecived++;
}
}
fWEClient->removeQueue(uniqueId);
return rc;
}
int DMLPackageProcessor::rollBackBatchAutoOffTransaction(uint64_t uniqueId, BRM::TxnID txnID,
uint32_t sessionID, const uint32_t tableOid,
std::string& errorMsg)
{
ByteStream bytestream;
fWEClient->addQueue(uniqueId);
bytestream << (ByteStream::byte)WE_SVR_ROLLBACK_BATCH_AUTO_OFF;
bytestream << uniqueId;
bytestream << sessionID;
bytestream << (uint32_t)txnID.id;
bytestream << tableOid;
uint32_t msgRecived = 0;
fWEClient->write_to_all(bytestream);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
bsIn.reset(new ByteStream());
int rc = 0;
ByteStream::byte tmp8;
while (1)
{
if (msgRecived == fWEClient->getPmCount())
break;
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
fWEClient->removeQueue(uniqueId);
break;
}
else
{
*bsIn >> tmp8;
rc = tmp8;
if (rc != 0)
{
*bsIn >> errorMsg;
fWEClient->removeQueue(uniqueId);
break;
}
else
msgRecived++;
}
}
return rc;
}
int DMLPackageProcessor::flushDataFiles(int rcIn, std::map<FID, FID>& columnOids, uint64_t uniqueId,
BRM::TxnID txnID, uint32_t tableOid)
{
// cout <<"in flushDataFiles" << endl;
ByteStream bytestream;
bytestream << (ByteStream::byte)WE_SVR_FLUSH_FILES;
bytestream << uniqueId;
bytestream << (uint32_t)rcIn;
bytestream << (uint32_t)txnID.id;
bytestream << tableOid;
uint32_t msgRecived = 0;
fWEClient->write_to_all(bytestream);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
bsIn.reset(new ByteStream());
int rc = 0;
ByteStream::byte tmp8;
std::string errorMsg;
try
{
while (1)
{
if (msgRecived == fWEClient->getPmCount())
break;
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
break;
}
else
{
*bsIn >> tmp8;
rc = tmp8;
if (rc != 0)
{
*bsIn >> errorMsg;
break;
}
else
msgRecived++;
}
}
}
catch (std::exception&)
{
}
return rc;
}
int DMLPackageProcessor::endTransaction(uint64_t uniqueId, BRM::TxnID txnID, bool success)
{
// cout <<"in flushDataFiles" << endl;
ByteStream bytestream;
bytestream << (ByteStream::byte)WE_END_TRANSACTION;
bytestream << uniqueId;
bytestream << (uint32_t)txnID.id;
bytestream << (ByteStream::byte)success;
uint32_t msgRecived = 0;
fWEClient->write_to_all(bytestream);
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
bsIn.reset(new ByteStream());
int rc = 0;
ByteStream::byte tmp8;
std::string errorMsg;
try
{
while (1)
{
if (msgRecived == fWEClient->getPmCount())
break;
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = NETWORK_ERROR;
break;
}
else
{
*bsIn >> tmp8;
rc = tmp8;
if (rc != 0)
{
*bsIn >> errorMsg;
break;
}
else
msgRecived++;
}
}
}
catch (std::exception&)
{
}
return rc;
}
} // namespace dmlpackageprocessor