/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*************************************************************************** * * $Id: filebuffermgr.cpp 2045 2013-01-30 20:26:59Z pleblanc $ * * jrodriguez@calpont.com * * * ***************************************************************************/ /** * InitialDBBCSize - the starting number of elements the unordered set used to store disk blocks. This does not instantiate InitialDBBCSize disk blocks but only the initial size of the unordered_set **/ //#define NDEBUG #include #include #include #include #include "stats.h" #include "configcpp.h" #include "filebuffermgr.h" #include "mcsconfig.h" using namespace config; using namespace boost; using namespace std; using namespace BRM; extern dbbc::Stats* gPMStatsPtr; extern bool gPMProfOn; extern uint32_t gSession; namespace dbbc { const uint32_t gReportingFrequencyMin(32768); FileBufferMgr::FileBufferMgr(const uint32_t numBlcks, const uint32_t blkSz, const uint32_t deleteBlocks) : fMaxNumBlocks(numBlcks) , fBlockSz(blkSz) , fWLock() , fbSet() , fbList() , fCacheSize(0) , fFBPool() , fDeleteBlocks(deleteBlocks) , fEmptyPoolSlots() , fReportFrequency(0) { fFBPool.reserve(numBlcks); fConfig = Config::makeConfig(); setReportingFrequency(0); fLog.open(string(MCSLOGDIR) + "/trace/bc", ios_base::app | ios_base::ate); } FileBufferMgr::~FileBufferMgr() { flushCache(); } // param d is used as a togle only void FileBufferMgr::setReportingFrequency(const uint32_t d) { if (d == 0) { fReportFrequency = 0; return; } const string val = fConfig->getConfig("DBBC", "ReportFrequency"); uint32_t temp = 0; if (val.length() > 0) temp = static_cast(Config::fromText(val)); if (temp > 0 && temp <= gReportingFrequencyMin) fReportFrequency = gReportingFrequencyMin; else fReportFrequency = temp; } void FileBufferMgr::flushCache() { boost::mutex::scoped_lock lk(fWLock); { filebuffer_uset_t sEmpty; filebuffer_list_t lEmpty; emptylist_t vEmpty; fbList.swap(lEmpty); fbSet.swap(sEmpty); fEmptyPoolSlots.swap(vEmpty); } fCacheSize = 0; // the block pool should not be freed in the above block to allow us // to continue doing concurrent unprotected-but-"safe" memcpys // from that memory if (fReportFrequency) { fLog << "Clearing entire cache" << endl; } fFBPool.clear(); // fFBPool.reserve(fMaxNumBlocks); } void FileBufferMgr::flushOne(const BRM::LBID_t lbid, const BRM::VER_t ver) { // similar in function to depleteCache() boost::mutex::scoped_lock lk(fWLock); filebuffer_uset_iter_t iter = fbSet.find(HashObject_t(lbid, ver, 0)); if (iter != fbSet.end()) { // remove it from fbList uint32_t idx = iter->poolIdx; fbList.erase(fFBPool[idx].listLoc()); // add to fEmptyPoolSlots fEmptyPoolSlots.push_back(idx); // remove it from fbSet fbSet.erase(iter); // adjust fCacheSize fCacheSize--; } } void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt) { boost::mutex::scoped_lock lk(fWLock); BRM::LBID_t lbid; BRM::VER_t ver; filebuffer_uset_iter_t iter; if (fReportFrequency) { fLog << "flushMany " << cnt << " items: "; for (uint32_t j = 0; j < cnt; j++) { fLog << "lbid: " << laVptr[j].LBID << " ver: " << laVptr[j].Ver << ", "; } fLog << endl; } for (uint32_t j = 0; j < cnt; j++) { lbid = static_cast(laVptr->LBID); ver = static_cast(laVptr->Ver); iter = fbSet.find(HashObject_t(lbid, ver, 0)); if (iter != fbSet.end()) { if (fReportFrequency) { fLog << "flushMany hit, lbid: " << lbid << " index: " << iter->poolIdx << endl; } // remove it from fbList uint32_t idx = iter->poolIdx; fbList.erase(fFBPool[idx].listLoc()); // add to fEmptyPoolSlots fEmptyPoolSlots.push_back(idx); // remove it from fbSet fbSet.erase(iter); // adjust fCacheSize fCacheSize--; } ++laVptr; } } void FileBufferMgr::flushManyAllversion(const LBID_t* laVptr, uint32_t cnt) { filebuffer_uset_t::iterator it, tmpIt; tr1::unordered_set uniquer; tr1::unordered_set::iterator uit; boost::mutex::scoped_lock lk(fWLock); if (fReportFrequency) { fLog << "flushManyAllversion " << cnt << " items: "; for (uint32_t i = 0; i < cnt; i++) { fLog << laVptr[i] << ", "; } fLog << endl; } if (fCacheSize == 0 || cnt == 0) return; for (uint32_t i = 0; i < cnt; i++) uniquer.insert(laVptr[i]); for (it = fbSet.begin(); it != fbSet.end();) { if (uniquer.find(it->lbid) != uniquer.end()) { if (fReportFrequency) { fLog << "flushManyAllversion hit: " << it->lbid << " index: " << it->poolIdx << endl; } const uint32_t idx = it->poolIdx; fbList.erase(fFBPool[idx].listLoc()); fEmptyPoolSlots.push_back(idx); tmpIt = it; ++it; fbSet.erase(tmpIt); fCacheSize--; } else ++it; } } void FileBufferMgr::flushOIDs(const uint32_t* oids, uint32_t count) { DBRM dbrm; uint32_t i; vector extents; int err; uint32_t currentExtent; LBID_t currentLBID; typedef tr1::unordered_multimap byLBID_t; byLBID_t byLBID; pair itList; filebuffer_uset_t::iterator it; if (fReportFrequency) { fLog << "flushOIDs " << count << " items: "; for (uint32_t i = 0; i < count; i++) { fLog << oids[i] << ", "; } fLog << endl; } // If there are more than this # of extents to drop, the whole cache will be cleared const uint32_t clearThreshold = 50000; boost::mutex::scoped_lock lk(fWLock); if (fCacheSize == 0 || count == 0) return; /* Index the cache by LBID */ for (it = fbSet.begin(); it != fbSet.end(); it++) byLBID.insert(pair(it->lbid, it)); for (i = 0; i < count; i++) { extents.clear(); err = dbrm.getExtents(oids[i], extents, true, true, true); // @Bug 3838 Include outofservice extents if (err < 0 || (i == 0 && (extents.size() * count) > clearThreshold)) { // (The i == 0 should ensure it's not a dictionary column) lk.unlock(); flushCache(); return; } for (currentExtent = 0; currentExtent < extents.size(); currentExtent++) { EMEntry& range = extents[currentExtent]; LBID_t lastLBID = range.range.start + (range.range.size * 1024); for (currentLBID = range.range.start; currentLBID < lastLBID; currentLBID++) { itList = byLBID.equal_range(currentLBID); for (byLBID_t::iterator tmpIt = itList.first; tmpIt != itList.second; tmpIt++) { fbList.erase(fFBPool[tmpIt->second->poolIdx].listLoc()); fEmptyPoolSlots.push_back(tmpIt->second->poolIdx); fbSet.erase(tmpIt->second); fCacheSize--; } } } } } void FileBufferMgr::flushPartition(const vector& oids, const set& partitions) { DBRM dbrm; uint32_t i; vector extents; int err; uint32_t currentExtent; LBID_t currentLBID; typedef tr1::unordered_multimap byLBID_t; byLBID_t byLBID; pair itList; filebuffer_uset_t::iterator it; uint32_t count = oids.size(); boost::mutex::scoped_lock lk(fWLock); if (fReportFrequency) { std::set::iterator sit; fLog << "flushPartition oids: "; for (uint32_t i = 0; i < count; i++) { fLog << oids[i] << ", "; } fLog << "flushPartition partitions: "; for (sit = partitions.begin(); sit != partitions.end(); ++sit) { fLog << (*sit).toString() << ", "; } fLog << endl; } if (fCacheSize == 0 || oids.size() == 0 || partitions.size() == 0) return; /* Index the cache by LBID */ for (it = fbSet.begin(); it != fbSet.end(); it++) byLBID.insert(pair(it->lbid, it)); for (i = 0; i < count; i++) { extents.clear(); err = dbrm.getExtents(oids[i], extents, true, true, true); // @Bug 3838 Include outofservice extents if (err < 0) { lk.unlock(); flushCache(); // better than returning an error code to the user return; } for (currentExtent = 0; currentExtent < extents.size(); currentExtent++) { EMEntry& range = extents[currentExtent]; LogicalPartition logicalPartNum(range.dbRoot, range.partitionNum, range.segmentNum); if (partitions.find(logicalPartNum) == partitions.end()) continue; LBID_t lastLBID = range.range.start + (range.range.size * 1024); for (currentLBID = range.range.start; currentLBID < lastLBID; currentLBID++) { itList = byLBID.equal_range(currentLBID); for (byLBID_t::iterator tmpIt = itList.first; tmpIt != itList.second; tmpIt++) { fbList.erase(fFBPool[tmpIt->second->poolIdx].listLoc()); fEmptyPoolSlots.push_back(tmpIt->second->poolIdx); fbSet.erase(tmpIt->second); fCacheSize--; } } } } } bool FileBufferMgr::exists(const BRM::LBID_t& lbid, const BRM::VER_t& ver) const { const HashObject_t fb(lbid, ver, 0); const bool b = exists(fb); return b; } FileBuffer* FileBufferMgr::findPtr(const HashObject_t& keyFb) { boost::mutex::scoped_lock lk(fWLock); filebuffer_uset_iter_t it = fbSet.find(keyFb); if (fbSet.end() != it) { FileBuffer* fb = &(fFBPool[it->poolIdx]); fFBPool[it->poolIdx].listLoc()->hits++; fbList.splice(fbList.begin(), fbList, (fFBPool[it->poolIdx]).listLoc()); return fb; } return NULL; } bool FileBufferMgr::find(const HashObject_t& keyFb, FileBuffer& fb) { bool ret = false; boost::mutex::scoped_lock lk(fWLock); filebuffer_uset_iter_t it = fbSet.find(keyFb); if (fbSet.end() != it) { fFBPool[it->poolIdx].listLoc()->hits++; fbList.splice(fbList.begin(), fbList, (fFBPool[it->poolIdx]).listLoc()); fb = fFBPool[it->poolIdx]; ret = true; } return ret; } bool FileBufferMgr::find(const HashObject_t& keyFb, void* bufferPtr) { bool ret = false; if (gPMProfOn && gPMStatsPtr) gPMStatsPtr->markEvent(keyFb.lbid, pthread_self(), gSession, 'L'); boost::mutex::scoped_lock lk(fWLock); if (gPMProfOn && gPMStatsPtr) gPMStatsPtr->markEvent(keyFb.lbid, pthread_self(), gSession, 'M'); filebuffer_uset_iter_t it = fbSet.find(keyFb); if (fbSet.end() != it) { uint32_t idx = it->poolIdx; //@bug 669 LRU cache, move block to front of list as last recently used. fFBPool[idx].listLoc()->hits++; fbList.splice(fbList.begin(), fbList, (fFBPool[idx]).listLoc()); lk.unlock(); memcpy(bufferPtr, (fFBPool[idx]).getData(), 8192); if (gPMProfOn && gPMStatsPtr) gPMStatsPtr->markEvent(keyFb.lbid, pthread_self(), gSession, 'U'); ret = true; } return ret; } uint32_t FileBufferMgr::bulkFind(const BRM::LBID_t* lbids, const BRM::VER_t* vers, uint8_t** buffers, bool* wasCached, uint32_t count) { uint32_t i, ret = 0; filebuffer_uset_iter_t* it = (filebuffer_uset_iter_t*)alloca(count * sizeof(filebuffer_uset_iter_t)); uint32_t* indexes = (uint32_t*)alloca(count * 4); if (gPMProfOn && gPMStatsPtr) { for (i = 0; i < count; i++) { gPMStatsPtr->markEvent(lbids[i], pthread_self(), gSession, 'L'); } } boost::mutex::scoped_lock lk(fWLock); if (gPMProfOn && gPMStatsPtr) { for (i = 0; i < count; i++) { gPMStatsPtr->markEvent(lbids[i], pthread_self(), gSession, 'M'); } } for (i = 0; i < count; i++) { new ((void*)&it[i]) filebuffer_uset_iter_t(); it[i] = fbSet.find(HashObject_t(lbids[i], vers[i], 0)); if (it[i] != fbSet.end()) { indexes[i] = it[i]->poolIdx; wasCached[i] = true; fFBPool[it[i]->poolIdx].listLoc()->hits++; fbList.splice(fbList.begin(), fbList, (fFBPool[it[i]->poolIdx]).listLoc()); } else { wasCached[i] = false; indexes[i] = 0; } } lk.unlock(); for (i = 0; i < count; i++) { if (wasCached[i]) { memcpy(buffers[i], fFBPool[indexes[i]].getData(), 8192); ret++; if (gPMProfOn && gPMStatsPtr) { gPMStatsPtr->markEvent(lbids[i], pthread_self(), gSession, 'U'); } } it[i].filebuffer_uset_iter_t::~filebuffer_uset_iter_t(); } return ret; } bool FileBufferMgr::exists(const HashObject_t& fb) const { bool find_bool = false; boost::mutex::scoped_lock lk(fWLock); filebuffer_uset_iter_t it = fbSet.find(fb); if (it != fbSet.end()) { find_bool = true; fFBPool[it->poolIdx].listLoc()->hits++; fbList.splice(fbList.begin(), fbList, (fFBPool[it->poolIdx]).listLoc()); } return find_bool; } // default insert operation. // add a new fb into fbMgr and to fbList // add to the front and age out from the back // so add new fbs to the front of the list //@bug 665: keep filebuffer in a vector. HashObject keeps the index of the filebuffer int FileBufferMgr::insert(const BRM::LBID_t lbid, const BRM::VER_t ver, const uint8_t* data) { int ret = 0; if (gPMProfOn && gPMStatsPtr) gPMStatsPtr->markEvent(lbid, pthread_self(), gSession, 'I'); boost::mutex::scoped_lock lk(fWLock); HashObject_t fbIndex(lbid, ver, 0); filebuffer_pair_t pr = fbSet.insert(fbIndex); if (pr.second) { // It was inserted (it wasn't there before) // Right now we have an invalid cache: we have inserted an entry with a -1 index. // We need to fix this quickly... fCacheSize++; FBData_t fbdata = {lbid, ver, 0}; fbList.push_front(fbdata); fBlksLoaded++; if (fReportFrequency && (fBlksLoaded % fReportFrequency) == 0) { struct timespec tm; clock_gettime(CLOCK_MONOTONIC, &tm); fLog << "insert: " << left << fixed << ((double)(tm.tv_sec + (1.e-9 * tm.tv_nsec))) << " " << right << setw(12) << fBlksLoaded << " " << right << setw(12) << fBlksNotUsed << endl; } } else { // if it's a duplicate there's nothing to do if (gPMProfOn && gPMStatsPtr) gPMStatsPtr->markEvent(lbid, pthread_self(), gSession, 'D'); return ret; } uint32_t pi = numeric_limits::max(); if (fCacheSize > maxCacheSize()) { // If the insert above caused the cache to exceed its max size, find the lru block in // the cache and use its pool index to store the block data. FBData_t& fbdata = fbList.back(); // the lru block HashObject_t lastFB(fbdata.lbid, fbdata.ver, 0); filebuffer_uset_iter_t iter = fbSet.find(lastFB); // should be there idbassert(iter != fbSet.end()); pi = iter->poolIdx; idbassert(pi < maxCacheSize()); idbassert(pi < fFBPool.size()); // set iters are always const. We are not changing the hash here, and this gets us // the pointer we need cheaply... HashObject_t& ref = const_cast(*pr.first); ref.poolIdx = pi; // replace the lru block with this block FileBuffer fb(lbid, ver, NULL, 0); fFBPool[pi] = fb; fFBPool[pi].setData(data, 8192); fbSet.erase(iter); if (fbList.back().hits == 0) fBlksNotUsed++; fbList.pop_back(); fCacheSize--; depleteCache(); ret = 1; } else { if (!fEmptyPoolSlots.empty()) { pi = fEmptyPoolSlots.front(); fEmptyPoolSlots.pop_front(); FileBuffer fb(lbid, ver, NULL, 0); fFBPool[pi] = fb; fFBPool[pi].setData(data, 8192); } else { pi = fFBPool.size(); FileBuffer fb(lbid, ver, NULL, 0); fFBPool.push_back(fb); fFBPool[pi].setData(data, 8192); } // See comment above HashObject_t& ref = const_cast(*pr.first); ref.poolIdx = pi; ret = 1; } idbassert(pi < fFBPool.size()); fFBPool[pi].listLoc(fbList.begin()); if (gPMProfOn && gPMStatsPtr) gPMStatsPtr->markEvent(lbid, pthread_self(), gSession, 'J'); idbassert(fCacheSize <= maxCacheSize()); // idbassert(fCacheSize == fbSet.size()); // idbassert(fCacheSize == fbList.size()); return ret; } void FileBufferMgr::depleteCache() { for (uint32_t i = 0; i < fDeleteBlocks && !fbList.empty(); ++i) { FBData_t fbdata(fbList.back()); // the lru block HashObject_t lastFB(fbdata.lbid, fbdata.ver, 0); filebuffer_uset_iter_t iter = fbSet.find(lastFB); idbassert(iter != fbSet.end()); uint32_t idx = iter->poolIdx; idbassert(idx < fFBPool.size()); // Save position in FileBuffer pool for reuse. fEmptyPoolSlots.push_back(idx); fbSet.erase(iter); if (fbList.back().hits == 0) fBlksNotUsed++; fbList.pop_back(); fCacheSize--; } } ostream& FileBufferMgr::formatLRUList(ostream& os) const { filebuffer_list_t::const_iterator iter = fbList.begin(); filebuffer_list_t::const_iterator end = fbList.end(); while (iter != end) { os << iter->lbid << '\t' << iter->ver << endl; ++iter; } return os; } // puts the new entry at the front of the list void FileBufferMgr::updateLRU(const FBData_t& f) { if (fCacheSize > maxCacheSize()) { list::iterator last = fbList.end(); last--; FBData_t& fbdata = *last; HashObject_t lastFB(fbdata.lbid, fbdata.ver, 0); filebuffer_uset_iter_t iter = fbSet.find(lastFB); fEmptyPoolSlots.push_back(iter->poolIdx); if (fbdata.hits == 0) fBlksNotUsed++; fbSet.erase(iter); fbList.splice(fbList.begin(), fbList, last); fbdata = f; fCacheSize--; // cout << "booted an entry\n"; } else { // cout << "new entry\n"; fbList.push_front(f); } } uint32_t FileBufferMgr::doBlockCopy(const BRM::LBID_t& lbid, const BRM::VER_t& ver, const uint8_t* data) { uint32_t poolIdx; if (!fEmptyPoolSlots.empty()) { poolIdx = fEmptyPoolSlots.front(); fEmptyPoolSlots.pop_front(); } else { poolIdx = fFBPool.size(); fFBPool.resize(poolIdx + 1); // shouldn't trigger a 'real' resize b/c of the reserve call } fFBPool[poolIdx].Lbid(lbid); fFBPool[poolIdx].Verid(ver); fFBPool[poolIdx].setData(data); return poolIdx; } int FileBufferMgr::bulkInsert(const vector& ops) { uint32_t i; int32_t pi; int ret = 0; boost::mutex::scoped_lock lk(fWLock); if (fReportFrequency) { fLog << "bulkInsert: "; } for (i = 0; i < ops.size(); i++) { const CacheInsert_t& op = ops[i]; if (gPMProfOn && gPMStatsPtr) gPMStatsPtr->markEvent(op.lbid, pthread_self(), gSession, 'I'); HashObject_t fbIndex(op.lbid, op.ver, 0); filebuffer_pair_t pr = fbSet.insert(fbIndex); if (!pr.second) { if (gPMProfOn && gPMStatsPtr) gPMStatsPtr->markEvent(op.lbid, pthread_self(), gSession, 'D'); continue; } if (fReportFrequency) { fLog << op.lbid << " " << op.ver << ", "; } fCacheSize++; fBlksLoaded++; FBData_t fbdata = {op.lbid, op.ver, 0}; updateLRU(fbdata); pi = doBlockCopy(op.lbid, op.ver, op.data); HashObject_t& ref = const_cast(*pr.first); ref.poolIdx = pi; fFBPool[pi].listLoc(fbList.begin()); if (gPMProfOn && gPMStatsPtr) gPMStatsPtr->markEvent(op.lbid, pthread_self(), gSession, 'J'); ret++; } if (fReportFrequency) { fLog << endl; } idbassert(fCacheSize <= maxCacheSize()); return ret; } } // namespace dbbc