You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
[MCOL-5131] Add support to calculate HWM for system catalog files.
This commit is contained in:
@ -35,6 +35,50 @@ using namespace idbdatafile;
|
||||
|
||||
namespace RebuildExtentMap
|
||||
{
|
||||
std::unordered_map<uint32_t, FileId> systemCatalogMap = {
|
||||
{2073, FileId(2073, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)},
|
||||
{2070, FileId(2070, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)},
|
||||
{2067, FileId(2067, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)},
|
||||
{2064, FileId(2064, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)},
|
||||
{2076, FileId(2076, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)},
|
||||
{2061, FileId(2061, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)},
|
||||
{1004, FileId(1004, 0, 0, 0, 4, execplan::CalpontSystemCatalog::DATE, 0, 0, false)},
|
||||
{1022, FileId(1022, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)},
|
||||
{1001, FileId(1001, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)},
|
||||
{1023, FileId(1023, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)},
|
||||
{1021, FileId(1021, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)},
|
||||
{1010, FileId(1010, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1006, FileId(1006, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1002, FileId(1002, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)},
|
||||
{1009, FileId(1009, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1005, FileId(1005, 0, 0, 0, 4, execplan::CalpontSystemCatalog::DATE, 0, 0, false)},
|
||||
{1011, FileId(1011, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1008, FileId(1008, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1007, FileId(1007, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1003, FileId(1003, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1032, FileId(1032, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)},
|
||||
{1038, FileId(1038, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1033, FileId(1033, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1027, FileId(1027, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1024, FileId(1024, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1042, FileId(1042, 0, 0, 0, 8, execplan::CalpontSystemCatalog::UBIGINT, 0, 0, false)},
|
||||
{1040, FileId(1040, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)},
|
||||
{1025, FileId(1025, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1035, FileId(1035, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1028, FileId(1028, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1036, FileId(1036, 0, 0, 0, 1, execplan::CalpontSystemCatalog::CHAR, 0, 0, false)},
|
||||
{1031, FileId(1031, 0, 0, 0, 4, execplan::CalpontSystemCatalog::DATE, 0, 0, false)},
|
||||
{1037, FileId(1037, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1039, FileId(1039, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)},
|
||||
{1030, FileId(1030, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1034, FileId(1034, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1026, FileId(1026, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1041, FileId(1041, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{1029, FileId(1029, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)},
|
||||
{2001, FileId(2001, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)},
|
||||
{2004, FileId(2004, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, true)},
|
||||
};
|
||||
|
||||
void EMReBuilder::collectFileNames(const std::string& partialPath, std::string currentPath,
|
||||
std::vector<std::string>& fileNames)
|
||||
{
|
||||
@ -76,6 +120,7 @@ int32_t EMReBuilder::collectExtent(const std::string& fullFileName)
|
||||
uint32_t oid;
|
||||
uint32_t partition;
|
||||
uint32_t segment;
|
||||
|
||||
// Initialize oid, partition and segment from the given `fullFileName`.
|
||||
auto rc = WriteEngine::Convertor::fileName2Oid(fullFileName, oid, partition, segment);
|
||||
if (rc != 0)
|
||||
@ -98,107 +143,144 @@ int32_t EMReBuilder::collectExtent(const std::string& fullFileName)
|
||||
return rc;
|
||||
}
|
||||
|
||||
// Read and verify header.
|
||||
char fileHeader[compress::CompressInterface::HDR_BUF_LEN * 2];
|
||||
rc = fileOp.readHeaders(dbFile.get(), fileHeader);
|
||||
if (rc != 0)
|
||||
if (oid > 3000)
|
||||
{
|
||||
// FIXME: If the file was created without a compression, it does not
|
||||
// have a header block, so header verification fails in this case,
|
||||
// currently we skip it, because we cannot deduce needed data to create
|
||||
// a column extent from the blob file.
|
||||
// Skip fileID from system catalog.
|
||||
if (doVerbose() && oid > 3000)
|
||||
// Read and verify header.
|
||||
char fileHeader[compress::CompressInterface::HDR_BUF_LEN * 2];
|
||||
rc = fileOp.readHeaders(dbFile.get(), fileHeader);
|
||||
if (rc != 0)
|
||||
{
|
||||
std::cerr << "Cannot read file header from the file " << fullFileName
|
||||
<< ", probably this file was created without compression. " << std::endl;
|
||||
if (doVerbose())
|
||||
{
|
||||
std::cerr << "Cannot read file header from the file " << fullFileName
|
||||
<< ", probably this file was created without compression. " << std::endl;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (doVerbose())
|
||||
{
|
||||
std::cout << "Processing file: " << fullFileName << std::endl;
|
||||
std::cout << "fileName2Oid: [OID: " << oid << ", partition: " << partition << ", segment: " << segment
|
||||
<< "] " << std::endl;
|
||||
}
|
||||
|
||||
// Read the `colDataType` and `colWidth` from the given header.
|
||||
const auto versionNumber = compress::CompressInterface::getVersionNumber(fileHeader);
|
||||
// Verify header number.
|
||||
if (versionNumber < 3)
|
||||
{
|
||||
if (doVerbose())
|
||||
{
|
||||
std::cerr << "File version " << versionNumber << " is not supported. " << std::endl;
|
||||
std::cout << "Processing file: " << fullFileName << std::endl;
|
||||
std::cout << "fileName2Oid: [OID: " << oid << ", partition: " << partition << ", segment: " << segment
|
||||
<< "] " << std::endl;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
auto colDataType = compress::CompressInterface::getColDataType(fileHeader);
|
||||
auto colWidth = compress::CompressInterface::getColumnWidth(fileHeader);
|
||||
auto blockCount = compress::CompressInterface::getBlockCount(fileHeader);
|
||||
auto lbidCount = compress::CompressInterface::getLBIDCount(fileHeader);
|
||||
auto compressionType = compress::CompressInterface::getCompressionType(fileHeader);
|
||||
|
||||
if (colDataType == execplan::CalpontSystemCatalog::UNDEFINED)
|
||||
{
|
||||
if (doVerbose())
|
||||
std::cout << "File header has invalid data. " << std::endl;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
auto isDict = isDictFile(colDataType, colWidth);
|
||||
if (isDict)
|
||||
colWidth = 8;
|
||||
|
||||
if (doVerbose())
|
||||
{
|
||||
std::cout << "Searching for HWM... " << std::endl;
|
||||
std::cout << "Block count: " << blockCount << std::endl;
|
||||
}
|
||||
|
||||
uint64_t hwm = 0;
|
||||
rc = searchHWMInSegmentFile(fullFileName, oid, getDBRoot(), partition, segment, colDataType, colWidth,
|
||||
blockCount, isDict, compressionType, hwm);
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
|
||||
if (doVerbose())
|
||||
std::cout << "HWM is: " << hwm << std::endl;
|
||||
|
||||
const uint32_t extentMaxBlockCount = getEM().getExtentRows() * colWidth / BLOCK_SIZE;
|
||||
// We found multiple extents per one segment file.
|
||||
if (hwm >= extentMaxBlockCount)
|
||||
{
|
||||
for (uint32_t lbidIndex = 0; lbidIndex < lbidCount - 1; ++lbidIndex)
|
||||
// Read the `colDataType` and `colWidth` from the given header.
|
||||
const auto versionNumber = compress::CompressInterface::getVersionNumber(fileHeader);
|
||||
// Verify header number.
|
||||
if (versionNumber < 3)
|
||||
{
|
||||
auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, lbidIndex);
|
||||
FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, /*hwm*/ 0, isDict);
|
||||
if (doVerbose())
|
||||
{
|
||||
std::cerr << "File version " << versionNumber << " is not supported. " << std::endl;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
auto colDataType = compress::CompressInterface::getColDataType(fileHeader);
|
||||
auto colWidth = compress::CompressInterface::getColumnWidth(fileHeader);
|
||||
auto blockCount = compress::CompressInterface::getBlockCount(fileHeader);
|
||||
auto lbidCount = compress::CompressInterface::getLBIDCount(fileHeader);
|
||||
auto compressionType = compress::CompressInterface::getCompressionType(fileHeader);
|
||||
|
||||
if (colDataType == execplan::CalpontSystemCatalog::UNDEFINED)
|
||||
{
|
||||
if (doVerbose())
|
||||
std::cout << "File header has invalid data. " << std::endl;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
auto isDict = isDictFile(colDataType, colWidth);
|
||||
if (isDict)
|
||||
colWidth = 8;
|
||||
|
||||
if (doVerbose())
|
||||
{
|
||||
std::cout << "Searching for HWM... " << std::endl;
|
||||
std::cout << "Block count: " << blockCount << std::endl;
|
||||
}
|
||||
|
||||
uint64_t hwm = 0;
|
||||
rc = searchHWMInSegmentFile(fullFileName, oid, getDBRoot(), partition, segment, colDataType, colWidth,
|
||||
blockCount, isDict, compressionType, hwm);
|
||||
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
|
||||
if (doVerbose())
|
||||
std::cout << "HWM is: " << hwm << std::endl;
|
||||
|
||||
const uint32_t extentMaxBlockCount = getEM().getExtentRows() * colWidth / BLOCK_SIZE;
|
||||
// We found multiple extents per one segment file.
|
||||
if (hwm >= extentMaxBlockCount)
|
||||
{
|
||||
for (uint32_t lbidIndex = 0; lbidIndex < lbidCount - 1; ++lbidIndex)
|
||||
{
|
||||
auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, lbidIndex);
|
||||
FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, /*hwm*/ 0, isDict);
|
||||
extentMap.push_back(fileId);
|
||||
}
|
||||
|
||||
// Last one has an actual HWM.
|
||||
auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, lbidCount - 1);
|
||||
FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, hwm, isDict);
|
||||
extentMap.push_back(fileId);
|
||||
|
||||
if (doVerbose())
|
||||
{
|
||||
std::cout << "Found multiple extents per segment file " << std::endl;
|
||||
std::cout << "FileId is collected " << fileId << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
// Last one has an actual HWM.
|
||||
auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, lbidCount - 1);
|
||||
FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, hwm, isDict);
|
||||
extentMap.push_back(fileId);
|
||||
|
||||
if (doVerbose())
|
||||
else
|
||||
{
|
||||
std::cout << "Found multiple extents per segment file " << std::endl;
|
||||
std::cout << "FileId is collected " << fileId << std::endl;
|
||||
// One extent per segment file.
|
||||
auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, 0);
|
||||
FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, hwm, isDict);
|
||||
extentMap.push_back(fileId);
|
||||
|
||||
if (doVerbose())
|
||||
std::cout << "FileId is collected " << fileId << std::endl;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// One extent per segment file.
|
||||
auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, 0);
|
||||
FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, hwm, isDict);
|
||||
extentMap.push_back(fileId);
|
||||
const auto fileSize = dbFile->size();
|
||||
if (fileSize == -1)
|
||||
{
|
||||
std::cerr << "IDBDataFile::size failed for the file " << fullFileName << std::endl;
|
||||
return -1;
|
||||
}
|
||||
|
||||
const uint64_t blockCount = fileSize / BLOCK_SIZE;
|
||||
if (doVerbose())
|
||||
{
|
||||
std::cout << "Searching for HWM for system catalog file..." << std::endl;
|
||||
std::cout << "Block count: " << blockCount << std::endl;
|
||||
}
|
||||
|
||||
auto it = systemCatalogMap.find(oid);
|
||||
if (it == systemCatalogMap.end())
|
||||
{
|
||||
std::cout << "Cannot find a system extent for OID: " << oid << std::endl;
|
||||
std::cout << "Continuing..." << std::endl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
FileId systemFileId = it->second;
|
||||
uint64_t hwm = 0;
|
||||
rc = searchHWMInSegmentFile(fullFileName, oid, getDBRoot(), systemFileId.partition, systemFileId.segment,
|
||||
systemFileId.colDataType, systemFileId.colWidth, blockCount,
|
||||
systemFileId.isDict, 0 /*=compressionType*/, hwm);
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
|
||||
if (doVerbose())
|
||||
std::cout << "FileId is collected " << fileId << std::endl;
|
||||
std::cout << "HWM is: " << hwm << std::endl;
|
||||
|
||||
systemFileId.hwm = hwm;
|
||||
systemExtentMap.push_back(systemFileId);
|
||||
}
|
||||
|
||||
return 0;
|
||||
@ -207,8 +289,28 @@ int32_t EMReBuilder::collectExtent(const std::string& fullFileName)
|
||||
int32_t EMReBuilder::rebuildExtentMap()
|
||||
{
|
||||
if (doVerbose())
|
||||
{
|
||||
std::cout << "Build extent map with size " << extentMap.size() << std::endl;
|
||||
|
||||
for (const auto& fileId : systemExtentMap)
|
||||
{
|
||||
if (fileId.hwm)
|
||||
{
|
||||
if (doVerbose())
|
||||
{
|
||||
std::cout << "Setting a HWM for system file " << fileId << std::endl;
|
||||
}
|
||||
try
|
||||
{
|
||||
getEM().setLocalHWM(fileId.oid, fileId.partition, fileId.segment, fileId.hwm, false, true);
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
getEM().undoChanges();
|
||||
std::cerr << "Cannot set local HWM: " << e.what() << std::endl;
|
||||
return -1;
|
||||
}
|
||||
getEM().confirmChanges();
|
||||
}
|
||||
}
|
||||
|
||||
// We have to restore extent map by restoring individual extent in order
|
||||
@ -351,9 +453,8 @@ int32_t EMReBuilder::initializeSystemExtents()
|
||||
if (!doDisplay())
|
||||
{
|
||||
if (doVerbose())
|
||||
{
|
||||
std::cout << "Initialize system extents from the initial state" << std::endl;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
getEM().loadFromBinaryBlob(reinterpret_cast<const char*>(BRM_saves_em_system_tables_blob));
|
||||
@ -364,19 +465,21 @@ int32_t EMReBuilder::initializeSystemExtents()
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
ChunkManagerWrapper::ChunkManagerWrapper(const std::string& filename, uint32_t oid, uint32_t dbRoot,
|
||||
uint32_t partition, uint32_t segment,
|
||||
execplan::CalpontSystemCatalog::ColDataType colDataType,
|
||||
uint32_t colWidth)
|
||||
uint32_t colWidth, uint32_t compressionType)
|
||||
: oid(oid)
|
||||
, dbRoot(dbRoot)
|
||||
, partition(partition)
|
||||
, segment(segment)
|
||||
, colDataType(colDataType)
|
||||
, colWidth(colWidth)
|
||||
, compressionType(compressionType)
|
||||
, size(colWidth)
|
||||
, pFileOp(nullptr)
|
||||
, fileName(filename)
|
||||
@ -385,29 +488,41 @@ ChunkManagerWrapper::ChunkManagerWrapper(const std::string& filename, uint32_t o
|
||||
|
||||
int32_t ChunkManagerWrapper::readBlock(uint32_t blockNumber)
|
||||
{
|
||||
auto rc = chunkManager.readBlock(pFile, blockData, blockNumber);
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
return 0;
|
||||
int32_t rc = 0;
|
||||
|
||||
if (compressionType == 0)
|
||||
rc = pFileOp->readDbBlocks(pFile, blockData, blockNumber, 1);
|
||||
else
|
||||
rc = chunkManager.readBlock(pFile, blockData, blockNumber);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
ChunkManagerWrapperColumn::ChunkManagerWrapperColumn(const std::string& filename, uint32_t oid,
|
||||
uint32_t dbRoot, uint32_t partition, uint32_t segment,
|
||||
execplan::CalpontSystemCatalog::ColDataType colDataType,
|
||||
uint32_t colWidth, uint32_t compressionType)
|
||||
: ChunkManagerWrapper(filename, oid, dbRoot, partition, segment, colDataType, colWidth)
|
||||
: ChunkManagerWrapper(filename, oid, dbRoot, partition, segment, colDataType, colWidth, compressionType)
|
||||
{
|
||||
pFileOp =
|
||||
std::unique_ptr<WriteEngine::ColumnOpCompress1>(new WriteEngine::ColumnOpCompress1(compressionType));
|
||||
chunkManager.fileOp(pFileOp.get());
|
||||
// Open compressed column segment file. We will read block by block
|
||||
// from the compressed chunks.
|
||||
pFile = chunkManager.getFilePtrByName(fileName, oid, dbRoot, partition, segment, colDataType, colWidth,
|
||||
"rb", size, false, false);
|
||||
if (!pFile)
|
||||
if (compressionType == 0)
|
||||
{
|
||||
throw std::bad_alloc();
|
||||
pFileOp = std::unique_ptr<WriteEngine::DbFileOp>(new WriteEngine::DbFileOp());
|
||||
pFile = IDBDataFile::open(IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename.c_str(), "rb",
|
||||
colWidth);
|
||||
}
|
||||
else
|
||||
{
|
||||
pFileOp =
|
||||
std::unique_ptr<WriteEngine::ColumnOpCompress1>(new WriteEngine::ColumnOpCompress1(compressionType));
|
||||
chunkManager.fileOp(pFileOp.get());
|
||||
// Open compressed column segment file. We will read block by block
|
||||
// from the compressed chunks.
|
||||
pFile = chunkManager.getFilePtrByName(fileName, oid, dbRoot, partition, segment, colDataType, colWidth,
|
||||
"rb", size, false, false);
|
||||
}
|
||||
|
||||
if (!pFile)
|
||||
throw std::bad_alloc();
|
||||
|
||||
emptyValue = pFileOp->getEmptyRowValue(colDataType, colWidth);
|
||||
midOffset = (WriteEngine::BYTE_PER_BLOCK / 2);
|
||||
@ -450,17 +565,27 @@ ChunkManagerWrapperDict::ChunkManagerWrapperDict(const std::string& filename, ui
|
||||
uint32_t partition, uint32_t segment,
|
||||
execplan::CalpontSystemCatalog::ColDataType colDataType,
|
||||
uint32_t colWidth, uint32_t compressionType)
|
||||
: ChunkManagerWrapper(filename, oid, dbRoot, partition, segment, colDataType, colWidth)
|
||||
: ChunkManagerWrapper(filename, oid, dbRoot, partition, segment, colDataType, colWidth, compressionType)
|
||||
{
|
||||
pFileOp = std::unique_ptr<WriteEngine::DctnryCompress1>(new WriteEngine::DctnryCompress1(compressionType));
|
||||
chunkManager.fileOp(pFileOp.get());
|
||||
// Open compressed dict segment file.
|
||||
pFile = chunkManager.getFilePtrByName(fileName, oid, dbRoot, partition, segment, colDataType, colWidth,
|
||||
"rb", size, false, true);
|
||||
if (!pFile)
|
||||
|
||||
if (compressionType == 0)
|
||||
{
|
||||
throw std::bad_alloc();
|
||||
pFileOp = std::unique_ptr<WriteEngine::DbFileOp>(new WriteEngine::DbFileOp());
|
||||
pFile = IDBDataFile::open(IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename.c_str(), "rb",
|
||||
colWidth);
|
||||
}
|
||||
else
|
||||
{
|
||||
pFileOp =
|
||||
std::unique_ptr<WriteEngine::DctnryCompress1>(new WriteEngine::DctnryCompress1(compressionType));
|
||||
chunkManager.fileOp(pFileOp.get());
|
||||
// Open compressed dict segment file.
|
||||
pFile = chunkManager.getFilePtrByName(fileName, oid, dbRoot, partition, segment, colDataType, colWidth,
|
||||
"rb", size, false, true);
|
||||
}
|
||||
|
||||
if (!pFile)
|
||||
throw std::bad_alloc();
|
||||
|
||||
auto dictBlockHeaderSize = WriteEngine::HDR_UNIT_SIZE + WriteEngine::NEXT_PTR_BYTES +
|
||||
WriteEngine::HDR_UNIT_SIZE + WriteEngine::HDR_UNIT_SIZE;
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include "we_fileop.h"
|
||||
#include "IDBPolicy.h"
|
||||
#include "we_chunkmanager.h"
|
||||
#include "we_dbfileop.h"
|
||||
|
||||
using namespace idbdatafile;
|
||||
|
||||
@ -157,6 +158,7 @@ class EMReBuilder
|
||||
bool display;
|
||||
uint32_t dbRoot;
|
||||
BRM::ExtentMap em;
|
||||
std::vector<FileId> systemExtentMap;
|
||||
std::vector<FileId> extentMap;
|
||||
};
|
||||
|
||||
@ -167,7 +169,7 @@ class ChunkManagerWrapper
|
||||
public:
|
||||
ChunkManagerWrapper(const std::string& filename, uint32_t oid, uint32_t dbRoot, uint32_t partition,
|
||||
uint32_t segment, execplan::CalpontSystemCatalog::ColDataType colDataType,
|
||||
uint32_t colWidth);
|
||||
uint32_t colWidth, uint32_t compressionType);
|
||||
|
||||
virtual ~ChunkManagerWrapper() = default;
|
||||
ChunkManagerWrapper(const ChunkManagerWrapper& other) = delete;
|
||||
@ -189,8 +191,9 @@ class ChunkManagerWrapper
|
||||
uint32_t segment;
|
||||
execplan::CalpontSystemCatalog::ColDataType colDataType;
|
||||
uint32_t colWidth;
|
||||
uint32_t compressionType;
|
||||
int32_t size;
|
||||
std::unique_ptr<WriteEngine::FileOp> pFileOp;
|
||||
std::unique_ptr<WriteEngine::DbFileOp> pFileOp;
|
||||
std::string fileName;
|
||||
// Note: We cannot clear this pointer directly, because
|
||||
// `ChunkManager` closes this file for us, otherwise we will get double
|
||||
@ -208,7 +211,13 @@ class ChunkManagerWrapperColumn : public ChunkManagerWrapper
|
||||
uint32_t segment, execplan::CalpontSystemCatalog::ColDataType colDataType,
|
||||
uint32_t colWidth, uint32_t compressionType);
|
||||
|
||||
~ChunkManagerWrapperColumn() = default;
|
||||
~ChunkManagerWrapperColumn()
|
||||
{
|
||||
// In case we open file without `ChunkManager` machinery.
|
||||
if (!compressionType && pFile)
|
||||
delete pFile;
|
||||
};
|
||||
|
||||
ChunkManagerWrapperColumn(const ChunkManagerWrapperColumn& other) = delete;
|
||||
ChunkManagerWrapperColumn& operator=(const ChunkManagerWrapperColumn& other) = delete;
|
||||
ChunkManagerWrapperColumn(ChunkManagerWrapperColumn&& other) = delete;
|
||||
|
Reference in New Issue
Block a user