From d61780cab1a3e8a5e29ce66bbc45a4bf6ff21abf Mon Sep 17 00:00:00 2001 From: Denis Khalikov Date: Wed, 7 Dec 2022 17:24:45 +0300 Subject: [PATCH] MCOL-5263 Add support to ROLLBACK when PP were restarted. DMLProc starts ROLLBACK when SELECT part of UPDATE fails b/c EM facility in PP were restarted. Unfortunately this ROLLBACK stuck if EM/PP are not yet available. DMLProc must have a t/o with re-try doing ROLLBACK. --- .../commandpackageprocessor.cpp | 6 +- dbcon/dmlpackageproc/dmlpackageprocessor.cpp | 21 + dbcon/dmlpackageproc/dmlpackageprocessor.h | 3 + dbcon/joblist/distributedenginecomm.cpp | 63 ++- dbcon/joblist/distributedenginecomm.h | 4 +- writeengine/client/we_clients.cpp | 1 - writeengine/server/we_dmlcommandproc.cpp | 1 - writeengine/shared/we_brm.cpp | 402 +++++++++--------- 8 files changed, 280 insertions(+), 221 deletions(-) diff --git a/dbcon/dmlpackageproc/commandpackageprocessor.cpp b/dbcon/dmlpackageproc/commandpackageprocessor.cpp index cb2dbd608..764ecc609 100644 --- a/dbcon/dmlpackageproc/commandpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/commandpackageprocessor.cpp @@ -37,6 +37,8 @@ #include "we_ddlcommandclient.h" #include "oamcache.h" #include "liboamcpp.h" +#include "resourcemanager.h" + using namespace std; using namespace WriteEngine; using namespace dmlpackage; @@ -370,7 +372,7 @@ DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackage( int weRc = 0; // version rollback, Bulkrollback - weRc = rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg); + weRc = tryToRollBackTransaction(uniqueId, txnid, fSessionID, errorMsg); if (weRc == 0) { @@ -413,7 +415,7 @@ DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackage( { std::string errorMsg(""); logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;"); - int weRc = rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg); + int weRc = tryToRollBackTransaction(uniqueId, txnid, fSessionID, errorMsg); if (weRc != 0) { diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.cpp b/dbcon/dmlpackageproc/dmlpackageprocessor.cpp index 78b19b392..5ca7028a2 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.cpp +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.cpp @@ -266,6 +266,27 @@ int DMLPackageProcessor::commitTransaction(uint64_t uniqueId, BRM::TxnID txnID) return rc; } +// Tries to rollback transaction, if network error tries one more time +// MCOL-5263. +int32_t DMLPackageProcessor::tryToRollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID, + string& errorMsg) +{ + auto weRc = rollBackTransaction(uniqueId, txnID, sessionID, errorMsg); + if (weRc) + { + weRc = rollBackTransaction(uniqueId, txnID, sessionID, errorMsg); + if (weRc == 0) + { + // Setup connection in WE with PS. + joblist::ResourceManager* rm = joblist::ResourceManager::instance(true); + joblist::DistributedEngineComm* fEc = joblist::DistributedEngineComm::instance(rm); + weRc = fEc->Setup(); + } + } + + return weRc; +} + int DMLPackageProcessor::rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID, std::string& errorMsg) { diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.h b/dbcon/dmlpackageproc/dmlpackageprocessor.h index f0c640a63..07180ac8b 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.h +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.h @@ -249,6 +249,9 @@ class DMLPackageProcessor EXPORT int rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID, std::string& errorMsg); + EXPORT int32_t tryToRollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID, + std::string& errorMsg); + EXPORT int rollBackBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID, const uint32_t tableOid, std::string& errorMsg); /** diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index 2505ed497..87ddc9a6b 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -221,7 +221,7 @@ DistributedEngineComm::~DistributedEngineComm() fInstance = 0; } -void DistributedEngineComm::Setup() +int32_t DistributedEngineComm::Setup() { // This is here to ensure that this function does not get invoked multiple times simultaneously. boost::mutex::scoped_lock setupLock(fSetupMutex); @@ -309,10 +309,9 @@ void DistributedEngineComm::Setup() "Could not connect to PMS" + std::to_string(connectionId) + ": " + ex.what(), LOG_TYPE_ERROR); if (newPmCount == 0) - { writeToLog(__FILE__, __LINE__, "No more PMs to try to connect to", LOG_TYPE_ERROR); - break; - } + + return 1; } catch (...) { @@ -322,10 +321,8 @@ void DistributedEngineComm::Setup() writeToLog(__FILE__, __LINE__, "Could not connect to PMS" + std::to_string(connectionId), LOG_TYPE_ERROR); if (newPmCount == 0) - { writeToLog(__FILE__, __LINE__, "No more PMs to try to connect to", LOG_TYPE_ERROR); - break; - } + return 1; } } @@ -362,6 +359,7 @@ void DistributedEngineComm::Setup() newLocks.clear(); newClients.clear(); + return 0; } int DistributedEngineComm::Close() @@ -428,8 +426,8 @@ Error: decltype(pmCount) originalPMCount = pmCount; // Re-establish if a remote PM restarted. std::this_thread::sleep_for(std::chrono::seconds(3)); - Setup(); - if (originalPMCount != pmCount) + auto rc = Setup(); + if (rc || originalPMCount != pmCount) { ostringstream os; os << "DEC: lost connection to " << client->addr2String(); @@ -889,7 +887,7 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos writeToClient(localConnectionId, msg); } -void DistributedEngineComm::write(uint32_t senderID, const SBS& msg) +int32_t DistributedEngineComm::write(uint32_t senderID, const SBS& msg) { ISMPacketHeader* ism = (ISMPacketHeader*)msg->buf(); uint32_t dest; @@ -914,6 +912,7 @@ void DistributedEngineComm::write(uint32_t senderID, const SBS& msg) entries in the config file point to unique PMs */ { uint32_t localConnectionId = std::numeric_limits::max(); + int32_t rc = 0; for (uint32_t i = 0; i < pmCount; ++i) { @@ -922,21 +921,25 @@ void DistributedEngineComm::write(uint32_t senderID, const SBS& msg) localConnectionId = i; continue; } - writeToClient(i, msg, senderID); + + rc =writeToClient(i, msg, senderID); + if (rc) + return rc; } if (localConnectionId < fPmConnections.size()) - writeToClient(localConnectionId, msg); + rc = writeToClient(localConnectionId, msg); + return rc; } - return; case BATCH_PRIMITIVE_RUN: case DICT_TOKEN_BY_SCAN_COMPARE: + { // for efficiency, writeToClient() grabs the interleaving factor for the caller, // and decides the final connection index because it already grabs the // caller's queue information dest = ism->Interleave; - writeToClient(dest, msg, senderID, true); - break; + return writeToClient(dest, msg, senderID, true); + } default: idbassert_s(0, "Unknown message type"); } @@ -946,6 +949,7 @@ void DistributedEngineComm::write(uint32_t senderID, const SBS& msg) writeToLog(__FILE__, __LINE__, "No PrimProcs are running", LOG_TYPE_DEBUG); throw IDBExcept(ERR_NO_PRIMPROC); } + return 0; } void DistributedEngineComm::write(messageqcpp::ByteStream& msg, uint32_t connection) @@ -1135,16 +1139,14 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ } } + ClientList::value_type client = fPmConnections[connectionId]; try { - ClientList::value_type client = fPmConnections[connectionId]; - if (!client->isAvailable()) return 0; std::lock_guard lk(*(fWlock[connectionId])); client->write(bs, NULL, senderStats); - return 0; } catch (...) { @@ -1163,7 +1165,30 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ map_tok->second->queue.push(sbs); } + int tries = 0; + // Try to setup connection with PS, it could be a situation that PS is starting. + // MCOL-5263. + while (tries < 10 && Setup()) + { + ++tries; + std::this_thread::sleep_for(std::chrono::seconds(3)); + } + lk.unlock(); + + if (tries == 10) + { + ostringstream os; + os << "DEC: lost connection to " << client->addr2String(); + writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_ERROR); + if (!fIsExeMgr) + abort(); + else + throw runtime_error("DistributedEngineComm::write: Broken Pipe error"); + } + + // Connection was established. + return 1; /* // reconfig the connection array ClientList tempConns; @@ -1195,8 +1220,8 @@ int DistributedEngineComm::writeToClient(size_t aPMIndex, const SBS& bs, uint32_ alarmItem.append(" PrimProc"); alarmMgr.sendAlarmReport(alarmItem.c_str(), oam::CONN_FAILURE, SET); */ - throw runtime_error("DistributedEngineComm::write: Broken Pipe error"); } + return 0; } uint32_t DistributedEngineComm::size(uint32_t key) diff --git a/dbcon/joblist/distributedenginecomm.h b/dbcon/joblist/distributedenginecomm.h index 361180a35..26b5869b7 100644 --- a/dbcon/joblist/distributedenginecomm.h +++ b/dbcon/joblist/distributedenginecomm.h @@ -146,7 +146,7 @@ class DistributedEngineComm * Writes a primitive message to a primitive server. Msg needs to conatin an ISMPacketHeader. The * LBID is extracted from the ISMPacketHeader and used to determine the actual P/M to send to. */ - EXPORT void write(uint32_t key, const messageqcpp::SBS& msg); + EXPORT int32_t write(uint32_t key, const messageqcpp::SBS& msg); // EXPORT void throttledWrite(const messageqcpp::ByteStream& msg); @@ -188,7 +188,7 @@ class DistributedEngineComm */ EXPORT uint32_t size(uint32_t key); - EXPORT void Setup(); + EXPORT int32_t Setup(); EXPORT void addDECEventListener(DECEventListener*); EXPORT void removeDECEventListener(DECEventListener*); diff --git a/writeengine/client/we_clients.cpp b/writeengine/client/we_clients.cpp index d8eb01b2c..e64ebd321 100644 --- a/writeengine/client/we_clients.cpp +++ b/writeengine/client/we_clients.cpp @@ -497,7 +497,6 @@ void WEClients::write_to_all(const messageqcpp::ByteStream& msg) } ClientList::iterator itor = fPmConnections.begin(); - while (itor != fPmConnections.end()) { if (itor->second != NULL) diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index cdd1d75f3..1d9fe41a5 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -817,7 +817,6 @@ uint8_t WE_DMLCommandProc::rollbackBlocks(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; - ; int txnID; bs >> sessionID; bs >> tmp32; diff --git a/writeengine/shared/we_brm.cpp b/writeengine/shared/we_brm.cpp index fb5d88fac..4f1697352 100644 --- a/writeengine/shared/we_brm.cpp +++ b/writeengine/shared/we_brm.cpp @@ -43,6 +43,7 @@ using namespace boost; #include "we_dctnrycompress.h" #include "we_simplesyslog.h" #include "we_config.h" +#include "exceptclasses.h" #include "IDBDataFile.h" #include "IDBPolicy.h" using namespace idbdatafile; @@ -1134,234 +1135,243 @@ int BRMWrapper::rollBackBlocks(const VER_t transID, int sessionId) std::vector files; - for (i = 0; i < lbidList.size(); i++) + try { - verID = (VER_t)transID; - // timer.start("vssLookup"); - // get version id - - verID = blockRsltnMgrPtr->getHighestVerInVB(lbidList[i], transID); - - if (verID < 0) + for (i = 0; i < lbidList.size(); i++) { - std::ostringstream oss; - BRM::errString(verID, errorMsg); - oss << "vssLookup error encountered while looking up lbid " << lbidList[i] << " and error code is " - << verID << " with message " << errorMsg; - throw std::runtime_error(oss.str()); - } + verID = (VER_t)transID; + // timer.start("vssLookup"); + // get version id - // timer.stop("vssLookup"); - // copy buffer back - // look for the block in extentmap - // timer.start("lookupLocalEX"); - rc = blockRsltnMgrPtr->lookupLocal(lbidList[i], /*transID*/ verID, false, weOid, weDbRoot, wePartitionNum, - weSegmentNum, weFbo); + verID = blockRsltnMgrPtr->getHighestVerInVB(lbidList[i], transID); - if (rc != 0) - { - std::ostringstream oss; - BRM::errString(rc, errorMsg); - oss << "lookupLocal from extent map error encountered while looking up lbid:verID " << lbidList[i] - << ":" << (uint32_t)verID << " and error code is " << rc << " with message " << errorMsg; - throw std::runtime_error(oss.str()); - } + if (verID < 0) + { + std::ostringstream oss; + BRM::errString(verID, errorMsg); + oss << "vssLookup error encountered while looking up lbid " << lbidList[i] << " and error code is " + << verID << " with message " << errorMsg; + throw std::runtime_error(oss.str()); + } - // Check whether this lbid is on this PM. - dbrootPmMapItor = dbrootPmMap.find(weDbRoot); + // timer.stop("vssLookup"); + // copy buffer back + // look for the block in extentmap + // timer.start("lookupLocalEX"); + rc = blockRsltnMgrPtr->lookupLocal(lbidList[i], /*transID*/ verID, false, weOid, weDbRoot, + wePartitionNum, weSegmentNum, weFbo); - if (dbrootPmMapItor == dbrootPmMap.end()) - continue; + if (rc != 0) + { + std::ostringstream oss; + BRM::errString(rc, errorMsg); + oss << "lookupLocal from extent map error encountered while looking up lbid:verID " << lbidList[i] + << ":" << (uint32_t)verID << " and error code is " << rc << " with message " << errorMsg; + throw std::runtime_error(oss.str()); + } - // timer.stop("lookupLocalEX"); - Column column; - execplan::CalpontSystemCatalog::ColType colType = systemCatalogPtr->colType(weOid); - columnOids[weOid] = weOid; + // Check whether this lbid is on this PM. + dbrootPmMapItor = dbrootPmMap.find(weDbRoot); - // This must be a dict oid - if (colType.columnOID == 0) - { - colType = systemCatalogPtr->colTypeDct(weOid); + if (dbrootPmMapItor == dbrootPmMap.end()) + continue; - idbassert(colType.columnOID != 0); - idbassert(colType.ddn.dictOID == weOid); - } + // timer.stop("lookupLocalEX"); + Column column; + execplan::CalpontSystemCatalog::ColType colType = systemCatalogPtr->colType(weOid); - CalpontSystemCatalog::ColDataType colDataType = colType.colDataType; - ColType weColType; - Convertor::convertColType(colDataType, colType.colWidth, weColType); - column.colWidth = Convertor::getCorrectRowWidth(colDataType, colType.colWidth); - column.colType = weColType; - column.colDataType = colDataType; - column.dataFile.fid = weOid; - column.dataFile.fDbRoot = weDbRoot; - column.dataFile.fPartition = wePartitionNum; - column.dataFile.fSegment = weSegmentNum; - column.compressionType = colType.compressionType; + columnOids[weOid] = weOid; - BRM::FileInfo aFile; - aFile.oid = weOid; - aFile.partitionNum = wePartitionNum; - aFile.dbRoot = weDbRoot; - aFile.segmentNum = weSegmentNum; - aFile.compType = colType.compressionType; - files.push_back(aFile); + // This must be a dict oid + if (colType.columnOID == 0) + { + colType = systemCatalogPtr->colTypeDct(weOid); - if (colType.compressionType == 0) - fileOp.chunkManager(NULL); - else - fileOp.chunkManager(&chunkManager); + idbassert(colType.columnOID != 0); + idbassert(colType.ddn.dictOID == weOid); + } - if (isDebug(DEBUG_3)) + CalpontSystemCatalog::ColDataType colDataType = colType.colDataType; + ColType weColType; + Convertor::convertColType(colDataType, colType.colWidth, weColType); + column.colWidth = Convertor::getCorrectRowWidth(colDataType, colType.colWidth); + column.colType = weColType; + column.colDataType = colDataType; + column.dataFile.fid = weOid; + column.dataFile.fDbRoot = weDbRoot; + column.dataFile.fPartition = wePartitionNum; + column.dataFile.fSegment = weSegmentNum; + column.compressionType = colType.compressionType; + + BRM::FileInfo aFile; + aFile.oid = weOid; + aFile.partitionNum = wePartitionNum; + aFile.dbRoot = weDbRoot; + aFile.segmentNum = weSegmentNum; + aFile.compType = colType.compressionType; + files.push_back(aFile); + + if (colType.compressionType == 0) + fileOp.chunkManager(NULL); + else + fileOp.chunkManager(&chunkManager); + + if (isDebug(DEBUG_3)) #ifndef __LP64__ - printf("\n\tuncommitted lbid - lbidList[i]=%lld weOid =%d weFbo=%d verID=%d, weDbRoot=%d", lbidList[i], - weOid, weFbo, verID, weDbRoot); + printf("\n\tuncommitted lbid - lbidList[i]=%lld weOid =%d weFbo=%d verID=%d, weDbRoot=%d", + lbidList[i], weOid, weFbo, verID, weDbRoot); #else - printf("\n\tuncommitted lbid - lbidList[i]=%ld weOid =%d weFbo=%d verID=%d, weDbRoot=%d", lbidList[i], - weOid, weFbo, verID, weDbRoot); + printf("\n\tuncommitted lbid - lbidList[i]=%ld weOid =%d weFbo=%d verID=%d, weDbRoot=%d", lbidList[i], + weOid, weFbo, verID, weDbRoot); #endif - // look for the block in the version buffer - // timer.start("lookupLocalVB"); - rc = blockRsltnMgrPtr->lookupLocal(lbidList[i], verID, true, vbOid, vbDbRoot, vbPartitionNum, - vbSegmentNum, vbFbo); + // look for the block in the version buffer + // timer.start("lookupLocalVB"); + rc = blockRsltnMgrPtr->lookupLocal(lbidList[i], verID, true, vbOid, vbDbRoot, vbPartitionNum, + vbSegmentNum, vbFbo); - if (rc != 0) - { - std::ostringstream oss; - BRM::errString(rc, errorMsg); - oss << "lookupLocal from version buffer error encountered while looking up lbid:verID " << lbidList[i] - << ":" << (uint32_t)verID << " and error code is " << rc << " with message " << errorMsg; - throw std::runtime_error(oss.str()); - } + if (rc != 0) + { + std::ostringstream oss; + BRM::errString(rc, errorMsg); + oss << "lookupLocal from version buffer error encountered while looking up lbid:verID " << lbidList[i] + << ":" << (uint32_t)verID << " and error code is " << rc << " with message " << errorMsg; + throw std::runtime_error(oss.str()); + } - if (pSourceFile == 0) //@Bug 2314. Optimize the version buffer open times. - { - currentVbOid = vbOid; - sourceFileInfo.oid = currentVbOid; - sourceFileInfo.fPartition = 0; - sourceFileInfo.fSegment = 0; - sourceFileInfo.fDbRoot = weDbRoot; - errno = 0; - pSourceFile = openFile(sourceFileInfo, "r+b"); + if (pSourceFile == 0) //@Bug 2314. Optimize the version buffer open times. + { + currentVbOid = vbOid; + sourceFileInfo.oid = currentVbOid; + sourceFileInfo.fPartition = 0; + sourceFileInfo.fSegment = 0; + sourceFileInfo.fDbRoot = weDbRoot; + errno = 0; + pSourceFile = openFile(sourceFileInfo, "r+b"); - if (pSourceFile == NULL) + if (pSourceFile == NULL) + { + std::ostringstream oss; + Convertor::mapErrnoToString(errno, errorMsg); + oss << "Error encountered while opening version buffer file oid:dbroot = " << currentVbOid << ":" + << weDbRoot << " and error message:" << errorMsg; + throw std::runtime_error(oss.str()); + } + } + + // timer.stop("lookupLocalVB"); + if (isDebug(DEBUG_3)) +#ifndef __LP64__ + printf("\n\tuncommitted lbid - lbidList[i]=%lld vbOid =%d vbFbo=%d\n", lbidList[i], vbOid, vbFbo); + +#else + printf("\n\tuncommitted lbid - lbidList[i]=%ld vbOid =%d vbFbo=%d\n", lbidList[i], vbOid, vbFbo); +#endif + + //@Bug 2293 Version buffer file information cannot be obtained from lookupLocal + if (vbOid != currentVbOid) + { + currentVbOid = vbOid; + // cout << "VB file changed to " << vbOid << endl; + delete pSourceFile; + sourceFileInfo.oid = currentVbOid; + sourceFileInfo.fPartition = 0; + sourceFileInfo.fSegment = 0; + sourceFileInfo.fDbRoot = weDbRoot; + errno = 0; + pSourceFile = openFile(sourceFileInfo, "r+b"); + + if (pSourceFile == NULL) + { + std::ostringstream oss; + Convertor::mapErrnoToString(errno, errorMsg); + oss << "Error encountered while opening version buffer file oid:dbroot = " << currentVbOid << ":" + << weDbRoot << " and error message:" << errorMsg; + throw std::runtime_error(oss.str()); + } + } + + targetFileInfo.oid = weOid; + targetFileInfo.fPartition = wePartitionNum; + targetFileInfo.fSegment = weSegmentNum; + targetFileInfo.fDbRoot = weDbRoot; + // printf("\n\tsource file info - oid =%d fPartition=%d fSegment=%d, fDbRoot=%d", + // sourceFileInfo.oid, sourceFileInfo.fPartition, sourceFileInfo.fSegment, sourceFileInfo.fDbRoot); + // printf("\n\ttarget file info - oid =%d fPartition=%d fSegment=%d, fDbRoot=%d", weOid, + // wePartitionNum, weSegmentNum, weDbRoot); + // Check whether the file is on this pm. + + if (column.compressionType != 0) + { + pTargetFile = fileOp.getFilePtr(column, false); // @bug 5572 HDFS tmp file + } + else if (fileOpenList.find(targetFileInfo) != fileOpenList.end()) + { + pTargetFile = fileOpenList[targetFileInfo]; + } + else + { + pTargetFile = openFile(targetFileInfo, "r+b"); + + if (pTargetFile != NULL) + fileOpenList[targetFileInfo] = pTargetFile; + } + + if (pTargetFile == NULL) { std::ostringstream oss; Convertor::mapErrnoToString(errno, errorMsg); - oss << "Error encountered while opening version buffer file oid:dbroot = " << currentVbOid << ":" - << weDbRoot << " and error message:" << errorMsg; - throw std::runtime_error(oss.str()); + oss << "Error encountered while opening source file oid:dbroot:partition:segment = " << weOid << ":" + << weDbRoot << ":" << wePartitionNum << ":" << weSegmentNum << " and error message:" << errorMsg; + errorMsg = oss.str(); + goto cleanup; } - } - // timer.stop("lookupLocalVB"); - if (isDebug(DEBUG_3)) -#ifndef __LP64__ - printf("\n\tuncommitted lbid - lbidList[i]=%lld vbOid =%d vbFbo=%d\n", lbidList[i], vbOid, vbFbo); + // timer.start("copyVBBlock"); + std::vector lbidRangeList; + BRM::LBIDRange range; + range.start = lbidList[i]; + range.size = 1; + lbidRangeList.push_back(range); + rc = blockRsltnMgrPtr->dmlLockLBIDRanges(lbidRangeList, transID); -#else - printf("\n\tuncommitted lbid - lbidList[i]=%ld vbOid =%d vbFbo=%d\n", lbidList[i], vbOid, vbFbo); -#endif + if (rc != 0) + { + BRM::errString(rc, errorMsg); + goto cleanup; + } - //@Bug 2293 Version buffer file information cannot be obtained from lookupLocal - if (vbOid != currentVbOid) - { - currentVbOid = vbOid; - // cout << "VB file changed to " << vbOid << endl; - delete pSourceFile; - sourceFileInfo.oid = currentVbOid; - sourceFileInfo.fPartition = 0; - sourceFileInfo.fSegment = 0; - sourceFileInfo.fDbRoot = weDbRoot; - errno = 0; - pSourceFile = openFile(sourceFileInfo, "r+b"); + rc = copyVBBlock(pSourceFile, pTargetFile, vbFbo, weFbo, &fileOp, column); - if (pSourceFile == NULL) + // cout << "WES rolled block " << lbidList[i] << endl; + if (rc != 0) { std::ostringstream oss; - Convertor::mapErrnoToString(errno, errorMsg); - oss << "Error encountered while opening version buffer file oid:dbroot = " << currentVbOid << ":" - << weDbRoot << " and error message:" << errorMsg; - throw std::runtime_error(oss.str()); + oss << "Error encountered while copying lbid " << lbidList[i] + << " to source file oid:dbroot:partition:segment = " << weOid << ":" << weDbRoot << ":" + << wePartitionNum << ":" << weSegmentNum; + errorMsg = oss.str(); + goto cleanup; } + + pTargetFile->flush(); + rc = blockRsltnMgrPtr->dmlReleaseLBIDRanges(lbidRangeList); + + if (rc != 0) + { + BRM::errString(rc, errorMsg); + goto cleanup; + } + + // timer.stop("copyVBBlock"); + if (rc != NO_ERROR) + goto cleanup; } - - targetFileInfo.oid = weOid; - targetFileInfo.fPartition = wePartitionNum; - targetFileInfo.fSegment = weSegmentNum; - targetFileInfo.fDbRoot = weDbRoot; - // printf("\n\tsource file info - oid =%d fPartition=%d fSegment=%d, fDbRoot=%d", sourceFileInfo.oid, - // sourceFileInfo.fPartition, sourceFileInfo.fSegment, sourceFileInfo.fDbRoot); printf("\n\ttarget - // file info - oid =%d fPartition=%d fSegment=%d, fDbRoot=%d", weOid, wePartitionNum, weSegmentNum, - // weDbRoot); - // Check whether the file is on this pm. - - if (column.compressionType != 0) - { - pTargetFile = fileOp.getFilePtr(column, false); // @bug 5572 HDFS tmp file - } - else if (fileOpenList.find(targetFileInfo) != fileOpenList.end()) - { - pTargetFile = fileOpenList[targetFileInfo]; - } - else - { - pTargetFile = openFile(targetFileInfo, "r+b"); - - if (pTargetFile != NULL) - fileOpenList[targetFileInfo] = pTargetFile; - } - - if (pTargetFile == NULL) - { - std::ostringstream oss; - Convertor::mapErrnoToString(errno, errorMsg); - oss << "Error encountered while opening source file oid:dbroot:partition:segment = " << weOid << ":" - << weDbRoot << ":" << wePartitionNum << ":" << weSegmentNum << " and error message:" << errorMsg; - errorMsg = oss.str(); - goto cleanup; - } - - // timer.start("copyVBBlock"); - std::vector lbidRangeList; - BRM::LBIDRange range; - range.start = lbidList[i]; - range.size = 1; - lbidRangeList.push_back(range); - rc = blockRsltnMgrPtr->dmlLockLBIDRanges(lbidRangeList, transID); - - if (rc != 0) - { - BRM::errString(rc, errorMsg); - goto cleanup; - } - - rc = copyVBBlock(pSourceFile, pTargetFile, vbFbo, weFbo, &fileOp, column); - - // cout << "WES rolled block " << lbidList[i] << endl; - if (rc != 0) - { - std::ostringstream oss; - oss << "Error encountered while copying lbid " << lbidList[i] - << " to source file oid:dbroot:partition:segment = " << weOid << ":" << weDbRoot << ":" - << wePartitionNum << ":" << weSegmentNum; - errorMsg = oss.str(); - goto cleanup; - } - - pTargetFile->flush(); - rc = blockRsltnMgrPtr->dmlReleaseLBIDRanges(lbidRangeList); - - if (rc != 0) - { - BRM::errString(rc, errorMsg); - goto cleanup; - } - - // timer.stop("copyVBBlock"); - if (rc != NO_ERROR) - goto cleanup; + } + // MCOL-5263. + catch (logging::IDBExcept&) + { + return ERR_BRM_NETWORK; } // timer.start("vbRollback");