/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // $Id: iomanager.cpp 724 2008-09-26 16:16:05Z jrodriguez $ // // C++ Implementation: iomanager // // Description: // // // Author: Jason Rodriguez // // // #define _FILE_OFFSET_BITS 64 #define _LARGEFILE64_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #define NDEBUG #include using namespace std; #include "configcpp.h" using namespace config; #include "iomanager.h" namespace { using namespace dbbc; using namespace std; void timespec_sub(const struct timespec& tv1, const struct timespec& tv2, double& tm) { tm = (double)(tv2.tv_sec - tv1.tv_sec) + 1.e-9 * (tv2.tv_nsec - tv1.tv_nsec); } struct IOMThreadArg { ioManager* iom; int32_t thdId; }; typedef IOMThreadArg IOMThreadArg_t; void* thr_popper(void* arg) { ioManager* iom = ((IOMThreadArg*)arg)->iom; int32_t iomThdId = ((IOMThreadArg*)arg)->thdId; FileBufferMgr* fbm; int totalRqst = 0; fileRequest* fr = 0; BRM::LBID_t lbid = 0; BRM::OID_t oid = 0; BRM::VER_t ver = 0; int blocksLoaded = 0; int blocksRead = 0; const unsigned pageSize = 4096; fbm = &iom->fileBufferManager(); char fileName[WriteEngine::FILE_NAME_SIZE]; const uint64_t fileBlockSize = BLOCK_SIZE; uint32_t offset = 0; bool flg = false; char* fileNamePtr = fileName; uint64_t longSeekOffset = 0; int err; uint32_t dlen = 0, acc, readSize, blocksThisRead, j; uint32_t blocksRequested = 0; ssize_t i; uint32_t sz = 0; char* alignedbuff = 0; boost::scoped_array realbuff; pthread_t threadId = 0; ostringstream iomLogFileName; ofstream lFile; threadId = pthread_self(); uint32_t readBufferSz = iom->blocksPerRead * BLOCK_SIZE + pageSize; realbuff.reset(new char[readBufferSz]); if (realbuff.get() == 0) { cerr << "thr_popper: Can't allocate space for a whole extent in memory" << endl; return 0; } #if __WORDSIZE > 32 // alignedbuff=(char*)((((ptrdiff_t)&realbuff[0] + pageSize - 1) / pageSize) * pageSize); // pagesize == (1 << 12) alignedbuff = (char*)((((ptrdiff_t)realbuff.get() >> 12) << 12) + pageSize); #else // alignedbuff=(char*)(((((ptrdiff_t)&realbuff[0] & 0xffffffff) + pageSize - 1) / pageSize) * pageSize); alignedbuff = (char*)(((((ptrdiff_t)realbuff.get() >> 12) << 12) & 0xffffffff) + pageSize); #endif idbassert(((ptrdiff_t)alignedbuff - (ptrdiff_t)realbuff.get()) < (ptrdiff_t)pageSize); idbassert(((ptrdiff_t)alignedbuff % pageSize) == 0); for (;;) { fr = iom->getNextRequest(); lbid = fr->Lbid(); ver = fr->Ver(); flg = fr->Flg(); blocksLoaded = 0; blocksRead = 0; dlen = fr->BlocksRequested(); blocksRequested = fr->BlocksRequested(); err = iom->lbidLookup(lbid, ver, flg, oid, offset); if (err < 0) { cerr << "lbid=" << lbid << " ver=" << ver << " flg=" << (flg ? 1 : 0) << endl; throw runtime_error("thr_popper: BRM lookup failure"); } longSeekOffset = (uint64_t)offset * (uint64_t)fileBlockSize; totalRqst++; sz = 0; uint32_t readCount = 0; uint32_t bytesRead = 0; uint32_t jend = blocksRequested / iom->blocksPerRead; for (j = 0; j <= jend; j++) { blocksThisRead = std::min(dlen, iom->blocksPerRead); readSize = blocksThisRead * BLOCK_SIZE; acc = 0; while (acc < readSize) { i = readSize; // pread(fd, &alignedbuff[acc], readSize - acc, longSeekOffset); /* XXXPAT: Need to decide how to handle errors here */ if (i < 0 && errno == EINTR) { continue; } else if (i < 0) { perror("thr_popper::read"); return 0; // shuts down this thread, // probably not the right thing to do } else if (i == 0) { try { } catch (exception& exc) { cerr << "FileName Err:" << exc.what() << endl; } cerr << "thr_popper: Early EOF in file " << fileNamePtr << endl; return 0; } acc += i; longSeekOffset += (uint64_t)i; readCount++; bytesRead += i; } // while(acc... blocksRead += blocksThisRead; for (i = 0; (unsigned)i < blocksThisRead; ++i) { if (fbm->insert((lbid + i) + (j * iom->blocksPerRead), ver, (uint8_t*)&alignedbuff[i * BLOCK_SIZE])) ++blocksLoaded; } dlen -= blocksThisRead; } // for (j... fr->BlocksRead(blocksRead); fr->BlocksLoaded(blocksLoaded); if (fr->data != 0 && blocksRequested == 1) memcpy(fr->data, alignedbuff, BLOCK_SIZE); pthread_mutex_lock(&fr->frMutex()); fr->SetPredicate(fileRequest::COMPLETE); pthread_cond_signal(&fr->frCond()); pthread_mutex_unlock(&fr->frMutex()); } // for(;;) lFile.close(); return 0; } // end thr_popper } // anonymous namespace namespace dbbc { ioManager::ioManager(FileBufferMgr& fbm, fileBlockRequestQueue& fbrq, int thrCount, int bsPerRead) : blocksPerRead(bsPerRead), fIOMfbMgr(fbm), fIOMRequestQueue(fbrq) { if (thrCount <= 0) thrCount = 1; if (thrCount > 256) thrCount = 256; fConfig = Config::makeConfig(); string val = fConfig->getConfig("DBBC", "IOMTracing"); int temp = 0; if (val.length() > 0) temp = static_cast(Config::fromText(val)); if (temp > 0) fIOTrace = true; else fIOTrace = false; fThreadCount = thrCount; go(); } // ioManager void ioManager::buildOidFileName(const BRM::OID_t oid, char* file_name) { if (fFileOp.getFileName(oid, file_name) != WriteEngine::NO_ERROR) { file_name[0] = 0; throw std::runtime_error("fileOp.getFileName failed"); } } BRM::LBID_t ioManager::lbidLookup(BRM::LBID_t lbid, BRM::VER_t verid, bool vbFlag, BRM::OID_t& oid, uint32_t& offset) { int rc = fdbrm.lookup(lbid, verid, vbFlag, oid, offset); return rc; } int ioManager::createReaders() { int realCnt = 0; IOMThreadArg_t fThdArgArr[256]; for (int idx = 0; idx < fThreadCount; idx++) { fThdArgArr[realCnt].iom = this; fThdArgArr[realCnt].thdId = realCnt; int ret = pthread_create(&fThreadArr[realCnt], 0, thr_popper, &fThdArgArr[realCnt]); if (ret != 0) perror("createReaders::pthread_create"); else realCnt++; } fThreadCount = realCnt; return fThreadCount; } ioManager::~ioManager() { stop(); } void ioManager::go(void) { createReaders(); } void ioManager::stop() { for (int idx = 0; idx < fThreadCount; idx++) { pthread_detach(fThreadArr[idx]); } } fileRequest* ioManager::getNextRequest() { fileRequest* blk = 0; try { blk = fIOMRequestQueue.pop(); return blk; } catch (exception& e) { cerr << "ioManager::getNextRequest() ERROR " << endl; } return blk; } } // namespace dbbc