You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-06-13 16:01:32 +03:00
1419 lines
38 KiB
C++
1419 lines
38 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2016 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: iomanager.cpp 2147 2013-08-14 20:44:44Z bwilkinson $
|
|
//
|
|
// C++ Implementation: iomanager
|
|
//
|
|
// Description:
|
|
//
|
|
//
|
|
// Author: Jason Rodriguez <jrodriguez@calpont.com>
|
|
//
|
|
//
|
|
//
|
|
|
|
#include "mcsconfig.h"
|
|
|
|
#define _FILE_OFFSET_BITS 64
|
|
#define _LARGEFILE64_SOURCE
|
|
#include <sys/mount.h>
|
|
#include <linux/fs.h>
|
|
#ifdef BLOCK_SIZE
|
|
#undef BLOCK_SIZE
|
|
#endif
|
|
#ifdef READ
|
|
#undef READ
|
|
#endif
|
|
#ifdef WRITE
|
|
#undef WRITE
|
|
#endif
|
|
#include <stdexcept>
|
|
#include <unistd.h>
|
|
#include <stdlib.h>
|
|
#include <string>
|
|
#include <sstream>
|
|
#include <tr1/unordered_map>
|
|
#include <tr1/unordered_set>
|
|
#include <set>
|
|
#include <sys/types.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/time.h>
|
|
#include <fcntl.h>
|
|
#include <errno.h>
|
|
#include <boost/shared_ptr.hpp>
|
|
#include <boost/scoped_array.hpp>
|
|
#include <boost/thread.hpp>
|
|
#include <boost/thread/condition.hpp>
|
|
#include <pthread.h>
|
|
//#define NDEBUG
|
|
#include <cassert>
|
|
|
|
using namespace std;
|
|
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
|
|
#include "messageobj.h"
|
|
#include "messageids.h"
|
|
using namespace logging;
|
|
|
|
#include "brmtypes.h"
|
|
|
|
#include "pp_logger.h"
|
|
|
|
#include "fsutils.h"
|
|
|
|
#include "rwlock_local.h"
|
|
|
|
#include "iomanager.h"
|
|
#include "liboamcpp.h"
|
|
|
|
#include "idbcompress.h"
|
|
using namespace compress;
|
|
|
|
#include "IDBDataFile.h"
|
|
#include "IDBPolicy.h"
|
|
#include "IDBLogger.h"
|
|
using namespace idbdatafile;
|
|
|
|
#include "mcsconfig.h"
|
|
#include "threadnaming.h"
|
|
|
|
typedef tr1::unordered_set<BRM::OID_t> USOID;
|
|
|
|
namespace primitiveprocessor
|
|
{
|
|
extern Logger* mlp;
|
|
extern int directIOFlag;
|
|
extern int noVB;
|
|
} // namespace primitiveprocessor
|
|
|
|
#ifndef O_BINARY
|
|
#define O_BINARY 0
|
|
#endif
|
|
#ifndef O_LARGEFILE
|
|
#define O_LARGEFILE 0
|
|
#endif
|
|
#ifndef O_NOATIME
|
|
#define O_NOATIME 0
|
|
#endif
|
|
|
|
namespace
|
|
{
|
|
using namespace dbbc;
|
|
using namespace std;
|
|
|
|
const std::string boldStart = "\033[0;1m";
|
|
const std::string boldStop = "\033[0;39m";
|
|
|
|
const uint32_t MAX_OPEN_FILES = 16384;
|
|
const uint32_t DECREASE_OPEN_FILES = 4096;
|
|
|
|
void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, double& tm)
|
|
{
|
|
tm = (double)(tv2.tv_sec - tv1.tv_sec) + 1.e-9 * (tv2.tv_nsec - tv1.tv_nsec);
|
|
}
|
|
|
|
struct IOMThreadArg
|
|
{
|
|
ioManager* iom;
|
|
int32_t thdId;
|
|
};
|
|
|
|
typedef IOMThreadArg IOMThreadArg_t;
|
|
|
|
/* structures shared across all iomanagers */
|
|
class FdEntry
|
|
{
|
|
public:
|
|
FdEntry() : oid(0), dbroot(0), partNum(0), segNum(0), fp(0), c(0), inUse(0), compType(0)
|
|
{
|
|
cmpMTime = 0;
|
|
}
|
|
|
|
FdEntry(const BRM::OID_t o, const uint16_t d, const uint32_t p, const uint16_t s, const int ct,
|
|
IDBDataFile* f)
|
|
: oid(o), dbroot(d), partNum(p), segNum(s), fp(f), c(0), inUse(0), compType(0)
|
|
{
|
|
cmpMTime = 0;
|
|
|
|
if (oid >= 1000)
|
|
compType = ct;
|
|
}
|
|
|
|
~FdEntry()
|
|
{
|
|
delete fp;
|
|
fp = 0;
|
|
}
|
|
|
|
BRM::OID_t oid;
|
|
uint16_t dbroot;
|
|
uint32_t partNum;
|
|
uint16_t segNum;
|
|
IDBDataFile* fp;
|
|
uint32_t c;
|
|
int inUse;
|
|
|
|
CompChunkPtrList ptrList;
|
|
|
|
int compType;
|
|
bool isCompressed() const
|
|
{
|
|
return (oid >= 1000 && compType != 0);
|
|
}
|
|
time_t cmpMTime;
|
|
friend ostream& operator<<(ostream& out, const FdEntry& o)
|
|
{
|
|
out << " o: " << o.oid << " f: " << o.fp << " d: " << o.dbroot << " p: " << o.partNum
|
|
<< " s: " << o.segNum << " c: " << o.c << " t: " << o.compType << " m: " << o.cmpMTime;
|
|
return out;
|
|
}
|
|
};
|
|
|
|
struct fdCacheMapLessThan
|
|
{
|
|
bool operator()(const FdEntry& lhs, const FdEntry& rhs) const
|
|
{
|
|
if (lhs.oid < rhs.oid)
|
|
return true;
|
|
|
|
if (lhs.oid == rhs.oid && lhs.dbroot < rhs.dbroot)
|
|
return true;
|
|
|
|
if (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum < rhs.partNum)
|
|
return true;
|
|
|
|
if (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum == rhs.partNum &&
|
|
lhs.segNum < rhs.segNum)
|
|
return true;
|
|
|
|
return false;
|
|
}
|
|
};
|
|
|
|
struct fdMapEqual
|
|
{
|
|
bool operator()(const FdEntry& lhs, const FdEntry& rhs) const
|
|
{
|
|
return (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum == rhs.partNum &&
|
|
lhs.segNum == rhs.segNum);
|
|
}
|
|
};
|
|
|
|
typedef boost::shared_ptr<FdEntry> SPFdEntry_t;
|
|
typedef std::map<FdEntry, SPFdEntry_t, fdCacheMapLessThan> FdCacheType_t;
|
|
|
|
struct FdCountEntry
|
|
{
|
|
FdCountEntry()
|
|
{
|
|
}
|
|
FdCountEntry(const BRM::OID_t o, const uint16_t d, const uint32_t p, const uint16_t s, const uint32_t c,
|
|
const FdCacheType_t::iterator it)
|
|
: oid(o), dbroot(d), partNum(p), segNum(s), cnt(c), fdit(it)
|
|
{
|
|
}
|
|
~FdCountEntry()
|
|
{
|
|
}
|
|
|
|
BRM::OID_t oid;
|
|
uint16_t dbroot;
|
|
uint32_t partNum;
|
|
uint16_t segNum;
|
|
uint32_t cnt;
|
|
FdCacheType_t::iterator fdit;
|
|
}; // FdCountEntry
|
|
|
|
typedef FdCountEntry FdCountEntry_t;
|
|
|
|
struct fdCountCompare
|
|
{
|
|
bool operator()(const FdCountEntry_t& lhs, const FdCountEntry_t& rhs) const
|
|
{
|
|
return lhs.cnt > rhs.cnt;
|
|
}
|
|
};
|
|
|
|
typedef multiset<FdCountEntry_t, fdCountCompare> FdCacheCountType_t;
|
|
|
|
FdCacheType_t fdcache;
|
|
boost::mutex fdMapMutex;
|
|
rwlock::RWLock_local localLock;
|
|
|
|
char* alignTo(const char* in, int av)
|
|
{
|
|
ptrdiff_t inx = reinterpret_cast<ptrdiff_t>(in);
|
|
ptrdiff_t avx = static_cast<ptrdiff_t>(av);
|
|
|
|
if ((inx % avx) != 0)
|
|
{
|
|
inx &= ~(avx - 1);
|
|
inx += avx;
|
|
}
|
|
|
|
char* outx = reinterpret_cast<char*>(inx);
|
|
return outx;
|
|
}
|
|
|
|
void waitForRetry(long count)
|
|
{
|
|
usleep(5000 * count);
|
|
return;
|
|
}
|
|
|
|
// Must hold the FD cache lock!
|
|
static int updateptrs(char* ptr, FdCacheType_t::iterator fdit)
|
|
{
|
|
ssize_t i;
|
|
uint32_t progress;
|
|
|
|
// ptr is taken from buffer, already been checked: realbuff.get() == 0
|
|
if (ptr == 0)
|
|
return -1;
|
|
|
|
// already checked before: fdit->second->isCompressed()
|
|
if (fdit->second.get() == 0)
|
|
return -2;
|
|
|
|
IDBDataFile* fp = fdit->second->fp;
|
|
|
|
if (fp == INVALID_HANDLE_VALUE)
|
|
{
|
|
Message::Args args;
|
|
args.add("updateptrs got invalid fp.");
|
|
primitiveprocessor::mlp->logInfoMessage(logging::M0006, args);
|
|
return -3;
|
|
}
|
|
|
|
// We need to read one extra block because we need the first ptr in the 3rd block
|
|
// to know if we're done.
|
|
// FIXME: re-work all of this so we don't have to re-read the 3rd block.
|
|
progress = 0;
|
|
|
|
while (progress < 4096 * 3)
|
|
{
|
|
i = fp->pread(&ptr[progress], progress, (4096 * 3) - progress);
|
|
|
|
if (i <= 0)
|
|
break;
|
|
|
|
progress += i;
|
|
}
|
|
|
|
if (progress != 4096 * 3)
|
|
return -4; // let it retry. Not likely, but ...
|
|
|
|
fdit->second->cmpMTime = 0;
|
|
time_t mtime = fp->mtime();
|
|
|
|
if (mtime != (time_t)-1)
|
|
fdit->second->cmpMTime = mtime;
|
|
|
|
int gplRc = 0;
|
|
gplRc = compress::CompressInterface::getPtrList(&ptr[4096], 4096, fdit->second->ptrList);
|
|
|
|
if (gplRc != 0)
|
|
return -5; // go for a retry.
|
|
|
|
if (fdit->second->ptrList.size() == 0)
|
|
return -6; // go for a retry.
|
|
|
|
uint64_t numHdrs = fdit->second->ptrList[0].first / 4096ULL - 2ULL;
|
|
|
|
if (numHdrs > 0)
|
|
{
|
|
boost::scoped_array<char> nextHdrBufsa(new char[numHdrs * 4096 + 4095]);
|
|
char* nextHdrBufPtr = 0;
|
|
|
|
nextHdrBufPtr = alignTo(nextHdrBufsa.get(), 4096);
|
|
|
|
progress = 0;
|
|
|
|
while (progress < numHdrs * 4096)
|
|
{
|
|
i = fp->pread(&nextHdrBufPtr[progress], (4096 * 2) + progress, (numHdrs * 4096) - progress);
|
|
|
|
if (i <= 0)
|
|
break;
|
|
|
|
progress += i;
|
|
}
|
|
|
|
if (progress != numHdrs * 4096)
|
|
return -8;
|
|
|
|
CompChunkPtrList nextPtrList;
|
|
gplRc = compress::CompressInterface::getPtrList(&nextHdrBufPtr[0], numHdrs * 4096, nextPtrList);
|
|
|
|
if (gplRc != 0)
|
|
return -7; // go for a retry.
|
|
|
|
fdit->second->ptrList.insert(fdit->second->ptrList.end(), nextPtrList.begin(), nextPtrList.end());
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
void* thr_popper(ioManager* arg)
|
|
{
|
|
utils::setThreadName("thr_popper");
|
|
ioManager* iom = arg;
|
|
FileBufferMgr* fbm;
|
|
fileRequest* fr = 0;
|
|
BRM::LBID_t lbid = 0;
|
|
BRM::OID_t oid = 0;
|
|
BRM::VER_t ver = 0;
|
|
BRM::QueryContext qc;
|
|
BRM::VER_t txn = 0;
|
|
int compType = 0;
|
|
int blocksLoaded = 0;
|
|
int blocksRead = 0;
|
|
const unsigned pageSize = 4096;
|
|
fbm = &iom->fileBufferManager();
|
|
char fileName[WriteEngine::FILE_NAME_SIZE];
|
|
const uint64_t fileBlockSize = BLOCK_SIZE;
|
|
bool flg = false;
|
|
bool useCache;
|
|
uint16_t dbroot = 0;
|
|
uint32_t partNum = 0;
|
|
uint16_t segNum = 0;
|
|
uint32_t offset = 0;
|
|
char* fileNamePtr = fileName;
|
|
uint64_t longSeekOffset = 0;
|
|
int err;
|
|
uint32_t dlen = 0, acc, readSize, blocksThisRead, j;
|
|
uint32_t blocksRequested = 0;
|
|
ssize_t i;
|
|
char* alignedbuff = 0;
|
|
boost::scoped_array<char> realbuff;
|
|
pthread_t threadId = 0;
|
|
ostringstream iomLogFileName;
|
|
ofstream lFile;
|
|
struct timespec rqst1;
|
|
struct timespec rqst2;
|
|
struct timespec tm;
|
|
struct timespec tm2;
|
|
double tm3;
|
|
double rqst3;
|
|
bool locked = false;
|
|
SPFdEntry_t fe;
|
|
vector<CacheInsert_t> cacheInsertOps;
|
|
bool copyLocked = false;
|
|
|
|
if (iom->IOTrace())
|
|
{
|
|
threadId = pthread_self();
|
|
iomLogFileName << MCSLOGDIR << "/trace/iom." << threadId;
|
|
lFile.open(iomLogFileName.str().c_str(), ios_base::app | ios_base::ate);
|
|
}
|
|
|
|
FdCacheType_t::iterator fdit;
|
|
IDBDataFile* fp = 0;
|
|
size_t maxCompSz =
|
|
compress::CompressInterface::getMaxCompressedSizeGeneric(iom->blocksPerRead * BLOCK_SIZE);
|
|
size_t readBufferSz = maxCompSz + pageSize;
|
|
|
|
realbuff.reset(new char[readBufferSz]);
|
|
|
|
if (realbuff.get() == 0)
|
|
{
|
|
cerr << "thr_popper: Can't allocate space for a whole extent in memory" << endl;
|
|
return 0;
|
|
}
|
|
|
|
alignedbuff = alignTo(realbuff.get(), 4096);
|
|
|
|
if ((((ptrdiff_t)alignedbuff - (ptrdiff_t)realbuff.get()) >= (ptrdiff_t)pageSize) ||
|
|
(((ptrdiff_t)alignedbuff % pageSize) != 0))
|
|
throw runtime_error("aligned buffer size is not matching the page size.");
|
|
|
|
uint8_t* uCmpBuf = 0;
|
|
uCmpBuf = new uint8_t[4 * 1024 * 1024 + 4];
|
|
|
|
for (;;)
|
|
{
|
|
if (copyLocked)
|
|
{
|
|
iom->dbrm()->releaseLBIDRange(lbid, blocksRequested);
|
|
copyLocked = false;
|
|
}
|
|
|
|
if (locked)
|
|
{
|
|
localLock.read_unlock();
|
|
locked = false;
|
|
}
|
|
|
|
fr = iom->getNextRequest();
|
|
|
|
localLock.read_lock();
|
|
locked = true;
|
|
|
|
if (iom->IOTrace())
|
|
clock_gettime(CLOCK_REALTIME, &rqst1);
|
|
|
|
lbid = fr->Lbid();
|
|
qc = fr->Ver();
|
|
txn = fr->Txn();
|
|
flg = fr->Flg();
|
|
compType = fr->CompType();
|
|
useCache = fr->useCache();
|
|
blocksLoaded = 0;
|
|
blocksRead = 0;
|
|
dlen = fr->BlocksRequested();
|
|
blocksRequested = fr->BlocksRequested();
|
|
oid = 0;
|
|
dbroot = 0;
|
|
partNum = 0;
|
|
segNum = 0;
|
|
offset = 0;
|
|
|
|
// special case for getBlock.
|
|
iom->dbrm()->lockLBIDRange(lbid, blocksRequested);
|
|
copyLocked = true;
|
|
|
|
// special case for getBlock.
|
|
if (blocksRequested == 1)
|
|
{
|
|
BRM::VER_t outVer;
|
|
iom->dbrm()->vssLookup((BRM::LBID_t)lbid, qc, txn, &outVer, &flg);
|
|
ver = outVer;
|
|
fr->versioned(flg);
|
|
}
|
|
else
|
|
{
|
|
fr->versioned(false);
|
|
ver = qc.currentScn;
|
|
}
|
|
|
|
err = iom->localLbidLookup(lbid, ver, flg, oid, dbroot, partNum, segNum, offset);
|
|
|
|
if (err == BRM::ERR_SNAPSHOT_TOO_OLD)
|
|
{
|
|
ostringstream errMsg;
|
|
errMsg << "thr_popper: version " << ver << " of LBID " << lbid << "is too old";
|
|
iom->handleBlockReadError(fr, errMsg.str(), ©Locked);
|
|
continue;
|
|
}
|
|
else if (err < 0)
|
|
{
|
|
ostringstream errMsg;
|
|
errMsg << "thr_popper: BRM lookup failure; lbid=" << lbid << "; ver=" << ver
|
|
<< "; flg=" << (flg ? 1 : 0);
|
|
iom->handleBlockReadError(fr, errMsg.str(), ©Locked, fileRequest::BRM_LOOKUP_ERROR);
|
|
continue;
|
|
}
|
|
|
|
#ifdef IDB_COMP_POC_DEBUG
|
|
{
|
|
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
|
|
|
|
if (compType != 0)
|
|
cout << boldStart;
|
|
|
|
cout << "fileRequest: " << *fr << endl;
|
|
|
|
if (compType != 0)
|
|
cout << boldStop;
|
|
}
|
|
#endif
|
|
const uint32_t extentSize = iom->getExtentRows();
|
|
FdEntry fdKey(oid, dbroot, partNum, segNum, compType, NULL);
|
|
// cout << "Looking for " << fdKey << endl
|
|
// << "O: " << oid << " D: " << dbroot << " P: " << partNum << " S: " << segNum << endl;
|
|
|
|
fdMapMutex.lock();
|
|
fdit = fdcache.find(fdKey);
|
|
|
|
if (fdit == fdcache.end())
|
|
{
|
|
try
|
|
{
|
|
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
|
|
}
|
|
catch (exception& exc)
|
|
{
|
|
fdMapMutex.unlock();
|
|
Message::Args args;
|
|
args.add(oid);
|
|
args.add(exc.what());
|
|
primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
|
|
ostringstream errMsg;
|
|
errMsg << "thr_popper: Error building filename for OID " << oid << "; " << exc.what();
|
|
iom->handleBlockReadError(fr, errMsg.str(), ©Locked);
|
|
continue;
|
|
}
|
|
|
|
#ifdef IDB_COMP_USE_CMP_SUFFIX
|
|
|
|
if (compType != 0)
|
|
{
|
|
char* ptr = strrchr(fileNamePtr, '.');
|
|
idbassert(ptr);
|
|
strcpy(ptr, ".cmp");
|
|
}
|
|
|
|
#endif
|
|
|
|
if (oid > 3000)
|
|
{
|
|
// TODO: should syscat columns be considered when reducing open file count
|
|
// They are always needed why should they be closed?
|
|
if (fdcache.size() >= iom->MaxOpenFiles())
|
|
{
|
|
FdCacheCountType_t fdCountSort;
|
|
|
|
for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
|
|
{
|
|
struct FdCountEntry fdc(it->second->oid, it->second->dbroot, it->second->partNum,
|
|
it->second->segNum, it->second->c, it);
|
|
|
|
fdCountSort.insert(fdc);
|
|
}
|
|
|
|
if (iom->FDCacheTrace())
|
|
{
|
|
iom->FDTraceFile() << "Before flushing sz: " << fdcache.size()
|
|
<< " delCount: " << iom->DecreaseOpenFilesCount() << endl;
|
|
|
|
for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
|
|
iom->FDTraceFile() << *(*it).second << endl;
|
|
|
|
iom->FDTraceFile() << "==================" << endl << endl;
|
|
}
|
|
|
|
// TODO: should we consider a minimum number of open files
|
|
// currently, there is nothing to prevent all open files
|
|
// from being closed by the IOManager.
|
|
|
|
uint32_t delCount = 0;
|
|
|
|
for (FdCacheCountType_t::reverse_iterator rit = fdCountSort.rbegin();
|
|
rit != fdCountSort.rend() && fdcache.size() > 0 && delCount < iom->DecreaseOpenFilesCount();
|
|
rit++)
|
|
{
|
|
FdEntry oldfdKey(rit->oid, rit->dbroot, rit->partNum, rit->segNum, 0, NULL);
|
|
FdCacheType_t::iterator it = fdcache.find(oldfdKey);
|
|
|
|
if (it != fdcache.end())
|
|
{
|
|
if (iom->FDCacheTrace())
|
|
{
|
|
if (!rit->fdit->second->inUse)
|
|
iom->FDTraceFile() << "Removing dc: " << delCount << " sz: " << fdcache.size()
|
|
<< *(*it).second << " u: " << rit->fdit->second->inUse << endl;
|
|
else
|
|
iom->FDTraceFile() << "Skip Remove in use dc: " << delCount << " sz: " << fdcache.size()
|
|
<< *(*it).second << " u: " << rit->fdit->second->inUse << endl;
|
|
}
|
|
|
|
if (rit->fdit->second->inUse <= 0)
|
|
{
|
|
fdcache.erase(it);
|
|
delCount++;
|
|
}
|
|
}
|
|
} // for (FdCacheCountType_t...
|
|
|
|
if (iom->FDCacheTrace())
|
|
{
|
|
iom->FDTraceFile() << "After flushing sz: " << fdcache.size() << endl;
|
|
|
|
for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
|
|
{
|
|
iom->FDTraceFile() << *(*it).second << endl;
|
|
}
|
|
|
|
iom->FDTraceFile() << "==================" << endl << endl;
|
|
}
|
|
|
|
fdCountSort.clear();
|
|
|
|
} // if (fdcache.size()...
|
|
} // if (oid > 3000)
|
|
|
|
int opts = primitiveprocessor::directIOFlag ? IDBDataFile::USE_ODIRECT : 0;
|
|
fp = NULL;
|
|
uint32_t openRetries = 0;
|
|
int saveErrno = 0;
|
|
|
|
while (fp == NULL && openRetries++ < 5)
|
|
{
|
|
fp = IDBDataFile::open(IDBPolicy::getType(fileNamePtr, IDBPolicy::PRIMPROC), fileNamePtr, "r", opts);
|
|
saveErrno = errno;
|
|
|
|
if (fp == NULL)
|
|
sleep(1);
|
|
}
|
|
|
|
if (fp == NULL)
|
|
{
|
|
Message::Args args;
|
|
fdit = fdcache.end();
|
|
fdMapMutex.unlock();
|
|
args.add(oid);
|
|
args.add(string(fileNamePtr) + ":" + strerror(saveErrno));
|
|
primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
|
|
ostringstream errMsg;
|
|
errMsg << "thr_popper: Error opening file for OID " << oid << "; " << fileNamePtr << "; "
|
|
<< strerror(saveErrno);
|
|
int errorCode = fileRequest::FAILED;
|
|
|
|
if (saveErrno == EINVAL)
|
|
errorCode = fileRequest::FS_EINVAL;
|
|
else if (saveErrno == ENOENT)
|
|
errorCode = fileRequest::FS_ENOENT;
|
|
|
|
iom->handleBlockReadError(fr, errMsg.str(), ©Locked, errorCode);
|
|
continue;
|
|
}
|
|
|
|
fe.reset(new FdEntry(oid, dbroot, partNum, segNum, compType, fp));
|
|
fe->inUse++;
|
|
fdcache[fdKey] = fe;
|
|
fdit = fdcache.find(fdKey);
|
|
fe.reset();
|
|
}
|
|
|
|
else
|
|
{
|
|
if (fdit->second.get())
|
|
{
|
|
fdit->second->c++;
|
|
fdit->second->inUse++;
|
|
fp = fdit->second->fp;
|
|
}
|
|
else
|
|
{
|
|
Message::Args args;
|
|
fdit = fdcache.end();
|
|
fdMapMutex.unlock();
|
|
args.add(oid);
|
|
ostringstream errMsg;
|
|
errMsg << "Null FD cache entry. (dbroot, partNum, segNum, compType) = (" << dbroot << ", " << partNum
|
|
<< ", " << segNum << ", " << compType << ")";
|
|
args.add(errMsg.str());
|
|
primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
|
|
iom->handleBlockReadError(fr, errMsg.str(), ©Locked);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
fdMapMutex.unlock();
|
|
|
|
#ifdef SHARED_NOTHING_DEMO_2
|
|
|
|
// change offset if it's shared nothing
|
|
/* Get the extent #, divide by # of PMs, calculate base offset for the new extent #,
|
|
add extent offset */
|
|
if (oid >= 10000)
|
|
offset = (((offset / extentSize) / iom->pmCount) * extentSize) + (offset % extentSize);
|
|
|
|
#endif
|
|
|
|
longSeekOffset = (uint64_t)offset * (uint64_t)fileBlockSize;
|
|
lldiv_t cmpOffFact = lldiv(longSeekOffset, (4LL * 1024LL * 1024LL));
|
|
|
|
uint32_t readCount = 0;
|
|
uint32_t bytesRead = 0;
|
|
uint32_t compressedBytesRead =
|
|
0; // @Bug 3149. IOMTrace was not reporting bytesRead correctly for compressed columns.
|
|
uint32_t jend = blocksRequested / iom->blocksPerRead;
|
|
|
|
if (iom->IOTrace())
|
|
clock_gettime(CLOCK_REALTIME, &tm);
|
|
|
|
ostringstream errMsg;
|
|
bool errorOccurred = false;
|
|
string errorString;
|
|
#ifdef IDB_COMP_POC_DEBUG
|
|
bool debugWrite = false;
|
|
#endif
|
|
|
|
#ifdef EM_AS_A_TABLE_POC__
|
|
dlen = 1;
|
|
#endif
|
|
|
|
if (blocksRequested % iom->blocksPerRead)
|
|
jend++;
|
|
|
|
for (j = 0; j < jend; j++)
|
|
{
|
|
int decompRetryCount = 0;
|
|
int retryReadHeadersCount = 0;
|
|
|
|
decompRetry:
|
|
blocksThisRead = std::min(dlen, iom->blocksPerRead);
|
|
readSize = blocksThisRead * BLOCK_SIZE;
|
|
|
|
acc = 0;
|
|
|
|
while (acc < readSize)
|
|
{
|
|
#if defined(EM_AS_A_TABLE_POC__)
|
|
|
|
if (oid == 1084)
|
|
{
|
|
uint32_t h;
|
|
int32_t o = 0;
|
|
int32_t* ip;
|
|
ip = (int32_t*)(&alignedbuff[acc]);
|
|
|
|
for (o = 0; o < 2048; o++)
|
|
{
|
|
if (iom->dbrm()->getHWM(o + 3000, h) == 0)
|
|
*ip++ = h;
|
|
else
|
|
*ip++ = numeric_limits<int32_t>::min() + 1;
|
|
}
|
|
|
|
i = BLOCK_SIZE;
|
|
}
|
|
else
|
|
i = pread(fd, &alignedbuff[acc], readSize - acc, longSeekOffset);
|
|
|
|
#else
|
|
|
|
if (fdit->second->isCompressed())
|
|
{
|
|
retryReadHeaders:
|
|
// hdrs may have been modified since we cached them in fdit->second...
|
|
time_t cur_mtime = numeric_limits<time_t>::max();
|
|
int updatePtrsRc = 0;
|
|
fdMapMutex.lock();
|
|
time_t fp_mtime = fp->mtime();
|
|
|
|
if (fp_mtime != (time_t)-1)
|
|
cur_mtime = fp_mtime;
|
|
|
|
if (decompRetryCount > 0 || retryReadHeadersCount > 0 || cur_mtime > fdit->second->cmpMTime)
|
|
updatePtrsRc = updateptrs(&alignedbuff[0], fdit);
|
|
|
|
fdMapMutex.unlock();
|
|
|
|
int idx = cmpOffFact.quot;
|
|
|
|
if (updatePtrsRc != 0 || idx >= (signed)fdit->second->ptrList.size())
|
|
{
|
|
// Due to race condition, the header on disk may not upated yet.
|
|
// Log an info message and retry.
|
|
if (retryReadHeadersCount == 0)
|
|
{
|
|
Message::Args args;
|
|
args.add(oid);
|
|
ostringstream infoMsg;
|
|
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
|
|
infoMsg << "retry updateptrs for " << fileNamePtr << ". rc=" << updatePtrsRc << ", idx=" << idx
|
|
<< ", ptr.size=" << fdit->second->ptrList.size();
|
|
args.add(infoMsg.str());
|
|
primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
|
|
}
|
|
|
|
if (++retryReadHeadersCount < 30)
|
|
{
|
|
waitForRetry(retryReadHeadersCount);
|
|
fdit->second->cmpMTime = 0;
|
|
goto retryReadHeaders;
|
|
}
|
|
else
|
|
{
|
|
// still fail after all the retries.
|
|
errorOccurred = true;
|
|
errMsg << "Error reading compression header. rc=" << updatePtrsRc << ", idx=" << idx
|
|
<< ", ptr.size=" << fdit->second->ptrList.size();
|
|
errorString = errMsg.str();
|
|
break;
|
|
}
|
|
}
|
|
|
|
// FIXME: make sure alignedbuff can hold fdit->second->ptrList[idx].second bytes
|
|
if (fdit->second->ptrList[idx].second > maxCompSz)
|
|
{
|
|
errorOccurred = true;
|
|
errMsg << "aligned buff too small. dataSize=" << fdit->second->ptrList[idx].second
|
|
<< ", buffSize=" << maxCompSz;
|
|
errorString = errMsg.str();
|
|
break;
|
|
}
|
|
|
|
i = fp->pread(&alignedbuff[0], fdit->second->ptrList[idx].first, fdit->second->ptrList[idx].second);
|
|
#ifdef IDB_COMP_POC_DEBUG
|
|
{
|
|
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
|
|
cout << boldStart << "pread1.1(" << fp << ", 0x" << hex << (ptrdiff_t)&alignedbuff[0] << dec
|
|
<< ", " << fdit->second->ptrList[idx].second << ", " << fdit->second->ptrList[idx].first
|
|
<< ") = " << i << ' ' << cmpOffFact.quot << ' ' << cmpOffFact.rem << boldStop << endl;
|
|
}
|
|
#endif
|
|
|
|
// @bug3407, give it some retries if pread failed.
|
|
if (i != (ssize_t)fdit->second->ptrList[idx].second)
|
|
{
|
|
// log an info message
|
|
if (retryReadHeadersCount == 0)
|
|
{
|
|
Message::Args args;
|
|
args.add(oid);
|
|
ostringstream infoMsg;
|
|
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
|
|
infoMsg << " Read from " << fileNamePtr << " failed at chunk " << idx
|
|
<< ". (offset, size, actuall read) = (" << fdit->second->ptrList[idx].first << ", "
|
|
<< fdit->second->ptrList[idx].second << ", " << i << ")";
|
|
args.add(infoMsg.str());
|
|
primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
|
|
}
|
|
|
|
if (++retryReadHeadersCount < 30)
|
|
{
|
|
waitForRetry(retryReadHeadersCount);
|
|
fdit->second->cmpMTime = 0;
|
|
goto retryReadHeaders;
|
|
}
|
|
else
|
|
{
|
|
errorOccurred = true;
|
|
errMsg << "Error reading chunk " << idx;
|
|
errorString = errMsg.str();
|
|
break;
|
|
}
|
|
}
|
|
|
|
compressedBytesRead += i; // @Bug 3149.
|
|
i = readSize;
|
|
}
|
|
else
|
|
{
|
|
i = fp->pread(&alignedbuff[acc], longSeekOffset, readSize - acc);
|
|
#ifdef IDB_COMP_POC_DEBUG
|
|
{
|
|
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
|
|
cout << "pread1.2(" << fp << ", 0x" << hex << (ptrdiff_t)&alignedbuff[acc] << dec << ", "
|
|
<< (readSize - acc) << ", " << longSeekOffset << ") = " << i << ' ' << cmpOffFact.quot << ' '
|
|
<< cmpOffFact.rem << endl;
|
|
}
|
|
#endif
|
|
}
|
|
|
|
#endif
|
|
|
|
if (i < 0 && errno == EINTR)
|
|
{
|
|
continue;
|
|
}
|
|
else if (i < 0)
|
|
{
|
|
try
|
|
{
|
|
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
|
|
}
|
|
catch (exception& exc)
|
|
{
|
|
cerr << "FileName Err:" << exc.what() << endl;
|
|
strcpy(fileNamePtr, "unknown filename");
|
|
}
|
|
|
|
errorString = strerror(errno);
|
|
errorOccurred = true;
|
|
errMsg << "thr_popper: Error reading file for OID " << oid << "; "
|
|
<< " fp " << fp << "; offset " << longSeekOffset << "; fileName " << fileNamePtr << "; "
|
|
<< errorString;
|
|
break; // break from "while(acc..." loop
|
|
}
|
|
else if (i == 0)
|
|
{
|
|
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
|
|
errorString = "early EOF";
|
|
errorOccurred = true;
|
|
errMsg << "thr_popper: Early EOF reading file for OID " << oid << "; " << fileNamePtr;
|
|
break; // break from "while(acc..." loop
|
|
}
|
|
|
|
acc += i;
|
|
longSeekOffset += (uint64_t)i;
|
|
readCount++;
|
|
bytesRead += i;
|
|
} // while(acc...
|
|
|
|
//..Break out of "for (j..." read loop if error occurred
|
|
if (errorOccurred)
|
|
{
|
|
Message::Args args;
|
|
args.add(oid);
|
|
args.add(errorString);
|
|
primitiveprocessor::mlp->logMessage(logging::M0061, args, true);
|
|
iom->handleBlockReadError(fr, errMsg.str(), ©Locked);
|
|
break;
|
|
}
|
|
|
|
blocksRead += blocksThisRead;
|
|
|
|
if (iom->IOTrace())
|
|
clock_gettime(CLOCK_REALTIME, &tm2);
|
|
|
|
/* New bulk VSS lookup code */
|
|
{
|
|
vector<BRM::LBID_t> lbids;
|
|
vector<BRM::VER_t> versions;
|
|
vector<bool> isLocked;
|
|
|
|
for (i = 0; (uint32_t)i < blocksThisRead; i++)
|
|
lbids.push_back((BRM::LBID_t)(lbid + i) + (j * iom->blocksPerRead));
|
|
|
|
if (blocksRequested > 1 || !flg) // prefetch, or an unversioned single-block read
|
|
iom->dbrm()->bulkGetCurrentVersion(lbids, &versions, &isLocked);
|
|
else // a single-block read that was versioned
|
|
{
|
|
versions.push_back(ver);
|
|
isLocked.push_back(false);
|
|
}
|
|
|
|
uint8_t* ptr = (uint8_t*)&alignedbuff[0];
|
|
|
|
if (blocksThisRead > 0 && fdit->second->isCompressed())
|
|
{
|
|
size_t blen = 4 * 1024 * 1024 + 4;
|
|
#ifdef IDB_COMP_POC_DEBUG
|
|
{
|
|
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
|
|
cout << "decompress(0x" << hex << (ptrdiff_t)&alignedbuff[0] << dec << ", "
|
|
<< fdit->second->ptrList[cmpOffFact.quot].second << ", 0x" << hex << (ptrdiff_t)uCmpBuf
|
|
<< dec << ", " << blen << ")" << endl;
|
|
}
|
|
#endif
|
|
|
|
std::unique_ptr<compress::CompressInterface> decompressor(
|
|
compress::getCompressInterfaceByType(static_cast<uint32_t>(fdit->second->compType)));
|
|
if (!decompressor)
|
|
{
|
|
// Use default?
|
|
decompressor.reset(new compress::CompressInterfaceSnappy());
|
|
}
|
|
|
|
int dcrc = decompressor->uncompressBlock(
|
|
&alignedbuff[0], fdit->second->ptrList[cmpOffFact.quot].second, uCmpBuf, blen);
|
|
|
|
if (dcrc != 0)
|
|
{
|
|
#ifdef IDB_COMP_POC_DEBUG
|
|
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
|
|
#endif
|
|
|
|
if (++decompRetryCount < 30)
|
|
{
|
|
blocksRead -= blocksThisRead;
|
|
waitForRetry(decompRetryCount);
|
|
|
|
// log an info message every 10 retries
|
|
if (decompRetryCount == 1)
|
|
{
|
|
Message::Args args;
|
|
args.add(oid);
|
|
ostringstream infoMsg;
|
|
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
|
|
infoMsg << "decompress retry for " << fileNamePtr << " decompRetry chunk " << cmpOffFact.quot
|
|
<< " code=" << dcrc;
|
|
args.add(infoMsg.str());
|
|
primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
|
|
}
|
|
|
|
goto decompRetry;
|
|
}
|
|
|
|
cout << boldStart << "decomp returned " << dcrc << boldStop << endl;
|
|
|
|
errorOccurred = true;
|
|
Message::Args args;
|
|
args.add(oid);
|
|
errMsg << "Error decompressing block " << cmpOffFact.quot << " code=" << dcrc
|
|
<< " part=" << partNum << " seg=" << segNum;
|
|
args.add(errMsg.str());
|
|
primitiveprocessor::mlp->logMessage(logging::M0061, args, true);
|
|
iom->handleBlockReadError(fr, errMsg.str(), ©Locked);
|
|
break;
|
|
}
|
|
|
|
// FIXME: why doesn't this work??? (See later for why)
|
|
// ptr = &uCmpBuf[cmpOffFact.rem];
|
|
memcpy(ptr, &uCmpBuf[cmpOffFact.rem], blocksThisRead * BLOCK_SIZE);
|
|
|
|
// log the retries, if any
|
|
if (retryReadHeadersCount > 0 || decompRetryCount > 0)
|
|
{
|
|
Message::Args args;
|
|
args.add(oid);
|
|
ostringstream infoMsg;
|
|
iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
|
|
infoMsg << "Successfully uncompress " << fileNamePtr << " chunk " << cmpOffFact.quot << " @";
|
|
|
|
if (retryReadHeadersCount > 0)
|
|
infoMsg << " HeaderRetry:" << retryReadHeadersCount;
|
|
|
|
if (decompRetryCount > 0)
|
|
infoMsg << " UncompressRetry:" << decompRetryCount;
|
|
|
|
args.add(infoMsg.str());
|
|
primitiveprocessor::mlp->logInfoMessage(logging::M0006, args);
|
|
}
|
|
}
|
|
|
|
for (i = 0; useCache && (uint32_t)i < lbids.size(); i++)
|
|
{
|
|
if (!isLocked[i])
|
|
{
|
|
#ifdef IDB_COMP_POC_DEBUG
|
|
{
|
|
if (debugWrite)
|
|
{
|
|
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
|
|
cout << boldStart << "i = " << i << ", ptr = 0x" << hex << (ptrdiff_t)&ptr[i * BLOCK_SIZE]
|
|
<< dec << boldStop << endl;
|
|
cout << boldStart;
|
|
#if 0
|
|
int32_t* i32p;
|
|
i32p = (int32_t*)&ptr[i * BLOCK_SIZE];
|
|
|
|
for (int iy = 0; iy < 2; iy++)
|
|
{
|
|
for (int ix = 0; ix < 8; ix++, i32p++)
|
|
cout << *i32p << ' ';
|
|
|
|
cout << endl;
|
|
}
|
|
|
|
#else
|
|
int64_t* i64p;
|
|
i64p = (int64_t*)&ptr[i * BLOCK_SIZE];
|
|
|
|
for (int iy = 0; iy < 2; iy++)
|
|
{
|
|
for (int ix = 0; ix < 8; ix++, i64p++)
|
|
cout << *i64p << ' ';
|
|
|
|
cout << endl;
|
|
}
|
|
|
|
#endif
|
|
cout << boldStop << endl;
|
|
}
|
|
}
|
|
#endif
|
|
cacheInsertOps.push_back(
|
|
CacheInsert_t(lbids[i], versions[i], (uint8_t*)&alignedbuff[i * BLOCK_SIZE]));
|
|
}
|
|
}
|
|
|
|
if (useCache)
|
|
{
|
|
blocksLoaded += fbm->bulkInsert(cacheInsertOps);
|
|
cacheInsertOps.clear();
|
|
}
|
|
}
|
|
|
|
dlen -= blocksThisRead;
|
|
|
|
} // for (j...
|
|
|
|
fdMapMutex.lock();
|
|
|
|
if (fdit->second.get())
|
|
fdit->second->inUse--;
|
|
|
|
fdit = fdcache.end();
|
|
fdMapMutex.unlock();
|
|
|
|
if (errorOccurred)
|
|
continue;
|
|
|
|
try
|
|
{
|
|
iom->dbrm()->releaseLBIDRange(lbid, blocksRequested);
|
|
copyLocked = false;
|
|
}
|
|
catch (exception& e)
|
|
{
|
|
cout << "releaseRange: " << e.what() << endl;
|
|
}
|
|
|
|
fr->BlocksRead(blocksRead);
|
|
fr->BlocksLoaded(blocksLoaded);
|
|
|
|
// FIXME: This is why some code above doesn't work...
|
|
if (fr->data != 0 && blocksRequested == 1)
|
|
memcpy(fr->data, alignedbuff, BLOCK_SIZE);
|
|
|
|
fr->frMutex().lock();
|
|
fr->SetPredicate(fileRequest::COMPLETE);
|
|
fr->frCond().notify_one();
|
|
fr->frMutex().unlock();
|
|
|
|
if (iom->IOTrace())
|
|
{
|
|
clock_gettime(CLOCK_REALTIME, &rqst2);
|
|
timespec_sub(tm, tm2, tm3);
|
|
timespec_sub(rqst1, rqst2, rqst3);
|
|
|
|
// @Bug 3149. IOMTrace was not reporting bytesRead correctly for compressed columns.
|
|
uint32_t reportBytesRead = (compressedBytesRead > 0) ? compressedBytesRead : bytesRead;
|
|
|
|
lFile << left << setw(5) << setfill(' ') << oid << right << setw(5) << setfill(' ')
|
|
<< offset / extentSize << " " << right << setw(11) << setfill(' ') << lbid << " " << right
|
|
<< setw(9) << bytesRead / (readCount << 13) << " " << right << setw(9) << blocksRequested << " "
|
|
<< right << setw(10) << fixed << tm3 << " " << right << fixed
|
|
<< ((double)(rqst1.tv_sec + (1.e-9 * rqst1.tv_nsec))) << " " << right << setw(10) << fixed
|
|
<< rqst3 << " " << right << setw(10) << fixed << longSeekOffset << " " << right << setw(10)
|
|
<< fixed << reportBytesRead << " " << right << setw(3) << fixed << dbroot << " " << right
|
|
<< setw(3) << fixed << partNum << " " << right << setw(3) << fixed << segNum << " " << endl;
|
|
}
|
|
|
|
} // for(;;)
|
|
|
|
delete[] uCmpBuf;
|
|
|
|
lFile.close();
|
|
|
|
// reaching here is an error...
|
|
|
|
return 0;
|
|
} // end thr_popper
|
|
|
|
} // anonymous namespace
|
|
|
|
namespace dbbc
|
|
{
|
|
void setReadLock()
|
|
{
|
|
localLock.read_lock();
|
|
}
|
|
|
|
void releaseReadLock()
|
|
{
|
|
localLock.read_unlock();
|
|
}
|
|
|
|
void dropFDCache()
|
|
{
|
|
localLock.write_lock();
|
|
fdcache.clear();
|
|
localLock.write_unlock();
|
|
}
|
|
void purgeFDCache(std::vector<BRM::FileInfo>& files)
|
|
{
|
|
localLock.write_lock();
|
|
|
|
FdCacheType_t::iterator fdit;
|
|
|
|
for (uint32_t i = 0; i < files.size(); i++)
|
|
{
|
|
FdEntry fdKey(files[i].oid, files[i].dbRoot, files[i].partitionNum, files[i].segmentNum,
|
|
files[i].compType, NULL);
|
|
fdit = fdcache.find(fdKey);
|
|
|
|
if (fdit != fdcache.end())
|
|
fdcache.erase(fdit);
|
|
}
|
|
|
|
localLock.write_unlock();
|
|
}
|
|
|
|
ioManager::ioManager(FileBufferMgr& fbm, fileBlockRequestQueue& fbrq, int thrCount, int bsPerRead)
|
|
: blocksPerRead(bsPerRead), fIOMfbMgr(fbm), fIOMRequestQueue(fbrq), fFileOp(false)
|
|
{
|
|
if (thrCount <= 0)
|
|
thrCount = 1;
|
|
|
|
if (thrCount > 256)
|
|
thrCount = 256;
|
|
|
|
fConfig = Config::makeConfig();
|
|
string val = fConfig->getConfig("DBBC", "IOMTracing");
|
|
int temp = 0;
|
|
|
|
if (val.length() > 0)
|
|
temp = static_cast<int>(Config::fromText(val));
|
|
|
|
if (temp > 0)
|
|
fIOTrace = true;
|
|
else
|
|
fIOTrace = false;
|
|
|
|
val = fConfig->getConfig("DBBC", "MaxOpenFiles");
|
|
temp = 0;
|
|
fMaxOpenFiles = MAX_OPEN_FILES;
|
|
|
|
if (val.length() > 0)
|
|
temp = static_cast<int>(Config::fromText(val));
|
|
|
|
if (temp > 0)
|
|
fMaxOpenFiles = temp;
|
|
|
|
val = fConfig->getConfig("DBBC", "DecreaseOpenFilesCount");
|
|
temp = 0;
|
|
fDecreaseOpenFilesCount = DECREASE_OPEN_FILES;
|
|
|
|
if (val.length() > 0)
|
|
temp = static_cast<int>(Config::fromText(val));
|
|
|
|
if (temp > 0)
|
|
fDecreaseOpenFilesCount = temp;
|
|
|
|
// limit the number of files closed
|
|
if (fDecreaseOpenFilesCount > (uint32_t)(0.75 * fMaxOpenFiles))
|
|
fDecreaseOpenFilesCount = (uint32_t)(0.75 * fMaxOpenFiles);
|
|
|
|
val = fConfig->getConfig("DBBC", "FDCacheTrace");
|
|
temp = 0;
|
|
fFDCacheTrace = false;
|
|
|
|
if (val.length() > 0)
|
|
temp = static_cast<int>(Config::fromText(val));
|
|
|
|
if (temp > 0)
|
|
{
|
|
fFDCacheTrace = true;
|
|
FDTraceFile().open(string(MCSLOGDIR) + "/trace/fdcache", ios_base::ate | ios_base::app);
|
|
}
|
|
|
|
fThreadCount = thrCount;
|
|
go();
|
|
}
|
|
|
|
void ioManager::buildOidFileName(const BRM::OID_t oid, uint16_t dbRoot, const uint32_t partNum,
|
|
const uint16_t segNum, char* file_name)
|
|
{
|
|
// when it's a request for the version buffer, the dbroot comes in as 0 for legacy reasons
|
|
if (dbRoot == 0 && oid < 1000)
|
|
dbRoot = fdbrm.getDBRootOfVBOID(oid);
|
|
|
|
fFileOp.getFileNameForPrimProc(oid, file_name, dbRoot, partNum, segNum);
|
|
}
|
|
|
|
int ioManager::localLbidLookup(BRM::LBID_t lbid, BRM::VER_t verid, bool vbFlag, BRM::OID_t& oid,
|
|
uint16_t& dbRoot, uint32_t& partitionNum, uint16_t& segmentNum,
|
|
uint32_t& fileBlockOffset)
|
|
{
|
|
if (primitiveprocessor::noVB > 0)
|
|
vbFlag = false;
|
|
|
|
int rc = fdbrm.lookupLocal(lbid, verid, vbFlag, oid, dbRoot, partitionNum, segmentNum, fileBlockOffset);
|
|
|
|
return rc;
|
|
}
|
|
|
|
struct LambdaKludge
|
|
{
|
|
LambdaKludge(ioManager* i) : iom(i)
|
|
{
|
|
}
|
|
~LambdaKludge()
|
|
{
|
|
iom = NULL;
|
|
}
|
|
ioManager* iom;
|
|
void operator()()
|
|
{
|
|
thr_popper(iom);
|
|
}
|
|
};
|
|
|
|
void ioManager::createReaders()
|
|
{
|
|
int idx;
|
|
|
|
for (idx = 0; idx < fThreadCount; idx++)
|
|
{
|
|
try
|
|
{
|
|
fThreadArr.create_thread(LambdaKludge(this));
|
|
}
|
|
catch (exception& e)
|
|
{
|
|
cerr << "IOM::createReaders() caught " << e.what() << endl;
|
|
idx--;
|
|
sleep(1);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
ioManager::~ioManager()
|
|
{
|
|
stop();
|
|
}
|
|
|
|
void ioManager::go(void)
|
|
{
|
|
createReaders();
|
|
}
|
|
|
|
// FIXME: is this right? what does this method do?
|
|
void ioManager::stop()
|
|
{
|
|
for (int idx = 0; idx < fThreadCount; idx++)
|
|
{
|
|
(void)0; // pthread_detach(fThreadArr[idx]);
|
|
}
|
|
}
|
|
|
|
fileRequest* ioManager::getNextRequest()
|
|
{
|
|
fileRequest* blk = 0;
|
|
|
|
try
|
|
{
|
|
blk = fIOMRequestQueue.pop();
|
|
return blk;
|
|
}
|
|
catch (exception&)
|
|
{
|
|
cerr << "ioManager::getNextRequest() ERROR " << endl;
|
|
}
|
|
|
|
return blk;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Prints stderr msg and updates fileRequest object to reflect an error.
|
|
// Lastly, notifies waiting thread that fileRequest has been completed.
|
|
//------------------------------------------------------------------------------
|
|
void ioManager::handleBlockReadError(fileRequest* fr, const string& errMsg, bool* copyLocked, int errorCode)
|
|
{
|
|
try
|
|
{
|
|
dbrm()->releaseLBIDRange(fr->Lbid(), fr->BlocksRequested());
|
|
*copyLocked = false;
|
|
}
|
|
catch (exception& e)
|
|
{
|
|
cout << "releaseRange on read error: " << e.what() << endl;
|
|
}
|
|
|
|
cerr << errMsg << endl;
|
|
fr->RequestStatus(errorCode);
|
|
fr->RequestStatusStr(errMsg);
|
|
|
|
fr->frMutex().lock();
|
|
fr->SetPredicate(fileRequest::COMPLETE);
|
|
fr->frCond().notify_one();
|
|
fr->frMutex().unlock();
|
|
}
|
|
|
|
} // namespace dbbc
|