From 180e1f793f89f20622e25d9ff6f34d194491a5a3 Mon Sep 17 00:00:00 2001 From: Denis Khalikov Date: Thu, 16 May 2024 15:17:50 +0000 Subject: [PATCH] MCOL-5746 Cpimport: convert blob data from ascii hex when reads from STDIN --- writeengine/bulk/we_columninfo.cpp | 6 ++--- writeengine/bulk/we_tableinfo.cpp | 5 +++++ writeengine/bulk/we_tableinfo.h | 35 ++++++++++++++++-------------- 3 files changed, 27 insertions(+), 19 deletions(-) diff --git a/writeengine/bulk/we_columninfo.cpp b/writeengine/bulk/we_columninfo.cpp index a3c47d1cd..bb3355944 100644 --- a/writeengine/bulk/we_columninfo.cpp +++ b/writeengine/bulk/we_columninfo.cpp @@ -1661,7 +1661,8 @@ int ColumnInfo::closeDctnryStore(bool bAbort) // Update dictionary store file with string column parquet data, and return the assigned // tokens (tokenbuf) to be stored in the corresponding column token file. //-------------------------------------------------------------------------------------- -int ColumnInfo::updateDctnryStoreParquet(std::shared_ptr columnData, int tokenPos, const int totalRow, char* tokenBuf) +int ColumnInfo::updateDctnryStoreParquet(std::shared_ptr columnData, int tokenPos, + const int totalRow, char* tokenBuf) { long long truncCount = 0; @@ -1688,7 +1689,6 @@ int ColumnInfo::updateDctnryStoreParquet(std::shared_ptr columnDat } incSaturatedCnt(truncCount); - return NO_ERROR; } @@ -1707,7 +1707,7 @@ int ColumnInfo::updateDctnryStore(char* buf, ColPosPair** pos, const int totalRo // column. // This only applies to default text mode. This step is bypassed for // binary imports, because in that case, the data is already true binary. - if (((curCol.colType == WR_VARBINARY) || (curCol.colType == WR_BLOB)) && + if (((curCol.colType == WR_VARBINARY) || (curCol.colType == WR_BLOB && fpTableInfo->readFromSTDIN())) && (fpTableInfo->getImportDataMode() == IMPORT_DATA_TEXT)) { #ifdef PROFILE diff --git a/writeengine/bulk/we_tableinfo.cpp b/writeengine/bulk/we_tableinfo.cpp index 310b49a0d..7b4c2d0ee 100644 --- a/writeengine/bulk/we_tableinfo.cpp +++ b/writeengine/bulk/we_tableinfo.cpp @@ -2463,5 +2463,10 @@ int TableInfo::allocateBRMColumnExtent(OID columnOID, uint16_t dbRoot, uint32_t& return rc; } +bool TableInfo::readFromSTDIN() +{ + return fReadFromStdin; +} + } // namespace WriteEngine // end of namespace diff --git a/writeengine/bulk/we_tableinfo.h b/writeengine/bulk/we_tableinfo.h index 670461b56..98a6048ea 100644 --- a/writeengine/bulk/we_tableinfo.h +++ b/writeengine/bulk/we_tableinfo.h @@ -80,7 +80,7 @@ class TableInfo : public WeUIDGID FILE* fHandle; // Handle to the input load file int fCurrentReadBuffer; // Id of current buffer being popu- // lated by the read thread - RID fTotalReadRows; // Total number of rows read + RID fTotalReadRows; // Total number of rows read unsigned fTotalErrRows; // Total error rows among all input // for this table. Is volatile to // insure parser & reader threads @@ -173,25 +173,26 @@ class TableInfo : public WeUIDGID oam::OamCache* fOamCachePtr; // OamCache: ptr is copyable boost::uuids::uuid fJobUUID; // Job UUID std::vector fDictFlushBlks; // dict blks to be flushed from cache - - std::shared_ptr fParquetReader; // Batch reader to read batches of data - std::unique_ptr fReader; // Reader to read parquet file + + std::shared_ptr fParquetReader; // Batch reader to read batches of data + std::unique_ptr fReader; // Reader to read parquet file //-------------------------------------------------------------------------- // Private Functions //-------------------------------------------------------------------------- - int changeTableLockState(); // Change state of table lock to cleanup - void closeTableFile(); // Close current tbl file; free buffer - void closeOpenDbFiles(); // Close DB files left open at job's end - int confirmDBFileChanges(); // Confirm DB file changes (on HDFS) - void deleteTempDBFileChanges(); // Delete DB temp swap files (on HDFS) - int finishBRM(); // Finish reporting updates for BRM - void freeProcessingBuffers(); // Free up Processing Buffers - bool isBufferAvailable(bool report); // Is tbl buffer available for reading - int openTableFileParquet(int64_t &totalRowsParquet); // Open parquet data file and set batch reader for each buffer - int openTableFile(); // Open data file and set the buffer - void reportTotals(double elapsedSec); // Report summary totals - void sleepMS(long int ms); // Sleep method + int changeTableLockState(); // Change state of table lock to cleanup + void closeTableFile(); // Close current tbl file; free buffer + void closeOpenDbFiles(); // Close DB files left open at job's end + int confirmDBFileChanges(); // Confirm DB file changes (on HDFS) + void deleteTempDBFileChanges(); // Delete DB temp swap files (on HDFS) + int finishBRM(); // Finish reporting updates for BRM + void freeProcessingBuffers(); // Free up Processing Buffers + bool isBufferAvailable(bool report); // Is tbl buffer available for reading + int openTableFileParquet( + int64_t& totalRowsParquet); // Open parquet data file and set batch reader for each buffer + int openTableFile(); // Open data file and set the buffer + void reportTotals(double elapsedSec); // Report summary totals + void sleepMS(long int ms); // Sleep method // Compare column HWM with the examplar HWM. int compareHWMs(const int smallestColumnId, const int widerColumnId, const uint32_t smallerColumnWidth, const uint32_t widerColumnWidth, const std::vector& segFileInfo, @@ -465,6 +466,8 @@ class TableInfo : public WeUIDGID void setJobUUID(const boost::uuids::uuid& jobUUID); + bool readFromSTDIN(); + public: friend class BulkLoad; friend class ColumnInfo;