1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-4566: Add rebuildEM tool support to work with compressed files.

* This patch adds rebuildEM tool support to work with compressed files.
* This patch increases a version of the file header.

Note: Default version of the `rebuildEM` tool was using very old API,
those functions are not present currently. So `rebuildEM` will not work with
files created without compression, because we cannot deduce some info which are
needed to create column extent.
This commit is contained in:
Denis Khalikov
2021-03-10 17:23:13 +03:00
parent 2eec956977
commit 5d497e8821
25 changed files with 1560 additions and 406 deletions

View File

@ -290,6 +290,25 @@ IDBDataFile* ChunkManager::getFilePtr(const FID& fid,
return (fileData ? fileData->fFilePtr : NULL);
}
//------------------------------------------------------------------------------
// Get/Return IDBDataFile* for specified OID, root, partition, and segment.
// Function is to be used to open column/dict segment file.
// If the IDBDataFile* is not found, then a segment file will be opened using
// the mode (mode) and I/O buffer size (size) that is given. Name of the
// resulting file is returned in filename.
//------------------------------------------------------------------------------
IDBDataFile* ChunkManager::getSegmentFilePtr(
FID& fid, uint16_t root, uint32_t partition, uint16_t segment,
execplan::CalpontSystemCatalog::ColDataType colDataType, uint32_t colWidth,
std::string& filename, const char* mode, int32_t size, bool useTmpSuffix,
bool isDict) const
{
CompFileData* fileData =
getFileData(fid, root, partition, segment, filename, mode, size,
colDataType, colWidth, useTmpSuffix, isDict);
return (fileData ? fileData->fFilePtr : NULL);
}
//------------------------------------------------------------------------------
// Get/Return CompFileData* for specified column OID, root, partition, and
// segment. If the IDBDataFile* is not found, then a segment file will be opened
@ -411,7 +430,8 @@ IDBDataFile* ChunkManager::createDctnryFile(const FID& fid,
uint16_t segment,
const char* filename,
const char* mode,
int size)
int size,
BRM::LBID_t lbid)
{
FileID fileID(fid, root, partition, segment);
CompFileData* fileData = new CompFileData(fileID, fid, CalpontSystemCatalog::VARCHAR, width);
@ -440,8 +460,13 @@ IDBDataFile* ChunkManager::createDctnryFile(const FID& fid,
fileData->fFileHeader.fLongPtrSectData.reset(fileData->fFileHeader.fPtrSection);
}
fCompressor.initHdr(fileData->fFileHeader.fControlData, fileData->fFileHeader.fPtrSection,
// Dictionary store extent width == 0. See more details in function
// `createDictStoreExtent`.
fCompressor.initHdr(fileData->fFileHeader.fControlData,
fileData->fFileHeader.fPtrSection,
/*colWidth=*/0, fileData->fColDataType,
fFileOp->compressionType(), hdrSize);
fCompressor.setLBID0(fileData->fFileHeader.fControlData, lbid);
if (writeHeader(fileData, __LINE__) != NO_ERROR)
{
@ -1376,7 +1401,8 @@ int ChunkManager::expandAbbrevColumnExtent(IDBDataFile* pFile, const uint8_t* em
// Increment the block count stored in the chunk header used to track how many
// blocks are allocated to the corresponding segment file.
//------------------------------------------------------------------------------
int ChunkManager::updateColumnExtent(IDBDataFile* pFile, int addBlockCount)
// same here as for dict.
int ChunkManager::updateColumnExtent(IDBDataFile* pFile, int addBlockCount, int64_t lbid)
{
map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.find(pFile);
@ -1397,6 +1423,7 @@ int ChunkManager::updateColumnExtent(IDBDataFile* pFile, int addBlockCount)
int rc = NO_ERROR;
char* hdr = pFileData->fFileHeader.fControlData;
fCompressor.setBlockCount(hdr, fCompressor.getBlockCount(hdr) + addBlockCount);
fCompressor.setLBID1(hdr, lbid);
ChunkData* chunkData = (pFileData)->findChunk(0);
if (chunkData != NULL)
@ -1428,7 +1455,8 @@ int ChunkManager::updateColumnExtent(IDBDataFile* pFile, int addBlockCount)
// Increment the block count stored in the chunk header used to track how many
// blocks are allocated to the corresponding segment file.
//------------------------------------------------------------------------------
int ChunkManager::updateDctnryExtent(IDBDataFile* pFile, int addBlockCount)
int ChunkManager::updateDctnryExtent(IDBDataFile* pFile, int addBlockCount,
BRM::LBID_t lbid)
{
map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.find(pFile);
@ -1485,6 +1513,8 @@ int ChunkManager::updateDctnryExtent(IDBDataFile* pFile, int addBlockCount)
if (rc == NO_ERROR)
fCompressor.setBlockCount(hdr, fCompressor.getBlockCount(hdr) + addBlockCount);
if (currentBlockCount)
fCompressor.setLBID1(hdr, lbid);
return rc;
}

View File

@ -189,6 +189,14 @@ public:
int size,
bool useTmpSuffix) const;
// @brief Retrieve a file pointer in the chunk manager.
// for column/dict segment file
IDBDataFile* getSegmentFilePtr(
FID& fid, uint16_t root, uint32_t partition, uint16_t segment,
execplan::CalpontSystemCatalog::ColDataType colDataType,
uint32_t colWidth, std::string& filename, const char* mode,
int32_t size, bool useTmpSuffix, bool isDict) const;
// @brief Create a compressed dictionary file with an appropriate header.
IDBDataFile* createDctnryFile(const FID& fid,
int64_t width,
@ -197,7 +205,8 @@ public:
uint16_t segment,
const char* filename,
const char* mode,
int size);
int size,
int64_t lbid);
// @brief Read a block from pFile at offset fbo.
// The data may copied from memory if the chunk it belongs to is already available.
@ -217,10 +226,12 @@ public:
int expandAbbrevColumnExtent(IDBDataFile* pFile, const uint8_t* emptyVal, int width);
// @brief Update column extent
int updateColumnExtent(IDBDataFile* pFile, int addBlockCount);
int updateColumnExtent(IDBDataFile* pFile, int addBlockCount,
int64_t lbid);
// @brief Update dictionary extent
int updateDctnryExtent(IDBDataFile* pFile, int addBlockCount);
int updateDctnryExtent(IDBDataFile* pFile, int addBlockCount,
int64_t lbid);
// @brief Read in n continuous blocks to read buffer.
// for backing up blocks to version buffer

View File

@ -163,7 +163,8 @@ int FileOp::createDir( const char* dirName, mode_t mode ) const
int FileOp::createFile( const char* fileName, int numOfBlock,
const uint8_t* emptyVal, int width,
execplan::CalpontSystemCatalog::ColDataType colDataType,
uint16_t dbRoot )
uint16_t dbRoot,
BRM::LBID_t startLbid )
{
IDBDataFile* pFile =
IDBDataFile::open(
@ -185,6 +186,7 @@ int FileOp::createFile( const char* fileName, int numOfBlock,
numOfBlock,
emptyVal,
width,
startLbid,
colDataType );
}
else
@ -284,7 +286,8 @@ int FileOp::createFile(FID fid,
//timer.stop( "allocateColExtent" );
return createFile( fileName, totalSize, emptyVal, width, colDataType, dbRoot );
return createFile(fileName, totalSize, emptyVal, width, colDataType,
dbRoot, startLbid);
}
/***********************************************************
@ -815,6 +818,7 @@ int FileOp::extendFile(
{
IDBCompressInterface compressor;
compressor.initHdr(hdrs, width, colDataType, m_compressionType);
compressor.setLBID0(hdrs, startLbid);
}
}
@ -849,7 +853,8 @@ int FileOp::extendFile(
newFile, // new or existing file
false, // don't expand; new extent
false, // add full (not abbreviated) extent
true); // try to optimize extent creation
true, // try to optimize extent creation
startLbid );
return rc;
}
@ -973,6 +978,7 @@ int FileOp::addExtentExactFile(
{
IDBCompressInterface compressor;
compressor.initHdr(hdrs, width, colDataType, m_compressionType);
compressor.setLBID0(hdrs, startLbid);
}
}
@ -1007,7 +1013,8 @@ int FileOp::addExtentExactFile(
colDataType,
newFile, // new or existing file
false, // don't expand; new extent
false ); // add full (not abbreviated) extent
false, // add full (not abbreviated) extent
startLbid );
closeFile( pFile );
return rc;
@ -1052,13 +1059,15 @@ int FileOp::initColumnExtent(
bool bNewFile,
bool bExpandExtent,
bool bAbbrevExtent,
bool bOptExtension)
bool bOptExtension,
int64_t lbid)
{
if ((bNewFile) && (m_compressionType))
{
char hdrs[IDBCompressInterface::HDR_BUF_LEN * 2];
IDBCompressInterface compressor;
compressor.initHdr(hdrs, width, colDataType, m_compressionType);
compressor.setLBID0(hdrs, lbid);
if (bAbbrevExtent)
compressor.setBlockCount(hdrs, nBlocks);
@ -1072,7 +1081,7 @@ int FileOp::initColumnExtent(
//@Bug 3219. update the compression header after the extent is expanded.
if ((!bNewFile) && (m_compressionType) && (bExpandExtent))
{
updateColumnExtent(pFile, nBlocks);
updateColumnExtent(pFile, nBlocks, lbid);
}
// @bug 2378. Synchronize here to avoid write buffer pile up too much,
@ -1188,7 +1197,7 @@ int FileOp::initColumnExtent(
//@Bug 3219. update the compression header after the extent is expanded.
if ((!bNewFile) && (m_compressionType) && (bExpandExtent))
{
updateColumnExtent(pFile, nBlocks);
updateColumnExtent(pFile, nBlocks, lbid);
}
// @bug 2378. Synchronize here to avoid write buffer pile up too much,
@ -1229,6 +1238,7 @@ int FileOp::initAbbrevCompColumnExtent(
int nBlocks,
const uint8_t* emptyVal,
int width,
BRM::LBID_t startLBID,
execplan::CalpontSystemCatalog::ColDataType colDataType)
{
// Reserve disk space for optimized abbreviated extent
@ -1241,7 +1251,8 @@ int FileOp::initAbbrevCompColumnExtent(
true, // new file
false, // don't expand; add new extent
true, // add abbreviated extent
true); // optimize the initial extent
true, // optimize the initial extent
startLBID);
if (rc != NO_ERROR)
{
return rc;
@ -1257,6 +1268,7 @@ int FileOp::initAbbrevCompColumnExtent(
INITIAL_EXTENT_ROWS_TO_DISK,
emptyVal,
width,
startLBID,
colDataType,
hdrs );
@ -1292,6 +1304,7 @@ int FileOp::writeInitialCompColumnChunk(
int nRows,
const uint8_t* emptyVal,
int width,
BRM::LBID_t startLBID,
execplan::CalpontSystemCatalog::ColDataType colDataType,
char* hdrs)
{
@ -1336,6 +1349,7 @@ int FileOp::writeInitialCompColumnChunk(
compressor.initHdr(hdrs, width, colDataType, m_compressionType);
compressor.setBlockCount(hdrs, nBlocksAllocated);
compressor.setLBID0(hdrs, startLBID);
// Store compression pointers in the header
std::vector<uint64_t> ptrs;
@ -1841,13 +1855,14 @@ int FileOp::initDctnryExtent(
unsigned char* blockHdrInit,
int blockHdrInitSize,
bool bExpandExtent,
bool bOptExtension )
bool bOptExtension,
int64_t lbid)
{
// @bug5769 Don't initialize extents or truncate db files on HDFS
if (idbdatafile::IDBPolicy::useHdfs())
{
if (m_compressionType)
updateDctnryExtent(pFile, nBlocks);
updateDctnryExtent(pFile, nBlocks, lbid);
// Synchronize to avoid write buffer pile up too much, which could cause
// controllernode to timeout later when it needs to save a snapshot.
@ -1972,7 +1987,7 @@ int FileOp::initDctnryExtent(
// MCOL-498 CS has to set a number of blocs in the chunk header
if ( m_compressionType )
{
updateDctnryExtent(pFile, nBlocks);
updateDctnryExtent(pFile, nBlocks, lbid);
}
pFile->flush();
}
@ -2897,12 +2912,12 @@ int FileOp::flushFile(int rc, std::map<FID, FID>& oids)
return NO_ERROR;
}
int FileOp::updateColumnExtent(IDBDataFile* pFile, int nBlocks)
int FileOp::updateColumnExtent(IDBDataFile* pFile, int nBlocks, int64_t lbid)
{
return NO_ERROR;
}
int FileOp::updateDctnryExtent(IDBDataFile* pFile, int nBlocks)
int FileOp::updateDctnryExtent(IDBDataFile* pFile, int nBlocks, int64_t lbid)
{
return NO_ERROR;
}

View File

@ -103,7 +103,8 @@ public:
int createFile( const char* fileName, int fileSize,
const uint8_t* emptyVal, int width,
execplan::CalpontSystemCatalog::ColDataType colDataType,
uint16_t dbRoot );
uint16_t dbRoot ,
BRM::LBID_t lbid = -1 );
/**
* @brief Delete a file
@ -358,7 +359,8 @@ public:
unsigned char* blockHdrInit,
int blockHdrInitSize,
bool bExpandExtent,
bool bOptExtension = false );
bool bOptExtension = false,
int64_t lbid = 0);
/**
* @brief Check whether it is an directory
@ -510,14 +512,15 @@ public:
bool bNewFile,
bool bExpandExtent,
bool bAbbrevExtent,
bool bOptExtension=false );
bool bOptExtension=false,
int64_t lbid = 0 );
// Calls a chown and logs an error message
bool chownDataPath(const std::string& fileName) const;
protected:
EXPORT virtual int updateColumnExtent(IDBDataFile* pFile, int nBlocks);
EXPORT virtual int updateDctnryExtent(IDBDataFile* pFile, int nBlocks);
EXPORT virtual int updateColumnExtent(IDBDataFile* pFile, int nBlocks, int64_t lbid);
EXPORT virtual int updateDctnryExtent(IDBDataFile* pFile, int nBlocks, int64_t lbid);
int m_compressionType; // compresssion type
@ -534,7 +537,7 @@ private:
int initAbbrevCompColumnExtent(
IDBDataFile* pFile, uint16_t dbRoot, int nBlocks,
const uint8_t* emptyVal, int width,
const uint8_t* emptyVal, int width, BRM::LBID_t lbid,
execplan::CalpontSystemCatalog::ColDataType colDataType);
static void initDbRootExtentMutexes();
@ -542,7 +545,7 @@ private:
int writeInitialCompColumnChunk(
IDBDataFile* pFile, int nBlocksAllocated, int nRows,
const uint8_t* emptyVal, int width,
const uint8_t* emptyVal, int width, BRM::LBID_t lbid,
execplan::CalpontSystemCatalog::ColDataType colDataType, char* hdrs);
TxnID m_transId;