1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-5505 add parquet support for cpimport and add mcs_parquet_ddl and mcs_parquet_gen tools

This commit is contained in:
HanpyBin
2023-08-20 16:01:58 +08:00
committed by Leonid Fedorov
parent 94a680ea60
commit fe597ec78c
25 changed files with 4677 additions and 251 deletions

View File

@ -40,7 +40,11 @@ set(cpimport.bin_SRCS cpimport.cpp)
add_executable(cpimport.bin ${cpimport.bin_SRCS})
add_dependencies(cpimport.bin marias3)
target_link_libraries(cpimport.bin ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${ENGINE_WRITE_LIBS} ${S3API_DEPS} we_bulk we_xml)
target_link_libraries(cpimport.bin ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${ENGINE_WRITE_LIBS} ${S3API_DEPS} we_bulk we_xml)
FIND_PACKAGE(Arrow)
FIND_PACKAGE(Parquet)
target_link_libraries(cpimport.bin arrow)
target_link_libraries(cpimport.bin parquet)
install(TARGETS cpimport.bin DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine)

View File

@ -1210,6 +1210,32 @@ int BulkLoad::manageImportDataFileList(Job& job, int tableNo, TableInfo* tableIn
std::vector<std::string> loadFilesList;
bool bUseStdin = false;
// Check if all the import files are parquet file
bool isParquet = false;
for (unsigned int i = 0; i < fCmdLineImportFiles.size(); i++)
{
if (fCmdLineImportFiles[i].rfind(".parquet") != std::string::npos)
{
if (!isParquet)
isParquet = true;
}
else
{
if (isParquet)
{
ostringstream oss;
oss << "Import files exist parquet file while not all of them are parquet files.";
fLog.logMsg(oss.str(), ERR_FILE_TYPE_DIFF, MSGLVL_ERROR);
return ERR_FILE_TYPE_DIFF;
}
}
}
if (isParquet)
{
setImportDataMode(IMPORT_DATA_PARQUET);
}
// Take loadFileName from command line argument override "if" one exists,
// else we take from the Job xml file
std::string loadFileName;

File diff suppressed because it is too large Load Diff

View File

@ -30,7 +30,7 @@
#include "we_columninfo.h"
#include "calpontsystemcatalog.h"
#include "dataconvert.h"
#include <arrow/api.h>
namespace WriteEngine
{
class Log;
@ -84,6 +84,9 @@ class BulkLoadBuffer
char* fOverflowBuf; // Overflow data held for next buffer
unsigned fOverflowSize; // Current size of fOverflowBuf
std::shared_ptr<arrow::RecordBatch> fParquetBatch; // Batch of parquet file to be parsed
std::shared_ptr<arrow::RecordBatch> fParquetBatchParser; // for temporary use by parser
std::shared_ptr<::arrow::RecordBatchReader> fParquetReader; // Reader for read batches of parquet data
// Information about the locker and status for each column in this buffer.
// Note that TableInfo::fSyncUpdatesTI mutex is used to synchronize
// access to fColumnLocks and fParseComplete from both read and parse
@ -174,6 +177,19 @@ class BulkLoadBuffer
void convert(char* field, int fieldLength, bool nullFlag, unsigned char* output, const JobColumn& column,
BLBufferStats& bufStats);
/** @brief Parse a batch of parquet data in read buffer for a nonDictionary column
*/
int parseColParquet(ColumnInfo& columnInfo);
/** @brief Convert batch parquet data depending upon the data type
*/
void convertParquet(std::shared_ptr<arrow::Array> columnData, unsigned char* buf, const JobColumn& column,
BLBufferStats& bufStats, RID& lastInputRowInExtent, ColumnInfo& columnInfo,
bool& updateCPInfoPendingFlag, ColumnBufferSection* section);
inline void updateCPMinMax(ColumnInfo& columnInfo, RID& lastInputRowInExtent, BLBufferStats& bufStats,
bool& updateCPInfoPendingFlag, ColumnBufferSection* section, uint32_t curRow);
/** @brief Copy the overflow data
*/
void copyOverflow(const BulkLoadBuffer& buffer);
@ -263,6 +279,11 @@ class BulkLoadBuffer
fStatusBLB = status;
}
void setParquetReader(std::shared_ptr<::arrow::RecordBatchReader> reader)
{
fParquetReader = reader;
}
/** @brief Try to lock a column for the buffer
* TableInfo::fSyncUpdatesTI mutex should be locked when calling this
* function (see fColumnLocks discussion).
@ -273,6 +294,10 @@ class BulkLoadBuffer
size_t* parse_length, RID& totalReadRows, RID& correctTotalRows,
const boost::ptr_vector<ColumnInfo>& columnsInfo, unsigned int allowedErrCntThisCall);
/** @brief Read the batch data into the buffer
*/
int fillFromFileParquet(RID& totalReadRows, RID& correctTotalRows);
/** @brief Read the table data into the buffer
*/
int fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, RID& totalRows, RID& correctTotalRows,

View File

@ -1657,6 +1657,41 @@ int ColumnInfo::closeDctnryStore(bool bAbort)
return rc;
}
//--------------------------------------------------------------------------------------
// Update dictionary store file with string column parquet data, and return the assigned
// tokens (tokenbuf) to be stored in the corresponding column token file.
//--------------------------------------------------------------------------------------
int ColumnInfo::updateDctnryStoreParquet(std::shared_ptr<arrow::Array> columnData, int tokenPos, const int totalRow, char* tokenBuf)
{
long long truncCount = 0;
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_WAIT_TO_PARSE_DCT);
#endif
boost::mutex::scoped_lock lock(fDictionaryMutex);
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_WAIT_TO_PARSE_DCT);
#endif
int rc = fStore->insertDctnryParquet(columnData, tokenPos, totalRow, id, tokenBuf, truncCount, column.cs, column.weType);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "updateDctnryStore: error adding rows to store file for "
<< "OID-" << column.dctnry.dctnryOid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
return rc;
}
incSaturatedCnt(truncCount);
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Update dictionary store file with specified strings, and return the assigned
// tokens (tokenbuf) to be stored in the corresponding column token file.

