diff --git a/tools/parquetDDL/main.cpp b/tools/parquetDDL/main.cpp index fcc275f7d..f2fdc6dd4 100644 --- a/tools/parquetDDL/main.cpp +++ b/tools/parquetDDL/main.cpp @@ -8,18 +8,21 @@ #include #include +using ReadableFileSharedPtr = std::shared_ptr; + enum STATUS_CODE { NO_ERROR, EMPTY_FIELD, UNSUPPORTED_DATA_TYPE, UNSUPPORTED_FILE_TYPE, - FILE_NUM_ERROR + FILE_NUM_ERROR, + IO_ERROR }; /** * print the usage information -*/ + */ static void usage() { std::cout << "usage: " << std::endl; @@ -29,20 +32,29 @@ static void usage() /** * get the schema of the parquet file -*/ -void getSchema(std::string filePath, std::shared_ptr* parquetSchema) + */ +int getSchema(std::string filePath, std::shared_ptr* parquetSchema) { - std::shared_ptr infile; - PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(filePath, arrow::default_memory_pool())); - std::unique_ptr reader; - PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); - PARQUET_THROW_NOT_OK(reader->GetSchema(parquetSchema)); - PARQUET_THROW_NOT_OK(infile->Close()); + try + { + ReadableFileSharedPtr infile; + PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(filePath, arrow::default_memory_pool())); + std::unique_ptr reader; + PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); + PARQUET_THROW_NOT_OK(reader->GetSchema(parquetSchema)); + PARQUET_THROW_NOT_OK(infile->Close()); + } + catch (...) + { + std::cerr << "Error while calling `getSchema` for the filepath " << filePath << std::endl; + return IO_ERROR; + } + return NO_ERROR; } /** * convert arrow data type id to corresponding columnstore type string -*/ + */ int convert2mcs(std::shared_ptr dataType, arrow::Type::type typeId, std::string& colType) { switch (typeId) @@ -116,7 +128,8 @@ int convert2mcs(std::shared_ptr dataType, arrow::Type::type typ } case arrow::Type::type::FIXED_SIZE_BINARY: { - std::shared_ptr fType = std::static_pointer_cast(dataType); + std::shared_ptr fType = + std::static_pointer_cast(dataType); int byteWidth = fType->byte_width(); colType = "CHAR(" + std::to_string(byteWidth) + ")"; break; @@ -157,7 +170,7 @@ int convert2mcs(std::shared_ptr dataType, arrow::Type::type typ colType = "TIME(6)"; else return UNSUPPORTED_DATA_TYPE; - + break; } case arrow::Type::type::DECIMAL128: @@ -179,14 +192,16 @@ int convert2mcs(std::shared_ptr dataType, arrow::Type::type typ /** * main function to generate DDL file -*/ + */ int generateDDL(std::string filePath, std::string targetPath, std::string tableName) { std::shared_ptr parquetSchema; - getSchema(filePath, &parquetSchema); + int rc = getSchema(filePath, &parquetSchema); + if (rc != NO_ERROR) + return rc; + std::vector parquetCols; std::vector parquetTypes; - int rc = NO_ERROR; int fieldsNum = parquetSchema->num_fields(); if (fieldsNum == 0) @@ -217,7 +232,7 @@ int generateDDL(std::string filePath, std::string targetPath, std::string tableN for (int i = 0; i < fieldsNum; i++) { - str1 += parquetCols[i] + " " + parquetTypes[i] + (i == fieldsNum-1 ? "\n" : ",\n"); + str1 += parquetCols[i] + " " + parquetTypes[i] + (i == fieldsNum - 1 ? "\n" : ",\n"); } str1 += str2; @@ -259,9 +274,8 @@ int main(int argc, char** argv) // check file extension std::string::size_type endBase = ddlFile.rfind('.'); std::string::size_type endBase1 = parquetFile.rfind('.'); - if (endBase == std::string::npos || endBase1 == std::string::npos || - parquetFile.substr(endBase1 + 1) != "parquet" || - ddlFile.substr(endBase + 1) != "ddl") + if (endBase == std::string::npos || endBase1 == std::string::npos || + parquetFile.substr(endBase1 + 1) != "parquet" || ddlFile.substr(endBase + 1) != "ddl") { std::cout << "File type not supported" << std::endl; usage(); diff --git a/writeengine/bulk/we_tableinfo.cpp b/writeengine/bulk/we_tableinfo.cpp index f062d2a26..310b49a0d 100644 --- a/writeengine/bulk/we_tableinfo.cpp +++ b/writeengine/bulk/we_tableinfo.cpp @@ -107,7 +107,6 @@ void TableInfo::sleepMS(long ms) abs_ts.tv_sec = rm_ts.tv_sec; abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); - } //------------------------------------------------------------------------------ @@ -156,7 +155,7 @@ TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID, const string& processN , fRejectErrCnt(0) , fExtentStrAlloc(tableOID, logger) , fOamCachePtr(oam::OamCache::makeOamCache()) - , fParquetReader(NULL) + , fParquetReader(nullptr) , fReader(nullptr) { fBuffers.clear(); @@ -271,8 +270,8 @@ int TableInfo::readTableData() { RID validTotalRows = 0; RID totalRowsPerInputFile = 0; - int64_t totalRowsParquet = 0; // totalRowsParquet to be used in later function - // needs int64_t type + int64_t totalRowsParquet = 0; // totalRowsParquet to be used in later function + // needs int64_t type int filesTBProcessed = fLoadFileList.size(); int fileCounter = 0; unsigned long long qtSentAt = 0; @@ -308,7 +307,6 @@ int TableInfo::readTableData() } fileCounter++; } - } timeval readStart; @@ -562,7 +560,8 @@ int TableInfo::readTableData() fCurrentReadBuffer = (fCurrentReadBuffer + 1) % fReadBufCount; // bufferCount++; - if ((fHandle && feof(fHandle)) || (fReadFromS3 && (fS3ReadLength == fS3ParseLength)) || (totalRowsPerInputFile == (RID)totalRowsParquet)) + if ((fHandle && feof(fHandle)) || (fReadFromS3 && (fS3ReadLength == fS3ParseLength)) || + (totalRowsPerInputFile == (RID)totalRowsParquet)) { timeval readFinished; gettimeofday(&readFinished, NULL); @@ -1147,8 +1146,7 @@ int TableInfo::getColumnForParse(const int& id, const int& bufferId, bool report if (report) { - oss << " ----- " << pthread_self() << ":fBuffers[" << bufferId << - "]: (colLocker,status,lasttime)- "; + oss << " ----- " << pthread_self() << ":fBuffers[" << bufferId << "]: (colLocker,status,lasttime)- "; } // @bug2099- @@ -1232,8 +1230,8 @@ bool TableInfo::bufferReadyForParse(const int& bufferId, bool report) const ostringstream oss; string bufStatusStr; ColumnInfo::convertStatusToString(stat, bufStatusStr); - oss << " --- " << pthread_self() << - ":fBuffers[" << bufferId << "]=" << bufStatusStr << " (" << stat << ")" << std::endl; + oss << " --- " << pthread_self() << ":fBuffers[" << bufferId << "]=" << bufStatusStr << " (" << stat + << ")" << std::endl; cout << oss.str(); } @@ -1249,7 +1247,6 @@ bool TableInfo::bufferReadyForParse(const int& bufferId, bool report) const int TableInfo::initializeBuffers(int noOfBuffers, const JobFieldRefList& jobFieldRefList, unsigned int fixedBinaryRecLen) { - fReadBufCount = noOfBuffers; // initialize and populate the buffer vector. @@ -1292,8 +1289,7 @@ void TableInfo::addColumn(ColumnInfo* info) fExtentStrAlloc.addColumn(info->column.mapOid, info->column.width, info->column.dataType); } - -int TableInfo::openTableFileParquet(int64_t &totalRowsParquet) +int TableInfo::openTableFileParquet(int64_t& totalRowsParquet) { if (fParquetReader != NULL) return NO_ERROR; @@ -1323,14 +1319,13 @@ int TableInfo::openTableFileParquet(int64_t &totalRowsParquet) return ERR_FILE_OPEN; } // initialize fBuffers batch source - for (int i = 0; i < fReadBufCount; ++i) + for (auto& buffer : fBuffers) { - fBuffers[i].setParquetReader(fParquetReader); + buffer.setParquetReader(fParquetReader); } return NO_ERROR; - } - + //------------------------------------------------------------------------------ // Open the file corresponding to fFileName so that we can import it's contents. // A buffer is also allocated and passed to setvbuf(). @@ -1424,7 +1419,7 @@ void TableInfo::closeTableFile() fclose(fHandle); delete[] fFileBuffer; } - + fHandle = 0; } else if (ms3) diff --git a/writeengine/dictionary/we_dctnry.cpp b/writeengine/dictionary/we_dctnry.cpp index b2503a9fe..105614f5f 100644 --- a/writeengine/dictionary/we_dctnry.cpp +++ b/writeengine/dictionary/we_dctnry.cpp @@ -35,8 +35,6 @@ #include using namespace std; - - #include "bytestream.h" #include "brmtypes.h" #include "extentmap.h" // for DICT_COL_WIDTH @@ -54,10 +52,13 @@ using namespace BRM; using namespace idbdatafile; #include "checks.h" -#include "utils_utf8.h" // for utf8_truncate_point() +#include "utils_utf8.h" // for utf8_truncate_point() namespace { +using BinaryArraySharedPtr = std::shared_ptr; +using FixedSizeBinaryArraySharedPtr = std::shared_ptr; + // These used to be member variables, hence the "m_" prefix. But they are // all constants, so I removed them as member variables. May change the // variable name later (to remove the m_ prefix) as time allows. @@ -770,7 +771,7 @@ int Dctnry::insertDctnry1(Signature& curSig, bool found, char* pOut, int& outOff } else { - const char* start = (const char*) curSig.signature; + const char* start = (const char*)curSig.signature; const char* end = (const char*)(curSig.signature + curSig.size); size_t numChars = cs->numchars(start, end); size_t maxCharLength = m_colWidth / cs->mbmaxlen; @@ -784,7 +785,7 @@ int Dctnry::insertDctnry1(Signature& curSig, bool found, char* pOut, int& outOff } } } - else // cs->mbmaxlen == 1 + else // cs->mbmaxlen == 1 { if (curSig.size > m_colWidth) { @@ -971,9 +972,8 @@ int Dctnry::insertDctnry1(Signature& curSig, bool found, char* pOut, int& outOff * success - successfully write the header to block * failure - it did not write the header to block ******************************************************************************/ -int Dctnry::insertDctnryParquet(std::shared_ptr columnData, int startRowIdx, - const int totalRow, const int col, char* tokenBuf, - long long& truncCount, const CHARSET_INFO* cs, +int Dctnry::insertDctnryParquet(std::shared_ptr columnData, int startRowIdx, const int totalRow, + const int col, char* tokenBuf, long long& truncCount, const CHARSET_INFO* cs, const WriteEngine::ColType& weType) { #ifdef PROFILE @@ -993,9 +993,8 @@ int Dctnry::insertDctnryParquet(std::shared_ptr columnData, int st cb.file.pFile = m_dFile; WriteEngine::Token nullToken; - bool isNonNullArray = true; - std::shared_ptr binaryArray; - std::shared_ptr fixedSizeBinaryArray; + BinaryArraySharedPtr binaryArray; + FixedSizeBinaryArraySharedPtr fixedSizeBinaryArray; if (columnData->type_id() != arrow::Type::type::FIXED_SIZE_BINARY) binaryArray = std::static_pointer_cast(columnData); @@ -1003,8 +1002,7 @@ int Dctnry::insertDctnryParquet(std::shared_ptr columnData, int st fixedSizeBinaryArray = std::static_pointer_cast(columnData); // check if this column data imported is NULL array or not - if (columnData->type_id() == arrow::Type::type::NA) - isNonNullArray = false; + bool isNonNullArray = columnData->type_id() == arrow::Type::type::NA ? false : true; //...Loop through all the rows for the specified column while (startPos < totalRow) @@ -1092,8 +1090,8 @@ int Dctnry::insertDctnryParquet(std::shared_ptr columnData, int st } } - RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next, truncCount, - cs, weType)); + RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next, + truncCount, cs, weType)); } #ifdef PROFILE @@ -1125,8 +1123,7 @@ int Dctnry::insertDctnryParquet(std::shared_ptr columnData, int st * failure - it did not write the header to block ******************************************************************************/ int Dctnry::insertDctnry(const char* buf, ColPosPair** pos, const int totalRow, const int col, char* tokenBuf, - long long& truncCount, const CHARSET_INFO* cs, - const WriteEngine::ColType& weType) + long long& truncCount, const CHARSET_INFO* cs, const WriteEngine::ColType& weType) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_PARSE_DCT); @@ -1199,9 +1196,9 @@ int Dctnry::insertDctnry(const char* buf, ColPosPair** pos, const int totalRow, curSig.signature = (unsigned char*)pIn; } - RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next, truncCount, - cs, weType)); - } // end while + RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next, + truncCount, cs, weType)); + } // end while #ifdef PROFILE Stats::stopParseEvent(WE_STATS_PARSE_DCT); diff --git a/writeengine/dictionary/we_dctnry.h b/writeengine/dictionary/we_dctnry.h index 5898748ac..143e6dbfa 100644 --- a/writeengine/dictionary/we_dctnry.h +++ b/writeengine/dictionary/we_dctnry.h @@ -37,11 +37,8 @@ #include "bytestream.h" #include "nullstring.h" - - #define EXPORT - namespace arrow { class Array;