You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			1419 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1419 lines
		
	
	
		
			38 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
   Copyright (C) 2016 MariaDB Corporation
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
// $Id: iomanager.cpp 2147 2013-08-14 20:44:44Z bwilkinson $
 | 
						|
//
 | 
						|
// C++ Implementation: iomanager
 | 
						|
//
 | 
						|
// Description:
 | 
						|
//
 | 
						|
//
 | 
						|
// Author: Jason Rodriguez <jrodriguez@calpont.com>
 | 
						|
//
 | 
						|
//
 | 
						|
//
 | 
						|
 | 
						|
#include "mcsconfig.h"
 | 
						|
 | 
						|
#define _FILE_OFFSET_BITS 64
 | 
						|
#define _LARGEFILE64_SOURCE
 | 
						|
#include <sys/mount.h>
 | 
						|
#include <linux/fs.h>
 | 
						|
#ifdef BLOCK_SIZE
 | 
						|
#undef BLOCK_SIZE
 | 
						|
#endif
 | 
						|
#ifdef READ
 | 
						|
#undef READ
 | 
						|
#endif
 | 
						|
#ifdef WRITE
 | 
						|
#undef WRITE
 | 
						|
#endif
 | 
						|
#include <stdexcept>
 | 
						|
#include <unistd.h>
 | 
						|
#include <stdlib.h>
 | 
						|
#include <string>
 | 
						|
#include <sstream>
 | 
						|
#include <tr1/unordered_map>
 | 
						|
#include <tr1/unordered_set>
 | 
						|
#include <set>
 | 
						|
#include <sys/types.h>
 | 
						|
#include <sys/stat.h>
 | 
						|
#include <sys/time.h>
 | 
						|
#include <fcntl.h>
 | 
						|
#include <errno.h>
 | 
						|
#include <boost/shared_ptr.hpp>
 | 
						|
#include <boost/scoped_array.hpp>
 | 
						|
#include <boost/thread.hpp>
 | 
						|
#include <boost/thread/condition.hpp>
 | 
						|
#include <pthread.h>
 | 
						|
//#define NDEBUG
 | 
						|
#include <cassert>
 | 
						|
 | 
						|
using namespace std;
 | 
						|
 | 
						|
#include "configcpp.h"
 | 
						|
using namespace config;
 | 
						|
 | 
						|
#include "messageobj.h"
 | 
						|
#include "messageids.h"
 | 
						|
using namespace logging;
 | 
						|
 | 
						|
#include "brmtypes.h"
 | 
						|
 | 
						|
#include "pp_logger.h"
 | 
						|
 | 
						|
#include "fsutils.h"
 | 
						|
 | 
						|
#include "rwlock_local.h"
 | 
						|
 | 
						|
#include "iomanager.h"
 | 
						|
#include "liboamcpp.h"
 | 
						|
 | 
						|
#include "idbcompress.h"
 | 
						|
using namespace compress;
 | 
						|
 | 
						|
#include "IDBDataFile.h"
 | 
						|
#include "IDBPolicy.h"
 | 
						|
#include "IDBLogger.h"
 | 
						|
using namespace idbdatafile;
 | 
						|
 | 
						|
#include "mcsconfig.h"
 | 
						|
#include "threadnaming.h"
 | 
						|
 | 
						|
typedef tr1::unordered_set<BRM::OID_t> USOID;
 | 
						|
 | 
						|
namespace primitiveprocessor
 | 
						|
{
 | 
						|
extern Logger* mlp;
 | 
						|
extern int directIOFlag;
 | 
						|
extern int noVB;
 | 
						|
}  // namespace primitiveprocessor
 | 
						|
 | 
						|
#ifndef O_BINARY
 | 
						|
#define O_BINARY 0
 | 
						|
#endif
 | 
						|
#ifndef O_LARGEFILE
 | 
						|
#define O_LARGEFILE 0
 | 
						|
#endif
 | 
						|
#ifndef O_NOATIME
 | 
						|
#define O_NOATIME 0
 | 
						|
#endif
 | 
						|
 | 
						|
namespace
 | 
						|
{
 | 
						|
using namespace dbbc;
 | 
						|
using namespace std;
 | 
						|
 | 
						|
const std::string boldStart = "\033[0;1m";
 | 
						|
const std::string boldStop = "\033[0;39m";
 | 
						|
 | 
						|
const uint32_t MAX_OPEN_FILES = 16384;
 | 
						|
const uint32_t DECREASE_OPEN_FILES = 4096;
 | 
						|
 | 
						|
void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, double& tm)
 | 
						|
{
 | 
						|
  tm = (double)(tv2.tv_sec - tv1.tv_sec) + 1.e-9 * (tv2.tv_nsec - tv1.tv_nsec);
 | 
						|
}
 | 
						|
 | 
						|
struct IOMThreadArg
 | 
						|
{
 | 
						|
  ioManager* iom;
 | 
						|
  int32_t thdId;
 | 
						|
};
 | 
						|
 | 
						|
typedef IOMThreadArg IOMThreadArg_t;
 | 
						|
 | 
						|
/* structures shared across all iomanagers */
 | 
						|
