From 60694a7b72e8fd9330ad48f4a45b10000fadf023 Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Wed, 21 Jun 2017 22:39:59 +0100 Subject: [PATCH] MCOL-769 Add WE command to get LBIDs written to --- writeengine/server/we_dmlcommandproc.cpp | 41 ++++++++++++++++++++++++ writeengine/server/we_dmlcommandproc.h | 1 + writeengine/server/we_messages.h | 1 + writeengine/server/we_readthread.cpp | 9 ++++-- 4 files changed, 50 insertions(+), 2 deletions(-) diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index c57faa0e3..50b9e5a9f 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -3410,6 +3410,47 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, return rc; } +uint8_t WE_DMLCommandProc::getWrittenLbids(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId) +{ + uint8_t rc = 0; + uint32_t txnId; + vector lbidList; + + bs >> txnId; + std::tr1::unordered_map::iterator mapIter; + std::tr1::unordered_map m_txnLBIDMap = fWEWrapper.getTxnMap(); + try + { + mapIter = m_txnLBIDMap.find(txnId); + if (mapIter != m_txnLBIDMap.end()) + { + SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second; + std::tr1::unordered_map ::iterator listIter = spTxnLBIDRec->m_LBIDMap.begin(); + while (listIter != spTxnLBIDRec->m_LBIDMap.end()) + { + lbidList.push_back(listIter->first); + listIter++; + } + } + } + catch(...) {} + bs.restart(); + try + { + serializeInlineVector (bs, lbidList); + } + catch (exception& ex) + { + // Append to errmsg in case we already have an error + if (err.length() > 0) + err += "; "; + err += ex.what(); + rc = 1; + return rc; + } + return rc; +} + uint8_t WE_DMLCommandProc::processFlushFiles(messageqcpp::ByteStream& bs, std::string & err) { uint8_t rc = 0; diff --git a/writeengine/server/we_dmlcommandproc.h b/writeengine/server/we_dmlcommandproc.h index e5f991c94..ccef3ef58 100644 --- a/writeengine/server/we_dmlcommandproc.h +++ b/writeengine/server/we_dmlcommandproc.h @@ -96,6 +96,7 @@ class WE_DMLCommandProc EXPORT uint8_t processPurgeFDCache(ByteStream& bs, std::string & err); EXPORT uint8_t processEndTransaction(ByteStream& bs, std::string & err); EXPORT uint8_t processFixRows(ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId); + EXPORT uint8_t getWrittenLbids(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId); int validateColumnHWMs( execplan::CalpontSystemCatalog::RIDList& ridList, boost::shared_ptr systemCatalogPtr, diff --git a/writeengine/server/we_messages.h b/writeengine/server/we_messages.h index 19a61cf04..93eafa21d 100644 --- a/writeengine/server/we_messages.h +++ b/writeengine/server/we_messages.h @@ -83,6 +83,7 @@ enum ServerMessages WE_SRV_FIX_ROWS, WE_SVR_WRITE_CREATE_SYSCOLUMN, WE_SVR_BATCH_INSERT_BINARY, + WE_SVR_GET_WRITTEN_LBIDS, WE_CLT_SRV_DATA=100, WE_CLT_SRV_EOD, diff --git a/writeengine/server/we_readthread.cpp b/writeengine/server/we_readthread.cpp index 77beeb581..8f1bb86a9 100644 --- a/writeengine/server/we_readthread.cpp +++ b/writeengine/server/we_readthread.cpp @@ -149,11 +149,16 @@ void DmlReadThread::operator()() //cout << "fWeDMLprocessor " << fWeDMLprocessor << " is processing batchinsert ..." << endl; break; } - case WE_SVR_BATCH_INSERT_BINARY: + case WE_SVR_BATCH_INSERT_BINARY: { rc = fWeDMLprocessor->processBatchInsertBinary(ibs, errMsg, PMId); break; } + case WE_SVR_GET_WRITTEN_LBIDS: + { + rc = fWeDMLprocessor->getWrittenLbids(ibs, errMsg, PMId); + break; + } case WE_SVR_BATCH_INSERT_END: { rc = fWeDMLprocessor->processBatchInsertHwm(ibs, errMsg); @@ -383,7 +388,7 @@ void DmlReadThread::operator()() obs << errMsg; } - if ((msgId == WE_SVR_COMMIT_BATCH_AUTO_ON) || (msgId ==WE_SVR_BATCH_INSERT_END) || (msgId == WE_SVR_FETCH_DDL_LOGS)) + if ((msgId == WE_SVR_COMMIT_BATCH_AUTO_ON) || (msgId ==WE_SVR_BATCH_INSERT_END) || (msgId == WE_SVR_FETCH_DDL_LOGS) || (msgId == WE_SVR_GET_WRITTEN_LBIDS)) { obs += ibs; //cout << " sending back hwm info with ibs length " << endl;