1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

MCOL-4566: Add file2Oid function.

* This patch adds file2Oid function. This function is needed
  to map ColumnStore file name to an oid, partition and segment.
* Tests added to check that this function works correctly.
* This patch is related to MCOL-4566, so it adds a new file with GTests.

Note: The description for the functions follows the description style
in the current file.
This commit is contained in:
Denis Khalikov
2021-03-04 22:13:03 +03:00
parent 2d6d8b901e
commit 797716ef13
4 changed files with 439 additions and 2 deletions

View File

@ -22,8 +22,10 @@
/** @file */
#include <unistd.h>
#include <fnmatch.h>
#include <limits>
#include <cstring>
#include <vector>
#ifdef _MSC_VER
#include <cstdio>
#endif
@ -36,6 +38,14 @@ using namespace execplan;
namespace
{
const char DATE_TIME_FORMAT[] = "%04d-%02d-%02d %02d:%02d:%02d";
// ColumnStore file `full file name` format.
const char CS_FULL_FILENAME_FORMAT[] =
"*[0-9][0-9][0-9].dir/[0-9][0-9][0-9].dir/[[0-9][0-9][0-9].dir/"
"[0-9][0-9][0-9].dir/[0-9][0-9][0-9].dir/FILE[0-9][0-9][0-9].cdf";
// ColumnStore file `directory name` format.
const char CS_DIR_FORMAT[] = "[0-9][0-9][0-9].dir";
// ColumnStore file `file name` format.
const char CS_FILE_FORMAT[] = "FILE[0-9][0-9][0-9].cdf";
/*******************************************************************************
* DESCRIPTION:
@ -92,6 +102,75 @@ int _doFile(char* pBuffer, int blen, unsigned char val)
return rc;
}
/*******************************************************************************
* DESCRIPTION:
* Takes a buffer in ColumnStore `directory` format and converts it to an
* integer.
* PARAMETERS:
* buffer(input) - a pointer to the input buffer.
val (output) - converted integer.
* RETURN:
* 0 is returned on success, -1 is returned on error.
******************************************************************************/
int32_t _fromDir(const char* buffer, uint32_t& val)
{
int32_t rc = -1;
// Number length in characters.
const uint32_t numberLen = 3;
// Check that buffer is in the correct `directory` format.
if (buffer && (fnmatch(CS_DIR_FORMAT, buffer, 0) == 0))
{
char num[numberLen + 1];
strncpy(num, buffer, numberLen);
num[numberLen] = '\0';
val = atoi(num);
// The number cannot exceed 0xff.
if (val < 256)
{
rc = 0;
}
}
return rc;
}
/*******************************************************************************
* DESCRIPTION:
* Takes a buffer in ColumnStore `file` format and converts it to an
* integer.
* PARAMETERS:
* buffer(input) - a pointer to the input buffer.
val (output) - converted integer.
* RETURN:
* 0 is returned on success, -1 is returned on error.
******************************************************************************/
int32_t _fromFile(const char* buffer, uint32_t& val)
{
int32_t rc = -1;
// Offset from the beggining e.g. `FILE000.cdf`.
const uint32_t offset = 4;
// Number length in characters.
const uint32_t numberLen = 3;
// Check that buffer is in the correct `file` format.
if (buffer && (fnmatch(CS_FILE_FORMAT, buffer, 0) == 0))
{
char num[numberLen + 1];
strncpy(num, buffer + offset, numberLen);
num[numberLen] = '\0';
val = atoi(num);
// The number cannot exceed 0xff.
if (val < 256)
{
rc = 0;
}
}
return rc;
}
}
namespace WriteEngine
@ -285,7 +364,133 @@ int Convertor::oid2FileName(FID fid,
return NO_ERROR;
}
/*******************************************************************************
* DESCRIPTION:
* Convert the given filename to an oid, segment and partition.
* PARAMETERS:
* fullFileName INPUT -- filename.
* oid OUTPUT -- oid number from the given filename.
* partition OUTPUT -- partition number from the given filename.
* segment OUTPUT -- segment number from the given fielname.
* RETURN:
* NO_ERROR if success, other if fail.
******************************************************************************/
int Convertor::fileName2Oid(const std::string& fullFileName, uint32_t& oid,
uint32_t& partition, uint32_t& segment)
{
// ColumnStore file directory separator.
const char dirSep = '/';
// The number of the directories in the ColumnStore file name.
// Note: without `DBRoot` directory.
const uint32_t dirNamesMaxSize = 6;
const uint32_t fullFileNameLen = fullFileName.size();
// Verify the given `fullFileName`.
if (!fullFileNameLen ||
// If not match `fnmatch` returns a result code which is not equal to
// zero.
// TODO: Probably we should use `std::regex_match`, but currently
// there are still parts of code which use legacy `char *`, so
// `std::regex_match` is not applicable there without creating
// additional `std::string` from `char *`.
fnmatch(CS_FULL_FILENAME_FORMAT, fullFileName.c_str(), 0))
{
return -1;
}
std::vector<std::string> dirNames;
// We need exact 6 instances.
dirNames.reserve(6);
uint32_t end = fullFileNameLen;
// Signed integer for `index` since it could be less than zero.
int32_t index = fullFileNameLen - 1;
// Iterate over `fullFileName` starting from the end and split it by
// directory separator. Since we starting from the end we need just 6
// instances to match ColumnStore file name format specification.
while (index >= 0 && dirNames.size() < dirNamesMaxSize)
{
while (index >= 0 && fullFileName[index] != dirSep)
{
--index;
}
// Begin is a `dirSep` index + 1.
uint32_t begin = index + 1;
const uint32_t dirNameLen = end - begin;
// We already checked the `fullFileName` format,
// but this check is only intended to make sure that this algo works
// correctly on any input, if something changes.
if (dirNameLen > 0 && dirNameLen < MAX_DB_DIR_NAME_SIZE)
{
dirNames.push_back(fullFileName.substr(begin, dirNameLen));
}
else
{
// Something wrong with filename, just return an error.
return -1;
}
// Set `end` to the last directory separator index.
end = index;
// Skip current directory separator.
--index;
}
// Make sure we parsed 6 instances.
if (dirNames.size() != 6)
{
return -1;
}
// Initialize `dmFilePathArgs_t` struct.
dmFilePathArgs_t args;
char aBuff[MAX_DB_DIR_NAME_SIZE];
char bBuff[MAX_DB_DIR_NAME_SIZE];
char cBuff[MAX_DB_DIR_NAME_SIZE];
char dBuff[MAX_DB_DIR_NAME_SIZE];
char eBuff[MAX_DB_DIR_NAME_SIZE];
char fnBuff[MAX_DB_DIR_NAME_SIZE];
args.pDirA = aBuff;
args.pDirB = bBuff;
args.pDirC = cBuff;
args.pDirD = dBuff;
args.pDirE = eBuff;
args.pFName = fnBuff;
args.ALen = sizeof(aBuff);
args.BLen = sizeof(bBuff);
args.CLen = sizeof(cBuff);
args.DLen = sizeof(dBuff);
args.ELen = sizeof(eBuff);
args.FNLen = sizeof(fnBuff);
args.Arc = 0;
args.Brc = 0;
args.Crc = 0;
args.Drc = 0;
args.Erc = 0;
args.FNrc = 0;
// Populate `dmFilePathArgs_t` struct with the given names.
strcpy(args.pFName, dirNames[0].c_str());
strcpy(args.pDirE, dirNames[1].c_str());
strcpy(args.pDirD, dirNames[2].c_str());
strcpy(args.pDirC, dirNames[3].c_str());
strcpy(args.pDirB, dirNames[4].c_str());
strcpy(args.pDirA, dirNames[5].c_str());
// FIXME: Currently used ERR_DM_CONVERT_OID, should we introduce new error
// code?
RETURN_ON_WE_ERROR(dmFPath2Oid(&args, oid, partition, segment),
ERR_DM_CONVERT_OID);
return NO_ERROR;
}
/*******************************************************************************
* DESCRIPTION:
* Map specified errno to the associated error message string.
@ -923,5 +1128,69 @@ int Convertor::dmOid2FPath(uint32_t oid, uint32_t partition, uint32_t segment,
return 0;
}
/*******************************************************************************
* DESCRIPTION:
* Converts populated `dmFilePathArgs_t` struct to an oid, partition,
* and segment.
*
* PARAMETERS:
* pArgs INPUT -- a pointer to `dmFilePathArgs_t` struct.
* oid OUTPUT -- oid for the given file name.
* partition OUTPUT -- partition for the given file name.
* segment OUTPUT -- segment for the given filename.
*
* RETURN:
* return 0 if everything went OK. -1 if an error occured.
******************************************************************************/
int32_t Convertor::dmFPath2Oid(dmFilePathArgs_t* pArgs, uint32_t& oid,
uint32_t& partition, uint32_t& segment)
{
uint32_t val = 0;
// OID.
// Directory A.
oid = 0;
if ((pArgs->Arc = _fromDir(pArgs->pDirA, val)) == -1)
{
return -1;
}
oid = val << 24;
// Directory B.
if ((pArgs->Brc = _fromDir(pArgs->pDirB, val)) == -1)
{
return -1;
}
oid |= val << 16;
// Directory C.
if ((pArgs->Crc = _fromDir(pArgs->pDirC, val)) == -1)
{
return -1;
}
oid |= val << 8;
// Directory D.
if ((pArgs->Drc = _fromDir(pArgs->pDirD, val)) == -1)
{
return -1;
}
oid |= val;
// Partition.
if ((pArgs->Erc = _fromDir(pArgs->pDirE, partition)) == -1)
{
return -1;
}
// Segment.
if ((pArgs->FNrc = _fromFile(pArgs->pFName, segment)) == -1)
{
return -1;
}
return 0;
}
} //end of namespace