1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/dbcon/dmlpackageproc/updatepackageprocessor.cpp
Serguey Zefirov 5aa2a824c2 feat(MCOL-6082): Multiple readers of dbroots using OamCache logic
This patch introduces centralized logic of selecting what dbroot is
accessible in PrimProc on what node. The logic is in OamCache for time
being and can be moved later.
2025-07-21 14:32:39 +03:00

1032 lines
29 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: updatepackageprocessor.cpp 9673 2013-07-09 15:59:49Z chao $
#include <iostream>
#include <fstream>
#include <ctype.h>
#include <string>
// #define NDEBUG
#include <cassert>
#include <map>
#include <boost/scoped_ptr.hpp>
using namespace std;
#include "updatepackageprocessor.h"
#include "writeengine.h"
#include "joblistfactory.h"
#include "messagelog.h"
#include "simplecolumn.h"
#include "sqllogger.h"
#include "stopwatch.h"
#include "dbrm.h"
#include "idberrorinfo.h"
#include "errorids.h"
#include "rowgroup.h"
#include "bytestream.h"
#include "calpontselectexecutionplan.h"
#include "autoincrementdata.h"
#include "columnresult.h"
#include "we_messages.h"
#include "tablelockdata.h"
#include "oamcache.h"
using namespace WriteEngine;
using namespace dmlpackage;
using namespace execplan;
using namespace logging;
using namespace dataconvert;
using namespace joblist;
using namespace rowgroup;
using namespace messageqcpp;
using namespace BRM;
using namespace oam;
// #define PROFILE 1
namespace dmlpackageprocessor
{
// StopWatch timer;
DMLPackageProcessor::DMLResult UpdatePackageProcessor::processPackageInternal(
dmlpackage::CalpontDMLPackage& cpackage)
{
SUMMARY_INFO("UpdatePackageProcessor::processPackage");
std::string err;
DMLResult result;
result.result = NO_ERROR;
result.rowCount = 0;
BRM::TxnID txnid;
// set-up the transaction
txnid.id = cpackage.get_TxnID();
txnid.valid = true;
fSessionID = cpackage.get_SessionID();
VERBOSE_INFO("Processing Update DML Package...");
TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
uint64_t uniqueId = 0;
// Bug 5070. Added exception handling
try
{
uniqueId = fDbrm->getUnique64();
}
catch (std::exception& ex)
{
logging::Message::Args args;
logging::Message message(9);
args.add(ex.what());
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
fSessionManager.rolledback(txnid);
return result;
}
catch (...)
{
logging::Message::Args args;
logging::Message message(9);
args.add("Unknown error occurred while getting unique number.");
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
fSessionManager.rolledback(txnid);
return result;
}
uint64_t tableLockId = 0;
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
CalpontSystemCatalog::TableName tableName;
// get the table object from the package
DMLTable* tablePtr = cpackage.get_Table();
tableName.schema = tablePtr->get_SchemaName();
tableName.table = tablePtr->get_TableName();
fWEClient->addQueue(uniqueId);
execplan::CalpontSystemCatalog::ROPair roPair;
// #ifdef PROFILE
// StopWatch timer;
// #endif
try
{
LoggingID logid(DMLLoggingId, fSessionID, txnid.id);
logging::Message::Args args1;
logging::Message msg(1);
args1.add("Start SQL statement: ");
ostringstream oss;
oss << cpackage.get_SQLStatement() << "|" << tablePtr->get_SchemaName() << "|";
args1.add(oss.str());
msg.format(args1);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
VERBOSE_INFO("The table name is:");
VERBOSE_INFO(tablePtr->get_TableName());
if (0 != tablePtr)
{
// get the row(s) from the table
RowList rows = tablePtr->get_RowList();
if (rows.size() == 0)
{
SUMMARY_INFO("No row to update!");
fWEClient->removeQueue(uniqueId);
return result;
}
roPair = systemCatalogPtr->tableRID(tableName);
tableLockId = tablelockData->getTablelockId(
roPair.objnum); // check whether this table is locked already for this session
if (tableLockId == 0)
{
// cout << "tablelock is not found in cache, getting from dbrm" << endl;
uint32_t processID = ::getpid();
int32_t txnId = txnid.id;
int32_t sessionId = fSessionID;
std::string processName("DMLProc");
int i = 0;
std::vector<int> pmList = oamcache()->getModuleIds();
std::vector<uint32_t> pms;
for (unsigned i = 0; i < pmList.size(); i++)
{
pms.push_back((uint32_t)pmList[i]);
}
try
{
tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId, &txnId,
BRM::LOADING);
}
catch (std::exception&)
{
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
}
if (tableLockId == 0)
{
int waitPeriod = 10;
int sleepTime = 100; // sleep 100 milliseconds between checks
int numTries = 10; // try 10 times per second
waitPeriod = Config::getWaitPeriod();
numTries = waitPeriod * 10;
struct timespec rm_ts;
rm_ts.tv_sec = sleepTime / 1000;
rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
for (; i < numTries; i++)
{
struct timespec abs_ts;
do
{
abs_ts.tv_sec = rm_ts.tv_sec;
abs_ts.tv_nsec = rm_ts.tv_nsec;
} while (nanosleep(&abs_ts, &rm_ts) < 0);
try
{
processID = ::getpid();
txnId = txnid.id;
sessionId = fSessionID;
processName = "DMLProc";
tableLockId = fDbrm->getTableLock(pms, roPair.objnum, &processName, &processID, &sessionId,
&txnId, BRM::LOADING);
}
catch (std::exception&)
{
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
}
if (tableLockId > 0)
break;
}
if (i >= numTries) // error out
{
result.result = UPDATE_ERROR;
logging::Message::Args args;
string strOp("update");
args.add(strOp);
args.add(processName);
args.add((uint64_t)processID);
args.add(sessionId);
throw std::runtime_error(IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
}
}
}
// cout << " tablelock is obtained with id " << tableLockId << endl;
tablelockData->setTablelock(roPair.objnum, tableLockId);
//@Bug 4491 start AI sequence for autoincrement column
const CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(tableName);
CalpontSystemCatalog::RIDList::const_iterator rid_iterator = ridList.begin();
CalpontSystemCatalog::ColType colType;
while (rid_iterator != ridList.end())
{
// If user hit ctrl+c in the mysql console, this will be true.
if (fRollbackPending)
{
result.result = JOB_CANCELED;
break;
}
CalpontSystemCatalog::ROPair roPair = *rid_iterator;
colType = systemCatalogPtr->colType(roPair.objnum);
if (colType.autoincrement)
{
try
{
uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
fDbrm->startAISequence(roPair.objnum, nextVal, colType.colWidth, colType.colDataType);
break; // Only one autoincrement column per table
}
catch (std::exception& ex)
{
result.result = UPDATE_ERROR;
throw std::runtime_error(ex.what());
}
}
++rid_iterator;
}
uint64_t rowsProcessed = 0;
if (!fRollbackPending)
{
rowsProcessed = fixUpRows(cpackage, result, uniqueId, roPair.objnum);
}
//@Bug 4994 Cancelled job is not error
if (result.result == JOB_CANCELED)
throw std::runtime_error("Query execution was interrupted");
if ((result.result != 0) && (result.result != DMLPackageProcessor::IDBRANGE_WARNING))
throw std::runtime_error(result.message.msg());
result.rowCount = rowsProcessed;
// Log the update statement.
LoggingID logid(DMLLoggingId, fSessionID, txnid.id);
logging::Message::Args args1;
logging::Message msg(1);
args1.add("End SQL statement");
msg.format(args1);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
logging::logDML(cpackage.get_SessionID(), txnid.id, cpackage.get_SQLStatement(),
cpackage.get_SchemaName());
}
}
catch (std::exception& ex)
{
if (checkPPLostConnection(ex))
{
result.result = PP_LOST_CONNECTION;
}
else
{
cerr << "UpdatePackageProcessor::processPackage:" << ex.what() << endl;
if (result.result == 0)
{
result.result = UPDATE_ERROR;
}
result.message = Message(ex.what());
result.rowCount = 0;
LoggingID logid(DMLLoggingId, fSessionID, txnid.id);
logging::Message::Args args1;
logging::Message msg(1);
args1.add("End SQL statement with error");
msg.format(args1);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
}
}
catch (...)
{
cerr << "UpdatePackageProcessor::processPackage: caught unknown exception!" << endl;
logging::Message::Args args;
logging::Message message(7);
args.add("Update Failed: ");
args.add("encountered unkown exception");
args.add("");
args.add("");
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
result.rowCount = 0;
LoggingID logid(DMLLoggingId, fSessionID, txnid.id);
logging::Message::Args args1;
logging::Message msg(1);
args1.add("End SQL statement with error");
msg.format(args1);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
}
// timer.finish();
//@Bug 1886,2870 Flush VM cache only once per statement. send to all PMs.
// WriteEngineWrapper writeEngine;
std::map<uint32_t, uint32_t> oids;
int rc = 0;
if (result.result == NO_ERROR || result.result == IDBRANGE_WARNING)
{
if ((rc = flushDataFiles(NO_ERROR, oids, uniqueId, txnid, roPair.objnum)) != NO_ERROR)
{
cerr << "UpdatePackageProcessor::processPackage: write data to disk failed" << endl;
if (!fRollbackPending)
{
logging::Message::Args args;
logging::Message message(7);
args.add("Update Failed: ");
args.add("error when writing data to disk");
args.add("");
args.add("");
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
result.rowCount = 0;
}
rc = endTransaction(uniqueId, txnid, false);
}
else
{
if (fRollbackPending)
rc = endTransaction(uniqueId, txnid, false);
else
rc = endTransaction(uniqueId, txnid, true);
if ((rc != NO_ERROR) && (!fRollbackPending))
{
logging::Message::Args args;
logging::Message message(7);
args.add("Update Failed: ");
args.add("error when cleaning up data files");
args.add("");
args.add("");
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
result.rowCount = 0;
}
}
}
else
{
//@Bug 4563. Always flush. error is already set
rc = flushDataFiles(result.result, oids, uniqueId, txnid, roPair.objnum);
rc = endTransaction(uniqueId, txnid, false);
}
// timer.finish();
/* if (result.result != IDBRANGE_WARNING)
flushDataFiles(result.result, oids, uniqueId, txnid);
else
flushDataFiles(0, oids, uniqueId, txnid);
*/
if (fRollbackPending)
{
result.result = JOB_CANCELED;
logging::Message::Args args1;
args1.add("Query execution was interrupted");
result.message.format(args1);
}
fWEClient->removeQueue(uniqueId);
VERBOSE_INFO("Finished Processing Update DML Package");
return result;
}
uint64_t UpdatePackageProcessor::fixUpRows(dmlpackage::CalpontDMLPackage& cpackage, DMLResult& result,
const uint64_t uniqueId, const uint32_t tableOid)
{
ByteStream msg, msgBk, emsgBs;
RGData rgData;
uint32_t qb = 4;
msg << qb;
boost::scoped_ptr<rowgroup::RowGroup> rowGroup;
uint64_t rowsProcessed = 0;
uint32_t dbroot = 1;
bool metaData = false;
std::vector<int> fPMs = oamcache()->getModuleIds();
std::map<unsigned, bool> pmState;
string emsg;
string emsgStr;
bool err = false;
// boost::scoped_ptr<messageqcpp::MessageQueueClient> fExeMgr;
// fExeMgr.reset( new messageqcpp::MessageQueueClient("ExeMgr1"));
try
{
for (unsigned i = 0; i < fPMs.size(); i++)
{
pmState[fPMs[i]] = true;
}
// timer.start("ExeMgr");
fExeMgr->write(msg);
fExeMgr->write(*(cpackage.get_ExecutionPlan()));
// cout << "sending to ExeMgr plan with length " << (cpackage.get_ExecutionPlan())->length() << endl;
msg.restart();
emsgBs.restart();
msg = fExeMgr->read(); // error handling
if (msg.length() == 4)
{
msg >> qb;
if (qb != 0)
err = true;
}
else
{
qb = 999;
err = true;
}
if (err)
{
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ExeMgr Error");
args.add((int)qb);
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
// timer.finish();
return rowsProcessed;
}
emsgBs = fExeMgr->read();
if (emsgBs.length() == 0)
{
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add("Lost connection to ExeMgr");
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
// timer.finish();
return rowsProcessed;
}
emsgBs >> emsgStr;
while (true)
{
if (fRollbackPending)
{
break;
}
msg.restart();
msgBk.restart();
msg = fExeMgr->read();
msgBk = msg;
if (msg.length() == 0)
{
cerr << "UpdatePackageProcessor::processPackage::fixupRows" << endl;
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add("Lost connection to ExeMgr");
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
// timer.finish();
// return rowsProcessed;
break;
}
else
{
if (rowGroup.get() == NULL)
{
// This is mete data, need to send all PMs.
metaData = true;
// cout << "sending meta data" << endl;
// timer.start("Meta");
err = processRowgroup(msgBk, result, uniqueId, cpackage, pmState, metaData, dbroot);
rowGroup.reset(new rowgroup::RowGroup());
rowGroup->deserialize(msg);
qb = 100;
msg.restart();
msg << qb;
fExeMgr->write(msg);
metaData = false;
// timer.stop("Meta");
continue;
}
rgData.deserialize(msg, true);
rowGroup->setData(&rgData);
// rowGroup->setData(const_cast<uint8_t*>(msg.buf()));
err = (rowGroup->getStatus() != 0);
if (err)
{
// msgBk.advance(rowGroup->getDataSize());
string errorMsg;
msg >> errorMsg;
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add(errorMsg);
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
DMLResult tmpResult;
receiveAll(tmpResult, uniqueId, fPMs, pmState, tableOid);
/* qb = 100;
//@Bug 4358 get rid of broken pipe error.
msg.restart();
msg << qb;
fExeMgr->write(msg);
*/ //timer.finish();
// return rowsProcessed;
// err = true;
break;
}
if (rowGroup->getRGData() == NULL)
{
msg.restart();
}
if (rowGroup->getRowCount() == 0) // done fetching
{
// timer.finish();
// need to receive all response
err = receiveAll(result, uniqueId, fPMs, pmState, tableOid);
// return rowsProcessed;
break;
}
if (rowGroup->getBaseRid() == (uint64_t)(-1))
{
continue; // @bug4247, not valid row ids, may from small side outer
}
dbroot = rowGroup->getDBRoot();
// cout << "dbroot in the rowgroup is " << dbroot << endl;
// timer.start("processRowgroup");
err = processRowgroup(msgBk, result, uniqueId, cpackage, pmState, metaData, dbroot);
// timer.stop("processRowgroup");
if (err)
{
// timer.finish();
LoggingID logid(DMLLoggingId, fSessionID, cpackage.get_TxnID());
logging::Message::Args args1;
logging::Message msg1(1);
args1.add("SQL statement erroring out, need to receive all messages from WES");
msg1.format(args1);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg1, logid);
DMLResult tmpResult;
receiveAll(tmpResult, uniqueId, fPMs, pmState, tableOid);
logging::Message::Args args2;
logging::Message msg2(1);
args2.add("SQL statement erroring out, received all messages from WES");
msg2.format(args2);
logger.logMessage(LOG_TYPE_DEBUG, msg2, logid);
//@Bug 4358 get rid of broken pipe error.
/* msg.restart();
msg << qb;
fExeMgr->write(msg);
return rowsProcessed;
*/
// err = true;
break;
}
rowsProcessed += rowGroup->getRowCount();
}
}
if (fRollbackPending)
{
err = true;
// Response to user
cerr << "UpdatePackageProcessor::processPackage::fixupRows Rollback Pending" << endl;
//@Bug 4994 Cancelled job is not error
result.result = JOB_CANCELED;
// Log
LoggingID logid(DMLLoggingId, fSessionID, cpackage.get_TxnID());
logging::Message::Args args1;
logging::Message msg1(1);
args1.add("SQL statement canceled by user");
msg1.format(args1);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_DEBUG, msg1, logid);
// Clean out the pipe;
DMLResult tmpResult;
receiveAll(tmpResult, uniqueId, fPMs, pmState, tableOid);
}
// get stats from ExeMgr
if (!err)
{
qb = 3;
msg.restart();
msg << qb;
fExeMgr->write(msg);
msg = fExeMgr->read();
msg >> result.queryStats;
msg >> result.extendedStats;
msg >> result.miniStats;
result.stats.unserialize(msg);
}
//@Bug 4358 get rid of broken pipe error by sending a dummy bs.
if (err)
{
msg.restart();
msg << qb;
fExeMgr->write(msg);
}
return rowsProcessed;
// stats.insert();
}
catch (runtime_error& ex)
{
cerr << "UpdatePackageProcessor::processPackage::fixupRows" << ex.what() << endl;
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add(ex.what());
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
qb = 0;
msg.restart();
msg << qb;
fExeMgr->write(msg);
return rowsProcessed;
}
catch (...)
{
cerr << "UpdatePackageProcessor::processPackage::fixupRows" << endl;
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add("Unknown error caught when communicating with ExeMgr");
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
qb = 0;
msg.restart();
msg << qb;
fExeMgr->write(msg);
return rowsProcessed;
}
// timer.finish();
return rowsProcessed;
}
bool UpdatePackageProcessor::processRowgroup(ByteStream& aRowGroup, DMLResult& result,
const uint64_t uniqueId, dmlpackage::CalpontDMLPackage& cpackage,
std::map<unsigned, bool>& pmState, bool isMeta, uint32_t dbroot)
{
bool rc = false;
// cout << "Get dbroot " << dbroot << endl;
uint32_t pmNum = oamcache()->getOwnerPM(dbroot);
ByteStream bytestream;
bytestream << (uint8_t)WE_SVR_UPDATE;
bytestream << uniqueId;
bytestream << pmNum;
bytestream << (uint32_t)cpackage.get_TxnID();
bytestream += aRowGroup;
// cout << "sending rows to pm " << pmNum << " with msg length " << bytestream.length() << endl;
uint32_t msgRecived = 0;
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
bsIn.reset(new ByteStream());
ByteStream::byte tmp8;
string errorMsg;
uint32_t tmp32;
uint64_t blocksChanged = 0;
if (isMeta) // send to all PMs
{
cpackage.write(bytestream);
fWEClient->write_to_all(bytestream);
while (1)
{
if (msgRecived == fWEClient->getPmCount())
break;
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = true;
break;
}
else
{
*bsIn >> tmp8;
if (tmp8 > 0)
{
*bsIn >> errorMsg;
rc = true;
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add(errorMsg);
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
break;
}
else
msgRecived++;
}
}
return rc;
}
if (pmState[pmNum])
{
try
{
// cout << "sending rows to pm " << pmNum << " with msg length " << bytestream.length() << endl;
fWEClient->write(bytestream, (uint32_t)pmNum);
pmState[pmNum] = false;
}
catch (runtime_error& ex) // write error
{
rc = true;
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add(ex.what());
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
}
catch (...)
{
rc = true;
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add("Unknown error caught when communicating with WES");
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
}
}
else
{
while (1)
{
bsIn.reset(new ByteStream());
try
{
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
rc = true;
errorMsg = "Lost connection to Write Engine Server while updating";
throw std::runtime_error(errorMsg);
}
else
{
*bsIn >> tmp8;
*bsIn >> errorMsg;
if (tmp8 == IDBRANGE_WARNING)
{
result.result = IDBRANGE_WARNING;
logging::Message::Args args;
logging::Message message(2);
args.add(errorMsg);
message.format(args);
result.message = message;
}
else if (tmp8 > 0)
{
result.stats.fErrorNo = tmp8;
rc = (tmp8 != 0);
}
*bsIn >> tmp32;
// cout << "Received response from pm " << tmp32 << endl;
pmState[tmp32] = true;
*bsIn >> blocksChanged;
result.stats.fBlocksChanged += blocksChanged;
if (rc != 0)
{
throw std::runtime_error(errorMsg);
}
if (tmp32 == (uint32_t)pmNum)
{
// cout << "sending rows to pm " << pmNum << " with msg length " << bytestream.length() << endl;
fWEClient->write(bytestream, (uint32_t)pmNum);
pmState[pmNum] = false;
break;
}
}
}
catch (runtime_error& ex) // write error
{
rc = true;
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add(ex.what());
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
break;
}
catch (...)
{
rc = true;
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add("Unknown error caught when communicating with WES");
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
break;
}
}
}
return rc;
}
bool UpdatePackageProcessor::receiveAll(DMLResult& result, const uint64_t uniqueId, std::vector<int>& fPMs,
std::map<unsigned, bool>& pmState, const uint32_t /*tableOid*/)
{
// check how many message we need to receive
uint32_t messagesNotReceived = 0;
bool err = false;
for (unsigned i = 0; i < fPMs.size(); i++)
{
if (!pmState[fPMs[i]])
messagesNotReceived++;
}
boost::shared_ptr<messageqcpp::ByteStream> bsIn;
ByteStream::byte tmp8;
string errorMsg;
uint32_t msgReceived = 0;
if (messagesNotReceived > 0)
{
LoggingID logid(DMLLoggingId, fSessionID, fSessionID);
if (messagesNotReceived > fWEClient->getPmCount())
{
logging::Message::Args args1;
logging::Message msg(1);
args1.add("Update outstanding messages exceed PM count , need to receive messages:PMcount = ");
ostringstream oss;
oss << messagesNotReceived << ":" << fWEClient->getPmCount();
args1.add(oss.str());
msg.format(args1);
logging::Logger logger(logid.fSubsysID);
logger.logMessage(LOG_TYPE_ERROR, msg, logid);
err = true;
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add("One of WriteEngineServer went away.");
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
return err;
}
bsIn.reset(new ByteStream());
ByteStream::quadbyte tmp32;
uint64_t blocksChanged = 0;
while (1)
{
if (msgReceived == messagesNotReceived)
break;
bsIn.reset(new ByteStream());
try
{
fWEClient->read(uniqueId, bsIn);
if (bsIn->length() == 0) // read error
{
err = true;
errorMsg = "Lost connection to Write Engine Server while updating";
throw std::runtime_error(errorMsg);
}
else
{
*bsIn >> tmp8;
*bsIn >> errorMsg;
if (tmp8 == IDBRANGE_WARNING)
{
result.result = IDBRANGE_WARNING;
logging::Message::Args args;
logging::Message message(2);
args.add(errorMsg);
message.format(args);
result.message = message;
}
else
{
result.stats.fErrorNo = tmp8;
err = (tmp8 != 0);
}
*bsIn >> tmp32;
*bsIn >> blocksChanged;
// cout << "Received response from pm " << tmp32 << endl;
pmState[tmp32] = true;
if (err)
{
throw std::runtime_error(errorMsg);
}
msgReceived++;
result.stats.fBlocksChanged += blocksChanged;
}
}
catch (runtime_error& ex) // write error
{
err = true;
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add(ex.what());
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
break;
}
catch (...)
{
err = true;
logging::Message::Args args;
logging::Message message(2);
args.add("Update Failed: ");
args.add("Unknown error caught when communicating with WES");
message.format(args);
result.result = UPDATE_ERROR;
result.message = message;
break;
}
}
}
return err;
}
} // namespace dmlpackageprocessor