You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
[MCOL-5106] Add support to work with StorageManager.
This patch eliminates boost::filesystem from `mcsRebuildEM` tool. After this change we should be able to work with any filesystem even S3.
This commit is contained in:
@@ -16,7 +16,6 @@
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
#include <iostream>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <stdint.h>
|
||||
|
||||
#include "rebuildEM.h"
|
||||
@@ -36,16 +35,36 @@ using namespace idbdatafile;
|
||||
|
||||
namespace RebuildExtentMap
|
||||
{
|
||||
int32_t EMReBuilder::collectExtents(const string& dbRootPath)
|
||||
void EMReBuilder::collectFileNames(const std::string& partialPath, std::string currentPath,
|
||||
std::vector<std::string>& fileNames)
|
||||
{
|
||||
currentPath.append(partialPath);
|
||||
|
||||
std::list<std::string> partialPathes;
|
||||
IDBPolicy::listDirectory(currentPath.c_str(), partialPathes);
|
||||
if (partialPathes.size() == 0)
|
||||
{
|
||||
fileNames.push_back(currentPath);
|
||||
return;
|
||||
}
|
||||
|
||||
currentPath.push_back('/');
|
||||
for (const auto& partialPath : partialPathes)
|
||||
collectFileNames(partialPath, currentPath, fileNames);
|
||||
}
|
||||
|
||||
int32_t EMReBuilder::collectExtents(const std::string& dbRootPath)
|
||||
{
|
||||
if (doVerbose())
|
||||
{
|
||||
std::cout << "Collect extents for the DBRoot " << dbRootPath << std::endl;
|
||||
}
|
||||
|
||||
for (boost::filesystem::recursive_directory_iterator dirIt(dbRootPath), dirEnd; dirIt != dirEnd; ++dirIt)
|
||||
std::vector<std::string> fileNames;
|
||||
collectFileNames(dbRootPath, "", fileNames);
|
||||
for (const auto& fileName : fileNames)
|
||||
{
|
||||
(void)collectExtent(dirIt->path().string());
|
||||
(void)collectExtent(fileName);
|
||||
}
|
||||
|
||||
return 0;
|
||||
@@ -141,8 +160,8 @@ int32_t EMReBuilder::collectExtent(const std::string& fullFileName)
|
||||
}
|
||||
|
||||
uint64_t hwm = 0;
|
||||
rc = searchHWMInSegmentFile(oid, getDBRoot(), partition, segment, colDataType, colWidth, blockCount, isDict,
|
||||
compressionType, hwm);
|
||||
rc = searchHWMInSegmentFile(fullFileName, oid, getDBRoot(), partition, segment, colDataType, colWidth,
|
||||
blockCount, isDict, compressionType, hwm);
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
|
||||
@@ -156,13 +175,13 @@ int32_t EMReBuilder::collectExtent(const std::string& fullFileName)
|
||||
for (uint32_t lbidIndex = 0; lbidIndex < lbidCount - 1; ++lbidIndex)
|
||||
{
|
||||
auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, lbidIndex);
|
||||
FileId fileId(oid, partition, segment, colWidth, colDataType, lbid, /*hwm*/ 0, isDict);
|
||||
FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, /*hwm*/ 0, isDict);
|
||||
extentMap.push_back(fileId);
|
||||
}
|
||||
|
||||
// Last one has an actual HWM.
|
||||
auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, lbidCount - 1);
|
||||
FileId fileId(oid, partition, segment, colWidth, colDataType, lbid, hwm, isDict);
|
||||
FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, hwm, isDict);
|
||||
extentMap.push_back(fileId);
|
||||
|
||||
if (doVerbose())
|
||||
@@ -175,7 +194,7 @@ int32_t EMReBuilder::collectExtent(const std::string& fullFileName)
|
||||
{
|
||||
// One extent per segment file.
|
||||
auto lbid = compress::CompressInterface::getLBIDByIndex(fileHeader, 0);
|
||||
FileId fileId(oid, partition, segment, colWidth, colDataType, lbid, hwm, isDict);
|
||||
FileId fileId(oid, partition, segment, getDBRoot(), colWidth, colDataType, lbid, hwm, isDict);
|
||||
extentMap.push_back(fileId);
|
||||
|
||||
if (doVerbose())
|
||||
@@ -213,14 +232,14 @@ int32_t EMReBuilder::rebuildExtentMap()
|
||||
{
|
||||
// Create a dictionary extent for the given oid, partition,
|
||||
// segment, dbroot.
|
||||
getEM().createDictStoreExtent(fileId.oid, getDBRoot(), fileId.partition, fileId.segment, lbid,
|
||||
getEM().createDictStoreExtent(fileId.oid, fileId.dbroot, fileId.partition, fileId.segment, lbid,
|
||||
allocdSize);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Create a column extent for the given oid, partition,
|
||||
// segment, dbroot and column width.
|
||||
getEM().createColumnExtentExactFile(fileId.oid, fileId.colWidth, getDBRoot(), fileId.partition,
|
||||
getEM().createColumnExtentExactFile(fileId.oid, fileId.colWidth, fileId.dbroot, fileId.partition,
|
||||
fileId.segment, fileId.colDataType, lbid, allocdSize,
|
||||
startBlockOffset);
|
||||
}
|
||||
@@ -263,8 +282,8 @@ int32_t EMReBuilder::rebuildExtentMap()
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t EMReBuilder::searchHWMInSegmentFile(uint32_t oid, uint32_t dbRoot, uint32_t partition,
|
||||
uint32_t segment,
|
||||
int32_t EMReBuilder::searchHWMInSegmentFile(const std::string& fullFileName, uint32_t oid, uint32_t dbRoot,
|
||||
uint32_t partition, uint32_t segment,
|
||||
execplan::CalpontSystemCatalog::ColDataType colDataType,
|
||||
uint32_t colWidth, uint64_t blockCount, bool isDict,
|
||||
uint32_t compressionType, uint64_t& hwm)
|
||||
@@ -275,12 +294,12 @@ int32_t EMReBuilder::searchHWMInSegmentFile(uint32_t oid, uint32_t dbRoot, uint3
|
||||
if (isDict)
|
||||
{
|
||||
chunkManagerWrapper = std::unique_ptr<ChunkManagerWrapperDict>(new ChunkManagerWrapperDict(
|
||||
oid, dbRoot, partition, segment, colDataType, colWidth, compressionType));
|
||||
fullFileName, oid, dbRoot, partition, segment, colDataType, colWidth, compressionType));
|
||||
}
|
||||
else
|
||||
{
|
||||
chunkManagerWrapper = std::unique_ptr<ChunkManagerWrapperColumn>(new ChunkManagerWrapperColumn(
|
||||
oid, dbRoot, partition, segment, colDataType, colWidth, compressionType));
|
||||
fullFileName, oid, dbRoot, partition, segment, colDataType, colWidth, compressionType));
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
@@ -348,7 +367,8 @@ int32_t EMReBuilder::initializeSystemExtents()
|
||||
return 0;
|
||||
}
|
||||
|
||||
ChunkManagerWrapper::ChunkManagerWrapper(uint32_t oid, uint32_t dbRoot, uint32_t partition, uint32_t segment,
|
||||
ChunkManagerWrapper::ChunkManagerWrapper(const std::string& filename, uint32_t oid, uint32_t dbRoot,
|
||||
uint32_t partition, uint32_t segment,
|
||||
execplan::CalpontSystemCatalog::ColDataType colDataType,
|
||||
uint32_t colWidth)
|
||||
: oid(oid)
|
||||
@@ -359,6 +379,7 @@ ChunkManagerWrapper::ChunkManagerWrapper(uint32_t oid, uint32_t dbRoot, uint32_t
|
||||
, colWidth(colWidth)
|
||||
, size(colWidth)
|
||||
, pFileOp(nullptr)
|
||||
, fileName(filename)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -370,19 +391,19 @@ int32_t ChunkManagerWrapper::readBlock(uint32_t blockNumber)
|
||||
return 0;
|
||||
}
|
||||
|
||||
ChunkManagerWrapperColumn::ChunkManagerWrapperColumn(uint32_t oid, uint32_t dbRoot, uint32_t partition,
|
||||
uint32_t segment,
|
||||
ChunkManagerWrapperColumn::ChunkManagerWrapperColumn(const std::string& filename, uint32_t oid,
|
||||
uint32_t dbRoot, uint32_t partition, uint32_t segment,
|
||||
execplan::CalpontSystemCatalog::ColDataType colDataType,
|
||||
uint32_t colWidth, uint32_t compressionType)
|
||||
: ChunkManagerWrapper(oid, dbRoot, partition, segment, colDataType, colWidth)
|
||||
: ChunkManagerWrapper(filename, oid, dbRoot, partition, segment, colDataType, colWidth)
|
||||
{
|
||||
pFileOp =
|
||||
std::unique_ptr<WriteEngine::ColumnOpCompress1>(new WriteEngine::ColumnOpCompress1(compressionType));
|
||||
chunkManager.fileOp(pFileOp.get());
|
||||
// Open compressed column segment file. We will read block by block
|
||||
// from the compressed chunks.
|
||||
pFile = chunkManager.getSegmentFilePtr(oid, dbRoot, partition, segment, colDataType, colWidth, fileName,
|
||||
"rb", size, false, false);
|
||||
pFile = chunkManager.getFilePtrByName(fileName, oid, dbRoot, partition, segment, colDataType, colWidth,
|
||||
"rb", size, false, false);
|
||||
if (!pFile)
|
||||
{
|
||||
throw std::bad_alloc();
|
||||
@@ -425,17 +446,17 @@ bool ChunkManagerWrapperColumn::isEmptyValue(const uint8_t* value) const
|
||||
return false;
|
||||
}
|
||||
|
||||
ChunkManagerWrapperDict::ChunkManagerWrapperDict(uint32_t oid, uint32_t dbRoot, uint32_t partition,
|
||||
uint32_t segment,
|
||||
ChunkManagerWrapperDict::ChunkManagerWrapperDict(const std::string& filename, uint32_t oid, uint32_t dbRoot,
|
||||
uint32_t partition, uint32_t segment,
|
||||
execplan::CalpontSystemCatalog::ColDataType colDataType,
|
||||
uint32_t colWidth, uint32_t compressionType)
|
||||
: ChunkManagerWrapper(oid, dbRoot, partition, segment, colDataType, colWidth)
|
||||
: ChunkManagerWrapper(filename, oid, dbRoot, partition, segment, colDataType, colWidth)
|
||||
{
|
||||
pFileOp = std::unique_ptr<WriteEngine::DctnryCompress1>(new WriteEngine::DctnryCompress1(compressionType));
|
||||
chunkManager.fileOp(pFileOp.get());
|
||||
// Open compressed dict segment file.
|
||||
pFile = chunkManager.getSegmentFilePtr(oid, dbRoot, partition, segment, colDataType, colWidth, fileName,
|
||||
"rb", size, false, true);
|
||||
pFile = chunkManager.getFilePtrByName(fileName, oid, dbRoot, partition, segment, colDataType, colWidth,
|
||||
"rb", size, false, true);
|
||||
if (!pFile)
|
||||
{
|
||||
throw std::bad_alloc();
|
||||
|
Reference in New Issue
Block a user