View File

@ -200,6 +200,13 @@ class ColumnInfo : public WeUIDGID
*/
void lastInputRowInExtentInc();
/** @brief Update dictionary for arrow/parquet format
* Parse and store the parquet data into the store file, and
* returns the assigned tokens (tokenBuf) to be stored in the
* corresponding column token file.
*/
int updateDctnryStoreParquet(std::shared_ptr<arrow::Array> columnData, int tokenPos, const int totalRow, char* tokenBuf);
/** @brief Update dictionary method.
* Parses and stores specified strings into the store file, and
* returns the assigned tokens (tokenBuf) to be stored in the

View File

@ -55,6 +55,9 @@ using namespace querytele;
#include "oamcache.h"
#include "cacheutils.h"
#include <arrow/io/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/exception.h>
namespace
{
const std::string BAD_FILE_SUFFIX = ".bad"; // Reject data file suffix
@ -153,6 +156,8 @@ TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID, const string& processN
, fRejectErrCnt(0)
, fExtentStrAlloc(tableOID, logger)
, fOamCachePtr(oam::OamCache::makeOamCache())
, fParquetReader(NULL)
, fReader(nullptr)
{
fBuffers.clear();
fColumns.clear();
@ -266,24 +271,44 @@ int TableInfo::readTableData()
{
RID validTotalRows = 0;
RID totalRowsPerInputFile = 0;
int64_t totalRowsParquet = 0; // totalRowsParquet to be used in later function
// needs int64_t type
int filesTBProcessed = fLoadFileList.size();
int fileCounter = 0;
unsigned long long qtSentAt = 0;
if (fHandle == NULL)
if (fImportDataMode != IMPORT_DATA_PARQUET)
{
fFileName = fLoadFileList[fileCounter];
int rc = openTableFile();
if (rc != NO_ERROR)
if (fHandle == NULL)
{
// Mark the table status as error and exit.
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
fStatusTI = WriteEngine::ERR;
return rc;
fFileName = fLoadFileList[fileCounter];
int rc = openTableFile();
if (rc != NO_ERROR)
{
// Mark the table status as error and exit.
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
fStatusTI = WriteEngine::ERR;
return rc;
}
fileCounter++;
}
}
else
{
if (fParquetReader == NULL)
{
fFileName = fLoadFileList[fileCounter];
int rc = openTableFileParquet(totalRowsParquet);
if (rc != NO_ERROR)
{
// Mark the table status as error and exit.
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
fStatusTI = WriteEngine::ERR;
return rc;
}
fileCounter++;
}
fileCounter++;
}
timeval readStart;
@ -419,16 +444,23 @@ int TableInfo::readTableData()
// validTotalRows is ongoing total of valid rows read for all files
// pertaining to this DB table.
int readRc;
if (fReadFromS3)
if (fImportDataMode != IMPORT_DATA_PARQUET)
{
readRc = fBuffers[readBufNo].fillFromMemory(fBuffers[prevReadBuf], fFileBuffer, fS3ReadLength,
&fS3ParseLength, totalRowsPerInputFile, validTotalRows,
fColumns, allowedErrCntThisCall);
if (fReadFromS3)
{
readRc = fBuffers[readBufNo].fillFromMemory(fBuffers[prevReadBuf], fFileBuffer, fS3ReadLength,
&fS3ParseLength, totalRowsPerInputFile, validTotalRows,
fColumns, allowedErrCntThisCall);
}
else
{
readRc = fBuffers[readBufNo].fillFromFile(fBuffers[prevReadBuf], fHandle, totalRowsPerInputFile,
validTotalRows, fColumns, allowedErrCntThisCall);
}
}
else
{
readRc = fBuffers[readBufNo].fillFromFile(fBuffers[prevReadBuf], fHandle, totalRowsPerInputFile,
validTotalRows, fColumns, allowedErrCntThisCall);
readRc = fBuffers[readBufNo].fillFromFileParquet(totalRowsPerInputFile, validTotalRows);
}
if (readRc != NO_ERROR)
@ -530,7 +562,7 @@ int TableInfo::readTableData()
fCurrentReadBuffer = (fCurrentReadBuffer + 1) % fReadBufCount;
// bufferCount++;
if ((fHandle && feof(fHandle)) || (fReadFromS3 && (fS3ReadLength == fS3ParseLength)))
if ((fHandle && feof(fHandle)) || (fReadFromS3 && (fS3ReadLength == fS3ParseLength)) || (totalRowsPerInputFile == (RID)totalRowsParquet))
{
timeval readFinished;
gettimeofday(&readFinished, NULL);
@ -567,7 +599,15 @@ int TableInfo::readTableData()
if (fileCounter < filesTBProcessed)
{
fFileName = fLoadFileList[fileCounter];
int rc = openTableFile();
int rc;
if (fImportDataMode != IMPORT_DATA_PARQUET)
{
rc = openTableFile();
}
else
{
rc = openTableFileParquet(totalRowsParquet);
}
if (rc != NO_ERROR)
{
@ -1252,6 +1292,45 @@ void TableInfo::addColumn(ColumnInfo* info)
fExtentStrAlloc.addColumn(info->column.mapOid, info->column.width, info->column.dataType);
}
int TableInfo::openTableFileParquet(int64_t &totalRowsParquet)
{
if (fParquetReader != NULL)
return NO_ERROR;
std::shared_ptr<arrow::io::ReadableFile> infile;
try
{
PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(fFileName, arrow::default_memory_pool()));
PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &fReader));
fReader->set_batch_size(1000);
PARQUET_THROW_NOT_OK(fReader->ScanContents({0}, 1000, &totalRowsParquet));
PARQUET_THROW_NOT_OK(fReader->GetRecordBatchReader(&fParquetReader));
}
catch (std::exception& ex)
{
ostringstream oss;
oss << "Error opening import file " << fFileName << ".";
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
return ERR_FILE_OPEN;
}
catch (...)
{
ostringstream oss;
oss << "Error opening import file " << fFileName << ".";
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
return ERR_FILE_OPEN;
}
// initialize fBuffers batch source
for (int i = 0; i < fReadBufCount; ++i)
{
fBuffers[i].setParquetReader(fParquetReader);
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Open the file corresponding to fFileName so that we can import it's contents.
// A buffer is also allocated and passed to setvbuf().
@ -1331,24 +1410,32 @@ int TableInfo::openTableFile()
//------------------------------------------------------------------------------
void TableInfo::closeTableFile()
{
if (fHandle)
if (fImportDataMode != IMPORT_DATA_PARQUET)
{
// If reading from stdin, we don't delete the buffer out from under
// the file handle, because stdin is still open. This will cause a
// memory leak, but when using stdin, we can only read in 1 table.
// So it's not like we will be leaking multiple buffers for several
// tables over the life of the job.
if (!fReadFromStdin)
if (fHandle)
{
fclose(fHandle);
delete[] fFileBuffer;
// If reading from stdin, we don't delete the buffer out from under
// the file handle, because stdin is still open. This will cause a
// memory leak, but when using stdin, we can only read in 1 table.
// So it's not like we will be leaking multiple buffers for several
// tables over the life of the job.
if (!fReadFromStdin)
{
fclose(fHandle);
delete[] fFileBuffer;
}
fHandle = 0;
}
else if (ms3)
{
ms3_free((uint8_t*)fFileBuffer);
}
fHandle = 0;
}
else if (ms3)
else
{
ms3_free((uint8_t*)fFileBuffer);
fReader.reset();
fParquetReader.reset();
}
}

View File

@ -30,6 +30,9 @@
#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/uuid/uuid.hpp>
#include <arrow/api.h>
#include <parquet/arrow/reader.h>
#include <libmarias3/marias3.h>
#include "we_type.h"
@ -170,22 +173,25 @@ class TableInfo : public WeUIDGID
oam::OamCache* fOamCachePtr; // OamCache: ptr is copyable
boost::uuids::uuid fJobUUID; // Job UUID
std::vector<BRM::LBID_t> fDictFlushBlks; // dict blks to be flushed from cache
std::shared_ptr<arrow::RecordBatchReader> fParquetReader; // Batch reader to read batches of data
std::unique_ptr<parquet::arrow::FileReader> fReader; // Reader to read parquet file
//--------------------------------------------------------------------------
// Private Functions
//--------------------------------------------------------------------------
int changeTableLockState(); // Change state of table lock to cleanup
void closeTableFile(); // Close current tbl file; free buffer
void closeOpenDbFiles(); // Close DB files left open at job's end
int confirmDBFileChanges(); // Confirm DB file changes (on HDFS)
void deleteTempDBFileChanges(); // Delete DB temp swap files (on HDFS)
int finishBRM(); // Finish reporting updates for BRM
void freeProcessingBuffers(); // Free up Processing Buffers
bool isBufferAvailable(bool report); // Is tbl buffer available for reading
int openTableFile(); // Open data file and set the buffer
void reportTotals(double elapsedSec); // Report summary totals
void sleepMS(long int ms); // Sleep method
int changeTableLockState(); // Change state of table lock to cleanup
void closeTableFile(); // Close current tbl file; free buffer
void closeOpenDbFiles(); // Close DB files left open at job's end
int confirmDBFileChanges(); // Confirm DB file changes (on HDFS)
void deleteTempDBFileChanges(); // Delete DB temp swap files (on HDFS)
int finishBRM(); // Finish reporting updates for BRM
void freeProcessingBuffers(); // Free up Processing Buffers
bool isBufferAvailable(bool report); // Is tbl buffer available for reading
int openTableFileParquet(int64_t &totalRowsParquet); // Open parquet data file and set batch reader for each buffer
int openTableFile(); // Open data file and set the buffer
void reportTotals(double elapsedSec); // Report summary totals
void sleepMS(long int ms); // Sleep method
// Compare column HWM with the examplar HWM.
int compareHWMs(const int smallestColumnId, const int widerColumnId, const uint32_t smallerColumnWidth,
const uint32_t widerColumnWidth, const std::vector<DBRootExtentInfo>& segFileInfo,

View File

@ -35,6 +35,8 @@
#include <iostream>
using namespace std;
#include "bytestream.h"
#include "brmtypes.h"
#include "extentmap.h" // for DICT_COL_WIDTH
@ -745,6 +747,365 @@ int Dctnry::insertDctnry2(Signature& sig)
return NO_ERROR;
}
int Dctnry::insertDctnry1(Signature& curSig, bool found, char* pOut, int& outOffset, int& startPos,
int& totalUseSize, CommBlock& cb, bool& next, long long& truncCount,
const CHARSET_INFO* cs, const WriteEngine::ColType& weType)
{
if (cs->mbmaxlen > 1)
{
// For TEXT columns, we truncate based on the number of bytes,
// and not based on the number of characters, as for CHAR/VARCHAR
// columns in the else block.
if (weType == WriteEngine::WR_TEXT)
{
if (curSig.size > m_colWidth)
{
uint8_t truncate_point = utf8::utf8_truncate_point((const char*)curSig.signature, m_colWidth);
curSig.size = m_colWidth - truncate_point;
truncCount++;
}
}
else
{
const char* start = (const char*) curSig.signature;
const char* end = (const char*)(curSig.signature + curSig.size);
size_t numChars = cs->numchars(start, end);
size_t maxCharLength = m_colWidth / cs->mbmaxlen;
if (numChars > maxCharLength)
{
MY_STRCOPY_STATUS status;
cs->well_formed_char_length(start, end, maxCharLength, &status);
curSig.size = status.m_source_end_pos - start;
truncCount++;
}
}
}
else // cs->mbmaxlen == 1
{
if (curSig.size > m_colWidth)
{
curSig.size = m_colWidth;
truncCount++;
}
}
//...Search for the string in our string cache
// if it fits into one block (< 8KB)
if (curSig.size <= MAX_SIGNATURE_SIZE)
{
// Stats::startParseEvent("getTokenFromArray");
found = getTokenFromArray(curSig);
if (found)
{
memcpy(pOut + outOffset, &curSig.token, 8);
outOffset += 8;
startPos++;
// Stats::stopParseEvent("getTokenFromArray");
return NO_ERROR;
}
// Stats::stopParseEvent("getTokenFromArray");
}
totalUseSize = m_totalHdrBytes + curSig.size;
//...String not found in cache, so proceed.
// If room is available in current block then insert into block.
// @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback
if (((totalUseSize <= m_freeSpace - HDR_UNIT_SIZE) ||
((curSig.size > 8176) && (m_freeSpace > HDR_UNIT_SIZE))) &&
(m_curOp < (MAX_OP_COUNT - 1)))
{
RETURN_ON_ERROR(insertDctnry2(curSig)); // m_freeSpace updated!
m_curBlock.state = BLK_WRITE;
memcpy(pOut + outOffset, &curSig.token, 8);
outOffset += 8;
startPos++;
found = true;
//...If we have reached limit for the number of strings allowed in
// a block, then we write the current block so that we can start
// another block.
if (m_curOp >= MAX_OP_COUNT - 1)
{
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_PARSE_DCT);
#endif
RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo));
m_curBlock.state = BLK_READ;
next = true;
}
//...Add string to cache, if we have not exceeded cache limit
// Don't cache big blobs
if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (curSig.size <= MAX_SIGNATURE_SIZE))
{
addToStringCache(curSig);
}
}
else //...No room for this string in current block, so we write
// out the current block, so we can start another block
{
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_PARSE_DCT);
#endif
RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo));
m_curBlock.state = BLK_READ;
next = true;
found = false;
} // if m_freeSpace
//..."next" flag is used to indicate that we need to advance to the
// next block in the store file.
if (next)
{
memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
memcpy(m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes);
m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes;
m_curBlock.state = BLK_WRITE;
m_curOp = 0;
next = false;
m_lastFbo++;
m_curFbo = m_lastFbo;
//...Expand current extent if it is an abbreviated initial extent
if ((m_curFbo == m_numBlocks) && (m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT))
{
RETURN_ON_ERROR(expandDctnryExtent());
}
//...Allocate a new extent if we have reached the last block in the
// current extent.
if (m_curFbo == m_numBlocks)
{
// last block
LBID_t startLbid;
// Add an extent.
RETURN_ON_ERROR(
createDctnry(m_dctnryOID, m_colWidth, m_dbRoot, m_partition, m_segment, startLbid, false));
if (m_logger)
{
std::ostringstream oss;
oss << "Add dictionary extent OID-" << m_dctnryOID << "; DBRoot-" << m_dbRoot << "; part-"
<< m_partition << "; seg-" << m_segment << "; hwm-" << m_curFbo << "; LBID-" << startLbid
<< "; file-" << m_segFileName;
m_logger->logMsg(oss.str(), MSGLVL_INFO2);
}
m_curLbid = startLbid;
// now seek back to the curFbo, after adding an extent
// @bug5769 For uncompressed only;
// ChunkManager manages the file offset for the compression case
if (m_compressionType == 0)
{
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_PARSE_DCT_SEEK_EXTENT_BLK);
#endif
long long byteOffset = m_curFbo;
byteOffset *= BYTE_PER_BLOCK;
RETURN_ON_ERROR(setFileOffset(m_dFile, byteOffset));
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_PARSE_DCT_SEEK_EXTENT_BLK);
#endif
}
}
else
{
// LBIDs are numbered collectively and consecutively within an
// extent, so within an extent we can derive the LBID by simply
// incrementing it rather than having to go back to BRM to look
// up the LBID for each FBO.
m_curLbid++;
}
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_PARSE_DCT);
#endif
m_curBlock.lbid = m_curLbid;
//..."found" flag indicates whether the string was already found
// "or" added to the end of the previous block. If false, then
// we need to add the string to the new block.
if (!found)
{
RETURN_ON_ERROR(insertDctnry2(curSig)); // m_freeSpace updated!
m_curBlock.state = BLK_WRITE;
memcpy(pOut + outOffset, &curSig.token, 8);
outOffset += 8;
startPos++;
//...Add string to cache, if we have not exceeded cache limit
if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (curSig.size <= MAX_SIGNATURE_SIZE))
{
addToStringCache(curSig);
}
}
} // if next
return NO_ERROR;
}
/*******************************************************************************
* Description:
* Used by bulk import to insert batch of parquet strings into this store file.
* Function assumes that the file is already positioned to the current block.
*
* PARAMETERS:
* input
* columnData - arrow array containing input strings
* startRowIdx - start position for current batch parquet data
* totalRow - number of rows in "buf"
* col - column of strings to be parsed from "buf"
* output
* tokenBuf - tokens assigned to inserted strings
*
* RETURN:
* success - successfully write the header to block
* failure - it did not write the header to block
******************************************************************************/
int Dctnry::insertDctnryParquet(std::shared_ptr<arrow::Array> columnData, int startRowIdx,
const int totalRow, const int col, char* tokenBuf,
long long& truncCount, const CHARSET_INFO* cs,
const WriteEngine::ColType& weType)
{
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_PARSE_DCT);
#endif
int startPos = 0;
int totalUseSize = 0;
int outOffset = 0;
const char* pIn;
char* pOut = tokenBuf;
Signature curSig;
bool found = false;
bool next = false;
CommBlock cb;
cb.file.oid = m_dctnryOID;
cb.file.pFile = m_dFile;
WriteEngine::Token nullToken;
bool isNonNullArray = true;
std::shared_ptr<arrow::BinaryArray> binaryArray;
std::shared_ptr<arrow::FixedSizeBinaryArray> fixedSizeBinaryArray;
if (columnData->type_id() != arrow::Type::type::FIXED_SIZE_BINARY)
binaryArray = std::static_pointer_cast<arrow::BinaryArray>(columnData);
else
fixedSizeBinaryArray = std::static_pointer_cast<arrow::FixedSizeBinaryArray>(columnData);
// check if this column data imported is NULL array or not
if (columnData->type_id() == arrow::Type::type::NA)
isNonNullArray = false;
//...Loop through all the rows for the specified column
while (startPos < totalRow)
{
found = false;
void* curSigPtr = static_cast<void*>(&curSig);
memset(curSigPtr, 0, sizeof(curSig));
// if this column is not null data
if (isNonNullArray)
{
const uint8_t* data;
// if (binaryArray != nullptr)
// {
// data = binaryArray->GetValue(startPos + startRowIdx, &curSig.size);
// }
// else
// {
// data = fixedSizeBinaryArray->GetValue(startPos + startRowIdx);
// std::shared_ptr<arrow::DataType> tType = fixedSizeBinaryArray->type();
// curSig.size = tType->byte_width();
// }
// comment this line and uncomment the above will reproduce the error
data = binaryArray->GetValue(startPos + startRowIdx, &curSig.size);
const char* dataPtr = reinterpret_cast<const char*>(data);
// Strip trailing null bytes '\0' (by adjusting curSig.size) if import-
// ing in binary mode. If entire string is binary zeros, then we treat
// as a NULL value.
if (curSig.size > 0)
{
const char* fld = dataPtr;
int kk = curSig.size - 1;
for (; kk >= 0; kk--)
{
if (fld[kk] != '\0')
break;
}
curSig.size = kk + 1;
}
// Read thread should validate against max size so that the entire row
// can be rejected up front. Once we get here in the parsing thread,
// it is too late to reject the row. However, as a precaution, we
// still check against max size & set to null token if needed.
if ((curSig.size == 0) || (curSig.size > MAX_BLOB_SIZE))
{
if (m_defVal.length() > 0) // use default string if available
{
pIn = m_defVal.str();
curSig.signature = (unsigned char*)pIn;
curSig.size = m_defVal.length();
}
else
{
memcpy(pOut + outOffset, &nullToken, 8);
outOffset += 8;
startPos++;
continue;
}
}
else
{
pIn = dataPtr;
curSig.signature = (unsigned char*)pIn;
}
}
else
{
curSig.size = 0;
if (m_defVal.length() > 0) // use default string if available
{
pIn = m_defVal.str();
curSig.signature = (unsigned char*)pIn;
curSig.size = m_defVal.length();
}
else
{
memcpy(pOut + outOffset, &nullToken, 8);
outOffset += 8;
startPos++;
continue;
}
}
RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next, truncCount,
cs, weType));
}
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_PARSE_DCT);
#endif
// Done
// If any data leftover and not written by subsequent call to
// insertDctnry(), then it will be written by closeDctnry().
return NO_ERROR;
}
/*******************************************************************************
* Description:
* Used by bulk import to insert collection of strings into this store file.
@ -838,201 +1199,8 @@ int Dctnry::insertDctnry(const char* buf, ColPosPair** pos, const int totalRow,
curSig.signature = (unsigned char*)pIn;
}
if (cs->mbmaxlen > 1)
{
// For TEXT columns, we truncate based on the number of bytes,
// and not based on the number of characters, as for CHAR/VARCHAR
// columns in the else block.
if (weType == WriteEngine::WR_TEXT)
{
if (curSig.size > m_colWidth)
{
uint8_t truncate_point = utf8::utf8_truncate_point((const char*)curSig.signature, m_colWidth);
curSig.size = m_colWidth - truncate_point;
truncCount++;
}
}
else
{
const char* start = (const char*) curSig.signature;
const char* end = (const char*)(curSig.signature + curSig.size);
size_t numChars = cs->numchars(start, end);
size_t maxCharLength = m_colWidth / cs->mbmaxlen;
if (numChars > maxCharLength)
{
MY_STRCOPY_STATUS status;
cs->well_formed_char_length(start, end, maxCharLength, &status);
curSig.size = status.m_source_end_pos - start;
truncCount++;
}
}
}
else // cs->mbmaxlen == 1
{
if (curSig.size > m_colWidth)
{
curSig.size = m_colWidth;
truncCount++;
}
}
//...Search for the string in our string cache
// if it fits into one block (< 8KB)
if (curSig.size <= MAX_SIGNATURE_SIZE)
{
// Stats::startParseEvent("getTokenFromArray");
found = getTokenFromArray(curSig);
if (found)
{
memcpy(pOut + outOffset, &curSig.token, 8);
outOffset += 8;
startPos++;
// Stats::stopParseEvent("getTokenFromArray");
continue;
}
// Stats::stopParseEvent("getTokenFromArray");
}
totalUseSize = m_totalHdrBytes + curSig.size;
//...String not found in cache, so proceed.
// If room is available in current block then insert into block.
// @bug 3960: Add MAX_OP_COUNT check to handle case after bulk rollback
if (((totalUseSize <= m_freeSpace - HDR_UNIT_SIZE) ||
((curSig.size > 8176) && (m_freeSpace > HDR_UNIT_SIZE))) &&
(m_curOp < (MAX_OP_COUNT - 1)))
{
RETURN_ON_ERROR(insertDctnry2(curSig)); // m_freeSpace updated!
m_curBlock.state = BLK_WRITE;
memcpy(pOut + outOffset, &curSig.token, 8);
outOffset += 8;
startPos++;
found = true;
//...If we have reached limit for the number of strings allowed in
// a block, then we write the current block so that we can start
// another block.
if (m_curOp >= MAX_OP_COUNT - 1)
{
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_PARSE_DCT);
#endif
RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo));
m_curBlock.state = BLK_READ;
next = true;
}
//...Add string to cache, if we have not exceeded cache limit
// Don't cache big blobs
if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (curSig.size <= MAX_SIGNATURE_SIZE))
{
addToStringCache(curSig);
}
}
else //...No room for this string in current block, so we write
// out the current block, so we can start another block
{
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_PARSE_DCT);
#endif
RETURN_ON_ERROR(writeDBFileNoVBCache(cb, &m_curBlock, m_curFbo));
m_curBlock.state = BLK_READ;
next = true;
found = false;
} // if m_freeSpace
//..."next" flag is used to indicate that we need to advance to the
// next block in the store file.
if (next)
{
memset(m_curBlock.data, 0, sizeof(m_curBlock.data));
memcpy(m_curBlock.data, &m_dctnryHeader2, m_totalHdrBytes);
m_freeSpace = BYTE_PER_BLOCK - m_totalHdrBytes;
m_curBlock.state = BLK_WRITE;
m_curOp = 0;
next = false;
m_lastFbo++;
m_curFbo = m_lastFbo;
//...Expand current extent if it is an abbreviated initial extent
if ((m_curFbo == m_numBlocks) && (m_numBlocks == NUM_BLOCKS_PER_INITIAL_EXTENT))
{
RETURN_ON_ERROR(expandDctnryExtent());
}
//...Allocate a new extent if we have reached the last block in the
// current extent.
if (m_curFbo == m_numBlocks)
{
// last block
LBID_t startLbid;
// Add an extent.
RETURN_ON_ERROR(
createDctnry(m_dctnryOID, m_colWidth, m_dbRoot, m_partition, m_segment, startLbid, false));
if (m_logger)
{
std::ostringstream oss;
oss << "Add dictionary extent OID-" << m_dctnryOID << "; DBRoot-" << m_dbRoot << "; part-"
<< m_partition << "; seg-" << m_segment << "; hwm-" << m_curFbo << "; LBID-" << startLbid
<< "; file-" << m_segFileName;
m_logger->logMsg(oss.str(), MSGLVL_INFO2);
}
m_curLbid = startLbid;
// now seek back to the curFbo, after adding an extent
// @bug5769 For uncompressed only;
// ChunkManager manages the file offset for the compression case
if (m_compressionType == 0)
{
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_PARSE_DCT_SEEK_EXTENT_BLK);
#endif
long long byteOffset = m_curFbo;
byteOffset *= BYTE_PER_BLOCK;
RETURN_ON_ERROR(setFileOffset(m_dFile, byteOffset));
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_PARSE_DCT_SEEK_EXTENT_BLK);
#endif
}
}
else
{
// LBIDs are numbered collectively and consecutively within an
// extent, so within an extent we can derive the LBID by simply
// incrementing it rather than having to go back to BRM to look
// up the LBID for each FBO.
m_curLbid++;
}
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_PARSE_DCT);
#endif
m_curBlock.lbid = m_curLbid;
//..."found" flag indicates whether the string was already found
// "or" added to the end of the previous block. If false, then
// we need to add the string to the new block.
if (!found)
{
RETURN_ON_ERROR(insertDctnry2(curSig)); // m_freeSpace updated!
m_curBlock.state = BLK_WRITE;
memcpy(pOut + outOffset, &curSig.token, 8);
outOffset += 8;
startPos++;
//...Add string to cache, if we have not exceeded cache limit
if ((m_arraySize < MAX_STRING_CACHE_SIZE) && (curSig.size <= MAX_SIGNATURE_SIZE))
{
addToStringCache(curSig);
}
}
} // if next
RETURN_ON_ERROR(insertDctnry1(curSig, found, pOut, outOffset, startPos, totalUseSize, cb, next, truncCount,
cs, weType));
} // end while
#ifdef PROFILE

View File

@ -37,6 +37,8 @@
#include "bytestream.h"
#include "nullstring.h"
#include <arrow/api.h>
#define EXPORT
/** Namespace WriteEngine */
@ -157,6 +159,20 @@ class Dctnry : public DbFileOp
*/
EXPORT int insertDctnry(const int& sgnature_size, const unsigned char* sgnature_value, Token& token);
/**
* @brief Insert signature value to a file block and return token/pointer
* (for Bulk use)
*
* @param columnData - arrow array containing strings to be parsed
* @param startRowIdx - start position for current batch parquet data
* @param totalRow - total number of rows in buf
* @param col - the column to be parsed from buf
* @param tokenBuf - (output) list of tokens for the parsed strings
*/
EXPORT int insertDctnryParquet(std::shared_ptr<arrow::Array> columnData, int startRowIdx, const int totalRow,
const int col, char* tokenBuf, long long& truncCount,
const CHARSET_INFO* cs, const WriteEngine::ColType& weType);
/**
* @brief Insert a signature value to a file block and return token/pointer
* (for Bulk use)
@ -280,6 +296,9 @@ class Dctnry : public DbFileOp
// insertDctnryHdr inserts the new value info into the header.
// insertSgnture inserts the new value into the block.
//
int insertDctnry1(Signature& curSig, bool found, char* pOut, int& outOffset, int& startPos,
int& totalUseSize, CommBlock& cb, bool& next, long long& truncCount,
const CHARSET_INFO* cs, const WriteEngine::ColType& weType);
int insertDctnry2(Signature& sig);
void insertDctnryHdr(unsigned char* blockBuf, const int& size);
void insertSgnture(unsigned char* blockBuf, const int& size, unsigned char* value);

View File

@ -113,7 +113,7 @@ const int ERR_COMPBASE = 1650; // Compression errors
const int ERR_AUTOINCBASE = 1700; // Auto-increment errors
const int ERR_BLKCACHEBASE = 1750; // Block cache flush errors
const int ERR_METABKUPBASE = 1800; // Backup bulk meta file errors
const int ERR_PARQUETBASE = 1850; // Parquet importing errors
//--------------------------------------------------------------------------
// Generic error
//--------------------------------------------------------------------------
@ -152,6 +152,7 @@ const int ERR_FILE_GLOBBING = ERR_FILEBASE + 19; // Error globbing a file
const int ERR_FILE_EOF = ERR_FILEBASE + 20; // EOF
const int ERR_FILE_CHOWN = ERR_FILEBASE + 21; // EOF
const int ERR_INTERNAL = ERR_FILEBASE + 22; // EOF
const int ERR_FILE_TYPE_DIFF = ERR_FILEBASE + 23; // Files import type are different
//--------------------------------------------------------------------------
// XML level error
@ -389,6 +390,11 @@ const int ERR_METADATABKUP_COMP_READ_BULK_BKUP =
ERR_METABKUPBASE + 7; // Error reading from backup chunk file */
const int ERR_METADATABKUP_COMP_RENAME = ERR_METABKUPBASE + 8; // Error renaming chunk file */
//--------------------------------------------------------------------------
// Parquet errors when importing
//--------------------------------------------------------------------------
const int ERR_PARQUET_AUX = ERR_PARQUETBASE + 1; // Error when creating aux column for parquet file
//------------------------------------------------------------------------------
// Class used to convert an error code to a corresponding error message string
//------------------------------------------------------------------------------

View File

@ -137,11 +137,13 @@ enum BulkModeType
// Import Mode 0-text Import (default)
// 1-Binary Import with NULL values
// 2-Binary Import with saturated NULL values
// 3-Binary Import with parquet file
enum ImportDataMode
{
IMPORT_DATA_TEXT = 0,
IMPORT_DATA_BIN_ACCEPT_NULL = 1,
IMPORT_DATA_BIN_SAT_NULL = 2
IMPORT_DATA_BIN_SAT_NULL = 2,
IMPORT_DATA_PARQUET = 3
};
/**