1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-26 11:48:52 +03:00
2020-11-17 15:03:10 +03:00

527 lines
16 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: we_dbfileop.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
/** @file */
#include <unistd.h>
#include <stdio.h>
#include <cstring>
using namespace std;
#include "we_chunkmanager.h"
#include "we_dbfileop.h"
#include "we_stats.h"
#include "IDBDataFile.h"
using namespace idbdatafile;
using namespace BRM;
namespace WriteEngine
{
/**
* Constructor
*/
DbFileOp::DbFileOp() : m_chunkManager(NULL)
{}
/**
* Default Destructor
*/
DbFileOp::~DbFileOp()
{}
/***********************************************************
* DESCRIPTION:
* flush the cache
* PARAMETERS:
* none
* RETURN:
* NO_ERROR if success, otherwise if fail
***********************************************************/
int DbFileOp::flushCache()
{
BlockBuffer* curBuf;
if ( !Cache::getUseCache() )
return NO_ERROR;
for ( CacheMapIt it = Cache::m_writeList->begin();
it != Cache::m_writeList->end(); it++ )
{
curBuf = it->second;
RETURN_ON_ERROR( writeDBFile( (*curBuf).cb.file.pFile,
(*curBuf).block.data,
(*curBuf).block.lbid ) );
}
RETURN_ON_ERROR( Cache::flushCache() );
return NO_ERROR;
}
/***********************************************************
* DESCRIPTION:
* get an entry within a sub block
* NOTE: the difference with readSubBlockEntry is that
* getSubBlockEntry only works for buffer while
* readSubBlockEntry works for file and block
* PARAMETERS:
* blockBuf - the block buffer
* sbid - sub block id
* entryNo - entry no within sub block
* width - width in bytes
* pStruct - sturcture pointer
* RETURN:
* none
***********************************************************/
void DbFileOp::getSubBlockEntry( unsigned char* blockBuf,
const int sbid, const int entryNo,
const int width, void* pStruct )
{
unsigned char* pBlock;
pBlock = blockBuf + BYTE_PER_SUBBLOCK * sbid + entryNo * MAX_COLUMN_BOUNDARY;
memcpy( pStruct, pBlock, width );
}
/***********************************************************
* DESCRIPTION:
* Read a block from a file at specified location
* PARAMETERS:
* pFile - file handle
* readBuf - read buffer
* fbo - file block offset
* RETURN:
* NO_ERROR if success
* other number if something wrong
***********************************************************/
int DbFileOp::readDBFile( IDBDataFile* pFile,
unsigned char* readBuf,
const uint64_t lbid,
const bool isFbo )
{
long long fboOffset = 0;
if ( !isFbo )
{
RETURN_ON_ERROR( setFileOffsetBlock( pFile, lbid ) );
}
else
{
fboOffset = (lbid) * (long)BYTE_PER_BLOCK;
RETURN_ON_ERROR( setFileOffset( pFile, fboOffset ) );
}
return readFile( pFile, readBuf, BYTE_PER_BLOCK );
}
int DbFileOp::readDBFile( IDBDataFile* pFile,
DataBlock* block,
const uint64_t lbid,
const bool isFbo )
{
block->dirty = false;
block->no = lbid;
Stats::incIoBlockRead();
return readDBFile( pFile, block->data, lbid, isFbo );
}
int DbFileOp::readDBFile( CommBlock& cb,
unsigned char* readBuf,
const uint64_t lbid )
{
CacheKey key;
if ( Cache::getUseCache() )
{
if ( Cache::cacheKeyExist( cb.file.oid, lbid ) )
{
key = Cache::getCacheKey( cb.file.oid, lbid );
RETURN_ON_ERROR( Cache::loadCacheBlock( key, readBuf ) );
return NO_ERROR;
}
}
RETURN_ON_ERROR( readDBFile( cb.file.pFile, readBuf, lbid ) );
if ( Cache::getUseCache() )
{
int fbo = lbid;
uint16_t dbRoot;
uint32_t partition;
uint16_t segment;
RETURN_ON_ERROR( BRMWrapper::getInstance()->getFboOffset(
lbid, dbRoot, partition, segment, fbo ) );
if ( Cache::getListSize( FREE_LIST ) == 0 )
{
if ( isDebug( DEBUG_1 ) )
{
printf( "\nBefore flushing cache " );
Cache::printCacheList();
}
// flush cache to give up more space
RETURN_ON_ERROR( flushCache() );
if ( isDebug( DEBUG_1 ) )
{
printf( "\nAfter flushing cache " );
Cache::printCacheList();
}
}
RETURN_ON_ERROR( Cache::insertLRUList( cb, lbid, fbo, readBuf ) );
}
return NO_ERROR;
}
/***********************************************************
* DESCRIPTION: No change, old signature 10/17/06
* Read an entry within a sub block from a file
* NOTE: the difference with getSubBlockEntry is that
* getSubBlockEntry only works for buffer while
* readSubBlockEntry works for file and block
* PARAMETERS:
* pFile - file handler
* block - the block structure
* fbo - file block offset
* sbid - sub block id
* entryNo - entry no within sub block
* width - width in bytes
* pStruct - sturcture pointer
* RETURN:
* NO_ERROR if success
* other number if something wrong
***********************************************************/
int DbFileOp::readSubBlockEntry( IDBDataFile* pFile, DataBlock* block,
const uint64_t lbid, const int sbid,
const int entryNo, const int width,
void* pStruct )
{
RETURN_ON_ERROR( readDBFile( pFile, block->data, lbid ) );
getSubBlockEntry( block->data, sbid, entryNo, width, pStruct );
return NO_ERROR;
}
int DbFileOp::readSubBlockEntry( CommBlock& cb, DataBlock* block,
const uint64_t lbid, const int sbid,
const int entryNo, const int width,
void* pStruct )
{
RETURN_ON_ERROR( readDBFile( cb, block->data, lbid ) );
getSubBlockEntry( block->data, sbid, entryNo, width, pStruct );
return NO_ERROR;
}
/***********************************************************
* DESCRIPTION:
* Set an entry within a sub block
* NOTE: the difference with writeSubBlockEntry is that
* setSubBlockEntry only works for buffer while
* writeSubBlockEntry works for file and block
* PARAMETERS:
* blockBuf - the block buffer
* sbid - sub block id
* entryNo - entry no within sub block
* width - width in bytes
* pStruct - sturcture pointer
* RETURN:
* none
***********************************************************/
void DbFileOp::setSubBlockEntry( unsigned char* blockBuf, const int sbid,
const int entryNo, const int width,
const void* pStruct )
{
unsigned char* pBlock;
pBlock = blockBuf + BYTE_PER_SUBBLOCK * sbid + entryNo * MAX_COLUMN_BOUNDARY;
memcpy( pBlock, pStruct, width );
}
/***********************************************************
* DESCRIPTION:
* Write a number of blocks to the file at specified location
* PARAMETERS:
* pFile - file handle
* writeBuf - write buffer
* fbo - file block offset
* numOfBlock - total number of file block offset
* RETURN:
* NO_ERROR if success
* other number if something wrong
***********************************************************/
int DbFileOp::writeDBFile( CommBlock& cb, const unsigned char* writeBuf,
const uint64_t lbid, const int numOfBlock )
{
CacheKey key;
int ret;
if ( Cache::getUseCache() )
{
if ( Cache::cacheKeyExist( cb.file.oid, lbid ) )
{
key = Cache::getCacheKey( cb.file.oid, lbid );
RETURN_ON_ERROR( Cache::modifyCacheBlock( key, writeBuf ) );
return NO_ERROR;
}
}
if (BRMWrapper::getUseVb())
{
RETURN_ON_ERROR( writeVB( cb.file.pFile, cb.file.oid, lbid ) );
}
ret = writeDBFile( cb.file.pFile, writeBuf, lbid, numOfBlock );
if (BRMWrapper::getUseVb())
{
LBIDRange_v ranges;
LBIDRange range;
range.start = lbid;
range.size = 1;
ranges.push_back(range);
BRMWrapper::getInstance()->writeVBEnd(getTransId(), ranges);
}
return ret;
}
int DbFileOp::writeDBFileNoVBCache(CommBlock& cb,
const unsigned char* writeBuf,
const int fbo,
const int numOfBlock)
{
return writeDBFileNoVBCache( cb.file.pFile, writeBuf, fbo, numOfBlock );
}
/***********************************************************
* DESCRIPTION:
* Core function for writing data w/o using VB cache
* (bulk load dictionary store inserts)
***********************************************************/
int DbFileOp::writeDBFileNoVBCache( IDBDataFile* pFile,
const unsigned char* writeBuf,
const int fbo,
const int numOfBlock )
{
#ifdef PROFILE
// This function is only used by bulk load for dictionary store files,
// so we log as such.
Stats::startParseEvent(WE_STATS_WRITE_DCT);
#endif
for ( int i = 0; i < numOfBlock; i++ )
{
Stats::incIoBlockWrite();
RETURN_ON_ERROR( writeFile( pFile, writeBuf, BYTE_PER_BLOCK ) );
}
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_WRITE_DCT);
#endif
return NO_ERROR;
}
/***********************************************************
* DESCRIPTION:
* Core function for writing data using VB cache
***********************************************************/
int DbFileOp::writeDBFile( IDBDataFile* pFile, const unsigned char* writeBuf,
const uint64_t lbid, const int numOfBlock )
{
RETURN_ON_ERROR( setFileOffsetBlock( pFile, lbid ) );
for ( int i = 0; i < numOfBlock; i++ )
{
Stats::incIoBlockWrite();
RETURN_ON_ERROR( writeFile( pFile, writeBuf, BYTE_PER_BLOCK ) );
}
return NO_ERROR;
}
// just don't have a good solution to consolidate with above functions
// Note: This is used with absolute FBO, no lbid involved
int DbFileOp::writeDBFileFbo(IDBDataFile* pFile, const unsigned char* writeBuf,
const uint64_t fbo, const int numOfBlock )
{
long long fboOffset = 0;
fboOffset = (fbo) * (long)BYTE_PER_BLOCK;
RETURN_ON_ERROR( setFileOffset( pFile, fboOffset ) );
for ( int i = 0; i < numOfBlock; i++ )
{
Stats::incIoBlockWrite();
RETURN_ON_ERROR( writeFile( pFile, writeBuf, BYTE_PER_BLOCK ) );
}
return NO_ERROR;
}
/***********************************************************
* DESCRIPTION:
* Write an entry within a sub block to a file
* NOTE: the difference with getSubBlockEntry is that
* setSubBlockEntry only works for buffer while
* writeSubBlockEntry works for file and block
* PARAMETERS:
* pFile - file handler
* block - the block structure
* fbo - file block offset
* sbid - sub block id
* entryNo - entry no within sub block
* width - width in bytes
* pStruct - sturcture pointer
* RETURN:
* NO_ERROR if success
* other number if something wrong
***********************************************************/
int DbFileOp::writeSubBlockEntry( IDBDataFile* pFile, DataBlock* block,
const uint64_t lbid, const int sbid,
const int entryNo, const int width,
void* pStruct )
{
setSubBlockEntry( block->data, sbid, entryNo, width, pStruct );
block->dirty = false;
return writeDBFile( pFile, block->data, lbid );
}
int DbFileOp::writeSubBlockEntry( CommBlock& cb, DataBlock* block,
const uint64_t lbid, const int sbid,
const int entryNo, const int width,
void* pStruct )
{
setSubBlockEntry( block->data, sbid, entryNo, width, pStruct );
block->dirty = false;
return writeDBFile( cb, block->data, lbid );
}
/***********************************************************
* DESCRIPTION:
* Write to version buffer
* PARAMETERS:
* oid - file oid
* lbid - lbid
* RETURN:
* NO_ERROR if success
* other number if something wrong
***********************************************************/
int DbFileOp::writeVB( IDBDataFile* pFile, const OID oid, const uint64_t lbid )
{
if ( !BRMWrapper::getUseVb() )
return NO_ERROR;
int rc;
const TxnID transId = getTransId();
if (transId != ((TxnID)INVALID_NUM))
{
rc = BRMWrapper::getInstance()->writeVB( pFile,
(VER_t)transId,
oid, lbid, this );
//@Bug 4671. The error is already logged by worker node.
/* if (rc != NO_ERROR)
{
char msg[2048];
snprintf(msg, 2048,
"we_dbfileop->BRMWrapper::getInstance()->writeVB "
"transId %i oid %i lbid "
#if __LP64__
"%lu"
#else
"%llu"
#endif
" Error Code %i", transId, oid, lbid, rc);
puts(msg);
{
logging::MessageLog ml(logging::LoggingID(19));
logging::Message m;
logging::Message::Args args;
args.add(msg);
m.format(args);
ml.logCriticalMessage(m);
}
return rc;
} */
return rc;
}
return NO_ERROR;
}
int DbFileOp::readDbBlocks(IDBDataFile* pFile,
unsigned char* readBuf,
uint64_t fbo,
size_t n)
{
if (m_chunkManager)
{
return m_chunkManager->readBlocks(pFile, readBuf, fbo, n);
}
if (setFileOffset(pFile, fbo * BYTE_PER_BLOCK, SEEK_SET) != NO_ERROR)
return -1;
return pFile->read(readBuf, BYTE_PER_BLOCK * n) / BYTE_PER_BLOCK;
}
int DbFileOp::restoreBlock(IDBDataFile* pFile, const unsigned char* writeBuf, uint64_t fbo)
{
if (m_chunkManager)
return m_chunkManager->restoreBlock(pFile, writeBuf, fbo);
if (setFileOffset(pFile, fbo * BYTE_PER_BLOCK, SEEK_SET) != NO_ERROR)
return -1;
return pFile->write(writeBuf, BYTE_PER_BLOCK);
}
// @bug 5572 - HDFS usage: add *.tmp file backup flag
IDBDataFile* DbFileOp::getFilePtr(const Column& column, bool useTmpSuffix)
{
string filename;
return m_chunkManager->getFilePtr(column,
column.dataFile.fDbRoot,
column.dataFile.fPartition,
column.dataFile.fSegment,
filename,
"r+b",
column.colWidth,
useTmpSuffix);
}
} //end of namespace