1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Update on review comments

This commit is contained in:
Denis Khalikov
2023-11-01 14:44:57 +03:00
committed by Leonid Fedorov
parent 865cca11c9
commit 0747099456
4 changed files with 64 additions and 61 deletions

View File

@ -8,18 +8,21 @@
#include <fstream> #include <fstream>
#include <unistd.h> #include <unistd.h>
using ReadableFileSharedPtr = std::shared_ptr<arrow::io::ReadableFile>;
enum STATUS_CODE enum STATUS_CODE
{ {
NO_ERROR, NO_ERROR,
EMPTY_FIELD, EMPTY_FIELD,
UNSUPPORTED_DATA_TYPE, UNSUPPORTED_DATA_TYPE,
UNSUPPORTED_FILE_TYPE, UNSUPPORTED_FILE_TYPE,
FILE_NUM_ERROR FILE_NUM_ERROR,
IO_ERROR
}; };
/** /**
* print the usage information * print the usage information
*/ */
static void usage() static void usage()
{ {
std::cout << "usage: " << std::endl; std::cout << "usage: " << std::endl;
@ -29,20 +32,29 @@ static void usage()
/** /**
* get the schema of the parquet file * get the schema of the parquet file
*/ */
void getSchema(std::string filePath, std::shared_ptr<arrow::Schema>* parquetSchema) int getSchema(std::string filePath, std::shared_ptr<arrow::Schema>* parquetSchema)
{ {
std::shared_ptr<arrow::io::ReadableFile> infile; try
PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(filePath, arrow::default_memory_pool())); {
std::unique_ptr<parquet::arrow::FileReader> reader; ReadableFileSharedPtr infile;
PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(filePath, arrow::default_memory_pool()));
PARQUET_THROW_NOT_OK(reader->GetSchema(parquetSchema)); std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(infile->Close()); PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
PARQUET_THROW_NOT_OK(reader->GetSchema(parquetSchema));
PARQUET_THROW_NOT_OK(infile->Close());
}
catch (...)
{
std::cerr << "Error while calling `getSchema` for the filepath " << filePath << std::endl;
return IO_ERROR;
}
return NO_ERROR;
} }
/** /**
* convert arrow data type id to corresponding columnstore type string * convert arrow data type id to corresponding columnstore type string
*/ */
int convert2mcs(std::shared_ptr<arrow::DataType> dataType, arrow::Type::type typeId, std::string& colType) int convert2mcs(std::shared_ptr<arrow::DataType> dataType, arrow::Type::type typeId, std::string& colType)
{ {
switch (typeId) switch (typeId)
@ -116,7 +128,8 @@ int convert2mcs(std::shared_ptr<arrow::DataType> dataType, arrow::Type::type typ
} }
case arrow::Type::type::FIXED_SIZE_BINARY: case arrow::Type::type::FIXED_SIZE_BINARY:
{ {
std::shared_ptr<arrow::FixedSizeBinaryType> fType = std::static_pointer_cast<arrow::FixedSizeBinaryType>(dataType); std::shared_ptr<arrow::FixedSizeBinaryType> fType =
std::static_pointer_cast<arrow::FixedSizeBinaryType>(dataType);
int byteWidth = fType->byte_width(); int byteWidth = fType->byte_width();
colType = "CHAR(" + std::to_string(byteWidth) + ")"; colType = "CHAR(" + std::to_string(byteWidth) + ")";
break; break;
@ -157,7 +170,7 @@ int convert2mcs(std::shared_ptr<arrow::DataType> dataType, arrow::Type::type typ
colType = "TIME(6)"; colType = "TIME(6)";
else else
return UNSUPPORTED_DATA_TYPE; return UNSUPPORTED_DATA_TYPE;
break; break;
} }
case arrow::Type::type::DECIMAL128: case arrow::Type::type::DECIMAL128:
@ -179,14 +192,16 @@ int convert2mcs(std::shared_ptr<arrow::DataType> dataType, arrow::Type::type typ
/** /**
* main function to generate DDL file * main function to generate DDL file
*/ */
int generateDDL(std::string filePath, std::string targetPath, std::string tableName) int generateDDL(std::string filePath, std::string targetPath, std::string tableName)
{ {
std::shared_ptr<arrow::Schema> parquetSchema; std::shared_ptr<arrow::Schema> parquetSchema;
getSchema(filePath, &parquetSchema); int rc = getSchema(filePath, &parquetSchema);
if (rc != NO_ERROR)
return rc;
std::vector<std::string> parquetCols; std::vector<std::string> parquetCols;
std::vector<std::string> parquetTypes; std::vector<std::string> parquetTypes;
int rc = NO_ERROR;
int fieldsNum = parquetSchema->num_fields(); int fieldsNum = parquetSchema->num_fields();
if (fieldsNum == 0) if (fieldsNum == 0)
@ -217,7 +232,7 @@ int generateDDL(std::string filePath, std::string targetPath, std::string tableN
for (int i = 0; i < fieldsNum; i++) for (int i = 0; i < fieldsNum; i++)
{ {
str1 += parquetCols[i] + " " + parquetTypes[i] + (i == fieldsNum-1 ? "\n" : ",\n"); str1 += parquetCols[i] + " " + parquetTypes[i] + (i == fieldsNum - 1 ? "\n" : ",\n");
} }
str1 += str2; str1 += str2;
@ -259,9 +274,8 @@ int main(int argc, char** argv)
// check file extension // check file extension
std::string::size_type endBase = ddlFile.rfind('.'); std::string::size_type endBase = ddlFile.rfind('.');
std::string::size_type endBase1 = parquetFile.rfind('.'); std::string::size_type endBase1 = parquetFile.rfind('.');
if (endBase == std::string::npos || endBase1 == std::string::npos || if (endBase == std::string::npos || endBase1 == std::string::npos ||
parquetFile.substr(endBase1 + 1) != "parquet" || parquetFile.substr(endBase1 + 1) != "parquet" || ddlFile.substr(endBase + 1) != "ddl")
ddlFile.substr(endBase + 1) != "ddl")
{ {
std::cout << "File type not supported" << std::endl; std::cout << "File type not supported" << std::endl;
usage(); usage();

View File

@ -107,7 +107,6 @@ void TableInfo::sleepMS(long ms)
abs_ts.tv_sec = rm_ts.tv_sec; abs_ts.tv_sec = rm_ts.tv_sec;
abs_ts.tv_nsec = rm_ts.tv_nsec; abs_ts.tv_nsec = rm_ts.tv_nsec;
} while (nanosleep(&abs_ts, &rm_ts) < 0); } while (nanosleep(&abs_ts, &rm_ts) < 0);
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -156,7 +155,7 @@ TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID, const string& processN
, fRejectErrCnt(0) , fRejectErrCnt(0)
, fExtentStrAlloc(tableOID, logger) , fExtentStrAlloc(tableOID, logger)
, fOamCachePtr(oam::OamCache::makeOamCache()) , fOamCachePtr(oam::OamCache::makeOamCache())
, fParquetReader(NULL) , fParquetReader(nullptr)
, fReader(nullptr) , fReader(nullptr)
{ {
fBuffers.clear(); fBuffers.clear();
@ -271,8 +270,8 @@ int TableInfo::readTableData()
{ {
RID validTotalRows = 0; RID validTotalRows = 0;
RID totalRowsPerInputFile = 0; RID totalRowsPerInputFile = 0;
int64_t totalRowsParquet = 0; // totalRowsParquet to be used in later function int64_t totalRowsParquet = 0; // totalRowsParquet to be used in later function
// needs int64_t type // needs int64_t type
int filesTBProcessed = fLoadFileList.size(); int filesTBProcessed = fLoadFileList.size();
int fileCounter = 0; int fileCounter = 0;
unsigned long long qtSentAt = 0; unsigned long long qtSentAt = 0;
@ -308,7 +307,6 @@ int TableInfo::readTableData()
} }
fileCounter++; fileCounter++;
} }
} }
timeval readStart; timeval readStart;
@ -562,7 +560,8 @@ int TableInfo::readTableData()
fCurrentReadBuffer = (fCurrentReadBuffer + 1) % fReadBufCount; fCurrentReadBuffer = (fCurrentReadBuffer + 1) % fReadBufCount;
// bufferCount++; // bufferCount++;
if ((fHandle && feof(fHandle)) || (fReadFromS3 && (fS3ReadLength == fS3ParseLength)) || (totalRowsPerInputFile == (RID)totalRowsParquet)) if ((fHandle && feof(fHandle)) || (fReadFromS3 && (fS3ReadLength == fS3ParseLength)) ||
(totalRowsPerInputFile == (RID)totalRowsParquet))
{ {
timeval readFinished; timeval readFinished;
gettimeofday(&readFinished, NULL); gettimeofday(&readFinished, NULL);
@ -1147,8 +1146,7 @@ int TableInfo::getColumnForParse(const int& id, const int& bufferId, bool report
if (report) if (report)
{ {
oss << " ----- " << pthread_self() << ":fBuffers[" << bufferId << oss << " ----- " << pthread_self() << ":fBuffers[" << bufferId << "]: (colLocker,status,lasttime)- ";
"]: (colLocker,status,lasttime)- ";
} }
// @bug2099- // @bug2099-
@ -1232,8 +1230,8 @@ bool TableInfo::bufferReadyForParse(const int& bufferId, bool report) const
ostringstream oss; ostringstream oss;
string bufStatusStr; string bufStatusStr;
ColumnInfo::convertStatusToString(stat, bufStatusStr); ColumnInfo::convertStatusToString(stat, bufStatusStr);
oss << " --- " << pthread_self() << oss << " --- " << pthread_self() << ":fBuffers[" << bufferId << "]=" << bufStatusStr << " (" << stat
":fBuffers[" << bufferId << "]=" << bufStatusStr << " (" << stat << ")" << std::endl; << ")" << std::endl;
cout << oss.str(); cout << oss.str();
} }
@ -1249,7 +1247,6 @@ bool TableInfo::bufferReadyForParse(const int& bufferId, bool report) const
int TableInfo::initializeBuffers(int noOfBuffers, const JobFieldRefList& jobFieldRefList, int TableInfo::initializeBuffers(int noOfBuffers, const JobFieldRefList& jobFieldRefList,
unsigned int fixedBinaryRecLen) unsigned int fixedBinaryRecLen)
{ {
fReadBufCount = noOfBuffers; fReadBufCount = noOfBuffers;
// initialize and populate the buffer vector. // initialize and populate the buffer vector.
@ -1292,8 +1289,7 @@ void TableInfo::addColumn(ColumnInfo* info)
fExtentStrAlloc.addColumn(info->column.mapOid, info->column.width, info->column.dataType); fExtentStrAlloc.addColumn(info->column.mapOid, info->column.width, info->column.dataType);
} }
int TableInfo::openTableFileParquet(int64_t& totalRowsParquet)
int TableInfo::openTableFileParquet(int64_t &totalRowsParquet)
{ {
if (fParquetReader != NULL) if (fParquetReader != NULL)
return NO_ERROR; return NO_ERROR;
@ -1323,14 +1319,13 @@ int TableInfo::openTableFileParquet(int64_t &totalRowsParquet)
return ERR_FILE_OPEN; return ERR_FILE_OPEN;
} }
// initialize fBuffers batch source // initialize fBuffers batch source
for (int i = 0; i < fReadBufCount; ++i) for (auto& buffer : fBuffers)
{ {
fBuffers[i].setParquetReader(fParquetReader); buffer.setParquetReader(fParquetReader);
} }
return NO_ERROR; return NO_ERROR;
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Open the file corresponding to fFileName so that we can import it's contents. // Open the file corresponding to fFileName so that we can import it's contents.
// A buffer is also allocated and passed to setvbuf(). // A buffer is also allocated and passed to setvbuf().
@ -1424,7 +1419,7 @@ void TableInfo::closeTableFile()
fclose(fHandle); fclose(fHandle);
delete[] fFileBuffer; delete[] fFileBuffer;
} }
fHandle = 0; fHandle = 0;
} }
else if (ms3) else if (ms3)

View File

@ -35,8 +35,6 @@
#include <iostream> #include <iostream>
using namespace std; using namespace std;
#include "bytestream.h" #include "bytestream.h"
#include "brmtypes.h" #include "brmtypes.h"
#include "extentmap.h" // for DICT_COL_WIDTH #include "extentmap.h" // for DICT_COL_WIDTH
@ -54,10 +52,13 @@ using namespace BRM;
using namespace idbdatafile; using namespace idbdatafile;
#include "checks.h" #include "checks.h"
#include "utils_utf8.h" // for utf8_truncate_point() #include "utils_utf8.h" // for utf8_truncate_point()
namespace namespace
{ {
using BinaryArraySharedPtr = std::shared_ptr<arrow::BinaryArray>;
using FixedSizeBinaryArraySharedPtr = std::shared_ptr<arrow::FixedSizeBinaryArray>;
// These used to be member variables, hence the "m_" prefix. But they are // These used to be member variables, hence the "m_" prefix. But they are
// all constants, so I removed them as member variables. May change the // all constants, so I removed them as member variables. May change the
// variable name later (to remove the m_ prefix) as time allows. // variable name later (to remove the m_ prefix) as time allows.
@ -770,7 +771,7 @@ int Dctnry::insertDctnry1(Signature& curSig, bool found, char* pOut, int& outOff
} }
else else
{ {
const char* start = (const char*) curSig.signature; const char* start = (const char*)curSig.signature;
const char* end = (const char*)(curSig.signature + curSig.size); const char* end = (const char*)(curSig.signature + curSig.size);
size_t numChars = cs->numchars(start, end); size_t numChars = cs->numchars(start, end);
size_t maxCharLength = m_colWidth / cs->mbmaxlen; size_t maxCharLength = m_colWidth / cs->mbmaxlen;
@ -784,7 +785,7 @@ int Dctnry::insertDctnry1(Signature& curSig, bool found, char* pOut, int& outOff
} }
} }
} }
else // cs->mbmaxlen == 1 else // cs->mbmaxlen == 1
{ {
if (curSig.size > m_colWidth) if (curSig.size > m_colWidth)
{ {
@ -971,9 +972,8 @@ int Dctnry::insertDctnry1(Signature& curSig, bool found, char* pOut, int& outOff
* success - successfully write the header to block * success - successfully write the header to block
* failure - it did not write the header to block * failure - it did not write the header to block
******************************************************************************/ ******************************************************************************/
int Dctnry::insertDctnryParquet(std::shared_ptr<arrow::Array> columnData, int startRowIdx, int Dctnry::insertDctnryParquet(std::shared_ptr<arrow::Array> columnData, int startRowIdx, const int totalRow,
const int totalRow, const int col, char* tokenBuf, const int col, char* tokenBuf, long long& truncCount, const CHARSET_INFO* cs,
long long& truncCount, const CHARSET_INFO* cs,
const WriteEngine::ColType& weType) const WriteEngine::ColType& weType)
{ {
#ifdef PROFILE #ifdef PROFILE
@ -993,9 +993,8 @@ int Dctnry::insertDctnryParquet(std::shared_ptr<arrow::Array> columnData, int st
cb.file.pFile = m_dFile; cb.file.pFile = m_dFile;
WriteEngine::Token nullToken; WriteEngine::Token nullToken;
bool isNonNullArray = true; BinaryArraySharedPtr binaryArray;
std::shared_ptr<arrow::BinaryArray> binaryArray; FixedSizeBinaryArraySharedPtr fixedSizeBinaryArray;
std::shared_ptr<arrow::FixedSizeBinaryArray> fixedSizeBinaryArray;
if (columnData->type_id() != arrow::Type::type::FIXED_SIZE_BINARY) if (columnData->type_id() != arrow::Type::type::FIXED_SIZE_BINARY)
binaryArray = std::static_pointer_cast<arrow::BinaryArray>(columnData); binaryArray = std::static_pointer_cast<arrow::BinaryArray>(columnData);
@ -1003,8 +1002,7 @@ int Dctnry::insertDctnryParquet(std::shared_ptr<arrow::Array> columnData, int st
fixedSizeBinaryArray = std::static_pointer_cast<arrow::FixedSizeBinaryArray>(columnData); fixedSizeBinaryArray = std::static_pointer_cast<arrow::FixedSizeBinaryArray>(columnData);
// check if this column data imported is NULL array or not // check if this column data imported is NULL array or not
if (columnData->type_id() == arrow::Type::type::NA) bool isNonNullArray = columnData->type_id() == arrow::Type::type::NA ? false : true;
isNonNullArray = false;
//...Loop through all the rows for the specified column //...Loop through all the rows for the specified column
while (startPos < totalRow) while (startPos < totalRow)
@ -1092,8 +1090,8 @@ int Dctnry::insertDctnryParquet(std::shared_ptr<arrow::Array> columnData, int st
} }
} }
RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next, truncCount, RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next,
cs, weType)); truncCount, cs, weType));
} }
#ifdef PROFILE #ifdef PROFILE
@ -1125,8 +1123,7 @@ int Dctnry::insertDctnryParquet(std::shared_ptr<arrow::Array> columnData, int st
* failure - it did not write the header to block * failure - it did not write the header to block
******************************************************************************/ ******************************************************************************/
int Dctnry::insertDctnry(const char* buf, ColPosPair** pos, const int totalRow, const int col, char* tokenBuf, int Dctnry::insertDctnry(const char* buf, ColPosPair** pos, const int totalRow, const int col, char* tokenBuf,
long long& truncCount, const CHARSET_INFO* cs, long long& truncCount, const CHARSET_INFO* cs, const WriteEngine::ColType& weType)
const WriteEngine::ColType& weType)
{ {
#ifdef PROFILE #ifdef PROFILE
Stats::startParseEvent(WE_STATS_PARSE_DCT); Stats::startParseEvent(WE_STATS_PARSE_DCT);
@ -1199,9 +1196,9 @@ int Dctnry::insertDctnry(const char* buf, ColPosPair** pos, const int totalRow,
curSig.signature = (unsigned char*)pIn; curSig.signature = (unsigned char*)pIn;
} }
RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next, truncCount, RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next,
cs, weType)); truncCount, cs, weType));
} // end while } // end while
#ifdef PROFILE #ifdef PROFILE
Stats::stopParseEvent(WE_STATS_PARSE_DCT); Stats::stopParseEvent(WE_STATS_PARSE_DCT);

View File

@ -37,11 +37,8 @@
#include "bytestream.h" #include "bytestream.h"
#include "nullstring.h" #include "nullstring.h"
#define EXPORT #define EXPORT
namespace arrow namespace arrow
{ {
class Array; class Array;