// // C++ Implementation: iomanager // // Description: // // // Author: Jason Rodriguez // // Copyright: Calpont, Inc, (C) 2007 // // #define _FILE_OFFSET_BITS 64 #define _LARGEFILE64_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include using namespace std; #include "configcpp.h" using namespace config; #include "iomanager.h" namespace { using namespace dbbc; using namespace std; unsigned pageSize; // = getpagesize(); // #define FDCACHE /* structures shared across all iomanagers */ #ifdef FDCACHE map fdcache; pthread_mutex_t fdMutex = PTHREAD_MUTEX_INITIALIZER; #endif void* thr_popper(void* arg) { ioManager* iom = (ioManager*)arg; FileBufferMgr* fbm; int totalRqst=0; fileRequest* fr=NULL; BRM::LBID_t lbid=0; BRM::OID_t oid=0; BRM::VER_t ver=0; int blocksLoaded=0; const unsigned pageSize = getpagesize(); fbm = &iom->fileBufferManager(); char fileName[WriteEngine::FILE_NAME_SIZE]; const uint64_t fileBlockSize = BLOCK_SIZE; uint32_t offset=0; bool flg=false; char* fileNamePtr=fileName; uint64_t longSeekOffset=0; int dlen=0, i, err; uint acc; uint32_t sz=0, readSize; char* alignedbuff=NULL; // char realbuff[(fileBlockSize)+pageSize] __attribute__((aligned)); //doesn't align enough, but is a start... boost::scoped_array realbuff; uint extentSize = iom->getExtentSize(); #ifdef FDCACHE map::iterator fdit; #endif int fd; // the largest read realbuff.reset(new char[(extentSize * BLOCK_SIZE) + pageSize]); for ( ; ; ) { fr = iom->getNextRequest(); lbid = fr->Lbid(); ver = fr->Ver(); flg = fr->Flg(); blocksLoaded=0; err = iom->lbidLookup(lbid, ver, flg, oid, offset); if (err < 0) { cerr << "lbid=" << lbid << " ver=" << ver << " flg=" << (flg ? 1 : 0) << endl; throw runtime_error("thr_popper: BRM lookup failure"); } #ifdef FDCACHE pthread_mutex_lock(&fdMutex); fdit = fdcache.find(oid); if (fdit == fdcache.end()) { #endif try { iom->buildOidFileName(oid, fileNamePtr); } catch (exception& exc) { cerr << "FileName Err:" << exc.what() << endl; } fd = open(fileNamePtr, O_RDONLY|O_DIRECT|O_LARGEFILE|O_NOATIME); if (fd<0) throw runtime_error("Error opening file"); #ifdef FDCACHE fdcache[oid] = fd; } else fd = fdit->second; pthread_mutex_unlock(&fdMutex); #endif longSeekOffset=(uint64_t)offset * (uint64_t)fileBlockSize; lseek64(fd, longSeekOffset, SEEK_SET); totalRqst++; dlen = (fr->BlocksRequested() > extentSize ? extentSize : fr->BlocksRequested()); sz=0; readSize = dlen * BLOCK_SIZE; if (realbuff == NULL) { cerr << "thr_popper: Can't allocate space for a whole extent in memory" << endl; return NULL; } #if __WORDSIZE > 32 alignedbuff=(char*)((((ptrdiff_t)&realbuff[0] + pageSize - 1) / pageSize) * pageSize); #else alignedbuff=(char*)(((((ptrdiff_t)&realbuff[0] & 0xffffffff) + pageSize - 1) / pageSize) * pageSize); #endif // Uncomment these assertions if necessary... // assert(((ptrdiff_t)alignedbuff - (ptrdiff_t)realbuff) < (ptrdiff_t)pageSize); //these 2 things need to be true for O_DIRECT read to work // assert((fileBlockSize % pageSize) == 0); // assert(((ptrdiff_t)alignedbuff % pageSize) == 0); if (dlen > 1) cerr << "thrpopper: multiblock read of " << dlen << " readsize = " << readSize << endl; acc = 0; while (acc < readSize) { // cerr << "reading " << readSize-acc << "/" << readSize << endl; i = read(fd, &alignedbuff[acc], readSize - acc); /* XXXPAT: Need to decide how to handle errors here */ if (i < 0 && errno == EINTR) continue; else if (i < 0) { perror("thr_popper::read"); return NULL; // shuts down this thread, // probably not the right thing to do } else if (i == 0) { try { iom->buildOidFileName(oid, fileNamePtr); } catch (exception& exc) { cerr << "FileName Err:" << exc.what() << endl; } cerr << "thr_popper: Early EOF in file " << fileNamePtr << endl; return NULL; } acc += i; } for (i = 0; i < dlen; ++i) { fbm->insert(FileBuffer(fr->Lbid() + i, fr->Ver(), (uint8_t *) &alignedbuff[i*BLOCK_SIZE], BLOCK_SIZE)); ++blocksLoaded; } if (fr->data != NULL && dlen == 1) memcpy(fr->data, alignedbuff, BLOCK_SIZE); #ifndef FDCACHE close(fd); #endif fr->RequestStatus(fileRequest::SUCCESSFUL); pthread_mutex_lock(&fr->frMutex()); fr->SetPredicate(fileRequest::COMPLETE); pthread_cond_signal(&fr->frCond()); pthread_mutex_unlock(&fr->frMutex()); } // for(;;) return NULL; } // end thr_popper } namespace dbbc { ioManager::ioManager(FileBufferMgr& fbm, fileBlockRequestQueue& fbrq, int thrCount): fIOMfbMgr(fbm), fIOMRequestQueue(fbrq) { pageSize = getpagesize(); if (thrCount<=0) thrCount=1; if (thrCount > 256) thrCount=256; fConfig = Config::makeConfig(); fThreadCount=thrCount; go(); } void ioManager::buildOidFileName(const BRM::OID_t oid, char* file_name) { if (fFileOp.getFileName(oid, file_name) != WriteEngine::NO_ERROR) { file_name[0]=0; throw std::runtime_error("fileOp.getFileName failed"); } } BRM::LBID_t ioManager::lbidLookup(BRM::LBID_t lbid, BRM::VER_t verid, bool vbFlag, BRM::OID_t& oid, uint32_t& offset) { int rc; // do the extent map lookup, possibly looking aside into the VBBM rc = fdbrm.lookup(lbid, verid, vbFlag, oid, offset); return rc; } int ioManager::createReaders() { int realCnt=0; for (int idx=0; idx