1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-06-13 16:01:32 +03:00
Files
mariadb-columnstore-engine/primitives/blockcache/iomanager.cpp
Roman Nozdrin 4fe9cd64a3 Revert "No boost condition (#2822)" (#2828)
This reverts commit f916e64927.
2023-04-22 15:49:50 +03:00

1419 lines
38 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: iomanager.cpp 2147 2013-08-14 20:44:44Z bwilkinson $
//
// C++ Implementation: iomanager
//
// Description:
//
//
// Author: Jason Rodriguez <jrodriguez@calpont.com>
//
//
//
#include "mcsconfig.h"
#define _FILE_OFFSET_BITS 64
#define _LARGEFILE64_SOURCE
#include <sys/mount.h>
#include <linux/fs.h>
#ifdef BLOCK_SIZE
#undef BLOCK_SIZE
#endif
#ifdef READ
#undef READ
#endif
#ifdef WRITE
#undef WRITE
#endif
#include <stdexcept>
#include <unistd.h>
#include <stdlib.h>
#include <string>
#include <sstream>
#include <tr1/unordered_map>
#include <tr1/unordered_set>
#include <set>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <errno.h>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_array.hpp>
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <pthread.h>
//#define NDEBUG
#include <cassert>
using namespace std;
#include "configcpp.h"
using namespace config;
#include "messageobj.h"
#include "messageids.h"
using namespace logging;
#include "brmtypes.h"
#include "pp_logger.h"
#include "fsutils.h"
#include "rwlock_local.h"
#include "iomanager.h"
#include "liboamcpp.h"
#include "idbcompress.h"
using namespace compress;
#include "IDBDataFile.h"
#include "IDBPolicy.h"
#include "IDBLogger.h"
using namespace idbdatafile;
#include "mcsconfig.h"
#include "threadnaming.h"
typedef tr1::unordered_set<BRM::OID_t> USOID;
namespace primitiveprocessor
{
extern Logger* mlp;
extern int directIOFlag;
extern int noVB;
} // namespace primitiveprocessor
#ifndef O_BINARY
#define O_BINARY 0
#endif
#ifndef O_LARGEFILE
#define O_LARGEFILE 0
#endif
#ifndef O_NOATIME
#define O_NOATIME 0
#endif
namespace
{
using namespace dbbc;
using namespace std;
const std::string boldStart = "\033[0;1m";
const std::string boldStop = "\033[0;39m";
const uint32_t MAX_OPEN_FILES = 16384;
const uint32_t DECREASE_OPEN_FILES = 4096;
void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, double& tm)
{
tm = (double)(tv2.tv_sec - tv1.tv_sec) + 1.e-9 * (tv2.tv_nsec - tv1.tv_nsec);
}
struct IOMThreadArg
{
ioManager* iom;
int32_t thdId;
};
typedef IOMThreadArg IOMThreadArg_t;
/* structures shared across all iomanagers */
class FdEntry
{
public:
FdEntry() : oid(0), dbroot(0), partNum(0), segNum(0), fp(0), c(0), inUse(0), compType(0)
{
cmpMTime = 0;
}
FdEntry(const BRM::OID_t o, const uint16_t d, const uint32_t p, const uint16_t s, const int ct,
IDBDataFile* f)
: oid(o), dbroot(d), partNum(p), segNum(s), fp(f), c(0), inUse(0), compType(0)
{
cmpMTime = 0;
if (oid >= 1000)
compType = ct;
}
~FdEntry()
{
delete fp;
fp = 0;
}
BRM::OID_t oid;
uint16_t dbroot;
uint32_t partNum;
uint16_t segNum;
IDBDataFile* fp;
uint32_t c;
int inUse;
CompChunkPtrList ptrList;
int compType;
bool isCompressed() const
{
return (oid >= 1000 && compType != 0);
}
time_t cmpMTime;
friend ostream& operator<<(ostream& out, const FdEntry& o)
{
out << " o: " << o.oid << " f: " << o.fp << " d: " << o.dbroot << " p: " << o.partNum
<< " s: " << o.segNum << " c: " << o.c << " t: " << o.compType << " m: " << o.cmpMTime;
return out;
}
};
struct fdCacheMapLessThan
{
bool operator()(const FdEntry& lhs, const FdEntry& rhs) const
{
if (lhs.oid < rhs.oid)
return true;
if (lhs.oid == rhs.oid && lhs.dbroot < rhs.dbroot)
return true;
if (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum < rhs.partNum)
return true;
if (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum == rhs.partNum &&
lhs.segNum < rhs.segNum)
return true;
return false;
}
};
struct fdMapEqual
{
bool operator()(const FdEntry& lhs, const FdEntry& rhs) const
{
return (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum == rhs.partNum &&
lhs.segNum == rhs.segNum);
}
};
typedef boost::shared_ptr<FdEntry> SPFdEntry_t;
typedef std::map<FdEntry, SPFdEntry_t, fdCacheMapLessThan> FdCacheType_t;
struct FdCountEntry
{
FdCountEntry()
{
}
FdCountEntry(const BRM::OID_t o, const uint16_t d, const uint32_t p, const uint16_t s, const uint32_t c,
const FdCacheType_t::iterator it)
: oid(o), dbroot(d), partNum(p), segNum(s), cnt(c), fdit(it)
{
}
~FdCountEntry()
{
}
BRM::OID_t oid;
uint16_t dbroot;
uint32_t partNum;
uint16_t segNum;
uint32_t cnt;
FdCacheType_t::iterator fdit;
}; // FdCountEntry
typedef FdCountEntry FdCountEntry_t;
struct fdCountCompare
{
bool operator()(const FdCountEntry_t& lhs, const FdCountEntry_t& rhs) const
{
return lhs.cnt > rhs.cnt;
}
};
typedef multiset<FdCountEntry_t, fdCountCompare> FdCacheCountType_t;
FdCacheType_t fdcache;
boost::mutex fdMapMutex;
rwlock::RWLock_local localLock;
char* alignTo(const char* in, int av)
{
ptrdiff_t inx = reinterpret_cast<ptrdiff_t>(in);
ptrdiff_t avx = static_cast<ptrdiff_t>(av);
if ((inx % avx) != 0)
{
inx &= ~(avx - 1);
inx += avx;
}
char* outx = reinterpret_cast<char*>(inx);
return outx;
}
void waitForRetry(long count)
{
usleep(5000 * count);
return;
}
// Must hold the FD cache lock!
static int updateptrs(char* ptr, FdCacheType_t::iterator fdit)
{
ssize_t i;
uint32_t progress;
// ptr is taken from buffer, already been checked: realbuff.get() == 0
if (ptr == 0)
return -1;
// already checked before: fdit->second->isCompressed()
if (fdit->second.get() == 0)
return -2;
IDBDataFile* fp = fdit->second->fp;
if (fp == INVALID_HANDLE_VALUE)
{
Message::Args args;
args.add("updateptrs got invalid fp.");
primitiveprocessor::mlp->logInfoMessage(logging::M0006, args);
return -3;
}
// We need to read one extra block because we need the first ptr in the 3rd block
// to know if we're done.
// FIXME: re-work all of this so we don't have to re-read the 3rd block.
progress = 0;
while (progress < 4096 * 3)
{
i = fp->pread(&ptr[progress], progress, (4096 * 3) - progress);
if (i <= 0)
break;
progress += i;
}
if (progress != 4096 * 3)
return -4; // let it retry. Not likely, but ...
fdit->second->cmpMTime = 0;
time_t mtime = fp->mtime();
if (mtime != (time_t)-1)
fdit->second->cmpMTime = mtime;
int gplRc = 0;
gplRc = compress::CompressInterface::getPtrList(&ptr[4096], 4096, fdit->second->ptrList);
if (gplRc != 0)
return -5; // go for a retry.
if (fdit->second->ptrList.size() == 0)
return -6; // go for a retry.
uint64_t numHdrs = fdit->second->ptrList[0].first / 4096ULL - 2ULL;
if (numHdrs > 0)
{
boost::scoped_array<char> nextHdrBufsa(new char[numHdrs * 4096 + 4095]);
char* nextHdrBufPtr = 0;
nextHdrBufPtr = alignTo(nextHdrBufsa.get(), 4096);
progress = 0;
while (progress < numHdrs * 4096)
{
i = fp->pread(&nextHdrBufPtr[progress], (4096 * 2) + progress, (numHdrs * 4096) - progress);
if (i <= 0)
break;
progress += i;
}
if (progress != numHdrs * 4096)
return -8;
CompChunkPtrList nextPtrList;
gplRc = compress::CompressInterface::getPtrList(&nextHdrBufPtr[0], numHdrs * 4096, nextPtrList);
if (gplRc != 0)
return -7; // go for a retry.
fdit->second->ptrList.insert(fdit->second->ptrList.end(), nextPtrList.begin(), nextPtrList.end());
}
return 0;
}
void* thr_popper(ioManager* arg)
{
utils::setThreadName("thr_popper");
ioManager* iom = arg;
FileBufferMgr* fbm;
fileRequest* fr = 0;
BRM::LBID_t lbid = 0;
BRM::OID_t oid = 0;
BRM::VER_t ver = 0;
BRM::QueryContext qc;
BRM::VER_t txn = 0;
int compType = 0;
int blocksLoaded = 0;
int blocksRead = 0;
const unsigned pageSize = 4096;
fbm = &iom->fileBufferManager();
char fileName[WriteEngine::FILE_NAME_SIZE];
const uint64_t fileBlockSize = BLOCK_SIZE;
bool flg = false;
bool useCache;
uint16_t dbroot = 0;
uint32_t partNum = 0;
uint16_t segNum = 0;
uint32_t offset = 0;
char* fileNamePtr = fileName;
uint64_t longSeekOffset = 0;
int err;
uint32_t dlen = 0, acc, readSize, blocksThisRead, j;
uint32_t blocksRequested = 0;
ssize_t i;
char* alignedbuff = 0;
boost::scoped_array<char> realbuff;
pthread_t threadId = 0;
ostringstream iomLogFileName;
ofstream lFile;
struct timespec rqst1;
struct timespec rqst2;
struct timespec tm;
struct timespec tm2;
double tm3;
double rqst3;
bool locked = false;
SPFdEntry_t fe;
vector<CacheInsert_t> cacheInsertOps;
bool copyLocked = false;
if (iom->IOTrace())
{
threadId = pthread_self();
iomLogFileName << MCSLOGDIR << "/trace/iom." << threadId;
lFile.open(iomLogFileName.str().c_str(), ios_base::app | ios_base::ate);
}
FdCacheType_t::iterator fdit;
IDBDataFile* fp = 0;
size_t maxCompSz =
compress::CompressInterface::getMaxCompressedSizeGeneric(iom->blocksPerRead * BLOCK_SIZE);
size_t readBufferSz = maxCompSz + pageSize;
realbuff.reset(new char[readBufferSz]);
if (realbuff.get() == 0)
{
cerr << "thr_popper: Can't allocate space for a whole extent in memory" << endl;
return 0;
}
alignedbuff = alignTo(realbuff.get(), 4096);
if ((((ptrdiff_t)alignedbuff - (ptrdiff_t)realbuff.get()) >= (ptrdiff_t)pageSize) ||
(((ptrdiff_t)alignedbuff % pageSize) != 0))
throw runtime_error("aligned buffer size is not matching the page size.");
uint8_t* uCmpBuf = 0;
uCmpBuf = new uint8_t[4 * 1024 * 1024 + 4];
for (;;)
{
if (copyLocked)
{
iom->dbrm()->releaseLBIDRange(lbid, blocksRequested);
copyLocked = false;
}
if (locked)
{
localLock.read_unlock();
locked = false;
}
fr = iom->getNextRequest();
localLock.read_lock();
locked = true;
if (iom->IOTrace())
clock_gettime(CLOCK_REALTIME, &rqst1);
lbid = fr->Lbid();
qc = fr->Ver();
txn = fr->Txn();
flg = fr->Flg();
compType = fr->CompType();
useCache = fr->useCache();
blocksLoaded = 0;
blocksRead = 0;
dlen = fr->BlocksRequested();
blocksRequested = fr->BlocksRequested();
oid = 0;
dbroot = 0;
partNum = 0;
segNum = 0;
offset = 0;
// special case for getBlock.
iom->dbrm()->lockLBIDRange(lbid, blocksRequested);
copyLocked = true;
// special case for getBlock.
if (blocksRequested == 1)
{
BRM::VER_t outVer;
iom->dbrm()->vssLookup((BRM::LBID_t)lbid, qc, txn, &outVer, &flg);
ver = outVer;
fr->versioned(flg);
}
else
{
fr->versioned(false);
ver = qc.currentScn;
}
err = iom->localLbidLookup(lbid, ver, flg, oid, dbroot, partNum, segNum, offset);
if (err == BRM::ERR_SNAPSHOT_TOO_OLD)
{
ostringstream errMsg;
errMsg << "thr_popper: version " << ver << " of LBID " << lbid << "is too old";
iom->handleBlockReadError(fr, errMsg.str(), &copyLocked);
continue;
}
else if (err < 0)
{
ostringstream errMsg;
errMsg << "thr_popper: BRM lookup failure; lbid=" << lbid << "; ver=" << ver
<< "; flg=" << (flg ? 1 : 0);
iom->handleBlockReadError(fr, errMsg.str(), &copyLocked, fileRequest::BRM_LOOKUP_ERROR);
continue;
}
#ifdef IDB_COMP_POC_DEBUG
{
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
if (compType != 0)
cout << boldStart;
cout << "fileRequest: " << *fr << endl;
if (compType != 0)
cout << boldStop;
}
#endif
const uint32_t extentSize = iom->getExtentRows();
FdEntry fdKey(oid, dbroot, partNum, segNum, compType, NULL);
// cout << "Looking for " << fdKey << endl
// << "O: " << oid << " D: " << dbroot << " P: " << partNum << " S: " << segNum << endl;
fdMapMutex.lock();
fdit = fdcache.find(fdKey);
if (fdit == fdcache.end())
{
try
{
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
}
catch (exception& exc)
{
fdMapMutex.unlock();
Message::Args args;
args.add(oid);
args.add(exc.what());
primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
ostringstream errMsg;
errMsg << "thr_popper: Error building filename for OID " << oid << "; " << exc.what();
iom->handleBlockReadError(fr, errMsg.str(), &copyLocked);
continue;
}
#ifdef IDB_COMP_USE_CMP_SUFFIX
if (compType != 0)
{
char* ptr = strrchr(fileNamePtr, '.');
idbassert(ptr);
strcpy(ptr, ".cmp");
}
#endif
if (oid > 3000)
{
// TODO: should syscat columns be considered when reducing open file count
// They are always needed why should they be closed?
if (fdcache.size() >= iom->MaxOpenFiles())
{
FdCacheCountType_t fdCountSort;
for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
{
struct FdCountEntry fdc(it->second->oid, it->second->dbroot, it->second->partNum,
it->second->segNum, it->second->c, it);
fdCountSort.insert(fdc);
}
if (iom->FDCacheTrace())
{
iom->FDTraceFile() << "Before flushing sz: " << fdcache.size()
<< " delCount: " << iom->DecreaseOpenFilesCount() << endl;
for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
iom->FDTraceFile() << *(*it).second << endl;
iom->FDTraceFile() << "==================" << endl << endl;
}
// TODO: should we consider a minimum number of open files
// currently, there is nothing to prevent all open files
// from being closed by the IOManager.
uint32_t delCount = 0;
for (FdCacheCountType_t::reverse_iterator rit = fdCountSort.rbegin();
rit != fdCountSort.rend() && fdcache.size() > 0 && delCount < iom->DecreaseOpenFilesCount();
rit++)
{
FdEntry oldfdKey(rit->oid, rit->dbroot, rit->partNum, rit->segNum, 0, NULL);
FdCacheType_t::iterator it = fdcache.find(oldfdKey);
if (it != fdcache.end())
{
if (iom->FDCacheTrace())
{
if (!rit->fdit->second->inUse)
iom->FDTraceFile() << "Removing dc: " << delCount << " sz: " << fdcache.size()
<< *(*it).second << " u: " << rit->fdit->second->inUse << endl;
else
iom->FDTraceFile() << "Skip Remove in use dc: " << delCount << " sz: " << fdcache.size()
<< *(*it).second << " u: " << rit->fdit->second->inUse << endl;
}
if (rit->fdit->second->inUse <= 0)
{
fdcache.erase(it);
delCount++;
}
}
} // for (FdCacheCountType_t...
if (iom->FDCacheTrace())
{
iom->FDTraceFile() << "After flushing sz: " << fdcache.size() << endl;
for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
{
iom->FDTraceFile() << *(*it).second << endl;
}
iom->FDTraceFile() << "==================" << endl << endl;
}
fdCountSort.clear();
} // if (fdcache.size()...
} // if (oid > 3000)
int opts = primitiveprocessor::directIOFlag ? IDBDataFile::USE_ODIRECT : 0;
fp = NULL;
uint32_t openRetries = 0;
int saveErrno = 0;
while (fp == NULL && openRetries++ < 5)
{
fp = IDBDataFile::open(IDBPolicy::getType(fileNamePtr, IDBPolicy::PRIMPROC), fileNamePtr, "r", opts);
saveErrno = errno;
if (fp == NULL)
sleep(1);
}
if (fp == NULL)
{
Message::Args args;
fdit = fdcache.end();
fdMapMutex.unlock();
args.add(oid);
args.add(string(fileNamePtr) + ":" + strerror(saveErrno));
primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
ostringstream errMsg;
errMsg << "thr_popper: Error opening file for OID " << oid << "; " << fileNamePtr << "; "
<< strerror(saveErrno);
int errorCode = fileRequest::FAILED;
if (saveErrno == EINVAL)
errorCode = fileRequest::FS_EINVAL;
else if (saveErrno == ENOENT)
errorCode = fileRequest::FS_ENOENT;
iom->handleBlockReadError(fr, errMsg.str(), &copyLocked, errorCode);
continue;
}
fe.reset(new FdEntry(oid, dbroot, partNum, segNum, compType, fp));
fe->inUse++;
fdcache[fdKey] = fe;
fdit = fdcache.find(fdKey);
fe.reset();
}
else
{
if (fdit->second.get())
{
fdit->second->c++;
fdit->second->inUse++;
fp = fdit->second->fp;
}
else
{
Message::Args args;
fdit = fdcache.end();
fdMapMutex.unlock();
args.add(oid);
ostringstream errMsg;
errMsg << "Null FD cache entry. (dbroot, partNum, segNum, compType) = (" << dbroot << ", " << partNum
<< ", " << segNum << ", " << compType << ")";
args.add(errMsg.str());
primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
iom->handleBlockReadError(fr, errMsg.str(), &copyLocked);
continue;
}
}
fdMapMutex.unlock();
#ifdef SHARED_NOTHING_DEMO_2
// change offset if it's shared nothing
/* Get the extent #, divide by # of PMs, calculate base offset for the new extent #,
add extent offset */
if (oid >= 10000)
offset = (((offset / extentSize) / iom->pmCount) * extentSize) + (offset % extentSize);
#endif
longSeekOffset = (uint64_t)offset * (uint64_t)fileBlockSize;
lldiv_t cmpOffFact = lldiv(longSeekOffset, (4LL * 1024LL * 1024LL));
uint32_t readCount = 0;
uint32_t bytesRead = 0;
uint32_t compressedBytesRead =
0; // @Bug 3149. IOMTrace was not reporting bytesRead correctly for compressed columns.
uint32_t jend = blocksRequested / iom->blocksPerRead;
if (iom->IOTrace())
clock_gettime(CLOCK_REALTIME, &tm);
ostringstream errMsg;
bool errorOccurred = false;
string errorString;
#ifdef IDB_COMP_POC_DEBUG
bool debugWrite = false;
#endif
#ifdef EM_AS_A_TABLE_POC__
dlen = 1;
#endif
if (blocksRequested % iom->blocksPerRead)
jend++;
for (j = 0; j < jend; j++)
{
int decompRetryCount = 0;
int retryReadHeadersCount = 0;
decompRetry:
blocksThisRead = std::min(dlen, iom->blocksPerRead);
readSize = blocksThisRead * BLOCK_SIZE;
acc = 0;
while (acc < readSize)
{
#if defined(EM_AS_A_TABLE_POC__)
if (oid == 1084)
{
uint32_t h;
int32_t o = 0;
int32_t* ip;
ip = (int32_t*)(&alignedbuff[acc]);
for (o = 0; o < 2048; o++)
{
if (iom->dbrm()->getHWM(o + 3000, h) == 0)
*ip++ = h;
else
*ip++ = numeric_limits<int32_t>::min() + 1;
}
i = BLOCK_SIZE;
}
else
i = pread(fd, &alignedbuff[acc], readSize - acc, longSeekOffset);
#else
if (fdit->second->isCompressed())
{
retryReadHeaders:
// hdrs may have been modified since we cached them in fdit->second...
time_t cur_mtime = numeric_limits<time_t>::max();
int updatePtrsRc = 0;
fdMapMutex.lock();
time_t fp_mtime = fp->mtime();
if (fp_mtime != (time_t)-1)
cur_mtime = fp_mtime;
if (decompRetryCount > 0 || retryReadHeadersCount > 0 || cur_mtime > fdit->second->cmpMTime)
updatePtrsRc = updateptrs(&alignedbuff[0], fdit);
fdMapMutex.unlock();
int idx = cmpOffFact.quot;
if (updatePtrsRc != 0 || idx >= (signed)fdit->second->ptrList.size())
{
// Due to race condition, the header on disk may not upated yet.
// Log an info message and retry.
if (retryReadHeadersCount == 0)
{
Message::Args args;
args.add(oid);
ostringstream infoMsg;
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
infoMsg << "retry updateptrs for " << fileNamePtr << ". rc=" << updatePtrsRc << ", idx=" << idx
<< ", ptr.size=" << fdit->second->ptrList.size();
args.add(infoMsg.str());
primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
}
if (++retryReadHeadersCount < 30)
{
waitForRetry(retryReadHeadersCount);
fdit->second->cmpMTime = 0;
goto retryReadHeaders;
}
else
{
// still fail after all the retries.
errorOccurred = true;
errMsg << "Error reading compression header. rc=" << updatePtrsRc << ", idx=" << idx
<< ", ptr.size=" << fdit->second->ptrList.size();
errorString = errMsg.str();
break;
}
}
// FIXME: make sure alignedbuff can hold fdit->second->ptrList[idx].second bytes
if (fdit->second->ptrList[idx].second > maxCompSz)
{
errorOccurred = true;
errMsg << "aligned buff too small. dataSize=" << fdit->second->ptrList[idx].second
<< ", buffSize=" << maxCompSz;
errorString = errMsg.str();
break;
}
i = fp->pread(&alignedbuff[0], fdit->second->ptrList[idx].first, fdit->second->ptrList[idx].second);
#ifdef IDB_COMP_POC_DEBUG
{
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
cout << boldStart << "pread1.1(" << fp << ", 0x" << hex << (ptrdiff_t)&alignedbuff[0] << dec
<< ", " << fdit->second->ptrList[idx].second << ", " << fdit->second->ptrList[idx].first
<< ") = " << i << ' ' << cmpOffFact.quot << ' ' << cmpOffFact.rem << boldStop << endl;
}
#endif
// @bug3407, give it some retries if pread failed.
if (i != (ssize_t)fdit->second->ptrList[idx].second)
{
// log an info message
if (retryReadHeadersCount == 0)
{
Message::Args args;
args.add(oid);
ostringstream infoMsg;
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
infoMsg << " Read from " << fileNamePtr << " failed at chunk " << idx
<< ". (offset, size, actuall read) = (" << fdit->second->ptrList[idx].first << ", "
<< fdit->second->ptrList[idx].second << ", " << i << ")";
args.add(infoMsg.str());
primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
}
if (++retryReadHeadersCount < 30)
{
waitForRetry(retryReadHeadersCount);
fdit->second->cmpMTime = 0;
goto retryReadHeaders;
}
else
{
errorOccurred = true;
errMsg << "Error reading chunk " << idx;
errorString = errMsg.str();
break;
}
}
compressedBytesRead += i; // @Bug 3149.
i = readSize;
}
else
{
i = fp->pread(&alignedbuff[acc], longSeekOffset, readSize - acc);
#ifdef IDB_COMP_POC_DEBUG
{
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
cout << "pread1.2(" << fp << ", 0x" << hex << (ptrdiff_t)&alignedbuff[acc] << dec << ", "
<< (readSize - acc) << ", " << longSeekOffset << ") = " << i << ' ' << cmpOffFact.quot << ' '
<< cmpOffFact.rem << endl;
}
#endif
}
#endif
if (i < 0 && errno == EINTR)
{
continue;
}
else if (i < 0)
{
try
{
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
}
catch (exception& exc)
{
cerr << "FileName Err:" << exc.what() << endl;
strcpy(fileNamePtr, "unknown filename");
}
errorString = strerror(errno);
errorOccurred = true;
errMsg << "thr_popper: Error reading file for OID " << oid << "; "
<< " fp " << fp << "; offset " << longSeekOffset << "; fileName " << fileNamePtr << "; "
<< errorString;
break; // break from "while(acc..." loop
}
else if (i == 0)
{
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
errorString = "early EOF";
errorOccurred = true;
errMsg << "thr_popper: Early EOF reading file for OID " << oid << "; " << fileNamePtr;
break; // break from "while(acc..." loop
}
acc += i;
longSeekOffset += (uint64_t)i;
readCount++;
bytesRead += i;
} // while(acc...
//..Break out of "for (j..." read loop if error occurred
if (errorOccurred)
{
Message::Args args;
args.add(oid);
args.add(errorString);
primitiveprocessor::mlp->logMessage(logging::M0061, args, true);
iom->handleBlockReadError(fr, errMsg.str(), &copyLocked);
break;
}
blocksRead += blocksThisRead;
if (iom->IOTrace())
clock_gettime(CLOCK_REALTIME, &tm2);
/* New bulk VSS lookup code */
{
vector<BRM::LBID_t> lbids;
vector<BRM::VER_t> versions;
vector<bool> isLocked;
for (i = 0; (uint32_t)i < blocksThisRead; i++)
lbids.push_back((BRM::LBID_t)(lbid + i) + (j * iom->blocksPerRead));
if (blocksRequested > 1 || !flg) // prefetch, or an unversioned single-block read
iom->dbrm()->bulkGetCurrentVersion(lbids, &versions, &isLocked);
else // a single-block read that was versioned
{
versions.push_back(ver);
isLocked.push_back(false);
}
uint8_t* ptr = (uint8_t*)&alignedbuff[0];
if (blocksThisRead > 0 && fdit->second->isCompressed())
{
size_t blen = 4 * 1024 * 1024 + 4;
#ifdef IDB_COMP_POC_DEBUG
{
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
cout << "decompress(0x" << hex << (ptrdiff_t)&alignedbuff[0] << dec << ", "
<< fdit->second->ptrList[cmpOffFact.quot].second << ", 0x" << hex << (ptrdiff_t)uCmpBuf
<< dec << ", " << blen << ")" << endl;
}
#endif
std::unique_ptr<compress::CompressInterface> decompressor(
compress::getCompressInterfaceByType(static_cast<uint32_t>(fdit->second->compType)));
if (!decompressor)
{
// Use default?
decompressor.reset(new compress::CompressInterfaceSnappy());
}
int dcrc = decompressor->uncompressBlock(
&alignedbuff[0], fdit->second->ptrList[cmpOffFact.quot].second, uCmpBuf, blen);
if (dcrc != 0)
{
#ifdef IDB_COMP_POC_DEBUG
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
#endif
if (++decompRetryCount < 30)
{
blocksRead -= blocksThisRead;
waitForRetry(decompRetryCount);
// log an info message every 10 retries
if (decompRetryCount == 1)
{
Message::Args args;
args.add(oid);
ostringstream infoMsg;
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
infoMsg << "decompress retry for " << fileNamePtr << " decompRetry chunk " << cmpOffFact.quot
<< " code=" << dcrc;
args.add(infoMsg.str());
primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
}
goto decompRetry;
}
cout << boldStart << "decomp returned " << dcrc << boldStop << endl;
errorOccurred = true;
Message::Args args;
args.add(oid);
errMsg << "Error decompressing block " << cmpOffFact.quot << " code=" << dcrc
<< " part=" << partNum << " seg=" << segNum;
args.add(errMsg.str());
primitiveprocessor::mlp->logMessage(logging::M0061, args, true);
iom->handleBlockReadError(fr, errMsg.str(), &copyLocked);
break;
}
// FIXME: why doesn't this work??? (See later for why)
// ptr = &uCmpBuf[cmpOffFact.rem];
memcpy(ptr, &uCmpBuf[cmpOffFact.rem], blocksThisRead * BLOCK_SIZE);
// log the retries, if any
if (retryReadHeadersCount > 0 || decompRetryCount > 0)
{
Message::Args args;
args.add(oid);
ostringstream infoMsg;
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
infoMsg << "Successfully uncompress " << fileNamePtr << " chunk " << cmpOffFact.quot << " @";
if (retryReadHeadersCount > 0)
infoMsg << " HeaderRetry:" << retryReadHeadersCount;
if (decompRetryCount > 0)
infoMsg << " UncompressRetry:" << decompRetryCount;
args.add(infoMsg.str());
primitiveprocessor::mlp->logInfoMessage(logging::M0006, args);
}
}
for (i = 0; useCache && (uint32_t)i < lbids.size(); i++)
{
if (!isLocked[i])
{
#ifdef IDB_COMP_POC_DEBUG
{
if (debugWrite)
{
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
cout << boldStart << "i = " << i << ", ptr = 0x" << hex << (ptrdiff_t)&ptr[i * BLOCK_SIZE]
<< dec << boldStop << endl;
cout << boldStart;
#if 0
int32_t* i32p;
i32p = (int32_t*)&ptr[i * BLOCK_SIZE];
for (int iy = 0; iy < 2; iy++)
{
for (int ix = 0; ix < 8; ix++, i32p++)
cout << *i32p << ' ';
cout << endl;
}
#else
int64_t* i64p;
i64p = (int64_t*)&ptr[i * BLOCK_SIZE];
for (int iy = 0; iy < 2; iy++)
{
for (int ix = 0; ix < 8; ix++, i64p++)
cout << *i64p << ' ';
cout << endl;
}
#endif
cout << boldStop << endl;
}
}
#endif
cacheInsertOps.push_back(
CacheInsert_t(lbids[i], versions[i], (uint8_t*)&alignedbuff[i * BLOCK_SIZE]));
}
}
if (useCache)
{
blocksLoaded += fbm->bulkInsert(cacheInsertOps);
cacheInsertOps.clear();
}
}
dlen -= blocksThisRead;
} // for (j...
fdMapMutex.lock();
if (fdit->second.get())
fdit->second->inUse--;
fdit = fdcache.end();
fdMapMutex.unlock();
if (errorOccurred)
continue;
try
{
iom->dbrm()->releaseLBIDRange(lbid, blocksRequested);
copyLocked = false;
}
catch (exception& e)
{
cout << "releaseRange: " << e.what() << endl;
}
fr->BlocksRead(blocksRead);
fr->BlocksLoaded(blocksLoaded);
// FIXME: This is why some code above doesn't work...
if (fr->data != 0 && blocksRequested == 1)
memcpy(fr->data, alignedbuff, BLOCK_SIZE);
fr->frMutex().lock();
fr->SetPredicate(fileRequest::COMPLETE);
fr->frCond().notify_one();
fr->frMutex().unlock();
if (iom->IOTrace())
{
clock_gettime(CLOCK_REALTIME, &rqst2);
timespec_sub(tm, tm2, tm3);
timespec_sub(rqst1, rqst2, rqst3);
// @Bug 3149. IOMTrace was not reporting bytesRead correctly for compressed columns.
uint32_t reportBytesRead = (compressedBytesRead > 0) ? compressedBytesRead : bytesRead;
lFile << left << setw(5) << setfill(' ') << oid << right << setw(5) << setfill(' ')
<< offset / extentSize << " " << right << setw(11) << setfill(' ') << lbid << " " << right
<< setw(9) << bytesRead / (readCount << 13) << " " << right << setw(9) << blocksRequested << " "
<< right << setw(10) << fixed << tm3 << " " << right << fixed
<< ((double)(rqst1.tv_sec + (1.e-9 * rqst1.tv_nsec))) << " " << right << setw(10) << fixed
<< rqst3 << " " << right << setw(10) << fixed << longSeekOffset << " " << right << setw(10)
<< fixed << reportBytesRead << " " << right << setw(3) << fixed << dbroot << " " << right
<< setw(3) << fixed << partNum << " " << right << setw(3) << fixed << segNum << " " << endl;
}
} // for(;;)
delete[] uCmpBuf;
lFile.close();
// reaching here is an error...
return 0;
} // end thr_popper
} // anonymous namespace
namespace dbbc
{
void setReadLock()
{
localLock.read_lock();
}
void releaseReadLock()
{
localLock.read_unlock();
}
void dropFDCache()
{
localLock.write_lock();
fdcache.clear();
localLock.write_unlock();
}
void purgeFDCache(std::vector<BRM::FileInfo>& files)
{
localLock.write_lock();
FdCacheType_t::iterator fdit;
for (uint32_t i = 0; i < files.size(); i++)
{
FdEntry fdKey(files[i].oid, files[i].dbRoot, files[i].partitionNum, files[i].segmentNum,
files[i].compType, NULL);
fdit = fdcache.find(fdKey);
if (fdit != fdcache.end())
fdcache.erase(fdit);
}
localLock.write_unlock();
}
ioManager::ioManager(FileBufferMgr& fbm, fileBlockRequestQueue& fbrq, int thrCount, int bsPerRead)
: blocksPerRead(bsPerRead), fIOMfbMgr(fbm), fIOMRequestQueue(fbrq), fFileOp(false)
{
if (thrCount <= 0)
thrCount = 1;
if (thrCount > 256)
thrCount = 256;
fConfig = Config::makeConfig();
string val = fConfig->getConfig("DBBC", "IOMTracing");
int temp = 0;
if (val.length() > 0)
temp = static_cast<int>(Config::fromText(val));
if (temp > 0)
fIOTrace = true;
else
fIOTrace = false;
val = fConfig->getConfig("DBBC", "MaxOpenFiles");
temp = 0;
fMaxOpenFiles = MAX_OPEN_FILES;
if (val.length() > 0)
temp = static_cast<int>(Config::fromText(val));
if (temp > 0)
fMaxOpenFiles = temp;
val = fConfig->getConfig("DBBC", "DecreaseOpenFilesCount");
temp = 0;
fDecreaseOpenFilesCount = DECREASE_OPEN_FILES;
if (val.length() > 0)
temp = static_cast<int>(Config::fromText(val));
if (temp > 0)
fDecreaseOpenFilesCount = temp;
// limit the number of files closed
if (fDecreaseOpenFilesCount > (uint32_t)(0.75 * fMaxOpenFiles))
fDecreaseOpenFilesCount = (uint32_t)(0.75 * fMaxOpenFiles);
val = fConfig->getConfig("DBBC", "FDCacheTrace");
temp = 0;
fFDCacheTrace = false;
if (val.length() > 0)
temp = static_cast<int>(Config::fromText(val));
if (temp > 0)
{
fFDCacheTrace = true;
FDTraceFile().open(string(MCSLOGDIR) + "/trace/fdcache", ios_base::ate | ios_base::app);
}
fThreadCount = thrCount;
go();
}
void ioManager::buildOidFileName(const BRM::OID_t oid, uint16_t dbRoot, const uint32_t partNum,
const uint16_t segNum, char* file_name)
{
// when it's a request for the version buffer, the dbroot comes in as 0 for legacy reasons
if (dbRoot == 0 && oid < 1000)
dbRoot = fdbrm.getDBRootOfVBOID(oid);
fFileOp.getFileNameForPrimProc(oid, file_name, dbRoot, partNum, segNum);
}
int ioManager::localLbidLookup(BRM::LBID_t lbid, BRM::VER_t verid, bool vbFlag, BRM::OID_t& oid,
uint16_t& dbRoot, uint32_t& partitionNum, uint16_t& segmentNum,
uint32_t& fileBlockOffset)
{
if (primitiveprocessor::noVB > 0)
vbFlag = false;
int rc = fdbrm.lookupLocal(lbid, verid, vbFlag, oid, dbRoot, partitionNum, segmentNum, fileBlockOffset);
return rc;
}
struct LambdaKludge
{
LambdaKludge(ioManager* i) : iom(i)
{
}
~LambdaKludge()
{
iom = NULL;
}
ioManager* iom;
void operator()()
{
thr_popper(iom);
}
};
void ioManager::createReaders()
{
int idx;
for (idx = 0; idx < fThreadCount; idx++)
{
try
{
fThreadArr.create_thread(LambdaKludge(this));
}
catch (exception& e)
{
cerr << "IOM::createReaders() caught " << e.what() << endl;
idx--;
sleep(1);
continue;
}
}
}
ioManager::~ioManager()
{
stop();
}
void ioManager::go(void)
{
createReaders();
}
// FIXME: is this right? what does this method do?
void ioManager::stop()
{
for (int idx = 0; idx < fThreadCount; idx++)
{
(void)0; // pthread_detach(fThreadArr[idx]);
}
}
fileRequest* ioManager::getNextRequest()
{
fileRequest* blk = 0;
try
{
blk = fIOMRequestQueue.pop();
return blk;
}
catch (exception&)
{
cerr << "ioManager::getNextRequest() ERROR " << endl;
}
return blk;
}
//------------------------------------------------------------------------------
// Prints stderr msg and updates fileRequest object to reflect an error.
// Lastly, notifies waiting thread that fileRequest has been completed.
//------------------------------------------------------------------------------
void ioManager::handleBlockReadError(fileRequest* fr, const string& errMsg, bool* copyLocked, int errorCode)
{
try
{
dbrm()->releaseLBIDRange(fr->Lbid(), fr->BlocksRequested());
*copyLocked = false;
}
catch (exception& e)
{
cout << "releaseRange on read error: " << e.what() << endl;
}
cerr << errMsg << endl;
fr->RequestStatus(errorCode);
fr->RequestStatusStr(errMsg);
fr->frMutex().lock();
fr->SetPredicate(fileRequest::COMPLETE);
fr->frCond().notify_one();
fr->frMutex().unlock();
}
} // namespace dbbc