You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-1737 Add debug logging options for LRU cache
This adds options which are user enabled to debug the LRU cache inside ColumnStore. Specifically cache flushing. It adds the following: * PrimProc flush information when SIGUSR2 mode is enabled * cpimport dictionary flush information when -d2 is used * WriteEngineServer DML flush information to STDERR
This commit is contained in:
@ -117,7 +117,10 @@ void FileBufferMgr::flushCache()
|
|||||||
// the block pool should not be freed in the above block to allow us
|
// the block pool should not be freed in the above block to allow us
|
||||||
// to continue doing concurrent unprotected-but-"safe" memcpys
|
// to continue doing concurrent unprotected-but-"safe" memcpys
|
||||||
// from that memory
|
// from that memory
|
||||||
|
if (fReportFrequency)
|
||||||
|
{
|
||||||
|
fLog << "Clearing entire cache" << endl;
|
||||||
|
}
|
||||||
fFBPool.clear();
|
fFBPool.clear();
|
||||||
// fFBPool.reserve(fMaxNumBlocks);
|
// fFBPool.reserve(fMaxNumBlocks);
|
||||||
}
|
}
|
||||||
@ -150,6 +153,15 @@ void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt)
|
|||||||
BRM::LBID_t lbid;
|
BRM::LBID_t lbid;
|
||||||
BRM::VER_t ver;
|
BRM::VER_t ver;
|
||||||
filebuffer_uset_iter_t iter;
|
filebuffer_uset_iter_t iter;
|
||||||
|
if (fReportFrequency)
|
||||||
|
{
|
||||||
|
fLog << "flushMany " << cnt << " items: ";
|
||||||
|
for (uint32_t j = 0; j < cnt; j++)
|
||||||
|
{
|
||||||
|
fLog << "lbid: " << laVptr[j].LBID << " ver: " << laVptr[j].Ver << ", ";
|
||||||
|
}
|
||||||
|
fLog << endl;
|
||||||
|
}
|
||||||
for (uint32_t j = 0; j < cnt; j++)
|
for (uint32_t j = 0; j < cnt; j++)
|
||||||
{
|
{
|
||||||
lbid = static_cast<BRM::LBID_t>(laVptr->LBID);
|
lbid = static_cast<BRM::LBID_t>(laVptr->LBID);
|
||||||
@ -157,6 +169,10 @@ void FileBufferMgr::flushMany(const LbidAtVer* laVptr, uint32_t cnt)
|
|||||||
iter = fbSet.find(HashObject_t(lbid, ver, 0));
|
iter = fbSet.find(HashObject_t(lbid, ver, 0));
|
||||||
if (iter != fbSet.end())
|
if (iter != fbSet.end())
|
||||||
{
|
{
|
||||||
|
if (fReportFrequency)
|
||||||
|
{
|
||||||
|
fLog << "flushMany hit, lbid: " << lbid << " index: " << iter->poolIdx << endl;
|
||||||
|
}
|
||||||
//remove it from fbList
|
//remove it from fbList
|
||||||
uint32_t idx = iter->poolIdx;
|
uint32_t idx = iter->poolIdx;
|
||||||
fbList.erase(fFBPool[idx].listLoc());
|
fbList.erase(fFBPool[idx].listLoc());
|
||||||
@ -179,6 +195,16 @@ void FileBufferMgr::flushManyAllversion(const LBID_t* laVptr, uint32_t cnt)
|
|||||||
|
|
||||||
mutex::scoped_lock lk(fWLock);
|
mutex::scoped_lock lk(fWLock);
|
||||||
|
|
||||||
|
if (fReportFrequency)
|
||||||
|
{
|
||||||
|
fLog << "flushManyAllversion " << cnt << " items: ";
|
||||||
|
for (uint32_t i = 0; i < cnt; i++)
|
||||||
|
{
|
||||||
|
fLog << laVptr[i] << ", ";
|
||||||
|
}
|
||||||
|
fLog << endl;
|
||||||
|
}
|
||||||
|
|
||||||
if (fCacheSize == 0 || cnt == 0)
|
if (fCacheSize == 0 || cnt == 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -187,6 +213,10 @@ void FileBufferMgr::flushManyAllversion(const LBID_t* laVptr, uint32_t cnt)
|
|||||||
|
|
||||||
for (it = fbSet.begin(); it != fbSet.end();) {
|
for (it = fbSet.begin(); it != fbSet.end();) {
|
||||||
if (uniquer.find(it->lbid) != uniquer.end()) {
|
if (uniquer.find(it->lbid) != uniquer.end()) {
|
||||||
|
if (fReportFrequency)
|
||||||
|
{
|
||||||
|
fLog << "flushManyAllversion hit: " << it->lbid << " index: " << it->poolIdx << endl;
|
||||||
|
}
|
||||||
const uint32_t idx = it->poolIdx;
|
const uint32_t idx = it->poolIdx;
|
||||||
fbList.erase(fFBPool[idx].listLoc());
|
fbList.erase(fFBPool[idx].listLoc());
|
||||||
fEmptyPoolSlots.push_back(idx);
|
fEmptyPoolSlots.push_back(idx);
|
||||||
@ -213,6 +243,16 @@ void FileBufferMgr::flushOIDs(const uint32_t *oids, uint32_t count)
|
|||||||
pair<byLBID_t::iterator, byLBID_t::iterator> itList;
|
pair<byLBID_t::iterator, byLBID_t::iterator> itList;
|
||||||
filebuffer_uset_t::iterator it;
|
filebuffer_uset_t::iterator it;
|
||||||
|
|
||||||
|
if (fReportFrequency)
|
||||||
|
{
|
||||||
|
fLog << "flushOIDs " << count << " items: ";
|
||||||
|
for (uint32_t i = 0; i < count; i++)
|
||||||
|
{
|
||||||
|
fLog << oids[i] << ", ";
|
||||||
|
}
|
||||||
|
fLog << endl;
|
||||||
|
}
|
||||||
|
|
||||||
// If there are more than this # of extents to drop, the whole cache will be cleared
|
// If there are more than this # of extents to drop, the whole cache will be cleared
|
||||||
const uint32_t clearThreshold = 50000;
|
const uint32_t clearThreshold = 50000;
|
||||||
|
|
||||||
@ -269,6 +309,22 @@ void FileBufferMgr::flushPartition(const vector<OID_t> &oids, const set<BRM::Log
|
|||||||
|
|
||||||
mutex::scoped_lock lk(fWLock);
|
mutex::scoped_lock lk(fWLock);
|
||||||
|
|
||||||
|
if (fReportFrequency)
|
||||||
|
{
|
||||||
|
std::set<BRM::LogicalPartition>::iterator sit;
|
||||||
|
fLog << "flushPartition oids: ";
|
||||||
|
for (uint32_t i = 0; i < count; i++)
|
||||||
|
{
|
||||||
|
fLog << oids[i] << ", ";
|
||||||
|
}
|
||||||
|
fLog << "flushPartition partitions: ";
|
||||||
|
for (sit = partitions.begin(); sit != partitions.end(); ++sit)
|
||||||
|
{
|
||||||
|
fLog << (*sit).toString() << ", ";
|
||||||
|
}
|
||||||
|
fLog << endl;
|
||||||
|
}
|
||||||
|
|
||||||
if (fCacheSize == 0 || oids.size() == 0 || partitions.size() == 0)
|
if (fCacheSize == 0 || oids.size() == 0 || partitions.size() == 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -496,7 +552,7 @@ int FileBufferMgr::insert(const BRM::LBID_t lbid, const BRM::VER_t ver, const ui
|
|||||||
if (fReportFrequency && (fBlksLoaded%fReportFrequency)==0) {
|
if (fReportFrequency && (fBlksLoaded%fReportFrequency)==0) {
|
||||||
struct timespec tm;
|
struct timespec tm;
|
||||||
clock_gettime(CLOCK_MONOTONIC, &tm);
|
clock_gettime(CLOCK_MONOTONIC, &tm);
|
||||||
fLog
|
fLog << "insert: "
|
||||||
<< left << fixed << ((double)(tm.tv_sec+(1.e-9*tm.tv_nsec))) << " "
|
<< left << fixed << ((double)(tm.tv_sec+(1.e-9*tm.tv_nsec))) << " "
|
||||||
<< right << setw(12) << fBlksLoaded << " "
|
<< right << setw(12) << fBlksLoaded << " "
|
||||||
<< right << setw(12) << fBlksNotUsed << endl;
|
<< right << setw(12) << fBlksNotUsed << endl;
|
||||||
@ -671,6 +727,11 @@ int FileBufferMgr::bulkInsert(const vector<CacheInsert_t> &ops)
|
|||||||
|
|
||||||
mutex::scoped_lock lk(fWLock);
|
mutex::scoped_lock lk(fWLock);
|
||||||
|
|
||||||
|
if (fReportFrequency)
|
||||||
|
{
|
||||||
|
fLog << "bulkInsert: ";
|
||||||
|
}
|
||||||
|
|
||||||
for (i = 0; i < ops.size(); i++) {
|
for (i = 0; i < ops.size(); i++) {
|
||||||
const CacheInsert_t &op = ops[i];
|
const CacheInsert_t &op = ops[i];
|
||||||
|
|
||||||
@ -694,7 +755,10 @@ int FileBufferMgr::bulkInsert(const vector<CacheInsert_t> &ops)
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
//cout << "FBM: inserting <" << op.lbid << ", " << op.ver << endl;
|
if (fReportFrequency)
|
||||||
|
{
|
||||||
|
fLog << op.lbid << " " << op.ver << ", ";
|
||||||
|
}
|
||||||
fCacheSize++;
|
fCacheSize++;
|
||||||
fBlksLoaded++;
|
fBlksLoaded++;
|
||||||
FBData_t fbdata = {op.lbid, op.ver, 0};
|
FBData_t fbdata = {op.lbid, op.ver, 0};
|
||||||
@ -712,6 +776,10 @@ int FileBufferMgr::bulkInsert(const vector<CacheInsert_t> &ops)
|
|||||||
#endif
|
#endif
|
||||||
ret++;
|
ret++;
|
||||||
}
|
}
|
||||||
|
if (fReportFrequency)
|
||||||
|
{
|
||||||
|
fLog << endl;
|
||||||
|
}
|
||||||
idbassert(fCacheSize <= maxCacheSize());
|
idbassert(fCacheSize <= maxCacheSize());
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -709,6 +709,17 @@ int TableInfo::setParseComplete(const int &columnId,
|
|||||||
#ifdef PROFILE
|
#ifdef PROFILE
|
||||||
Stats::startParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS);
|
Stats::startParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS);
|
||||||
#endif
|
#endif
|
||||||
|
if (fLog->isDebug(DEBUG_2))
|
||||||
|
{
|
||||||
|
ostringstream oss;
|
||||||
|
oss << "Dictionary cache flush: ";
|
||||||
|
for (uint32_t i = 0; i < fDictFlushBlks.size(); i++)
|
||||||
|
{
|
||||||
|
oss << fDictFlushBlks[i] << ", ";
|
||||||
|
}
|
||||||
|
oss << endl;
|
||||||
|
fLog->logMsg( oss.str(), MSGLVL_INFO1 );
|
||||||
|
}
|
||||||
cacheutils::flushPrimProcAllverBlocks(fDictFlushBlks);
|
cacheutils::flushPrimProcAllverBlocks(fDictFlushBlks);
|
||||||
#ifdef PROFILE
|
#ifdef PROFILE
|
||||||
Stats::stopParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS);
|
Stats::stopParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS);
|
||||||
|
@ -2036,10 +2036,13 @@ uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::s
|
|||||||
{
|
{
|
||||||
std::set<BRM::LBID_t>::iterator lbidIter;
|
std::set<BRM::LBID_t>::iterator lbidIter;
|
||||||
std::vector<BRM::LBID_t> dictFlushBlks;
|
std::vector<BRM::LBID_t> dictFlushBlks;
|
||||||
|
cerr << "API Flushing blocks: ";
|
||||||
for (lbidIter = (*mapIter).second.begin(); lbidIter != (*mapIter).second.end(); lbidIter++)
|
for (lbidIter = (*mapIter).second.begin(); lbidIter != (*mapIter).second.end(); lbidIter++)
|
||||||
{
|
{
|
||||||
|
cerr << *lbidIter << ", ";
|
||||||
dictFlushBlks.push_back((*lbidIter));
|
dictFlushBlks.push_back((*lbidIter));
|
||||||
}
|
}
|
||||||
|
cerr << endl;
|
||||||
cacheutils::flushPrimProcAllverBlocks(dictFlushBlks);
|
cacheutils::flushPrimProcAllverBlocks(dictFlushBlks);
|
||||||
fWEWrapper.getDictMap().erase(txnID);
|
fWEWrapper.getDictMap().erase(txnID);
|
||||||
}
|
}
|
||||||
|
@ -195,13 +195,16 @@ int BulkRollbackMgr::rollback ( bool keepMetaFile )
|
|||||||
// the user but keep going.
|
// the user but keep going.
|
||||||
std::vector<BRM::OID_t> allOIDs;
|
std::vector<BRM::OID_t> allOIDs;
|
||||||
std::set<OID>::const_iterator iter=fAllColDctOIDs.begin();
|
std::set<OID>::const_iterator iter=fAllColDctOIDs.begin();
|
||||||
|
cerr << "Rollback flushing: ";
|
||||||
while (iter != fAllColDctOIDs.end())
|
while (iter != fAllColDctOIDs.end())
|
||||||
{
|
{
|
||||||
|
cerr << *iter << ", ";
|
||||||
//std::cout << "Flushing OID from PrimProc cache " << *iter <<
|
//std::cout << "Flushing OID from PrimProc cache " << *iter <<
|
||||||
// std::endl;
|
// std::endl;
|
||||||
allOIDs.push_back(*iter);
|
allOIDs.push_back(*iter);
|
||||||
++iter;
|
++iter;
|
||||||
}
|
}
|
||||||
|
cerr << endl;
|
||||||
|
|
||||||
int cache_rc = cacheutils::flushOIDsFromCache( allOIDs );
|
int cache_rc = cacheutils::flushOIDsFromCache( allOIDs );
|
||||||
if (cache_rc != 0)
|
if (cache_rc != 0)
|
||||||
|
Reference in New Issue
Block a user