You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
288 lines
6.3 KiB
Plaintext
288 lines
6.3 KiB
Plaintext
//
|
|
// C++ Implementation: iomanager
|
|
//
|
|
// Description:
|
|
//
|
|
//
|
|
// Author: Jason Rodriguez <jrodriguez@calpont.com>
|
|
//
|
|
// Copyright: Calpont, Inc, (C) 2007
|
|
//
|
|
//
|
|
|
|
#define _FILE_OFFSET_BITS 64
|
|
#define _LARGEFILE64_SOURCE
|
|
#include <stdexcept>
|
|
#include <iostream>
|
|
#include <unistd.h>
|
|
#include <stdlib.h>
|
|
#include <string>
|
|
#include <sstream>
|
|
#include <map>
|
|
#include <sys/types.h>
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
#include <errno.h>
|
|
#include <boost/scoped_array.hpp>
|
|
|
|
using namespace std;
|
|
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
|
|
#include "iomanager.h"
|
|
namespace {
|
|
|
|
using namespace dbbc;
|
|
using namespace std;
|
|
|
|
unsigned pageSize; // = getpagesize();
|
|
|
|
// #define FDCACHE
|
|
|
|
/* structures shared across all iomanagers */
|
|
#ifdef FDCACHE
|
|
map<uint32_t, int> fdcache;
|
|
pthread_mutex_t fdMutex = PTHREAD_MUTEX_INITIALIZER;
|
|
#endif
|
|
|
|
void* thr_popper(void* arg) {
|
|
ioManager* iom = (ioManager*)arg;
|
|
FileBufferMgr* fbm;
|
|
int totalRqst=0;
|
|
fileRequest* fr=NULL;
|
|
BRM::LBID_t lbid=0;
|
|
BRM::OID_t oid=0;
|
|
BRM::VER_t ver=0;
|
|
int blocksLoaded=0;
|
|
const unsigned pageSize = getpagesize();
|
|
fbm = &iom->fileBufferManager();
|
|
char fileName[WriteEngine::FILE_NAME_SIZE];
|
|
const uint64_t fileBlockSize = BLOCK_SIZE;
|
|
uint32_t offset=0;
|
|
bool flg=false;
|
|
char* fileNamePtr=fileName;
|
|
uint64_t longSeekOffset=0;
|
|
int dlen=0, i, err;
|
|
uint acc;
|
|
uint32_t sz=0, readSize;
|
|
char* alignedbuff=NULL;
|
|
// char realbuff[(fileBlockSize)+pageSize] __attribute__((aligned)); //doesn't align enough, but is a start...
|
|
boost::scoped_array<char> realbuff;
|
|
uint extentSize = iom->getExtentSize();
|
|
#ifdef FDCACHE
|
|
map<uint32_t, int>::iterator fdit;
|
|
#endif
|
|
int fd;
|
|
|
|
// the largest read
|
|
realbuff.reset(new char[(extentSize * BLOCK_SIZE) + pageSize]);
|
|
|
|
for ( ; ; ) {
|
|
|
|
fr = iom->getNextRequest();
|
|
lbid = fr->Lbid();
|
|
ver = fr->Ver();
|
|
flg = fr->Flg();
|
|
blocksLoaded=0;
|
|
|
|
err = iom->lbidLookup(lbid, ver, flg, oid, offset);
|
|
if (err < 0) {
|
|
cerr << "lbid=" << lbid << " ver=" << ver << " flg=" << (flg ? 1 : 0) << endl;
|
|
throw runtime_error("thr_popper: BRM lookup failure");
|
|
}
|
|
|
|
#ifdef FDCACHE
|
|
pthread_mutex_lock(&fdMutex);
|
|
fdit = fdcache.find(oid);
|
|
if (fdit == fdcache.end()) {
|
|
#endif
|
|
try {
|
|
iom->buildOidFileName(oid, fileNamePtr);
|
|
} catch (exception& exc)
|
|
{
|
|
cerr << "FileName Err:" << exc.what() << endl;
|
|
}
|
|
fd = open(fileNamePtr, O_RDONLY|O_DIRECT|O_LARGEFILE|O_NOATIME);
|
|
if (fd<0)
|
|
throw runtime_error("Error opening file");
|
|
#ifdef FDCACHE
|
|
fdcache[oid] = fd;
|
|
}
|
|
else
|
|
fd = fdit->second;
|
|
pthread_mutex_unlock(&fdMutex);
|
|
#endif
|
|
|
|
longSeekOffset=(uint64_t)offset * (uint64_t)fileBlockSize;
|
|
lseek64(fd, longSeekOffset, SEEK_SET);
|
|
totalRqst++;
|
|
dlen = (fr->BlocksRequested() > extentSize ? extentSize : fr->BlocksRequested());
|
|
sz=0;
|
|
|
|
readSize = dlen * BLOCK_SIZE;
|
|
if (realbuff == NULL) {
|
|
cerr << "thr_popper: Can't allocate space for a whole extent in memory" << endl;
|
|
return NULL;
|
|
}
|
|
|
|
#if __WORDSIZE > 32
|
|
alignedbuff=(char*)((((ptrdiff_t)&realbuff[0] + pageSize - 1) / pageSize) * pageSize);
|
|
#else
|
|
alignedbuff=(char*)(((((ptrdiff_t)&realbuff[0] & 0xffffffff) + pageSize - 1) / pageSize) * pageSize);
|
|
#endif
|
|
// Uncomment these assertions if necessary...
|
|
// assert(((ptrdiff_t)alignedbuff - (ptrdiff_t)realbuff) < (ptrdiff_t)pageSize);
|
|
|
|
//these 2 things need to be true for O_DIRECT read to work
|
|
// assert((fileBlockSize % pageSize) == 0);
|
|
// assert(((ptrdiff_t)alignedbuff % pageSize) == 0);
|
|
|
|
if (dlen > 1)
|
|
cerr << "thrpopper: multiblock read of " << dlen << " readsize = " <<
|
|
readSize << endl;
|
|
|
|
acc = 0;
|
|
while (acc < readSize) {
|
|
// cerr << "reading " << readSize-acc << "/" << readSize << endl;
|
|
i = read(fd, &alignedbuff[acc], readSize - acc);
|
|
/* XXXPAT: Need to decide how to handle errors here */
|
|
if (i < 0 && errno == EINTR)
|
|
continue;
|
|
else if (i < 0) {
|
|
perror("thr_popper::read");
|
|
return NULL; // shuts down this thread,
|
|
// probably not the right thing to do
|
|
}
|
|
else if (i == 0) {
|
|
try {
|
|
iom->buildOidFileName(oid, fileNamePtr);
|
|
} catch (exception& exc)
|
|
{
|
|
cerr << "FileName Err:" << exc.what() << endl;
|
|
}
|
|
cerr << "thr_popper: Early EOF in file " << fileNamePtr << endl;
|
|
return NULL;
|
|
}
|
|
acc += i;
|
|
}
|
|
|
|
for (i = 0; i < dlen; ++i) {
|
|
fbm->insert(FileBuffer(fr->Lbid() + i, fr->Ver(),
|
|
(uint8_t *) &alignedbuff[i*BLOCK_SIZE], BLOCK_SIZE));
|
|
++blocksLoaded;
|
|
}
|
|
|
|
if (fr->data != NULL && dlen == 1)
|
|
memcpy(fr->data, alignedbuff, BLOCK_SIZE);
|
|
|
|
#ifndef FDCACHE
|
|
close(fd);
|
|
#endif
|
|
|
|
fr->RequestStatus(fileRequest::SUCCESSFUL);
|
|
|
|
pthread_mutex_lock(&fr->frMutex());
|
|
fr->SetPredicate(fileRequest::COMPLETE);
|
|
pthread_cond_signal(&fr->frCond());
|
|
pthread_mutex_unlock(&fr->frMutex());
|
|
|
|
} // for(;;)
|
|
|
|
return NULL;
|
|
} // end thr_popper
|
|
|
|
}
|
|
|
|
namespace dbbc {
|
|
|
|
ioManager::ioManager(FileBufferMgr& fbm,
|
|
fileBlockRequestQueue& fbrq,
|
|
int thrCount):
|
|
fIOMfbMgr(fbm),
|
|
fIOMRequestQueue(fbrq)
|
|
{
|
|
pageSize = getpagesize();
|
|
|
|
if (thrCount<=0)
|
|
thrCount=1;
|
|
|
|
if (thrCount > 256)
|
|
thrCount=256;
|
|
|
|
fConfig = Config::makeConfig();
|
|
fThreadCount=thrCount;
|
|
go();
|
|
}
|
|
|
|
void ioManager::buildOidFileName(const BRM::OID_t oid, char* file_name) {
|
|
|
|
if (fFileOp.getFileName(oid, file_name) != WriteEngine::NO_ERROR) {
|
|
file_name[0]=0;
|
|
throw std::runtime_error("fileOp.getFileName failed");
|
|
}
|
|
}
|
|
|
|
|
|
BRM::LBID_t ioManager::lbidLookup(BRM::LBID_t lbid,
|
|
BRM::VER_t verid,
|
|
bool vbFlag,
|
|
BRM::OID_t& oid,
|
|
uint32_t& offset)
|
|
{
|
|
int rc;
|
|
|
|
// do the extent map lookup, possibly looking aside into the VBBM
|
|
rc = fdbrm.lookup(lbid, verid, vbFlag, oid, offset);
|
|
|
|
return rc;
|
|
}
|
|
|
|
int ioManager::createReaders() {
|
|
int realCnt=0;
|
|
for (int idx=0; idx<fThreadCount; idx++){
|
|
int ret = pthread_create(&fThreadArr[realCnt], NULL, thr_popper, this);
|
|
if (ret!=0)
|
|
perror("createReaders::pthread_create");
|
|
else
|
|
realCnt++;
|
|
}
|
|
fThreadCount=realCnt;
|
|
return fThreadCount;
|
|
}
|
|
|
|
|
|
ioManager::~ioManager()
|
|
{
|
|
// cout << "~ioManager()"<<endl;
|
|
stop();
|
|
}
|
|
|
|
void ioManager::go(void) {
|
|
createReaders();
|
|
}
|
|
|
|
|
|
void ioManager::stop() {
|
|
for(int idx=0; idx <fThreadCount; idx++) {
|
|
pthread_detach(fThreadArr[idx]);
|
|
}
|
|
}
|
|
|
|
|
|
fileRequest* ioManager::getNextRequest() {
|
|
fileRequest* blk = NULL; /*fIOMRequestQueue.top();*/
|
|
try {
|
|
blk = fIOMRequestQueue.pop();
|
|
return blk;
|
|
} catch (exception& e) {
|
|
cerr << "ioManager::getNextRequest() ERROR " << endl;
|
|
}
|
|
|
|
return blk;
|
|
|
|
}
|
|
|
|
}
|
|
// vim:ts=4 sw=4:
|