diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index 9015decff..0fda76182 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -2027,6 +2027,22 @@ uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::s if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size()>0) ) cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID()); TableMetaData::removeTableMetaData(tableOid); + + // MCOL-1160 For API bulk insert flush the PrimProc cached dictionary + // blocks tounched + std::tr1::unordered_map::iterator mapIter; + mapIter = fWEWrapper.getDictMap().find(txnID); + if (mapIter != fWEWrapper.getDictMap().end()) + { + std::set::iterator lbidIter; + std::vector dictFlushBlks; + for (lbidIter = (*mapIter).second.begin(); lbidIter != (*mapIter).second.end(); lbidIter++) + { + dictFlushBlks.push_back((*lbidIter)); + } + cacheutils::flushPrimProcAllverBlocks(dictFlushBlks); + fWEWrapper.getDictMap().erase(txnID); + } return rc; } diff --git a/writeengine/wrapper/writeengine.cpp b/writeengine/wrapper/writeengine.cpp index 8da33caad..e2d785021 100644 --- a/writeengine/wrapper/writeengine.cpp +++ b/writeengine/wrapper/writeengine.cpp @@ -1485,6 +1485,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, bool newExtent = false; RIDList ridList; ColumnOp* colOp = NULL; + std::vector dictLbids; // Set tmp file suffix to modify HDFS db file bool useTmpSuffix = false; @@ -1898,6 +1899,7 @@ timer.start("tokenize"); timer.stop("tokenize"); #endif memcpy(colValPtr, &dctTuple.token, 8); + dictLbids.push_back(dctTuple.token.fbo); } dctStr_iter++; @@ -1946,6 +1948,7 @@ timer.start("tokenize"); timer.stop("tokenize"); #endif memcpy(colValPtr, &dctTuple.token, 8); + dictLbids.push_back(dctTuple.token.fbo); } dctStr_iter++; } @@ -2100,6 +2103,7 @@ timer.start("writeColumnRec"); // Write row(s) to database file(s) //---------------------------------------------------------------------- bool versioning = !(isAutoCommitOn && insertSelect); + AddDictToList(txnid, dictLbids); rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file } return rc; @@ -5177,6 +5181,7 @@ int WriteEngineWrapper::rollbackCommon(const TxnID& txnid, int sessionId) // BUG 4312 RemoveTxnFromLBIDMap(txnid); + RemoveTxnFromDictMap(txnid); config::Config *config = config::Config::makeConfig(); prefix = config->getConfig("SystemConfig", "DBRMRoot"); @@ -5401,6 +5406,7 @@ int WriteEngineWrapper::rollbackVersion(const TxnID& txnid, int sessionId) { // BUG 4312 RemoveTxnFromLBIDMap(txnid); + RemoveTxnFromDictMap(txnid); return BRMWrapper::getInstance()->rollBackVersion(txnid, sessionId); } @@ -5492,6 +5498,7 @@ int WriteEngineWrapper::updateNextValue(const TxnID txnId, const OID& columnoid, int WriteEngineWrapper::flushDataFiles(int rc, const TxnID txnId, std::map & columnOids) { RemoveTxnFromLBIDMap(txnId); + RemoveTxnFromDictMap(txnId); for (int i = 0; i < TOTAL_COMPRESS_OP; i++) { @@ -5506,6 +5513,27 @@ int WriteEngineWrapper::flushDataFiles(int rc, const TxnID txnId, std::map& lbids) +{ + std::tr1::unordered_map::iterator mapIter; + + mapIter = m_dictLBIDMap.find(txnid); + if (mapIter == m_dictLBIDMap.end()) + { + dictLBIDRec_t tempRecord; + tempRecord.insert(lbids.begin(), lbids.end()); + m_dictLBIDMap[txnid] = tempRecord; + return; + } + else + { + dictLBIDRec_t &txnRecord = mapIter->second; + txnRecord.insert(lbids.begin(), lbids.end()); + } + +} + /*********************************************************** * DESCRIPTION: * Add an lbid to a list of lbids for sending to markExtentsInvalid. @@ -5598,6 +5626,16 @@ int WriteEngineWrapper::AddLBIDtoList(const TxnID txnid, return rtn; } +void WriteEngineWrapper::RemoveTxnFromDictMap(const TxnID txnid) +{ + std::tr1::unordered_map::iterator mapIter; + + mapIter = m_dictLBIDMap.find(txnid); + if (mapIter != m_dictLBIDMap.end()) + { + m_dictLBIDMap.erase(txnid); + } +} /*********************************************************** * DESCRIPTION: diff --git a/writeengine/wrapper/writeengine.h b/writeengine/wrapper/writeengine.h index 7c7862d5f..f1783ac8e 100644 --- a/writeengine/wrapper/writeengine.h +++ b/writeengine/wrapper/writeengine.h @@ -85,6 +85,7 @@ struct TxnLBIDRec }; typedef boost::shared_ptr SP_TxnLBIDRec_t; +typedef std::set dictLBIDRec_t; /** Class WriteEngineWrapper */ class WriteEngineWrapper : public WEObj @@ -408,6 +409,10 @@ public: return m_txnLBIDMap; }; + std::tr1::unordered_map& getDictMap() + { + return m_dictLBIDMap; + }; /** * @brief Flush the ChunkManagers. */ @@ -686,6 +691,9 @@ private: const RID filesPerColumnPartition, const RID extentsPerSegmentFile, const RID extentRows, uint16_t startDBRoot, unsigned dbrootCnt); + void AddDictToList(const TxnID txnid, std::vector& lbids); + void RemoveTxnFromDictMap(const TxnID txnid); + // Bug 4312: We use a hash set to hold the set of starting LBIDS for a given // txn so that we don't waste time marking the same extent as invalid. This // list should be trimmed if it gets too big. @@ -703,6 +711,10 @@ private: // This is a Map of sets of LBIDS for each transaction. A Transaction's list will be removed upon commit or rollback. std::tr1::unordered_map m_txnLBIDMap; + // MCOL-1160: We need to track dictionary LBIDs so we can tell PrimProc + // to flush the blocks after an API bulk-write. + std::tr1::unordered_map m_dictLBIDMap; + ColumnOp* m_colOp[TOTAL_COMPRESS_OP]; // column operations Dctnry* m_dctnry[TOTAL_COMPRESS_OP]; // dictionary operations OpType m_opType; // operation type