1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-987 Add LZ4 compression.

* Adds CompressInterfaceLZ4 which uses LZ4 API for compress/uncompress.
* Adds CMake machinery to search LZ4 on running host.
* All methods which use static data and do not modify any internal data - become `static`,
  so we can use them without creation of the specific object. This is possible, because
  the header specification has not been modified. We still use 2 sections in header, first
  one with file meta data, the second one with pointers for compressed chunks.
* Methods `compress`, `uncompress`, `maxCompressedSize`, `getUncompressedSize` - become
  pure virtual, so we can override them for the other compression algos.
* Adds method `getChunkMagicNumber`, so we can verify chunk magic number
  for each compression algo.
* Renames "s/IDBCompressInterface/CompressInterface/g" according to requirement.
This commit is contained in:
Denis Khalikov
2021-04-01 17:26:38 +03:00
parent dd12bd3cd0
commit cc1c3629c5
45 changed files with 1311 additions and 549 deletions

View File

@ -67,8 +67,6 @@ namespace WriteEngine
extern int NUM_BLOCKS_PER_INITIAL_EXTENT; // defined in we_dctnry.cpp
extern WErrorCodes ec; // defined in we_log.cpp
const int COMPRESSED_CHUNK_SIZE = compress::IDBCompressInterface::maxCompressedSize(UNCOMPRESSED_CHUNK_SIZE) + 64 + 3 + 8 * 1024;
//------------------------------------------------------------------------------
// Search for the specified chunk in fChunkList.
//------------------------------------------------------------------------------
@ -91,18 +89,24 @@ ChunkData* CompFileData::findChunk(int64_t id) const
//------------------------------------------------------------------------------
// ChunkManager constructor
//------------------------------------------------------------------------------
ChunkManager::ChunkManager() : fMaxActiveChunkNum(100), fLenCompressed(0), fIsBulkLoad(false),
fDropFdCache(false), fIsInsert(false), fIsHdfs(IDBPolicy::useHdfs()),
fFileOp(0), fSysLogger(NULL), fTransId(-1),
fLocalModuleId(Config::getLocalModuleID()),
fFs(fIsHdfs ?
IDBFileSystem::getFs(IDBDataFile::HDFS) :
IDBPolicy::useCloud() ?
IDBFileSystem::getFs(IDBDataFile::CLOUD) :
IDBFileSystem::getFs(IDBDataFile::BUFFERED))
ChunkManager::ChunkManager()
: fMaxActiveChunkNum(100), fLenCompressed(0), fIsBulkLoad(false),
fDropFdCache(false), fIsInsert(false), fIsHdfs(IDBPolicy::useHdfs()),
fFileOp(0), fSysLogger(NULL), fTransId(-1),
fLocalModuleId(Config::getLocalModuleID()),
fFs(fIsHdfs ? IDBFileSystem::getFs(IDBDataFile::HDFS)
: IDBPolicy::useCloud()
? IDBFileSystem::getFs(IDBDataFile::CLOUD)
: IDBFileSystem::getFs(IDBDataFile::BUFFERED))
{
fUserPaddings = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK;
fCompressor.numUserPaddingBytes(fUserPaddings);
compress::initializeCompressorPool(fCompressorPool, fUserPaddings);
COMPRESSED_CHUNK_SIZE =
compress::CompressInterface::getMaxCompressedSizeGeneric(
UNCOMPRESSED_CHUNK_SIZE) +
64 + 3 + 8 * 1024;
fMaxCompressedBufSize = COMPRESSED_CHUNK_SIZE + fUserPaddings;
fBufCompressed = new char[fMaxCompressedBufSize];
fSysLogger = new logging::Logger(SUBSYSTEM_ID_WE);
@ -383,16 +387,22 @@ CompFileData* ChunkManager::getFileData(const FID& fid,
}
// make sure the header is valid
if (fCompressor.verifyHdr(fileData->fFileHeader.fControlData) != 0)
if (compress::CompressInterface::verifyHdr(fileData->fFileHeader.fControlData) != 0)
{
WE_COMP_DBG(cout << "Invalid header." << endl;)
delete fileData;
return NULL;
}
int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
int headerSize = compress::CompressInterface::getHdrSize(
fileData->fFileHeader.fControlData);
int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
// Save segment file compression type.
uint32_t compressionType = compress::CompressInterface::getCompressionType(
fileData->fFileHeader.fControlData);
fileData->fCompressionType = compressionType;
if (ptrSecSize > COMPRESSED_FILE_HEADER_UNIT)
{
// >8K header, dictionary width > 128
@ -462,11 +472,12 @@ IDBDataFile* ChunkManager::createDctnryFile(const FID& fid,
// Dictionary store extent width == 0. See more details in function
// `createDictStoreExtent`.
fCompressor.initHdr(fileData->fFileHeader.fControlData,
fileData->fFileHeader.fPtrSection,
/*colWidth=*/0, fileData->fColDataType,
fFileOp->compressionType(), hdrSize);
fCompressor.setLBIDByIndex(fileData->fFileHeader.fControlData, lbid, 0);
compress::CompressInterface::initHdr(
fileData->fFileHeader.fControlData, fileData->fFileHeader.fPtrSection,
/*colWidth=*/0, fileData->fColDataType, fFileOp->compressionType(), hdrSize);
compress::CompressInterface::setLBIDByIndex(fileData->fFileHeader.fControlData, lbid, 0);
// Save compression type.
fileData->fCompressionType = fFileOp->compressionType();
if (writeHeader(fileData, __LINE__) != NO_ERROR)
{
@ -771,9 +782,16 @@ int ChunkManager::fetchChunkFromFile(IDBDataFile* pFile, int64_t id, ChunkData*&
}
// uncompress the read in buffer
unsigned int dataLen = sizeof(chunkData->fBufUnCompressed);
size_t dataLen = sizeof(chunkData->fBufUnCompressed);
if (fCompressor.uncompressBlock((char*)fBufCompressed, chunkSize,
auto fCompressor = compress::getCompressorByType(
fCompressorPool, fileData->fCompressionType);
if (!fCompressor)
{
return ERR_COMP_WRONG_COMP_TYPE;
}
if (fCompressor->uncompressBlock((char*)fBufCompressed, chunkSize,
(unsigned char*)chunkData->fBufUnCompressed, dataLen) != 0)
{
if (fIsFix)
@ -784,7 +802,7 @@ int ChunkManager::fetchChunkFromFile(IDBDataFile* pFile, int64_t id, ChunkData*&
{
char* hdr = fileData->fFileHeader.fControlData;
if (fCompressor.getBlockCount(hdr) < 512)
if (compress::CompressInterface::getBlockCount(hdr) < 512)
blocks = 256;
}
@ -820,7 +838,8 @@ int ChunkManager::fetchChunkFromFile(IDBDataFile* pFile, int64_t id, ChunkData*&
{
if (id == 0 && ptrs[id] == 0) // if the 1st ptr is not set for new extent
{
ptrs[0] = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
ptrs[0] = compress::CompressInterface::getHdrSize(
fileData->fFileHeader.fControlData);
}
// load the uncompressed buffer with empty values.
@ -907,10 +926,17 @@ int ChunkManager::writeChunkToFile(CompFileData* fileData, ChunkData* chunkData)
// compress the chunk before writing it to file
fLenCompressed = fMaxCompressedBufSize;
if (fCompressor.compressBlock((char*)chunkData->fBufUnCompressed,
chunkData->fLenUnCompressed,
(unsigned char*)fBufCompressed,
fLenCompressed) != 0)
auto fCompressor = compress::getCompressorByType(
fCompressorPool, fileData->fCompressionType);
if (!fCompressor)
{
return ERR_COMP_WRONG_COMP_TYPE;
}
if (fCompressor->compressBlock((char*) chunkData->fBufUnCompressed,
chunkData->fLenUnCompressed,
(unsigned char*) fBufCompressed,
fLenCompressed) != 0)
{
logMessage(ERR_COMP_COMPRESS, logging::LOG_TYPE_ERROR, __LINE__);
return ERR_COMP_COMPRESS;
@ -941,7 +967,8 @@ int ChunkManager::writeChunkToFile(CompFileData* fileData, ChunkData* chunkData)
// [chunkId+0] is the start offset of current chunk.
// [chunkId+1] is the start offset of next chunk, the offset diff is current chunk size.
// [chunkId+2] is 0 or not indicates if the next chunk exists.
int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
int headerSize = compress::CompressInterface::getHdrSize(
fileData->fFileHeader.fControlData);
int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
int64_t usablePtrIds = (ptrSecSize / sizeof(uint64_t)) - 2;
@ -968,7 +995,7 @@ int ChunkManager::writeChunkToFile(CompFileData* fileData, ChunkData* chunkData)
else if (lastChunk)
{
// add padding space if the chunk is written first time
if (fCompressor.padCompressedChunks(
if (fCompressor->padCompressedChunks(
(unsigned char*)fBufCompressed, fLenCompressed, fMaxCompressedBufSize) != 0)
{
WE_COMP_DBG(cout << "Last chunk:" << chunkId << ", padding failed." << endl;)
@ -1272,7 +1299,8 @@ int ChunkManager::closeFile(CompFileData* fileData)
int ChunkManager::writeHeader(CompFileData* fileData, int ln)
{
int rc = NO_ERROR;
int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
int headerSize = compress::CompressInterface::getHdrSize(
fileData->fFileHeader.fControlData);
int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
if (!fIsHdfs && !fIsBulkLoad)
@ -1422,8 +1450,10 @@ int ChunkManager::updateColumnExtent(IDBDataFile* pFile, int addBlockCount, int6
int rc = NO_ERROR;
char* hdr = pFileData->fFileHeader.fControlData;
fCompressor.setBlockCount(hdr, fCompressor.getBlockCount(hdr) + addBlockCount);
fCompressor.setLBIDByIndex(hdr, lbid, 1);
compress::CompressInterface::setBlockCount(
hdr, compress::CompressInterface::getBlockCount(hdr) + addBlockCount);
compress::CompressInterface::setLBIDByIndex(hdr, lbid, 1);
ChunkData* chunkData = (pFileData)->findChunk(0);
if (chunkData != NULL)
@ -1475,7 +1505,7 @@ int ChunkManager::updateDctnryExtent(IDBDataFile* pFile, int addBlockCount,
char* hdr = i->second->fFileHeader.fControlData;
char* uncompressedBuf = chunkData->fBufUnCompressed;
int currentBlockCount = fCompressor.getBlockCount(hdr);
int currentBlockCount = compress::CompressInterface::getBlockCount(hdr);
// Bug 3203, write out the compressed initial extent.
if (currentBlockCount == 0)
@ -1511,13 +1541,15 @@ int ChunkManager::updateDctnryExtent(IDBDataFile* pFile, int addBlockCount,
}
if (rc == NO_ERROR)
fCompressor.setBlockCount(hdr, fCompressor.getBlockCount(hdr) + addBlockCount);
compress::CompressInterface::setBlockCount(
hdr,
compress::CompressInterface::getBlockCount(hdr) + addBlockCount);
if (currentBlockCount)
{
// Append to the end.
uint64_t lbidCount = fCompressor.getLBIDCount(hdr);
fCompressor.setLBIDByIndex(hdr, lbid, lbidCount);
uint64_t lbidCount = compress::CompressInterface::getLBIDCount(hdr);
compress::CompressInterface::setLBIDByIndex(hdr, lbid, lbidCount);
}
return rc;
}
@ -1684,7 +1716,8 @@ int ChunkManager::getBlockCount(IDBDataFile* pFile)
map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
idbassert(fpIt != fFilePtrMap.end());
return fCompressor.getBlockCount(fpIt->second->fFileHeader.fControlData);
return compress::CompressInterface::getBlockCount(
fpIt->second->fFileHeader.fControlData);
}
//------------------------------------------------------------------------------
@ -1758,11 +1791,13 @@ int ChunkManager::reallocateChunks(CompFileData* fileData)
origFilePtr->flush();
// back out the current pointers
int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
int headerSize = compress::CompressInterface::getHdrSize(
fileData->fFileHeader.fControlData);
int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
compress::CompChunkPtrList origPtrs;
if (fCompressor.getPtrList(fileData->fFileHeader.fPtrSection, ptrSecSize, origPtrs) != 0)
if (compress::CompressInterface::getPtrList(
fileData->fFileHeader.fPtrSection, ptrSecSize, origPtrs) != 0)
{
ostringstream oss;
oss << "Chunk shifting failed, file:" << origFileName << " -- invalid header.";
@ -1876,7 +1911,14 @@ int ChunkManager::reallocateChunks(CompFileData* fileData)
ChunkData* chunkData = chunksTouched[k];
fLenCompressed = fMaxCompressedBufSize;
if ((rc = fCompressor.compressBlock((char*)chunkData->fBufUnCompressed,
auto fCompressor = compress::getCompressorByType(
fCompressorPool, fileData->fCompressionType);
if (!fCompressor)
{
return ERR_COMP_WRONG_COMP_TYPE;
}
if ((rc = fCompressor->compressBlock((char*)chunkData->fBufUnCompressed,
chunkData->fLenUnCompressed,
(unsigned char*)fBufCompressed,
fLenCompressed)) != 0)
@ -1894,7 +1936,7 @@ int ChunkManager::reallocateChunks(CompFileData* fileData)
<< fLenCompressed;)
// shifting chunk, add padding space
if ((rc = fCompressor.padCompressedChunks(
if ((rc = fCompressor->padCompressedChunks(
(unsigned char*)fBufCompressed, fLenCompressed, fMaxCompressedBufSize)) != 0)
{
WE_COMP_DBG(cout << ", but padding failed." << endl;)
@ -2245,7 +2287,8 @@ int ChunkManager::verifyChunksAfterRealloc(CompFileData* fileData)
}
// make sure the header is valid
if ((rc = fCompressor.verifyHdr(fileData->fFileHeader.fControlData)) != 0)
if ((rc = compress::CompressInterface::verifyHdr(
fileData->fFileHeader.fControlData)) != 0)
{
ostringstream oss;
oss << "Invalid header in new " << fileData->fFileName << ", roll back";
@ -2254,7 +2297,8 @@ int ChunkManager::verifyChunksAfterRealloc(CompFileData* fileData)
return rc;
}
int headerSize = fCompressor.getHdrSize(fileData->fFileHeader.fControlData);
int headerSize = compress::CompressInterface::getHdrSize(
fileData->fFileHeader.fControlData);
int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
// read in the pointer section in header
@ -2270,7 +2314,8 @@ int ChunkManager::verifyChunksAfterRealloc(CompFileData* fileData)
// get pointer list
compress::CompChunkPtrList ptrs;
if (fCompressor.getPtrList(fileData->fFileHeader.fPtrSection, ptrSecSize, ptrs) != 0)
if (compress::CompressInterface::getPtrList(
fileData->fFileHeader.fPtrSection, ptrSecSize, ptrs) != 0)
{
ostringstream oss;
oss << "Failed to parse pointer list from new " << fileData->fFileName << "@" << __LINE__;
@ -2282,6 +2327,13 @@ int ChunkManager::verifyChunksAfterRealloc(CompFileData* fileData)
ChunkData chunkData;
int numOfChunks = ptrs.size(); // number of chunks in the file
auto fCompressor = compress::getCompressorByType(
fCompressorPool, fileData->fCompressionType);
if (!fCompressor)
{
return ERR_COMP_WRONG_COMP_TYPE;
}
for (int i = 0; i < numOfChunks && rc == NO_ERROR; i++)
{
unsigned int chunkSize = ptrs[i].second;
@ -2304,9 +2356,9 @@ int ChunkManager::verifyChunksAfterRealloc(CompFileData* fileData)
}
// uncompress the read in buffer
unsigned int dataLen = sizeof(chunkData.fBufUnCompressed);
size_t dataLen = sizeof(chunkData.fBufUnCompressed);
if (fCompressor.uncompressBlock((char*)fBufCompressed, chunkSize,
if (fCompressor->uncompressBlock((char*)fBufCompressed, chunkSize,
(unsigned char*)chunkData.fBufUnCompressed, dataLen) != 0)
{
ostringstream oss;
@ -2624,13 +2676,15 @@ int ChunkManager::checkFixLastDictChunk(const FID& fid,
if (mit != fFileMap.end())
{
int headerSize = fCompressor.getHdrSize(mit->second->fFileHeader.fControlData);
int headerSize = compress::CompressInterface::getHdrSize(
mit->second->fFileHeader.fControlData);
int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
// get pointer list
compress::CompChunkPtrList ptrs;
if (fCompressor.getPtrList(mit->second->fFileHeader.fPtrSection, ptrSecSize, ptrs) != 0)
if (compress::CompressInterface::getPtrList(
mit->second->fFileHeader.fPtrSection, ptrSecSize, ptrs) != 0)
{
ostringstream oss;
oss << "Failed to parse pointer list from new " << mit->second->fFileName << "@" << __LINE__;
@ -2662,9 +2716,16 @@ int ChunkManager::checkFixLastDictChunk(const FID& fid,
// uncompress the read in buffer
chunkData = new ChunkData(numOfChunks - 1);
unsigned int dataLen = sizeof(chunkData->fBufUnCompressed);
size_t dataLen = sizeof(chunkData->fBufUnCompressed);
if (fCompressor.uncompressBlock((char*)fBufCompressed, chunkSize,
auto fCompressor = compress::getCompressorByType(
fCompressorPool, mit->second->fCompressionType);
if (!fCompressor)
{
return ERR_COMP_WRONG_COMP_TYPE;
}
if (fCompressor->uncompressBlock((char*)fBufCompressed, chunkSize,
(unsigned char*)chunkData->fBufUnCompressed, dataLen) != 0)
{
mit->second->fChunkList.push_back(chunkData);
@ -2676,7 +2737,7 @@ int ChunkManager::checkFixLastDictChunk(const FID& fid,
{
char* hdr = mit->second->fFileHeader.fControlData;
if (fCompressor.getBlockCount(hdr) < 512)
if (compress::CompressInterface::getBlockCount(hdr) < 512)
blocks = 256;
}
@ -2693,7 +2754,6 @@ int ChunkManager::checkFixLastDictChunk(const FID& fid,
return rc;
}
}
// vim:ts=4 sw=4: