From 24c5e937565f044919b18662309db4a133ed9291 Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Fri, 21 Sep 2018 09:50:10 +0100 Subject: [PATCH] MCOL-1737 Add debug logging options for LRU cache This adds options which are user enabled to debug the LRU cache inside ColumnStore. Specifically cache flushing. It adds the following: * PrimProc flush information when SIGUSR2 mode is enabled * cpimport dictionary flush information when -d2 is used * WriteEngineServer DML flush information to STDERR --- primitives/blockcache/filebuffermgr.cpp | 74 ++++++++++++++++++++++- writeengine/bulk/we_tableinfo.cpp | 11 ++++ writeengine/server/we_dmlcommandproc.cpp | 3 + writeengine/shared/we_bulkrollbackmgr.cpp | 3 + 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/primitives/blockcache/filebuffermgr.cpp b/primitives/blockcache/filebuffermgr.cpp index e6cab58ac..eb5f9a2a9 100644 --- a/primitives/blockcache/filebuffermgr.cpp +++ b/primitives/blockcache/filebuffermgr.cpp @@ -117,7 +117,10 @@ void FileBufferMgr::flushCache() // the block pool should not be freed in the above block to allow us // to continue doing concurrent unprotected-but-"safe" memcpys // from that memory - + if (fReportFrequency) + { + fLog << "Clearing entire cache" << endl; + } fFBPool.clear(); // fFBPool.reserve(fMaxNumBlocks); } @@ -150,6 +153,15 @@ void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt) BRM::LBID_t lbid; BRM::VER_t ver; filebuffer_uset_iter_t iter; + if (fReportFrequency) + { + fLog << "flushMany " << cnt << " items: "; + for (uint32_t j = 0; j < cnt; j++) + { + fLog << "lbid: " << laVptr[j].LBID << " ver: " << laVptr[j].Ver << ", "; + } + fLog << endl; + } for (uint32_t j = 0; j < cnt; j++) { lbid = static_cast(laVptr->LBID); @@ -157,6 +169,10 @@ void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt) iter = fbSet.find(HashObject_t(lbid, ver, 0)); if (iter != fbSet.end()) { + if (fReportFrequency) + { + fLog << "flushMany hit, lbid: " << lbid << " index: " << iter->poolIdx << endl; + } //remove it from fbList uint32_t idx = iter->poolIdx; fbList.erase(fFBPool[idx].listLoc()); @@ -179,6 +195,16 @@ void FileBufferMgr::flushManyAllversion(const LBID_t* laVptr, uint32_t cnt) mutex::scoped_lock lk(fWLock); + if (fReportFrequency) + { + fLog << "flushManyAllversion " << cnt << " items: "; + for (uint32_t i = 0; i < cnt; i++) + { + fLog << laVptr[i] << ", "; + } + fLog << endl; + } + if (fCacheSize == 0 || cnt == 0) return; @@ -187,6 +213,10 @@ void FileBufferMgr::flushManyAllversion(const LBID_t* laVptr, uint32_t cnt) for (it = fbSet.begin(); it != fbSet.end();) { if (uniquer.find(it->lbid) != uniquer.end()) { + if (fReportFrequency) + { + fLog << "flushManyAllversion hit: " << it->lbid << " index: " << it->poolIdx << endl; + } const uint32_t idx = it->poolIdx; fbList.erase(fFBPool[idx].listLoc()); fEmptyPoolSlots.push_back(idx); @@ -213,6 +243,16 @@ void FileBufferMgr::flushOIDs(const uint32_t *oids, uint32_t count) pair itList; filebuffer_uset_t::iterator it; + if (fReportFrequency) + { + fLog << "flushOIDs " << count << " items: "; + for (uint32_t i = 0; i < count; i++) + { + fLog << oids[i] << ", "; + } + fLog << endl; + } + // If there are more than this # of extents to drop, the whole cache will be cleared const uint32_t clearThreshold = 50000; @@ -269,6 +309,22 @@ void FileBufferMgr::flushPartition(const vector &oids, const set::iterator sit; + fLog << "flushPartition oids: "; + for (uint32_t i = 0; i < count; i++) + { + fLog << oids[i] << ", "; + } + fLog << "flushPartition partitions: "; + for (sit = partitions.begin(); sit != partitions.end(); ++sit) + { + fLog << (*sit).toString() << ", "; + } + fLog << endl; + } + if (fCacheSize == 0 || oids.size() == 0 || partitions.size() == 0) return; @@ -496,7 +552,7 @@ int FileBufferMgr::insert(const BRM::LBID_t lbid, const BRM::VER_t ver, const ui if (fReportFrequency && (fBlksLoaded%fReportFrequency)==0) { struct timespec tm; clock_gettime(CLOCK_MONOTONIC, &tm); - fLog + fLog << "insert: " << left << fixed << ((double)(tm.tv_sec+(1.e-9*tm.tv_nsec))) << " " << right << setw(12) << fBlksLoaded << " " << right << setw(12) << fBlksNotUsed << endl; @@ -671,6 +727,11 @@ int FileBufferMgr::bulkInsert(const vector &ops) mutex::scoped_lock lk(fWLock); + if (fReportFrequency) + { + fLog << "bulkInsert: "; + } + for (i = 0; i < ops.size(); i++) { const CacheInsert_t &op = ops[i]; @@ -694,7 +755,10 @@ int FileBufferMgr::bulkInsert(const vector &ops) continue; } - //cout << "FBM: inserting <" << op.lbid << ", " << op.ver << endl; + if (fReportFrequency) + { + fLog << op.lbid << " " << op.ver << ", "; + } fCacheSize++; fBlksLoaded++; FBData_t fbdata = {op.lbid, op.ver, 0}; @@ -712,6 +776,10 @@ int FileBufferMgr::bulkInsert(const vector &ops) #endif ret++; } + if (fReportFrequency) + { + fLog << endl; + } idbassert(fCacheSize <= maxCacheSize()); return ret; diff --git a/writeengine/bulk/we_tableinfo.cpp b/writeengine/bulk/we_tableinfo.cpp index 0e651b55d..922e0a24b 100644 --- a/writeengine/bulk/we_tableinfo.cpp +++ b/writeengine/bulk/we_tableinfo.cpp @@ -709,6 +709,17 @@ int TableInfo::setParseComplete(const int &columnId, #ifdef PROFILE Stats::startParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS); #endif + if (fLog->isDebug(DEBUG_2)) + { + ostringstream oss; + oss << "Dictionary cache flush: "; + for (uint32_t i = 0; i < fDictFlushBlks.size(); i++) + { + oss << fDictFlushBlks[i] << ", "; + } + oss << endl; + fLog->logMsg( oss.str(), MSGLVL_INFO1 ); + } cacheutils::flushPrimProcAllverBlocks(fDictFlushBlks); #ifdef PROFILE Stats::stopParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS); diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index 0fda76182..90831afd6 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -2036,10 +2036,13 @@ uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::s { std::set::iterator lbidIter; std::vector dictFlushBlks; + cerr << "API Flushing blocks: "; for (lbidIter = (*mapIter).second.begin(); lbidIter != (*mapIter).second.end(); lbidIter++) { + cerr << *lbidIter << ", "; dictFlushBlks.push_back((*lbidIter)); } + cerr << endl; cacheutils::flushPrimProcAllverBlocks(dictFlushBlks); fWEWrapper.getDictMap().erase(txnID); } diff --git a/writeengine/shared/we_bulkrollbackmgr.cpp b/writeengine/shared/we_bulkrollbackmgr.cpp index 2c41734e0..6f8c986b8 100644 --- a/writeengine/shared/we_bulkrollbackmgr.cpp +++ b/writeengine/shared/we_bulkrollbackmgr.cpp @@ -195,13 +195,16 @@ int BulkRollbackMgr::rollback ( bool keepMetaFile ) // the user but keep going. std::vector allOIDs; std::set::const_iterator iter=fAllColDctOIDs.begin(); + cerr << "Rollback flushing: "; while (iter != fAllColDctOIDs.end()) { + cerr << *iter << ", "; //std::cout << "Flushing OID from PrimProc cache " << *iter << // std::endl; allOIDs.push_back(*iter); ++iter; } + cerr << endl; int cache_rc = cacheutils::flushOIDsFromCache( allOIDs ); if (cache_rc != 0)