You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-31 18:30:33 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			288 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			288 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
| //
 | |
| // C++ Implementation: iomanager
 | |
| //
 | |
| // Description: 
 | |
| //
 | |
| //
 | |
| // Author: Jason Rodriguez <jrodriguez@calpont.com>
 | |
| //
 | |
| // Copyright: Calpont, Inc, (C) 2007
 | |
| //
 | |
| //
 | |
| 
 | |
| #define _FILE_OFFSET_BITS 64
 | |
| #define _LARGEFILE64_SOURCE
 | |
| #include <stdexcept>
 | |
| #include <iostream>
 | |
| #include <unistd.h>
 | |
| #include <stdlib.h>
 | |
| #include <string>
 | |
| #include <sstream>
 | |
| #include <map>
 | |
| #include <sys/types.h>
 | |
| #include <sys/stat.h>
 | |
| #include <fcntl.h>
 | |
| #include <errno.h>
 | |
| #include <boost/scoped_array.hpp>
 | |
| 
 | |
| using namespace std;
 | |
| 
 | |
| #include "configcpp.h"
 | |
| using namespace config;
 | |
| 
 | |
| #include "iomanager.h"
 | |
| namespace {
 | |
| 
 | |
| using namespace dbbc;
 | |
| using namespace std;
 | |
| 
 | |
| unsigned pageSize; // = getpagesize();
 | |
| 
 | |
| // #define FDCACHE
 | |
| 
 | |
| /* structures shared across all iomanagers */
 | |
| #ifdef FDCACHE
 | |
| map<uint32_t, int> fdcache;
 | |
| pthread_mutex_t fdMutex = PTHREAD_MUTEX_INITIALIZER;
 | |
| #endif
 | |
| 
 | |
| void* thr_popper(void* arg) {
 | |
| 	ioManager* iom = (ioManager*)arg;
 | |
| 	FileBufferMgr* fbm;
 | |
| 	int totalRqst=0;
 | |
| 	fileRequest* fr=NULL;
 | |
| 	BRM::LBID_t lbid=0;
 | |
| 	BRM::OID_t oid=0;
 | |
| 	BRM::VER_t ver=0; 
 | |
| 	int blocksLoaded=0;
 | |
| 	const unsigned pageSize = getpagesize();
 | |
| 	fbm = &iom->fileBufferManager();
 | |
| 	char fileName[WriteEngine::FILE_NAME_SIZE];
 | |
| 	const uint64_t fileBlockSize = BLOCK_SIZE;
 | |
| 	uint32_t offset=0;
 | |
| 	bool flg=false;
 | |
| 	char* fileNamePtr=fileName;
 | |
| 	uint64_t longSeekOffset=0;
 | |
| 	int dlen=0, i, err;
 | |
| 	uint acc;
 | |
| 	uint32_t sz=0, readSize;
 | |
| 	char* alignedbuff=NULL;
 | |
| // 	char realbuff[(fileBlockSize)+pageSize] __attribute__((aligned)); //doesn't align enough, but is a start...
 | |
|  	boost::scoped_array<char> realbuff;	
 | |
| 	uint extentSize = iom->getExtentSize();
 | |
| #ifdef FDCACHE
 | |
| 	map<uint32_t, int>::iterator fdit;
 | |
| #endif
 | |
| 	int fd;
 | |
| 
 | |
| 	// the largest read
 | |
| 	realbuff.reset(new char[(extentSize * BLOCK_SIZE) + pageSize]);
 | |
| 
 | |
| 	for ( ; ; ) {
 | |
| 
 | |
| 		fr = iom->getNextRequest();
 | |
| 		lbid = fr->Lbid();
 | |
| 		ver = fr->Ver();
 | |
| 		flg = fr->Flg();
 | |
| 		blocksLoaded=0;
 | |
| 		
 | |
| 		err = iom->lbidLookup(lbid, ver, flg, oid, offset);
 | |
| 		if (err < 0) {
 | |
| 			cerr << "lbid=" << lbid << " ver=" << ver << " flg=" << (flg ? 1 : 0) << endl;
 | |
| 			throw runtime_error("thr_popper: BRM lookup failure");
 | |
| 		}
 | |
| 
 | |
| #ifdef FDCACHE
 | |
| 		pthread_mutex_lock(&fdMutex);
 | |
| 		fdit = fdcache.find(oid);
 | |
| 		if (fdit == fdcache.end()) {
 | |
| #endif
 | |
| 			try {
 | |
| 				iom->buildOidFileName(oid, fileNamePtr);
 | |
| 			} catch (exception& exc)
 | |
| 			{
 | |
| 				cerr << "FileName Err:" << exc.what() << endl;
 | |
| 			}
 | |
| 			fd = open(fileNamePtr, O_RDONLY|O_DIRECT|O_LARGEFILE|O_NOATIME);
 | |
| 			if (fd<0)
 | |
| 				throw runtime_error("Error opening file");
 | |
| #ifdef FDCACHE
 | |
| 			fdcache[oid] = fd;
 | |
| 		}
 | |
| 		else 
 | |
| 			fd = fdit->second;
 | |
| 		pthread_mutex_unlock(&fdMutex);
 | |
| #endif
 | |
| 
 | |
| 		longSeekOffset=(uint64_t)offset * (uint64_t)fileBlockSize;
 | |
| 		lseek64(fd, longSeekOffset, SEEK_SET);
 | |
| 		totalRqst++;
 | |
| 		dlen = (fr->BlocksRequested() > extentSize ? extentSize : fr->BlocksRequested());
 | |
| 		sz=0;
 | |
| 
 | |
| 		readSize = dlen * BLOCK_SIZE;
 | |
| 		if (realbuff == NULL) {
 | |
| 			cerr << "thr_popper: Can't allocate space for a whole extent in memory" << endl;
 | |
| 			return NULL;
 | |
| 		}
 | |
| 
 | |
| #if __WORDSIZE > 32
 | |
| 		alignedbuff=(char*)((((ptrdiff_t)&realbuff[0] + pageSize - 1) / pageSize) * pageSize);
 | |
| #else
 | |
| 		alignedbuff=(char*)(((((ptrdiff_t)&realbuff[0] & 0xffffffff) + pageSize - 1) / pageSize) * pageSize);
 | |
| #endif
 | |
| //			Uncomment these assertions if necessary...
 | |
| //  		assert(((ptrdiff_t)alignedbuff - (ptrdiff_t)realbuff) < (ptrdiff_t)pageSize);
 | |
| 
 | |
| 		//these 2 things need to be true for O_DIRECT read to work
 | |
| //  		assert((fileBlockSize % pageSize) == 0);
 | |
| //  		assert(((ptrdiff_t)alignedbuff % pageSize) == 0);
 | |
| 
 | |
|  		if (dlen > 1)
 | |
|  			cerr << "thrpopper: multiblock read of " << dlen << " readsize = " << 
 | |
| 				readSize << endl;
 | |
| 
 | |
| 		acc = 0;
 | |
| 		while (acc < readSize) {
 | |
| // 			cerr << "reading " << readSize-acc << "/" << readSize << endl;
 | |
| 			i = read(fd, &alignedbuff[acc], readSize - acc);
 | |
| 			/* XXXPAT: Need to decide how to handle errors here */
 | |
| 			if (i < 0 && errno == EINTR)
 | |
| 				continue;
 | |
| 			else if (i < 0) {
 | |
| 				perror("thr_popper::read");
 | |
| 				return NULL;			// shuts down this thread, 
 | |
| 								// probably not the right thing to do
 | |
| 			}
 | |
| 			else if (i == 0) {
 | |
| 				try {
 | |
| 					iom->buildOidFileName(oid, fileNamePtr);
 | |
| 				} catch (exception& exc)
 | |
| 				{
 | |
| 					cerr << "FileName Err:" << exc.what() << endl;
 | |
| 				}
 | |
| 				cerr << "thr_popper: Early EOF in file " << fileNamePtr << endl;
 | |
| 				return NULL;
 | |
| 			}
 | |
| 			acc += i;
 | |
| 		}
 | |
| 
 | |
| 		for (i = 0; i < dlen; ++i) {
 | |
| 			fbm->insert(FileBuffer(fr->Lbid() + i, fr->Ver(), 
 | |
| 				(uint8_t *) &alignedbuff[i*BLOCK_SIZE], BLOCK_SIZE));
 | |
| 			++blocksLoaded;
 | |
| 		}
 | |
| 
 | |
| 		if (fr->data != NULL && dlen == 1)
 | |
| 			memcpy(fr->data, alignedbuff, BLOCK_SIZE);
 | |
| 
 | |
| #ifndef FDCACHE
 | |
|  		close(fd);
 | |
| #endif
 | |
| 
 | |
| 		fr->RequestStatus(fileRequest::SUCCESSFUL);
 | |
| 
 | |
| 		pthread_mutex_lock(&fr->frMutex());
 | |
| 		fr->SetPredicate(fileRequest::COMPLETE);
 | |
| 		pthread_cond_signal(&fr->frCond());
 | |
| 		pthread_mutex_unlock(&fr->frMutex());
 | |
| 
 | |
| 	} // for(;;)
 | |
| 
 | |
| 	return NULL;
 | |
| } // end thr_popper
 | |
| 
 | |
| }
 | |