class FdEntry
 | 
						|
{
 | 
						|
 public:
 | 
						|
  FdEntry() : oid(0), dbroot(0), partNum(0), segNum(0), fp(0), c(0), inUse(0), compType(0)
 | 
						|
  {
 | 
						|
    cmpMTime = 0;
 | 
						|
  }
 | 
						|
 | 
						|
  FdEntry(const BRM::OID_t o, const uint16_t d, const uint32_t p, const uint16_t s, const int ct,
 | 
						|
          IDBDataFile* f)
 | 
						|
   : oid(o), dbroot(d), partNum(p), segNum(s), fp(f), c(0), inUse(0), compType(0)
 | 
						|
  {
 | 
						|
    cmpMTime = 0;
 | 
						|
 | 
						|
    if (oid >= 1000)
 | 
						|
      compType = ct;
 | 
						|
  }
 | 
						|
 | 
						|
  ~FdEntry()
 | 
						|
  {
 | 
						|
    delete fp;
 | 
						|
    fp = 0;
 | 
						|
  }
 | 
						|
 | 
						|
  BRM::OID_t oid;
 | 
						|
  uint16_t dbroot;
 | 
						|
  uint32_t partNum;
 | 
						|
  uint16_t segNum;
 | 
						|
  IDBDataFile* fp;
 | 
						|
  uint32_t c;
 | 
						|
  int inUse;
 | 
						|
 | 
						|
  CompChunkPtrList ptrList;
 | 
						|
 | 
						|
  int compType;
 | 
						|
  bool isCompressed() const
 | 
						|
  {
 | 
						|
    return (oid >= 1000 && compType != 0);
 | 
						|
  }
 | 
						|
  time_t cmpMTime;
 | 
						|
  friend ostream& operator<<(ostream& out, const FdEntry& o)
 | 
						|
  {
 | 
						|
    out << " o: " << o.oid << " f: " << o.fp << " d: " << o.dbroot << " p: " << o.partNum
 | 
						|
        << " s: " << o.segNum << " c: " << o.c << " t: " << o.compType << " m: " << o.cmpMTime;
 | 
						|
    return out;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
struct fdCacheMapLessThan
 | 
						|
{
 | 
						|
  bool operator()(const FdEntry& lhs, const FdEntry& rhs) const
 | 
						|
  {
 | 
						|
    if (lhs.oid < rhs.oid)
 | 
						|
      return true;
 | 
						|
 | 
						|
    if (lhs.oid == rhs.oid && lhs.dbroot < rhs.dbroot)
 | 
						|
      return true;
 | 
						|
 | 
						|
    if (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum < rhs.partNum)
 | 
						|
      return true;
 | 
						|
 | 
						|
    if (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum == rhs.partNum &&
 | 
						|
        lhs.segNum < rhs.segNum)
 | 
						|
      return true;
 | 
						|
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
struct fdMapEqual
 | 
						|
{
 | 
						|
  bool operator()(const FdEntry& lhs, const FdEntry& rhs) const
 | 
						|
  {
 | 
						|
    return (lhs.oid == rhs.oid && lhs.dbroot == rhs.dbroot && lhs.partNum == rhs.partNum &&
 | 
						|
            lhs.segNum == rhs.segNum);
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
typedef boost::shared_ptr<FdEntry> SPFdEntry_t;
 | 
						|
typedef std::map<FdEntry, SPFdEntry_t, fdCacheMapLessThan> FdCacheType_t;
 | 
						|
 | 
						|
struct FdCountEntry
 | 
						|
{
 | 
						|
  FdCountEntry()
 | 
						|
  {
 | 
						|
  }
 | 
						|
  FdCountEntry(const BRM::OID_t o, const uint16_t d, const uint32_t p, const uint16_t s, const uint32_t c,
 | 
						|
               const FdCacheType_t::iterator it)
 | 
						|
   : oid(o), dbroot(d), partNum(p), segNum(s), cnt(c), fdit(it)
 | 
						|
  {
 | 
						|
  }
 | 
						|
  ~FdCountEntry()
 | 
						|
  {
 | 
						|
  }
 | 
						|
 | 
						|
  BRM::OID_t oid;
 | 
						|
  uint16_t dbroot;
 | 
						|
  uint32_t partNum;
 | 
						|
  uint16_t segNum;
 | 
						|
  uint32_t cnt;
 | 
						|
  FdCacheType_t::iterator fdit;
 | 
						|
};  // FdCountEntry
 | 
						|
 | 
						|
typedef FdCountEntry FdCountEntry_t;
 | 
						|
 | 
						|
struct fdCountCompare
 | 
						|
{
 | 
						|
  bool operator()(const FdCountEntry_t& lhs, const FdCountEntry_t& rhs) const
 | 
						|
  {
 | 
						|
    return lhs.cnt > rhs.cnt;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
typedef multiset<FdCountEntry_t, fdCountCompare> FdCacheCountType_t;
 | 
						|
 | 
						|
FdCacheType_t fdcache;
 | 
						|
boost::mutex fdMapMutex;
 | 
						|
rwlock::RWLock_local localLock;
 | 
						|
 | 
						|
char* alignTo(const char* in, int av)
 | 
						|
{
 | 
						|
  ptrdiff_t inx = reinterpret_cast<ptrdiff_t>(in);
 | 
						|
  ptrdiff_t avx = static_cast<ptrdiff_t>(av);
 | 
						|
 | 
						|
  if ((inx % avx) != 0)
 | 
						|
  {
 | 
						|
    inx &= ~(avx - 1);
 | 
						|
    inx += avx;
 | 
						|
  }
 | 
						|
 | 
						|
  char* outx = reinterpret_cast<char*>(inx);
 | 
						|
  return outx;
 | 
						|
}
 | 
						|
 | 
						|
void waitForRetry(long count)
 | 
						|
{
 | 
						|
  usleep(5000 * count);
 | 
						|
  return;
 | 
						|
}
 | 
						|
 | 
						|
// Must hold the FD cache lock!
 | 
						|
static int updateptrs(char* ptr, FdCacheType_t::iterator fdit)
 | 
						|
{
 | 
						|
  ssize_t i;
 | 
						|
  uint32_t progress;
 | 
						|
 | 
						|
  // ptr is taken from buffer, already been checked: realbuff.get() == 0
 | 
						|
  if (ptr == 0)
 | 
						|
    return -1;
 | 
						|
 | 
						|
  // already checked before: fdit->second->isCompressed()
 | 
						|
  if (fdit->second.get() == 0)
 | 
						|
    return -2;
 | 
						|
 | 
						|
  IDBDataFile* fp = fdit->second->fp;
 | 
						|
 | 
						|
  if (fp == INVALID_HANDLE_VALUE)
 | 
						|
  {
 | 
						|
    Message::Args args;
 | 
						|
    args.add("updateptrs got invalid fp.");
 | 
						|
    primitiveprocessor::mlp->logInfoMessage(logging::M0006, args);
 | 
						|
    return -3;
 | 
						|
  }
 | 
						|
 | 
						|
  // We need to read one extra block because we need the first ptr in the 3rd block
 | 
						|
  // to know if we're done.
 | 
						|
  // FIXME: re-work all of this so we don't have to re-read the 3rd block.
 | 
						|
  progress = 0;
 | 
						|
 | 
						|
  while (progress < 4096 * 3)
 | 
						|
  {
 | 
						|
    i = fp->pread(&ptr[progress], progress, (4096 * 3) - progress);
 | 
						|
 | 
						|
    if (i <= 0)
 | 
						|
      break;
 | 
						|
 | 
						|
    progress += i;
 | 
						|
  }
 | 
						|
 | 
						|
  if (progress != 4096 * 3)
 | 
						|
    return -4;  // let it retry. Not likely, but ...
 | 
						|
 | 
						|
  fdit->second->cmpMTime = 0;
 | 
						|
  time_t mtime = fp->mtime();
 | 
						|
 | 
						|
  if (mtime != (time_t)-1)
 | 
						|
    fdit->second->cmpMTime = mtime;
 | 
						|
 | 
						|
  int gplRc = 0;
 | 
						|
  gplRc = compress::CompressInterface::getPtrList(&ptr[4096], 4096, fdit->second->ptrList);
 | 
						|
 | 
						|
  if (gplRc != 0)
 | 
						|
    return -5;  // go for a retry.
 | 
						|
 | 
						|
  if (fdit->second->ptrList.size() == 0)
 | 
						|
    return -6;  // go for a retry.
 | 
						|
 | 
						|
  uint64_t numHdrs = fdit->second->ptrList[0].first / 4096ULL - 2ULL;
 | 
						|
 | 
						|
  if (numHdrs > 0)
 | 
						|
  {
 | 
						|
    boost::scoped_array<char> nextHdrBufsa(new char[numHdrs * 4096 + 4095]);
 | 
						|
    char* nextHdrBufPtr = 0;
 | 
						|
 | 
						|
    nextHdrBufPtr = alignTo(nextHdrBufsa.get(), 4096);
 | 
						|
 | 
						|
    progress = 0;
 | 
						|
 | 
						|
    while (progress < numHdrs * 4096)
 | 
						|
    {
 | 
						|
      i = fp->pread(&nextHdrBufPtr[progress], (4096 * 2) + progress, (numHdrs * 4096) - progress);
 | 
						|
 | 
						|
      if (i <= 0)
 | 
						|
        break;
 | 
						|
 | 
						|
      progress += i;
 | 
						|
    }
 | 
						|
 | 
						|
    if (progress != numHdrs * 4096)
 | 
						|
      return -8;
 | 
						|
 | 
						|
    CompChunkPtrList nextPtrList;
 | 
						|
    gplRc = compress::CompressInterface::getPtrList(&nextHdrBufPtr[0], numHdrs * 4096, nextPtrList);
 | 
						|
 | 
						|
    if (gplRc != 0)
 | 
						|
      return -7;  // go for a retry.
 | 
						|
 | 
						|
    fdit->second->ptrList.insert(fdit->second->ptrList.end(), nextPtrList.begin(), nextPtrList.end());
 | 
						|
  }
 | 
						|
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
void* thr_popper(ioManager* arg)
 | 
						|
{
 | 
						|
  utils::setThreadName("thr_popper");
 | 
						|
  ioManager* iom = arg;
 | 
						|
  FileBufferMgr* fbm;
 | 
						|
  fileRequest* fr = 0;
 | 
						|
  BRM::LBID_t lbid = 0;
 | 
						|
  BRM::OID_t oid = 0;
 | 
						|
  BRM::VER_t ver = 0;
 | 
						|
  BRM::QueryContext qc;
 | 
						|
  BRM::VER_t txn = 0;
 | 
						|
  int compType = 0;
 | 
						|
  int blocksLoaded = 0;
 | 
						|
  int blocksRead = 0;
 | 
						|
  const unsigned pageSize = 4096;
 | 
						|
  fbm = &iom->fileBufferManager();
 | 
						|
  char fileName[WriteEngine::FILE_NAME_SIZE];
 | 
						|
  const uint64_t fileBlockSize = BLOCK_SIZE;
 | 
						|
  bool flg = false;
 | 
						|
  bool useCache;
 | 
						|
  uint16_t dbroot = 0;
 | 
						|
  uint32_t partNum = 0;
 | 
						|
  uint16_t segNum = 0;
 | 
						|
  uint32_t offset = 0;
 | 
						|
  char* fileNamePtr = fileName;
 | 
						|
  uint64_t longSeekOffset = 0;
 | 
						|
  int err;
 | 
						|
  uint32_t dlen = 0, acc, readSize, blocksThisRead, j;
 | 
						|
  uint32_t blocksRequested = 0;
 | 
						|
  ssize_t i;
 | 
						|
  char* alignedbuff = 0;
 | 
						|
  boost::scoped_array<char> realbuff;
 | 
						|
  pthread_t threadId = 0;
 | 
						|
  ostringstream iomLogFileName;
 | 
						|
  ofstream lFile;
 | 
						|
  struct timespec rqst1;
 | 
						|
  struct timespec rqst2;
 | 
						|
  struct timespec tm;
 | 
						|
  struct timespec tm2;
 | 
						|
  double tm3;
 | 
						|
  double rqst3;
 | 
						|
  bool locked = false;
 | 
						|
  SPFdEntry_t fe;
 | 
						|
  vector<CacheInsert_t> cacheInsertOps;
 | 
						|
  bool copyLocked = false;
 | 
						|
 | 
						|
  if (iom->IOTrace())
 | 
						|
  {
 | 
						|
    threadId = pthread_self();
 | 
						|
    iomLogFileName << MCSLOGDIR << "/trace/iom." << threadId;
 | 
						|
    lFile.open(iomLogFileName.str().c_str(), ios_base::app | ios_base::ate);
 | 
						|
  }
 | 
						|
 | 
						|
  FdCacheType_t::iterator fdit;
 | 
						|
  IDBDataFile* fp = 0;
 | 
						|
  size_t maxCompSz =
 | 
						|
      compress::CompressInterface::getMaxCompressedSizeGeneric(iom->blocksPerRead * BLOCK_SIZE);
 | 
						|
  size_t readBufferSz = maxCompSz + pageSize;
 | 
						|
 | 
						|
  realbuff.reset(new char[readBufferSz]);
 | 
						|
 | 
						|
  if (realbuff.get() == 0)
 | 
						|
  {
 | 
						|
    cerr << "thr_popper: Can't allocate space for a whole extent in memory" << endl;
 | 
						|
    return 0;
 | 
						|
  }
 | 
						|
 | 
						|
  alignedbuff = alignTo(realbuff.get(), 4096);
 | 
						|
 | 
						|
  if ((((ptrdiff_t)alignedbuff - (ptrdiff_t)realbuff.get()) >= (ptrdiff_t)pageSize) ||
 | 
						|
      (((ptrdiff_t)alignedbuff % pageSize) != 0))
 | 
						|
    throw runtime_error("aligned buffer size is not matching the page size.");
 | 
						|
 | 
						|
  uint8_t* uCmpBuf = 0;
 | 
						|
  uCmpBuf = new uint8_t[4 * 1024 * 1024 + 4];
 | 
						|
 | 
						|
  for (;;)
 | 
						|
  {
 | 
						|
    if (copyLocked)
 | 
						|
    {
 | 
						|
      iom->dbrm()->releaseLBIDRange(lbid, blocksRequested);
 | 
						|
      copyLocked = false;
 | 
						|
    }
 | 
						|
 | 
						|
    if (locked)
 | 
						|
    {
 | 
						|
      localLock.read_unlock();
 | 
						|
      locked = false;
 | 
						|
    }
 | 
						|
 | 
						|
    fr = iom->getNextRequest();
 | 
						|
 | 
						|
    localLock.read_lock();
 | 
						|
    locked = true;
 | 
						|
 | 
						|
    if (iom->IOTrace())
 | 
						|
      clock_gettime(CLOCK_REALTIME, &rqst1);
 | 
						|
 | 
						|
    lbid = fr->Lbid();
 | 
						|
    qc = fr->Ver();
 | 
						|
    txn = fr->Txn();
 | 
						|
    flg = fr->Flg();
 | 
						|
    compType = fr->CompType();
 | 
						|
    useCache = fr->useCache();
 | 
						|
    blocksLoaded = 0;
 | 
						|
    blocksRead = 0;
 | 
						|
    dlen = fr->BlocksRequested();
 | 
						|
    blocksRequested = fr->BlocksRequested();
 | 
						|
    oid = 0;
 | 
						|
    dbroot = 0;
 | 
						|
    partNum = 0;
 | 
						|
    segNum = 0;
 | 
						|
    offset = 0;
 | 
						|
 | 
						|
    // special case for getBlock.
 | 
						|
    iom->dbrm()->lockLBIDRange(lbid, blocksRequested);
 | 
						|
    copyLocked = true;
 | 
						|
 | 
						|
    // special case for getBlock.
 | 
						|
    if (blocksRequested == 1)
 | 
						|
    {
 | 
						|
      BRM::VER_t outVer;
 | 
						|
      iom->dbrm()->vssLookup((BRM::LBID_t)lbid, qc, txn, &outVer, &flg);
 | 
						|
      ver = outVer;
 | 
						|
      fr->versioned(flg);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      fr->versioned(false);
 | 
						|
      ver = qc.currentScn;
 | 
						|
    }
 | 
						|
 | 
						|
    err = iom->localLbidLookup(lbid, ver, flg, oid, dbroot, partNum, segNum, offset);
 | 
						|
 | 
						|
    if (err == BRM::ERR_SNAPSHOT_TOO_OLD)
 | 
						|
    {
 | 
						|
      ostringstream errMsg;
 | 
						|
      errMsg << "thr_popper: version " << ver << " of LBID " << lbid << "is too old";
 | 
						|
      iom->handleBlockReadError(fr, errMsg.str(), ©Locked);
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
    else if (err < 0)
 | 
						|
    {
 | 
						|
      ostringstream errMsg;
 | 
						|
      errMsg << "thr_popper: BRM lookup failure; lbid=" << lbid << "; ver=" << ver
 | 
						|
             << "; flg=" << (flg ? 1 : 0);
 | 
						|
      iom->handleBlockReadError(fr, errMsg.str(), ©Locked, fileRequest::BRM_LOOKUP_ERROR);
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
 | 
						|
#ifdef IDB_COMP_POC_DEBUG
 | 
						|
    {
 | 
						|
      boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
 | 
						|
 | 
						|
      if (compType != 0)
 | 
						|
        cout << boldStart;
 | 
						|
 | 
						|
      cout << "fileRequest: " << *fr << endl;
 | 
						|
 | 
						|
      if (compType != 0)
 | 
						|
        cout << boldStop;
 | 
						|
    }
 | 
						|
#endif
 | 
						|
    const uint32_t extentSize = iom->getExtentRows();
 | 
						|
    FdEntry fdKey(oid, dbroot, partNum, segNum, compType, NULL);
 | 
						|
    // cout << "Looking for " << fdKey << endl
 | 
						|
    //   << "O: " << oid << " D: " << dbroot << " P: " << partNum << " S: " << segNum << endl;
 | 
						|
 | 
						|
    fdMapMutex.lock();
 | 
						|
    fdit = fdcache.find(fdKey);
 | 
						|
 | 
						|
    if (fdit == fdcache.end())
 | 
						|
    {
 | 
						|
      try
 | 
						|
      {
 | 
						|
        iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
 | 
						|
      }
 | 
						|
      catch (exception& exc)
 | 
						|
      {
 | 
						|
        fdMapMutex.unlock();
 | 
						|
        Message::Args args;
 | 
						|
        args.add(oid);
 | 
						|
        args.add(exc.what());
 | 
						|
        primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
 | 
						|
        ostringstream errMsg;
 | 
						|
        errMsg << "thr_popper: Error building filename for OID " << oid << "; " << exc.what();
 | 
						|
        iom->handleBlockReadError(fr, errMsg.str(), ©Locked);
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
 | 
						|
#ifdef IDB_COMP_USE_CMP_SUFFIX
 | 
						|
 | 
						|
      if (compType != 0)
 | 
						|
      {
 | 
						|
        char* ptr = strrchr(fileNamePtr, '.');
 | 
						|
        idbassert(ptr);
 | 
						|
        strcpy(ptr, ".cmp");
 | 
						|
      }
 | 
						|
 | 
						|
#endif
 | 
						|
 | 
						|
      if (oid > 3000)
 | 
						|
      {
 | 
						|
        // TODO: should syscat columns be considered when reducing open file count
 | 
						|
        //  They are always needed why should they be closed?
 | 
						|
        if (fdcache.size() >= iom->MaxOpenFiles())
 | 
						|
        {
 | 
						|
          FdCacheCountType_t fdCountSort;
 | 
						|
 | 
						|
          for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
 | 
						|
          {
 | 
						|
            struct FdCountEntry fdc(it->second->oid, it->second->dbroot, it->second->partNum,
 | 
						|
                                    it->second->segNum, it->second->c, it);
 | 
						|
 | 
						|
            fdCountSort.insert(fdc);
 | 
						|
          }
 | 
						|
 | 
						|
          if (iom->FDCacheTrace())
 | 
						|
          {
 | 
						|
            iom->FDTraceFile() << "Before flushing sz: " << fdcache.size()
 | 
						|
                               << " delCount: " << iom->DecreaseOpenFilesCount() << endl;
 | 
						|
 | 
						|
            for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
 | 
						|
              iom->FDTraceFile() << *(*it).second << endl;
 | 
						|
 | 
						|
            iom->FDTraceFile() << "==================" << endl << endl;
 | 
						|
          }
 | 
						|
 | 
						|
          // TODO: should we consider a minimum number of open files
 | 
						|
          //      currently, there is nothing to prevent all open files
 | 
						|
          //      from being closed by the IOManager.
 | 
						|
 | 
						|
          uint32_t delCount = 0;
 | 
						|
 | 
						|
          for (FdCacheCountType_t::reverse_iterator rit = fdCountSort.rbegin();
 | 
						|
               rit != fdCountSort.rend() && fdcache.size() > 0 && delCount < iom->DecreaseOpenFilesCount();
 | 
						|
               rit++)
 | 
						|
          {
 | 
						|
            FdEntry oldfdKey(rit->oid, rit->dbroot, rit->partNum, rit->segNum, 0, NULL);
 | 
						|
            FdCacheType_t::iterator it = fdcache.find(oldfdKey);
 | 
						|
 | 
						|
            if (it != fdcache.end())
 | 
						|
            {
 | 
						|
              if (iom->FDCacheTrace())
 | 
						|
              {
 | 
						|
                if (!rit->fdit->second->inUse)
 | 
						|
                  iom->FDTraceFile() << "Removing dc: " << delCount << " sz: " << fdcache.size()
 | 
						|
                                     << *(*it).second << " u: " << rit->fdit->second->inUse << endl;
 | 
						|
                else
 | 
						|
                  iom->FDTraceFile() << "Skip Remove in use dc: " << delCount << " sz: " << fdcache.size()
 | 
						|
                                     << *(*it).second << " u: " << rit->fdit->second->inUse << endl;
 | 
						|
              }
 | 
						|
 | 
						|
              if (rit->fdit->second->inUse <= 0)
 | 
						|
              {
 | 
						|
                fdcache.erase(it);
 | 
						|
                delCount++;
 | 
						|
              }
 | 
						|
            }
 | 
						|
          }  // for (FdCacheCountType_t...
 | 
						|
 | 
						|
          if (iom->FDCacheTrace())
 | 
						|
          {
 | 
						|
            iom->FDTraceFile() << "After flushing sz: " << fdcache.size() << endl;
 | 
						|
 | 
						|
            for (FdCacheType_t::iterator it = fdcache.begin(); it != fdcache.end(); it++)
 | 
						|
            {
 | 
						|
              iom->FDTraceFile() << *(*it).second << endl;
 | 
						|
            }
 | 
						|
 | 
						|
            iom->FDTraceFile() << "==================" << endl << endl;
 | 
						|
          }
 | 
						|
 | 
						|
          fdCountSort.clear();
 | 
						|
 | 
						|
        }  // if (fdcache.size()...
 | 
						|
      }    // if (oid > 3000)
 | 
						|
 | 
						|
      int opts = primitiveprocessor::directIOFlag ? IDBDataFile::USE_ODIRECT : 0;
 | 
						|
      fp = NULL;
 | 
						|
      uint32_t openRetries = 0;
 | 
						|
      int saveErrno = 0;
 | 
						|
 | 
						|
      while (fp == NULL && openRetries++ < 5)
 | 
						|
      {
 | 
						|
        fp = IDBDataFile::open(IDBPolicy::getType(fileNamePtr, IDBPolicy::PRIMPROC), fileNamePtr, "r", opts);
 | 
						|
        saveErrno = errno;
 | 
						|
 | 
						|
        if (fp == NULL)
 | 
						|
          sleep(1);
 | 
						|
      }
 | 
						|
 | 
						|
      if (fp == NULL)
 | 
						|
      {
 | 
						|
        Message::Args args;
 | 
						|
        fdit = fdcache.end();
 | 
						|
        fdMapMutex.unlock();
 | 
						|
        args.add(oid);
 | 
						|
        args.add(string(fileNamePtr) + ":" + strerror(saveErrno));
 | 
						|
        primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
 | 
						|
        ostringstream errMsg;
 | 
						|
        errMsg << "thr_popper: Error opening file for OID " << oid << "; " << fileNamePtr << "; "
 | 
						|
               << strerror(saveErrno);
 | 
						|
        int errorCode = fileRequest::FAILED;
 | 
						|
 | 
						|
        if (saveErrno == EINVAL)
 | 
						|
          errorCode = fileRequest::FS_EINVAL;
 | 
						|
        else if (saveErrno == ENOENT)
 | 
						|
          errorCode = fileRequest::FS_ENOENT;
 | 
						|
 | 
						|
        iom->handleBlockReadError(fr, errMsg.str(), ©Locked, errorCode);
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
 | 
						|
      fe.reset(new FdEntry(oid, dbroot, partNum, segNum, compType, fp));
 | 
						|
      fe->inUse++;
 | 
						|
      fdcache[fdKey] = fe;
 | 
						|
      fdit = fdcache.find(fdKey);
 | 
						|
      fe.reset();
 | 
						|
    }
 | 
						|
 | 
						|
    else
 | 
						|
    {
 | 
						|
      if (fdit->second.get())
 | 
						|
      {
 | 
						|
        fdit->second->c++;
 | 
						|
        fdit->second->inUse++;
 | 
						|
        fp = fdit->second->fp;
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        Message::Args args;
 | 
						|
        fdit = fdcache.end();
 | 
						|
        fdMapMutex.unlock();
 | 
						|
        args.add(oid);
 | 
						|
        ostringstream errMsg;
 | 
						|
        errMsg << "Null FD cache entry. (dbroot, partNum, segNum, compType) = (" << dbroot << ", " << partNum
 | 
						|
               << ", " << segNum << ", " << compType << ")";
 | 
						|
        args.add(errMsg.str());
 | 
						|
        primitiveprocessor::mlp->logMessage(logging::M0053, args, true);
 | 
						|
        iom->handleBlockReadError(fr, errMsg.str(), ©Locked);
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    fdMapMutex.unlock();
 | 
						|
 | 
						|
#ifdef SHARED_NOTHING_DEMO_2
 | 
						|
 | 
						|
    // change offset if it's shared nothing
 | 
						|
    /* Get the extent #, divide by # of PMs, calculate base offset for the new extent #,
 | 
						|
            add extent offset */
 | 
						|
    if (oid >= 10000)
 | 
						|
      offset = (((offset / extentSize) / iom->pmCount) * extentSize) + (offset % extentSize);
 | 
						|
 | 
						|
#endif
 | 
						|
 | 
						|
    longSeekOffset = (uint64_t)offset * (uint64_t)fileBlockSize;
 | 
						|
    lldiv_t cmpOffFact = lldiv(longSeekOffset, (4LL * 1024LL * 1024LL));
 | 
						|
 | 
						|
    uint32_t readCount = 0;
 | 
						|
    uint32_t bytesRead = 0;
 | 
						|
    uint32_t compressedBytesRead =
 | 
						|
        0;  // @Bug 3149.  IOMTrace was not reporting bytesRead correctly for compressed columns.
 | 
						|
    uint32_t jend = blocksRequested / iom->blocksPerRead;
 | 
						|
 | 
						|
    if (iom->IOTrace())
 | 
						|
      clock_gettime(CLOCK_REALTIME, &tm);
 | 
						|
 | 
						|
    ostringstream errMsg;
 | 
						|
    bool errorOccurred = false;
 | 
						|
    string errorString;
 | 
						|
#ifdef IDB_COMP_POC_DEBUG
 | 
						|
    bool debugWrite = false;
 | 
						|
#endif
 | 
						|
 | 
						|
#ifdef EM_AS_A_TABLE_POC__
 | 
						|
    dlen = 1;
 | 
						|
#endif
 | 
						|
 | 
						|
    if (blocksRequested % iom->blocksPerRead)
 | 
						|
      jend++;
 | 
						|
 | 
						|
    for (j = 0; j < jend; j++)
 | 
						|
    {
 | 
						|
      int decompRetryCount = 0;
 | 
						|
      int retryReadHeadersCount = 0;
 | 
						|
 | 
						|
    decompRetry:
 | 
						|
      blocksThisRead = std::min(dlen, iom->blocksPerRead);
 | 
						|
      readSize = blocksThisRead * BLOCK_SIZE;
 | 
						|
 | 
						|
      acc = 0;
 | 
						|
 | 
						|
      while (acc < readSize)
 | 
						|
      {
 | 
						|
#if defined(EM_AS_A_TABLE_POC__)
 | 
						|
 | 
						|
        if (oid == 1084)
 | 
						|
        {
 | 
						|
          uint32_t h;
 | 
						|
          int32_t o = 0;
 | 
						|
          int32_t* ip;
 | 
						|
          ip = (int32_t*)(&alignedbuff[acc]);
 | 
						|
 | 
						|
          for (o = 0; o < 2048; o++)
 | 
						|
          {
 | 
						|
            if (iom->dbrm()->getHWM(o + 3000, h) == 0)
 | 
						|
              *ip++ = h;
 | 
						|
            else
 | 
						|
              *ip++ = numeric_limits<int32_t>::min() + 1;
 | 
						|
          }
 | 
						|
 | 
						|
          i = BLOCK_SIZE;
 | 
						|
        }
 | 
						|
        else
 | 
						|
          i = pread(fd, &alignedbuff[acc], readSize - acc, longSeekOffset);
 | 
						|
 | 
						|
#else
 | 
						|
 | 
						|
        if (fdit->second->isCompressed())
 | 
						|
        {
 | 
						|
        retryReadHeaders:
 | 
						|
          // hdrs may have been modified since we cached them in fdit->second...
 | 
						|
          time_t cur_mtime = numeric_limits<time_t>::max();
 | 
						|
          int updatePtrsRc = 0;
 | 
						|
          fdMapMutex.lock();
 | 
						|
          time_t fp_mtime = fp->mtime();
 | 
						|
 | 
						|
          if (fp_mtime != (time_t)-1)
 | 
						|
            cur_mtime = fp_mtime;
 | 
						|
 | 
						|
          if (decompRetryCount > 0 || retryReadHeadersCount > 0 || cur_mtime > fdit->second->cmpMTime)
 | 
						|
            updatePtrsRc = updateptrs(&alignedbuff[0], fdit);
 | 
						|
 | 
						|
          fdMapMutex.unlock();
 | 
						|
 | 
						|
          int idx = cmpOffFact.quot;
 | 
						|
 | 
						|
          if (updatePtrsRc != 0 || idx >= (signed)fdit->second->ptrList.size())
 | 
						|
          {
 | 
						|
            // Due to race condition, the header on disk may not upated yet.
 | 
						|
            // Log an info message and retry.
 | 
						|
            if (retryReadHeadersCount == 0)
 | 
						|
            {
 | 
						|
              Message::Args args;
 | 
						|
              args.add(oid);
 | 
						|
              ostringstream infoMsg;
 | 
						|
              iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
 | 
						|
              infoMsg << "retry updateptrs for " << fileNamePtr << ". rc=" << updatePtrsRc << ", idx=" << idx
 | 
						|
                      << ", ptr.size=" << fdit->second->ptrList.size();
 | 
						|
              args.add(infoMsg.str());
 | 
						|
              primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
 | 
						|
            }
 | 
						|
 | 
						|
            if (++retryReadHeadersCount < 30)
 | 
						|
            {
 | 
						|
              waitForRetry(retryReadHeadersCount);
 | 
						|
              fdit->second->cmpMTime = 0;
 | 
						|
              goto retryReadHeaders;
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
              // still fail after all the retries.
 | 
						|
              errorOccurred = true;
 | 
						|
              errMsg << "Error reading compression header. rc=" << updatePtrsRc << ", idx=" << idx
 | 
						|
                     << ", ptr.size=" << fdit->second->ptrList.size();
 | 
						|
              errorString = errMsg.str();
 | 
						|
              break;
 | 
						|
            }
 | 
						|
          }
 | 
						|
 | 
						|
          // FIXME: make sure alignedbuff can hold fdit->second->ptrList[idx].second bytes
 | 
						|
          if (fdit->second->ptrList[idx].second > maxCompSz)
 | 
						|
          {
 | 
						|
            errorOccurred = true;
 | 
						|
            errMsg << "aligned buff too small. dataSize=" << fdit->second->ptrList[idx].second
 | 
						|
                   << ", buffSize=" << maxCompSz;
 | 
						|
            errorString = errMsg.str();
 | 
						|
            break;
 | 
						|
          }
 | 
						|
 | 
						|
          i = fp->pread(&alignedbuff[0], fdit->second->ptrList[idx].first, fdit->second->ptrList[idx].second);
 | 
						|
#ifdef IDB_COMP_POC_DEBUG
 | 
						|
          {
 | 
						|
            boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
 | 
						|
            cout << boldStart << "pread1.1(" << fp << ", 0x" << hex << (ptrdiff_t)&alignedbuff[0] << dec
 | 
						|
                 << ", " << fdit->second->ptrList[idx].second << ", " << fdit->second->ptrList[idx].first
 | 
						|
                 << ") = " << i << ' ' << cmpOffFact.quot << ' ' << cmpOffFact.rem << boldStop << endl;
 | 
						|
          }
 | 
						|
#endif
 | 
						|
 | 
						|
          // @bug3407, give it some retries if pread failed.
 | 
						|
          if (i != (ssize_t)fdit->second->ptrList[idx].second)
 | 
						|
          {
 | 
						|
            // log an info message
 | 
						|
            if (retryReadHeadersCount == 0)
 | 
						|
            {
 | 
						|
              Message::Args args;
 | 
						|
              args.add(oid);
 | 
						|
              ostringstream infoMsg;
 | 
						|
              iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
 | 
						|
              infoMsg << " Read from " << fileNamePtr << " failed at chunk " << idx
 | 
						|
                      << ". (offset, size, actuall read) = (" << fdit->second->ptrList[idx].first << ", "
 | 
						|
                      << fdit->second->ptrList[idx].second << ", " << i << ")";
 | 
						|
              args.add(infoMsg.str());
 | 
						|
              primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
 | 
						|
            }
 | 
						|
 | 
						|
            if (++retryReadHeadersCount < 30)
 | 
						|
            {
 | 
						|
              waitForRetry(retryReadHeadersCount);
 | 
						|
              fdit->second->cmpMTime = 0;
 | 
						|
              goto retryReadHeaders;
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
              errorOccurred = true;
 | 
						|
              errMsg << "Error reading chunk " << idx;
 | 
						|
              errorString = errMsg.str();
 | 
						|
              break;
 | 
						|
            }
 | 
						|
          }
 | 
						|
 | 
						|
          compressedBytesRead += i;  // @Bug 3149.
 | 
						|
          i = readSize;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          i = fp->pread(&alignedbuff[acc], longSeekOffset, readSize - acc);
 | 
						|
#ifdef IDB_COMP_POC_DEBUG
 | 
						|
          {
 | 
						|
            boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
 | 
						|
            cout << "pread1.2(" << fp << ", 0x" << hex << (ptrdiff_t)&alignedbuff[acc] << dec << ", "
 | 
						|
                 << (readSize - acc) << ", " << longSeekOffset << ") = " << i << ' ' << cmpOffFact.quot << ' '
 | 
						|
                 << cmpOffFact.rem << endl;
 | 
						|
          }
 | 
						|
#endif
 | 
						|
        }
 | 
						|
 | 
						|
#endif
 | 
						|
 | 
						|
        if (i < 0 && errno == EINTR)
 | 
						|
        {
 | 
						|
          continue;
 | 
						|
        }
 | 
						|
        else if (i < 0)
 | 
						|
        {
 | 
						|
          try
 | 
						|
          {
 | 
						|
            iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
 | 
						|
          }
 | 
						|
          catch (exception& exc)
 | 
						|
          {
 | 
						|
            cerr << "FileName Err:" << exc.what() << endl;
 | 
						|
            strcpy(fileNamePtr, "unknown filename");
 | 
						|
          }
 | 
						|
 | 
						|
          errorString = strerror(errno);
 | 
						|
          errorOccurred = true;
 | 
						|
          errMsg << "thr_popper: Error reading file for OID " << oid << "; "
 | 
						|
                 << " fp " << fp << "; offset " << longSeekOffset << "; fileName " << fileNamePtr << "; "
 | 
						|
                 << errorString;
 | 
						|
          break;  // break from "while(acc..." loop
 | 
						|
        }
 | 
						|
        else if (i == 0)
 | 
						|
        {
 | 
						|
          iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
 | 
						|
          errorString = "early EOF";
 | 
						|
          errorOccurred = true;
 | 
						|
          errMsg << "thr_popper: Early EOF reading file for OID " << oid << "; " << fileNamePtr;
 | 
						|
          break;  // break from "while(acc..." loop
 | 
						|
        }
 | 
						|
 | 
						|
        acc += i;
 | 
						|
        longSeekOffset += (uint64_t)i;
 | 
						|
        readCount++;
 | 
						|
        bytesRead += i;
 | 
						|
      }  // while(acc...
 | 
						|
 | 
						|
      //..Break out of "for (j..." read loop if error occurred
 | 
						|
      if (errorOccurred)
 | 
						|
      {
 | 
						|
        Message::Args args;
 | 
						|
        args.add(oid);
 | 
						|
        args.add(errorString);
 | 
						|
        primitiveprocessor::mlp->logMessage(logging::M0061, args, true);
 | 
						|
        iom->handleBlockReadError(fr, errMsg.str(), ©Locked);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      blocksRead += blocksThisRead;
 | 
						|
 | 
						|
      if (iom->IOTrace())
 | 
						|
        clock_gettime(CLOCK_REALTIME, &tm2);
 | 
						|
 | 
						|
      /* New bulk VSS lookup code */
 | 
						|
      {
 | 
						|
        vector<BRM::LBID_t> lbids;
 | 
						|
        vector<BRM::VER_t> versions;
 | 
						|
        vector<bool> isLocked;
 | 
						|
 | 
						|
        for (i = 0; (uint32_t)i < blocksThisRead; i++)
 | 
						|
          lbids.push_back((BRM::LBID_t)(lbid + i) + (j * iom->blocksPerRead));
 | 
						|
 | 
						|
        if (blocksRequested > 1 || !flg)  // prefetch, or an unversioned single-block read
 | 
						|
          iom->dbrm()->bulkGetCurrentVersion(lbids, &versions, &isLocked);
 | 
						|
        else  // a single-block read that was versioned
 | 
						|
        {
 | 
						|
          versions.push_back(ver);
 | 
						|
          isLocked.push_back(false);
 | 
						|
        }
 | 
						|
 | 
						|
        uint8_t* ptr = (uint8_t*)&alignedbuff[0];
 | 
						|
 | 
						|
        if (blocksThisRead > 0 && fdit->second->isCompressed())
 | 
						|
        {
 | 
						|
          size_t blen = 4 * 1024 * 1024 + 4;
 | 
						|
#ifdef IDB_COMP_POC_DEBUG
 | 
						|
          {
 | 
						|
            boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
 | 
						|
            cout << "decompress(0x" << hex << (ptrdiff_t)&alignedbuff[0] << dec << ", "
 | 
						|
                 << fdit->second->ptrList[cmpOffFact.quot].second << ", 0x" << hex << (ptrdiff_t)uCmpBuf
 | 
						|
                 << dec << ", " << blen << ")" << endl;
 | 
						|
          }
 | 
						|
#endif
 | 
						|
 | 
						|
          std::unique_ptr<compress::CompressInterface> decompressor(
 | 
						|
              compress::getCompressInterfaceByType(static_cast<uint32_t>(fdit->second->compType)));
 | 
						|
          if (!decompressor)
 | 
						|
          {
 | 
						|
            // Use default?
 | 
						|
            decompressor.reset(new compress::CompressInterfaceSnappy());
 | 
						|
          }
 | 
						|
 | 
						|
          int dcrc = decompressor->uncompressBlock(
 | 
						|
              &alignedbuff[0], fdit->second->ptrList[cmpOffFact.quot].second, uCmpBuf, blen);
 | 
						|
 | 
						|
          if (dcrc != 0)
 | 
						|
          {
 | 
						|
#ifdef IDB_COMP_POC_DEBUG
 | 
						|
            boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
 | 
						|
#endif
 | 
						|
 | 
						|
            if (++decompRetryCount < 30)
 | 
						|
            {
 | 
						|
              blocksRead -= blocksThisRead;
 | 
						|
              waitForRetry(decompRetryCount);
 | 
						|
 | 
						|
              // log an info message every 10 retries
 | 
						|
              if (decompRetryCount == 1)
 | 
						|
              {
 | 
						|
                Message::Args args;
 | 
						|
                args.add(oid);
 | 
						|
                ostringstream infoMsg;
 | 
						|
                iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
 | 
						|
                infoMsg << "decompress retry for " << fileNamePtr << " decompRetry chunk " << cmpOffFact.quot
 | 
						|
                        << " code=" << dcrc;
 | 
						|
                args.add(infoMsg.str());
 | 
						|
                primitiveprocessor::mlp->logInfoMessage(logging::M0061, args);
 | 
						|
              }
 | 
						|
 | 
						|
              goto decompRetry;
 | 
						|
            }
 | 
						|
 | 
						|
            cout << boldStart << "decomp returned " << dcrc << boldStop << endl;
 | 
						|
 | 
						|
            errorOccurred = true;
 | 
						|
            Message::Args args;
 | 
						|
            args.add(oid);
 | 
						|
            errMsg << "Error decompressing block " << cmpOffFact.quot << " code=" << dcrc
 | 
						|
                   << " part=" << partNum << " seg=" << segNum;
 | 
						|
            args.add(errMsg.str());
 | 
						|
            primitiveprocessor::mlp->logMessage(logging::M0061, args, true);
 | 
						|
            iom->handleBlockReadError(fr, errMsg.str(), ©Locked);
 | 
						|
            break;
 | 
						|
          }
 | 
						|
 | 
						|
          // FIXME: why doesn't this work??? (See later for why)
 | 
						|
          // ptr = &uCmpBuf[cmpOffFact.rem];
 | 
						|
          memcpy(ptr, &uCmpBuf[cmpOffFact.rem], blocksThisRead * BLOCK_SIZE);
 | 
						|
 | 
						|
          // log the retries, if any
 | 
						|
          if (retryReadHeadersCount > 0 || decompRetryCount > 0)
 | 
						|
          {
 | 
						|
            Message::Args args;
 | 
						|
            args.add(oid);
 | 
						|
            ostringstream infoMsg;
 | 
						|
            iom->buildOidFileName(oid, dbroot, partNum, segNum, fileNamePtr);
 | 
						|
            infoMsg << "Successfully uncompress " << fileNamePtr << " chunk " << cmpOffFact.quot << " @";
 | 
						|
 | 
						|
            if (retryReadHeadersCount > 0)
 | 
						|
              infoMsg << " HeaderRetry:" << retryReadHeadersCount;
 | 
						|
 | 
						|
            if (decompRetryCount > 0)
 | 
						|
              infoMsg << " UncompressRetry:" << decompRetryCount;
 | 
						|
 | 
						|
            args.add(infoMsg.str());
 | 
						|
            primitiveprocessor::mlp->logInfoMessage(logging::M0006, args);
 | 
						|
          }
 | 
						|
        }
 | 
						|
 | 
						|
        for (i = 0; useCache && (uint32_t)i < lbids.size(); i++)
 | 
						|
        {
 | 
						|
          if (!isLocked[i])
 | 
						|
          {
 | 
						|
#ifdef IDB_COMP_POC_DEBUG
 | 
						|
            {
 | 
						|
              if (debugWrite)
 | 
						|
              {
 | 
						|
                boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
 | 
						|
                cout << boldStart << "i = " << i << ", ptr = 0x" << hex << (ptrdiff_t)&ptr[i * BLOCK_SIZE]
 | 
						|
                     << dec << boldStop << endl;
 | 
						|
                cout << boldStart;
 | 
						|
#if 0
 | 
						|
                                int32_t* i32p;
 | 
						|
                                i32p = (int32_t*)&ptr[i * BLOCK_SIZE];
 | 
						|
 | 
						|
                                for (int iy = 0; iy < 2; iy++)
 | 
						|
                                {
 | 
						|
                                    for (int ix = 0; ix < 8; ix++, i32p++)
 | 
						|
                                        cout << *i32p << ' ';
 | 
						|
 | 
						|
                                    cout << endl;
 | 
						|
                                }
 | 
						|
 | 
						|
#else
 | 
						|
                int64_t* i64p;
 | 
						|
                i64p = (int64_t*)&ptr[i * BLOCK_SIZE];
 | 
						|
 | 
						|
                for (int iy = 0; iy < 2; iy++)
 | 
						|
                {
 | 
						|
                  for (int ix = 0; ix < 8; ix++, i64p++)
 | 
						|
                    cout << *i64p << ' ';
 | 
						|
 | 
						|
                  cout << endl;
 | 
						|
                }
 | 
						|
 | 
						|
#endif
 | 
						|
                cout << boldStop << endl;
 | 
						|
              }
 | 
						|
            }
 | 
						|
#endif
 | 
						|
            cacheInsertOps.push_back(
 | 
						|
                CacheInsert_t(lbids[i], versions[i], (uint8_t*)&alignedbuff[i * BLOCK_SIZE]));
 | 
						|
          }
 | 
						|
        }
 | 
						|
 | 
						|
        if (useCache)
 | 
						|
        {
 | 
						|
          blocksLoaded += fbm->bulkInsert(cacheInsertOps);
 | 
						|
          cacheInsertOps.clear();
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      dlen -= blocksThisRead;
 | 
						|
 | 
						|
    }  // for (j...
 | 
						|
 | 
						|
    fdMapMutex.lock();
 | 
						|
 | 
						|
    if (fdit->second.get())
 | 
						|
      fdit->second->inUse--;
 | 
						|
 | 
						|
    fdit = fdcache.end();
 | 
						|
    fdMapMutex.unlock();
 | 
						|
 | 
						|
    if (errorOccurred)
 | 
						|
      continue;
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
      iom->dbrm()->releaseLBIDRange(lbid, blocksRequested);
 | 
						|
      copyLocked = false;
 | 
						|
    }
 | 
						|
    catch (exception& e)
 | 
						|
    {
 | 
						|
      cout << "releaseRange: " << e.what() << endl;
 | 
						|
    }
 | 
						|
 | 
						|
    fr->BlocksRead(blocksRead);
 | 
						|
    fr->BlocksLoaded(blocksLoaded);
 | 
						|
 | 
						|
    // FIXME: This is why some code above doesn't work...
 | 
						|
    if (fr->data != 0 && blocksRequested == 1)
 | 
						|
      memcpy(fr->data, alignedbuff, BLOCK_SIZE);
 | 
						|
 | 
						|
    fr->frMutex().lock();
 | 
						|
    fr->SetPredicate(fileRequest::COMPLETE);
 | 
						|
    fr->frCond().notify_one();
 | 
						|
    fr->frMutex().unlock();
 | 
						|
 | 
						|
    if (iom->IOTrace())
 | 
						|
    {
 | 
						|
      clock_gettime(CLOCK_REALTIME, &rqst2);
 | 
						|
      timespec_sub(tm, tm2, tm3);
 | 
						|
      timespec_sub(rqst1, rqst2, rqst3);
 | 
						|
 | 
						|
      // @Bug 3149.  IOMTrace was not reporting bytesRead correctly for compressed columns.
 | 
						|
      uint32_t reportBytesRead = (compressedBytesRead > 0) ? compressedBytesRead : bytesRead;
 | 
						|
 | 
						|
      lFile << left << setw(5) << setfill(' ') << oid << right << setw(5) << setfill(' ')
 | 
						|
            << offset / extentSize << " " << right << setw(11) << setfill(' ') << lbid << " " << right
 | 
						|
            << setw(9) << bytesRead / (readCount << 13) << " " << right << setw(9) << blocksRequested << " "
 | 
						|
            << right << setw(10) << fixed << tm3 << " " << right << fixed
 | 
						|
            << ((double)(rqst1.tv_sec + (1.e-9 * rqst1.tv_nsec))) << " " << right << setw(10) << fixed
 | 
						|
            << rqst3 << " " << right << setw(10) << fixed << longSeekOffset << " " << right << setw(10)
 | 
						|
            << fixed << reportBytesRead << " " << right << setw(3) << fixed << dbroot << " " << right
 | 
						|
            << setw(3) << fixed << partNum << " " << right << setw(3) << fixed << segNum << " " << endl;
 | 
						|
    }
 | 
						|
 | 
						|
  }  // for(;;)
 | 
						|
 | 
						|
  delete[] uCmpBuf;
 | 
						|
 | 
						|
  lFile.close();
 | 
						|
 | 
						|
  // reaching here is an error...
 | 
						|
 | 
						|
  return 0;
 | 
						|
}  // end thr_popper
 | 
						|
 | 
						|
}  // anonymous namespace
 | 
						|
 | 
						|
namespace dbbc
 | 
						|
{
 | 
						|
void setReadLock()
 | 
						|
{
 | 
						|
  localLock.read_lock();
 | 
						|
}
 | 
						|
 | 
						|
void releaseReadLock()
 | 
						|
{
 | 
						|
  localLock.read_unlock();
 | 
						|
}
 | 
						|
 | 
						|
void dropFDCache()
 | 
						|
{
 | 
						|
  localLock.write_lock();
 | 
						|
  fdcache.clear();
 | 
						|
  localLock.write_unlock();
 | 
						|
}
 | 
						|
void purgeFDCache(std::vector<BRM::FileInfo>& files)
 | 
						|
{
 | 
						|
  localLock.write_lock();
 | 
						|
 | 
						|
  FdCacheType_t::iterator fdit;
 | 
						|
 | 
						|
  for (uint32_t i = 0; i < files.size(); i++)
 | 
						|
  {
 | 
						|
    FdEntry fdKey(files[i].oid, files[i].dbRoot, files[i].partitionNum, files[i].segmentNum,
 | 
						|
                  files[i].compType, NULL);
 | 
						|
    fdit = fdcache.find(fdKey);
 | 
						|
 | 
						|
    if (fdit != fdcache.end())
 | 
						|
      fdcache.erase(fdit);
 | 
						|
  }
 | 
						|
 | 
						|
  localLock.write_unlock();
 | 
						|
}
 | 
						|
 | 
						|
ioManager::ioManager(FileBufferMgr& fbm, fileBlockRequestQueue& fbrq, int thrCount, int bsPerRead)
 | 
						|
 : blocksPerRead(bsPerRead), fIOMfbMgr(fbm), fIOMRequestQueue(fbrq), fFileOp(false)
 | 
						|
{
 | 
						|
  if (thrCount <= 0)
 | 
						|
    thrCount = 1;
 | 
						|
 | 
						|
  if (thrCount > 256)
 | 
						|
    thrCount = 256;
 | 
						|
 | 
						|
  fConfig = Config::makeConfig();
 | 
						|
  string val = fConfig->getConfig("DBBC", "IOMTracing");
 | 
						|
  int temp = 0;
 | 
						|
 | 
						|
  if (val.length() > 0)
 | 
						|
    temp = static_cast<int>(Config::fromText(val));
 | 
						|
 | 
						|
  if (temp > 0)
 | 
						|
    fIOTrace = true;
 | 
						|
  else
 | 
						|
    fIOTrace = false;
 | 
						|
 | 
						|
  val = fConfig->getConfig("DBBC", "MaxOpenFiles");
 | 
						|
  temp = 0;
 | 
						|
  fMaxOpenFiles = MAX_OPEN_FILES;
 | 
						|
 | 
						|
  if (val.length() > 0)
 | 
						|
    temp = static_cast<int>(Config::fromText(val));
 | 
						|
 | 
						|
  if (temp > 0)
 | 
						|
    fMaxOpenFiles = temp;
 | 
						|
 | 
						|
  val = fConfig->getConfig("DBBC", "DecreaseOpenFilesCount");
 | 
						|
  temp = 0;
 | 
						|
  fDecreaseOpenFilesCount = DECREASE_OPEN_FILES;
 | 
						|
 | 
						|
  if (val.length() > 0)
 | 
						|
    temp = static_cast<int>(Config::fromText(val));
 | 
						|
 | 
						|
  if (temp > 0)
 | 
						|
    fDecreaseOpenFilesCount = temp;
 | 
						|
 | 
						|
  // limit the number of files closed
 | 
						|
  if (fDecreaseOpenFilesCount > (uint32_t)(0.75 * fMaxOpenFiles))
 | 
						|
    fDecreaseOpenFilesCount = (uint32_t)(0.75 * fMaxOpenFiles);
 | 
						|
 | 
						|
  val = fConfig->getConfig("DBBC", "FDCacheTrace");
 | 
						|
  temp = 0;
 | 
						|
  fFDCacheTrace = false;
 | 
						|
 | 
						|
  if (val.length() > 0)
 | 
						|
    temp = static_cast<int>(Config::fromText(val));
 | 
						|
 | 
						|
  if (temp > 0)
 | 
						|
  {
 | 
						|
    fFDCacheTrace = true;
 | 
						|
    FDTraceFile().open(string(MCSLOGDIR) + "/trace/fdcache", ios_base::ate | ios_base::app);
 | 
						|
  }
 | 
						|
 | 
						|
  fThreadCount = thrCount;
 | 
						|
  go();
 | 
						|
}
 | 
						|
 | 
						|
void ioManager::buildOidFileName(const BRM::OID_t oid, uint16_t dbRoot, const uint32_t partNum,
 | 
						|
                                 const uint16_t segNum, char* file_name)
 | 
						|
{
 | 
						|
  // when it's a request for the version buffer, the dbroot comes in as 0 for legacy reasons
 | 
						|
  if (dbRoot == 0 && oid < 1000)
 | 
						|
    dbRoot = fdbrm.getDBRootOfVBOID(oid);
 | 
						|
 | 
						|
  fFileOp.getFileNameForPrimProc(oid, file_name, dbRoot, partNum, segNum);
 | 
						|
}
 | 
						|
 | 
						|
int ioManager::localLbidLookup(BRM::LBID_t lbid, BRM::VER_t verid, bool vbFlag, BRM::OID_t& oid,
 | 
						|
                               uint16_t& dbRoot, uint32_t& partitionNum, uint16_t& segmentNum,
 | 
						|
                               uint32_t& fileBlockOffset)
 | 
						|
{
 | 
						|
  if (primitiveprocessor::noVB > 0)
 | 
						|
    vbFlag = false;
 | 
						|
 | 
						|
  int rc = fdbrm.lookupLocal(lbid, verid, vbFlag, oid, dbRoot, partitionNum, segmentNum, fileBlockOffset);
 | 
						|
 | 
						|
  return rc;
 | 
						|
}
 | 
						|
 | 
						|
struct LambdaKludge
 | 
						|
{
 | 
						|
  LambdaKludge(ioManager* i) : iom(i)
 | 
						|
  {
 | 
						|
  }
 | 
						|
  ~LambdaKludge()
 | 
						|
  {
 | 
						|
    iom = NULL;
 | 
						|
  }
 | 
						|
  ioManager* iom;
 | 
						|
  void operator()()
 | 
						|
  {
 | 
						|
    thr_popper(iom);
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
void ioManager::createReaders()
 | 
						|
{
 | 
						|
  int idx;
 | 
						|
 | 
						|
  for (idx = 0; idx < fThreadCount; idx++)
 | 
						|
  {
 | 
						|
    try
 | 
						|
    {
 | 
						|
      fThreadArr.create_thread(LambdaKludge(this));
 | 
						|
    }
 | 
						|
    catch (exception& e)
 | 
						|
    {
 | 
						|
      cerr << "IOM::createReaders() caught " << e.what() << endl;
 | 
						|
      idx--;
 | 
						|
      sleep(1);
 | 
						|
      continue;
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
ioManager::~ioManager()
 | 
						|
{
 | 
						|
  stop();
 | 
						|
}
 | 
						|
 | 
						|
void ioManager::go(void)
 | 
						|
{
 | 
						|
  createReaders();
 | 
						|
}
 | 
						|
 | 
						|
// FIXME: is this right? what does this method do?
 | 
						|
void ioManager::stop()
 | 
						|
{
 | 
						|
  for (int idx = 0; idx < fThreadCount; idx++)
 | 
						|
  {
 | 
						|
    (void)0;  // pthread_detach(fThreadArr[idx]);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
fileRequest* ioManager::getNextRequest()
 | 
						|
{
 | 
						|
  fileRequest* blk = 0;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    blk = fIOMRequestQueue.pop();
 | 
						|
    return blk;
 | 
						|
  }
 | 
						|
  catch (exception&)
 | 
						|
  {
 | 
						|
    cerr << "ioManager::getNextRequest() ERROR " << endl;
 | 
						|
  }
 | 
						|
 | 
						|
  return blk;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Prints stderr msg and updates fileRequest object to reflect an error.
 | 
						|
// Lastly, notifies waiting thread that fileRequest has been completed.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void ioManager::handleBlockReadError(fileRequest* fr, const string& errMsg, bool* copyLocked, int errorCode)
 | 
						|
{
 | 
						|
  try
 | 
						|
  {
 | 
						|
    dbrm()->releaseLBIDRange(fr->Lbid(), fr->BlocksRequested());
 | 
						|
    *copyLocked = false;
 | 
						|
  }
 | 
						|
  catch (exception& e)
 | 
						|
  {
 | 
						|
    cout << "releaseRange on read error: " << e.what() << endl;
 | 
						|
  }
 | 
						|
 | 
						|
  cerr << errMsg << endl;
 | 
						|
  fr->RequestStatus(errorCode);
 | 
						|
  fr->RequestStatusStr(errMsg);
 | 
						|
 | 
						|
  fr->frMutex().lock();
 | 
						|
  fr->SetPredicate(fileRequest::COMPLETE);
 | 
						|
  fr->frCond().notify_one();
 | 
						|
  fr->frMutex().unlock();
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace dbbc
 |