From fb5761fbff51ced8fa92d04c8b1921123d2cbd7b Mon Sep 17 00:00:00 2001 From: Denis Khalikov Date: Fri, 10 Jun 2022 14:59:51 +0300 Subject: [PATCH] [MCOL-5131] Add support to calculate HWM for system catalog files. --- tools/rebuildEM/rebuildEM.cpp | 343 +++++++++++++++++++++++----------- tools/rebuildEM/rebuildEM.h | 15 +- 2 files changed, 246 insertions(+), 112 deletions(-) diff --git a/tools/rebuildEM/rebuildEM.cpp b/tools/rebuildEM/rebuildEM.cpp index 30861c458..7805f5ac8 100644 --- a/tools/rebuildEM/rebuildEM.cpp +++ b/tools/rebuildEM/rebuildEM.cpp @@ -35,6 +35,50 @@ using namespace idbdatafile; namespace RebuildExtentMap { +std::unordered_map systemCatalogMap = { + {2073, FileId(2073, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)}, + {2070, FileId(2070, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)}, + {2067, FileId(2067, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)}, + {2064, FileId(2064, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)}, + {2076, FileId(2076, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)}, + {2061, FileId(2061, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)}, + {1004, FileId(1004, 0, 0, 0, 4, execplan::CalpontSystemCatalog::DATE, 0, 0, false)}, + {1022, FileId(1022, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)}, + {1001, FileId(1001, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)}, + {1023, FileId(1023, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)}, + {1021, FileId(1021, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)}, + {1010, FileId(1010, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1006, FileId(1006, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1002, FileId(1002, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)}, + {1009, FileId(1009, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1005, FileId(1005, 0, 0, 0, 4, execplan::CalpontSystemCatalog::DATE, 0, 0, false)}, + {1011, FileId(1011, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1008, FileId(1008, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1007, FileId(1007, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1003, FileId(1003, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1032, FileId(1032, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)}, + {1038, FileId(1038, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1033, FileId(1033, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1027, FileId(1027, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1024, FileId(1024, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1042, FileId(1042, 0, 0, 0, 8, execplan::CalpontSystemCatalog::UBIGINT, 0, 0, false)}, + {1040, FileId(1040, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)}, + {1025, FileId(1025, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1035, FileId(1035, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1028, FileId(1028, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1036, FileId(1036, 0, 0, 0, 1, execplan::CalpontSystemCatalog::CHAR, 0, 0, false)}, + {1031, FileId(1031, 0, 0, 0, 4, execplan::CalpontSystemCatalog::DATE, 0, 0, false)}, + {1037, FileId(1037, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1039, FileId(1039, 0, 0, 0, 8, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, false)}, + {1030, FileId(1030, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1034, FileId(1034, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1026, FileId(1026, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1041, FileId(1041, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {1029, FileId(1029, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, false)}, + {2001, FileId(2001, 0, 0, 0, 0, execplan::CalpontSystemCatalog::VARCHAR, 0, 0, true)}, + {2004, FileId(2004, 0, 0, 0, 4, execplan::CalpontSystemCatalog::INT, 0, 0, true)}, +}; + void EMReBuilder::collectFileNames(const std::string& partialPath, std::string currentPath, std::vector& fileNames) { @@ -76,6 +120,7 @@ int32_t EMReBuilder::collectExtent(const std::string& fullFileName) uint32_t oid; uint32_t partition; uint32_t segment; + // Initialize oid, partition and segment from the given `fullFileName`. auto rc = WriteEngine::Convertor::fileName2Oid(fullFileName, oid, partition, segment); if (rc != 0) @@ -98,107 +143,144 @@ int32_t EMReBuilder::collectExtent(const std::string& fullFileName) return rc; } - // Read and verify header. - char fileHeader[compress::CompressInterface::HDR_BUF_LEN * 2]; - rc = fileOp.readHeaders(dbFile.get(), fileHeader); - if (rc != 0) + if (oid > 3000) { - // FIXME: If the file was created without a compression, it does not - // have a header block, so header verification fails in this case, - // currently we skip it, because we cannot deduce needed data to create - // a column extent from the blob file. - // Skip fileID from system catalog. - if (doVerbose() && oid > 3000) + // Read and verify header. + char fileHeader[compress::CompressInterface::HDR_BUF_LEN * 2]; + rc = fileOp.readHeaders(dbFile.get(), fileHeader); + if (rc != 0) { - std::cerr << "Cannot read file header from the file " << fullFileName - << ", probably this file was created without compression. " << std::endl; + if (doVerbose()) + { + std::cerr << "Cannot read file header from the file " << fullFileName + << ", probably this file was created without compression. " << std::endl; + } + return rc; } - return rc; - } - if (doVerbose()) - { - std::cout << "Processing file: " << fullFileName << std::endl; - std::cout << "fileName2Oid: [OID: " << oid << ", partition: " << partition << ", segment: " << segment - << "] " << std::endl; - } - - // Read the `colDataType` and `colWidth` from the given header. - const auto versionNumber = compress::CompressInterface::getVersionNumber(fileHeader); - // Verify header number. - if (versionNumber < 3) - { if (doVerbose()) { - std::cerr << "File version " << versionNumber << " is not supported. " << std::endl; + std::cout << "Processing file: " << fullFileName << std::endl; + std::cout << "fileName2Oid: [OID: " << oid << ", partition: " << partition << ", segment: " << segment + << "] " << std::endl; } - return -1; - } - auto colDataType = compress::CompressInterface::getColDataType(fileHeader); - auto colWidth = compress::CompressInterface::getColumnWidth(fileHeader); - auto blockCount = compress::CompressInterface::getBlockCount(fileHeader); - auto lbidCount = compress::CompressInterface::getLBIDCount(fileHeader); - auto compressionType = compress::CompressInterface::getCompressionType(fileHeader); - - if (colDataType == execplan::CalpontSystemCatalog::UNDEFINED) - { - if (doVerbose()) - std::cout << "File header has invalid data. " << std::endl; - - return -1; - } - - auto isDict = isDictFile(colDataType, colWidth); - if (isDict) - colWidth = 8; - - if (doVerbose()) - { - std::cout << "Searching for HWM... " << std::endl; - std::cout << "Block count: " << blockCount << std::endl; - } - - uint64_t hwm = 0; - rc = searchHWMInSegmentFile(fullFileName, oid, getDBRoot(), partition, segment, colDataType, colWidth, - blockCount, isDict, compressionType, hwm); - if (rc != 0) - return rc; - - if (doVerbose()) - std::cout << "HWM is: " << hwm << std::endl; - - const uint32_t extentMaxBlockCount = getEM().getExtentRows() * colWidth / BLOCK_SIZE; - // We found multiple extents per one segment file. - if (hwm >= extentMaxBlockCount) - { - for (uint32_t lbidIndex = 0; lbidIndex < lbidCount - 1; ++lbidIndex) + // Read the `colDataType` and `colWidth` from the given header. + const auto versionNumber = compress::CompressInterface::getVersionNumber(fileHeader); + // Verify header number. + if (versionNumber < 3) { - auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, lbidIndex); - FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, /*hwm*/ 0, isDict); + if (doVerbose()) + { + std::cerr << "File version " << versionNumber << " is not supported. " << std::endl; + } + return -1; + } + + auto colDataType = compress::CompressInterface::getColDataType(fileHeader); + auto colWidth = compress::CompressInterface::getColumnWidth(fileHeader); + auto blockCount = compress::CompressInterface::getBlockCount(fileHeader); + auto lbidCount = compress::CompressInterface::getLBIDCount(fileHeader); + auto compressionType = compress::CompressInterface::getCompressionType(fileHeader); + + if (colDataType == execplan::CalpontSystemCatalog::UNDEFINED) + { + if (doVerbose()) + std::cout << "File header has invalid data. " << std::endl; + + return -1; + } + + auto isDict = isDictFile(colDataType, colWidth); + if (isDict) + colWidth = 8; + + if (doVerbose()) + { + std::cout << "Searching for HWM... " << std::endl; + std::cout << "Block count: " << blockCount << std::endl; + } + + uint64_t hwm = 0; + rc = searchHWMInSegmentFile(fullFileName, oid, getDBRoot(), partition, segment, colDataType, colWidth, + blockCount, isDict, compressionType, hwm); + + if (rc != 0) + return rc; + + if (doVerbose()) + std::cout << "HWM is: " << hwm << std::endl; + + const uint32_t extentMaxBlockCount = getEM().getExtentRows() * colWidth / BLOCK_SIZE; + // We found multiple extents per one segment file. + if (hwm >= extentMaxBlockCount) + { + for (uint32_t lbidIndex = 0; lbidIndex < lbidCount - 1; ++lbidIndex) + { + auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, lbidIndex); + FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, /*hwm*/ 0, isDict); + extentMap.push_back(fileId); + } + + // Last one has an actual HWM. + auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, lbidCount - 1); + FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, hwm, isDict); extentMap.push_back(fileId); + + if (doVerbose()) + { + std::cout << "Found multiple extents per segment file " << std::endl; + std::cout << "FileId is collected " << fileId << std::endl; + } } - - // Last one has an actual HWM. - auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, lbidCount - 1); - FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, hwm, isDict); - extentMap.push_back(fileId); - - if (doVerbose()) + else { - std::cout << "Found multiple extents per segment file " << std::endl; - std::cout << "FileId is collected " << fileId << std::endl; + // One extent per segment file. + auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, 0); + FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, hwm, isDict); + extentMap.push_back(fileId); + + if (doVerbose()) + std::cout << "FileId is collected " << fileId << std::endl; } } else { - // One extent per segment file. - auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, 0); - FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, hwm, isDict); - extentMap.push_back(fileId); + const auto fileSize = dbFile->size(); + if (fileSize == -1) + { + std::cerr << "IDBDataFile::size failed for the file " << fullFileName << std::endl; + return -1; + } + + const uint64_t blockCount = fileSize / BLOCK_SIZE; + if (doVerbose()) + { + std::cout << "Searching for HWM for system catalog file..." << std::endl; + std::cout << "Block count: " << blockCount << std::endl; + } + + auto it = systemCatalogMap.find(oid); + if (it == systemCatalogMap.end()) + { + std::cout << "Cannot find a system extent for OID: " << oid << std::endl; + std::cout << "Continuing..." << std::endl; + return 0; + } + + FileId systemFileId = it->second; + uint64_t hwm = 0; + rc = searchHWMInSegmentFile(fullFileName, oid, getDBRoot(), systemFileId.partition, systemFileId.segment, + systemFileId.colDataType, systemFileId.colWidth, blockCount, + systemFileId.isDict, 0 /*=compressionType*/, hwm); + if (rc != 0) + return rc; if (doVerbose()) - std::cout << "FileId is collected " << fileId << std::endl; + std::cout << "HWM is: " << hwm << std::endl; + + systemFileId.hwm = hwm; + systemExtentMap.push_back(systemFileId); } return 0; @@ -207,8 +289,28 @@ int32_t EMReBuilder::collectExtent(const std::string& fullFileName) int32_t EMReBuilder::rebuildExtentMap() { if (doVerbose()) - { std::cout << "Build extent map with size " << extentMap.size() << std::endl; + + for (const auto& fileId : systemExtentMap) + { + if (fileId.hwm) + { + if (doVerbose()) + { + std::cout << "Setting a HWM for system file " << fileId << std::endl; + } + try + { + getEM().setLocalHWM(fileId.oid, fileId.partition, fileId.segment, fileId.hwm, false, true); + } + catch (std::exception& e) + { + getEM().undoChanges(); + std::cerr << "Cannot set local HWM: " << e.what() << std::endl; + return -1; + } + getEM().confirmChanges(); + } } // We have to restore extent map by restoring individual extent in order @@ -351,9 +453,8 @@ int32_t EMReBuilder::initializeSystemExtents() if (!doDisplay()) { if (doVerbose()) - { std::cout << "Initialize system extents from the initial state" << std::endl; - } + try { getEM().loadFromBinaryBlob(reinterpret_cast(BRM_saves_em_system_tables_blob)); @@ -364,19 +465,21 @@ int32_t EMReBuilder::initializeSystemExtents() return -1; } } + return 0; } ChunkManagerWrapper::ChunkManagerWrapper(const std::string& filename, uint32_t oid, uint32_t dbRoot, uint32_t partition, uint32_t segment, execplan::CalpontSystemCatalog::ColDataType colDataType, - uint32_t colWidth) + uint32_t colWidth, uint32_t compressionType) : oid(oid) , dbRoot(dbRoot) , partition(partition) , segment(segment) , colDataType(colDataType) , colWidth(colWidth) + , compressionType(compressionType) , size(colWidth) , pFileOp(nullptr) , fileName(filename) @@ -385,29 +488,41 @@ ChunkManagerWrapper::ChunkManagerWrapper(const std::string& filename, uint32_t o int32_t ChunkManagerWrapper::readBlock(uint32_t blockNumber) { - auto rc = chunkManager.readBlock(pFile, blockData, blockNumber); - if (rc != 0) - return rc; - return 0; + int32_t rc = 0; + + if (compressionType == 0) + rc = pFileOp->readDbBlocks(pFile, blockData, blockNumber, 1); + else + rc = chunkManager.readBlock(pFile, blockData, blockNumber); + + return rc; } ChunkManagerWrapperColumn::ChunkManagerWrapperColumn(const std::string& filename, uint32_t oid, uint32_t dbRoot, uint32_t partition, uint32_t segment, execplan::CalpontSystemCatalog::ColDataType colDataType, uint32_t colWidth, uint32_t compressionType) - : ChunkManagerWrapper(filename, oid, dbRoot, partition, segment, colDataType, colWidth) + : ChunkManagerWrapper(filename, oid, dbRoot, partition, segment, colDataType, colWidth, compressionType) { - pFileOp = - std::unique_ptr(new WriteEngine::ColumnOpCompress1(compressionType)); - chunkManager.fileOp(pFileOp.get()); - // Open compressed column segment file. We will read block by block - // from the compressed chunks. - pFile = chunkManager.getFilePtrByName(fileName, oid, dbRoot, partition, segment, colDataType, colWidth, - "rb", size, false, false); - if (!pFile) + if (compressionType == 0) { - throw std::bad_alloc(); + pFileOp = std::unique_ptr(new WriteEngine::DbFileOp()); + pFile = IDBDataFile::open(IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename.c_str(), "rb", + colWidth); } + else + { + pFileOp = + std::unique_ptr(new WriteEngine::ColumnOpCompress1(compressionType)); + chunkManager.fileOp(pFileOp.get()); + // Open compressed column segment file. We will read block by block + // from the compressed chunks. + pFile = chunkManager.getFilePtrByName(fileName, oid, dbRoot, partition, segment, colDataType, colWidth, + "rb", size, false, false); + } + + if (!pFile) + throw std::bad_alloc(); emptyValue = pFileOp->getEmptyRowValue(colDataType, colWidth); midOffset = (WriteEngine::BYTE_PER_BLOCK / 2); @@ -450,17 +565,27 @@ ChunkManagerWrapperDict::ChunkManagerWrapperDict(const std::string& filename, ui uint32_t partition, uint32_t segment, execplan::CalpontSystemCatalog::ColDataType colDataType, uint32_t colWidth, uint32_t compressionType) - : ChunkManagerWrapper(filename, oid, dbRoot, partition, segment, colDataType, colWidth) + : ChunkManagerWrapper(filename, oid, dbRoot, partition, segment, colDataType, colWidth, compressionType) { - pFileOp = std::unique_ptr(new WriteEngine::DctnryCompress1(compressionType)); - chunkManager.fileOp(pFileOp.get()); - // Open compressed dict segment file. - pFile = chunkManager.getFilePtrByName(fileName, oid, dbRoot, partition, segment, colDataType, colWidth, - "rb", size, false, true); - if (!pFile) + + if (compressionType == 0) { - throw std::bad_alloc(); + pFileOp = std::unique_ptr(new WriteEngine::DbFileOp()); + pFile = IDBDataFile::open(IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename.c_str(), "rb", + colWidth); } + else + { + pFileOp = + std::unique_ptr(new WriteEngine::DctnryCompress1(compressionType)); + chunkManager.fileOp(pFileOp.get()); + // Open compressed dict segment file. + pFile = chunkManager.getFilePtrByName(fileName, oid, dbRoot, partition, segment, colDataType, colWidth, + "rb", size, false, true); + } + + if (!pFile) + throw std::bad_alloc(); auto dictBlockHeaderSize = WriteEngine::HDR_UNIT_SIZE + WriteEngine::NEXT_PTR_BYTES + WriteEngine::HDR_UNIT_SIZE + WriteEngine::HDR_UNIT_SIZE; diff --git a/tools/rebuildEM/rebuildEM.h b/tools/rebuildEM/rebuildEM.h index 6cbe2fcfd..3cf129133 100644 --- a/tools/rebuildEM/rebuildEM.h +++ b/tools/rebuildEM/rebuildEM.h @@ -31,6 +31,7 @@ #include "we_fileop.h" #include "IDBPolicy.h" #include "we_chunkmanager.h" +#include "we_dbfileop.h" using namespace idbdatafile; @@ -157,6 +158,7 @@ class EMReBuilder bool display; uint32_t dbRoot; BRM::ExtentMap em; + std::vector systemExtentMap; std::vector extentMap; }; @@ -167,7 +169,7 @@ class ChunkManagerWrapper public: ChunkManagerWrapper(const std::string& filename, uint32_t oid, uint32_t dbRoot, uint32_t partition, uint32_t segment, execplan::CalpontSystemCatalog::ColDataType colDataType, - uint32_t colWidth); + uint32_t colWidth, uint32_t compressionType); virtual ~ChunkManagerWrapper() = default; ChunkManagerWrapper(const ChunkManagerWrapper& other) = delete; @@ -189,8 +191,9 @@ class ChunkManagerWrapper uint32_t segment; execplan::CalpontSystemCatalog::ColDataType colDataType; uint32_t colWidth; + uint32_t compressionType; int32_t size; - std::unique_ptr pFileOp; + std::unique_ptr pFileOp; std::string fileName; // Note: We cannot clear this pointer directly, because // `ChunkManager` closes this file for us, otherwise we will get double @@ -208,7 +211,13 @@ class ChunkManagerWrapperColumn : public ChunkManagerWrapper uint32_t segment, execplan::CalpontSystemCatalog::ColDataType colDataType, uint32_t colWidth, uint32_t compressionType); - ~ChunkManagerWrapperColumn() = default; + ~ChunkManagerWrapperColumn() + { + // In case we open file without `ChunkManager` machinery. + if (!compressionType && pFile) + delete pFile; + }; + ChunkManagerWrapperColumn(const ChunkManagerWrapperColumn& other) = delete; ChunkManagerWrapperColumn& operator=(const ChunkManagerWrapperColumn& other) = delete; ChunkManagerWrapperColumn(ChunkManagerWrapperColumn&& other) = delete;