| 
 | |
| namespace dbbc {
 | |
| 
 | |
| ioManager::ioManager(FileBufferMgr& fbm,
 | |
| 					fileBlockRequestQueue& fbrq,
 | |
| 					int thrCount):
 | |
| 		fIOMfbMgr(fbm),
 | |
| 		fIOMRequestQueue(fbrq)
 | |
| {
 | |
| 	pageSize = getpagesize();
 | |
| 
 | |
| 	if (thrCount<=0)
 | |
| 		thrCount=1;
 | |
| 	
 | |
| 	if (thrCount > 256)
 | |
| 		thrCount=256;
 | |
| 	
 | |
| 	fConfig = Config::makeConfig();
 | |
| 	fThreadCount=thrCount;
 | |
| 	go();
 | |
| }
 | |
| 
 | |
| void ioManager::buildOidFileName(const BRM::OID_t oid, char* file_name) {
 | |
| 
 | |
| 	if (fFileOp.getFileName(oid, file_name) != WriteEngine::NO_ERROR) {
 | |
| 		file_name[0]=0;
 | |
| 		throw std::runtime_error("fileOp.getFileName failed");
 | |
| 	}
 | |
| }
 | |
| 
 | |
| 
 | |
| BRM::LBID_t ioManager::lbidLookup(BRM::LBID_t lbid,
 | |
| 						   BRM::VER_t verid,
 | |
| 						   bool vbFlag,
 | |
| 						   BRM::OID_t& oid,
 | |
| 							uint32_t& offset)
 | |
| {
 | |
|  int rc;
 | |
| 
 | |
|  // do the extent map lookup, possibly looking aside into the VBBM
 | |
|  rc =  fdbrm.lookup(lbid, verid, vbFlag, oid, offset);
 | |
| 
 | |
|  return rc;
 | |
| }
 | |
| 
 | |
| int ioManager::createReaders() {
 | |
| 	int realCnt=0;
 | |
| 	for (int idx=0; idx<fThreadCount; idx++){
 | |
| 		int ret = pthread_create(&fThreadArr[realCnt], NULL, thr_popper, this);
 | |
|  		if (ret!=0)
 | |
|  			perror("createReaders::pthread_create");
 | |
| 		else
 | |
| 			realCnt++;
 | |
| 	}
 | |
| 	fThreadCount=realCnt;
 | |
| 	return fThreadCount;
 | |
| }
 | |
| 
 | |
| 
 | |
| ioManager::~ioManager()
 | |
| {
 | |
| // 	cout << "~ioManager()"<<endl;
 | |
| 	stop();
 | |
| }
 | |
| 
 | |
| void ioManager::go(void) {
 | |
| 	createReaders();
 | |
| }
 | |
| 
 | |
| 
 | |
| void ioManager::stop() {
 | |
| 	for(int idx=0; idx <fThreadCount; idx++) {
 | |
| 		pthread_detach(fThreadArr[idx]);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| 
 | |
| fileRequest* ioManager::getNextRequest() {
 | |
| 	fileRequest* blk = NULL; /*fIOMRequestQueue.top();*/
 | |
| 	try {
 | |
| 		blk = fIOMRequestQueue.pop();
 | |
| 		return blk;
 | |
| 	} catch (exception& e) {
 | |
| 		cerr << "ioManager::getNextRequest() ERROR " << endl;
 | |
| 	}
 | |
| 
 | |
| 	return blk;
 | |
| 	
 | |
| }
 | |
| 
 | |
| }
 | |
| // vim:ts=4 sw=4:
 |