1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-5505 Add TypeHandler functions.

This commit is contained in:
Denis Khalikov
2023-10-30 18:35:40 +03:00
committed by Leonid Fedorov
parent 491ba6e0aa
commit 865cca11c9
5 changed files with 360 additions and 289 deletions

View File

@ -1609,6 +1609,11 @@ boost::any TypeHandlerDate::convertFromString(const TypeAttributesStd& colType,
return dataconvert::DataConvert::StringToDate(data, pushWarning); return dataconvert::DataConvert::StringToDate(data, pushWarning);
} }
int32_t TypeHandlerDate::convertArrowColumnDate(int32_t dayVal, int& status) const
{
return dataconvert::DataConvert::convertArrowColumnDate(dayVal, status);
}
boost::any TypeHandlerDatetime::convertFromString(const TypeAttributesStd& colType, boost::any TypeHandlerDatetime::convertFromString(const TypeAttributesStd& colType,
const ConvertFromStringParam& prm, const std::string& data, const ConvertFromStringParam& prm, const std::string& data,
bool& pushWarning) const bool& pushWarning) const
@ -1616,6 +1621,16 @@ boost::any TypeHandlerDatetime::convertFromString(const TypeAttributesStd& colTy
return dataconvert::DataConvert::StringToDatetime(data, pushWarning); return dataconvert::DataConvert::StringToDatetime(data, pushWarning);
} }
int64_t TypeHandlerDatetime::convertArrowColumnDatetime(int64_t timeVal, int& status) const
{
return dataconvert::DataConvert::convertArrowColumnDatetime(timeVal, status);
}
int64_t TypeHandlerDatetime::convertArrowColumnDatetimeUs(int64_t timeVal, int& status) const
{
return dataconvert::DataConvert::convertArrowColumnDatetimeUs(timeVal, status);
}
boost::any TypeHandlerTime::convertFromString(const TypeAttributesStd& colType, boost::any TypeHandlerTime::convertFromString(const TypeAttributesStd& colType,
const ConvertFromStringParam& prm, const std::string& data, const ConvertFromStringParam& prm, const std::string& data,
bool& pushWarning) const bool& pushWarning) const
@ -1623,6 +1638,16 @@ boost::any TypeHandlerTime::convertFromString(const TypeAttributesStd& colType,
return dataconvert::DataConvert::StringToTime(colType, data, pushWarning); return dataconvert::DataConvert::StringToTime(colType, data, pushWarning);
} }
int64_t TypeHandlerTime::convertArrowColumnTime64(int64_t timeVal, int& status) const
{
return dataconvert::DataConvert::convertArrowColumnTime64(timeVal, status);
}
int64_t TypeHandlerTime::convertArrowColumnTime32(int32_t timeVal, int& status) const
{
return dataconvert::DataConvert::convertArrowColumnTime32(timeVal, status);
}
boost::any TypeHandlerTimestamp::convertFromString(const TypeAttributesStd& colType, boost::any TypeHandlerTimestamp::convertFromString(const TypeAttributesStd& colType,
const ConvertFromStringParam& prm, const std::string& data, const ConvertFromStringParam& prm, const std::string& data,
bool& pushWarning) const bool& pushWarning) const
@ -1630,6 +1655,16 @@ boost::any TypeHandlerTimestamp::convertFromString(const TypeAttributesStd& colT
return dataconvert::DataConvert::StringToTimestamp(prm, data, pushWarning); return dataconvert::DataConvert::StringToTimestamp(prm, data, pushWarning);
} }
int64_t TypeHandlerTimestamp::convertArrowColumnTimestamp(int64_t timeVal, int& status) const
{
return dataconvert::DataConvert::convertArrowColumnTimestamp(timeVal, status);
}
int64_t TypeHandlerTimestamp::convertArrowColumnTimestampUs(int64_t timeVal, int& status) const
{
return dataconvert::DataConvert::convertArrowColumnTimestampUs(timeVal, status);
}
boost::any TypeHandlerChar::convertFromString(const TypeAttributesStd& colType, boost::any TypeHandlerChar::convertFromString(const TypeAttributesStd& colType,
const ConvertFromStringParam& prm, const std::string& data, const ConvertFromStringParam& prm, const std::string& data,
bool& pushWarning) const bool& pushWarning) const

View File

@ -2276,6 +2276,10 @@ class TypeHandlerTemporal : public TypeHandler
class TypeHandlerDate : public TypeHandlerTemporal class TypeHandlerDate : public TypeHandlerTemporal
{ {
public:
int32_t convertArrowColumnDate(int32_t dayVal, int& status) const;
private:
const string& name() const override; const string& name() const override;
code_t code() const override code_t code() const override
{ {
@ -2301,6 +2305,11 @@ class TypeHandlerDate : public TypeHandlerTemporal
class TypeHandlerDatetime : public TypeHandlerTemporal class TypeHandlerDatetime : public TypeHandlerTemporal
{ {
public:
int64_t convertArrowColumnDatetime(int64_t timeVal, int& status) const;
int64_t convertArrowColumnDatetimeUs(int64_t timeVal, int& status) const;
private:
const string& name() const override; const string& name() const override;
code_t code() const override code_t code() const override
{ {
@ -2326,6 +2335,11 @@ class TypeHandlerDatetime : public TypeHandlerTemporal
class TypeHandlerTime : public TypeHandlerTemporal class TypeHandlerTime : public TypeHandlerTemporal
{ {
public:
int64_t convertArrowColumnTime64(int64_t timeVal, int& status) const;
int64_t convertArrowColumnTime32(int32_t timeVal, int& status) const;
private:
const string& name() const override; const string& name() const override;
code_t code() const override code_t code() const override
{ {
@ -2351,6 +2365,11 @@ class TypeHandlerTime : public TypeHandlerTemporal
class TypeHandlerTimestamp : public TypeHandlerTemporal class TypeHandlerTimestamp : public TypeHandlerTemporal
{ {
public:
int64_t convertArrowColumnTimestamp(int64_t timeVal, int& status) const;
int64_t convertArrowColumnTimestampUs(int64_t timeVal, int& status) const;
private:
const string& name() const override; const string& name() const override;
code_t code() const override code_t code() const override
{ {

View File

@ -1576,7 +1576,7 @@ boost::any DataConvert::StringToTimestamp(const datatypes::ConvertFromStringPara
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Convert date32 parquet data to binary date. Used by BulkLoad. // Convert date32 parquet data to binary date. Used by BulkLoad.
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
int32_t DataConvert::ConvertArrowColumnDate(int32_t dayVal, int& status) int32_t DataConvert::convertArrowColumnDate(int32_t dayVal, int& status)
{ {
int inYear; int inYear;
int inMonth; int inMonth;

View File

@ -1176,7 +1176,7 @@ class DataConvert
* @param dayVal the input data representing days * @param dayVal the input data representing days
* @param status 0 - success, -1 - fail * @param status 0 - success, -1 - fail
*/ */
EXPORT static int32_t ConvertArrowColumnDate(int32_t dayVal, int& status); EXPORT static int32_t convertArrowColumnDate(int32_t dayVal, int& status);
/** /**
* @brief convert a date column data, represnted as a string, to it's native * @brief convert a date column data, represnted as a string, to it's native

View File

@ -40,6 +40,7 @@
#include "dataconvert.h" #include "dataconvert.h"
#include "exceptclasses.h" #include "exceptclasses.h"
#include "mcs_decimal.h" #include "mcs_decimal.h"
#include "mcs_datatype.h"
#include "joblisttypes.h" #include "joblisttypes.h"
@ -1562,7 +1563,7 @@ int BulkLoadBuffer::parse(ColumnInfo& columnInfo)
boost::mutex::scoped_lock lock(fSyncUpdatesBLB); boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
fTotalReadRowsParser = fTotalReadRows; fTotalReadRowsParser = fTotalReadRows;
fStartRowParser = fStartRow; fStartRowParser = fStartRow;
if (fImportDataMode != IMPORT_DATA_PARQUET) if (fImportDataMode != IMPORT_DATA_PARQUET)
{ {
fDataParser = fData; fDataParser = fData;
@ -1603,7 +1604,7 @@ int BulkLoadBuffer::parse(ColumnInfo& columnInfo)
int BulkLoadBuffer::parseColParquet(ColumnInfo& columnInfo) int BulkLoadBuffer::parseColParquet(ColumnInfo& columnInfo)
{ {
int rc = NO_ERROR; int rc = NO_ERROR;
// Parse the data and fill up a buffer; which is written to output file // Parse the data and fill up a buffer; which is written to output file
uint32_t nRowsParsed; uint32_t nRowsParsed;
@ -1689,8 +1690,9 @@ int BulkLoadBuffer::parseColParquet(ColumnInfo& columnInfo)
return ERR_PARQUET_AUX; return ERR_PARQUET_AUX;
} }
} }
convertParquet(columnData, buf, columnInfo.column, bufStats, lastInputRowInExtent, columnInfo, updateCPInfoPendingFlag, section); convertParquet(columnData, buf, columnInfo.column, bufStats, lastInputRowInExtent, columnInfo,
updateCPInfoPendingFlag, section);
if (updateCPInfoPendingFlag) if (updateCPInfoPendingFlag)
{ {
@ -1764,8 +1766,9 @@ int BulkLoadBuffer::parseColParquet(ColumnInfo& columnInfo)
// fTotalReadRowsParser (in) - current batch size(row number) // fTotalReadRowsParser (in) - current batch size(row number)
// fAutoIncNextValue (in) - first auto increment number of this batch // fAutoIncNextValue (in) - first auto increment number of this batch
//----------------------------------------------------------------------------------- //-----------------------------------------------------------------------------------
void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, unsigned char* buf, const JobColumn& column, void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, unsigned char* buf,
BLBufferStats& bufStats, RID& lastInputRowInExtent, ColumnInfo& columnInfo, const JobColumn& column, BLBufferStats& bufStats,
RID& lastInputRowInExtent, ColumnInfo& columnInfo,
bool& updateCPInfoPendingFlag, ColumnBufferSection* section) bool& updateCPInfoPendingFlag, ColumnBufferSection* section)
{ {
char biVal; char biVal;
@ -1841,13 +1844,12 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
break; break;
} }
//---------------------------------------------------------------------- //----------------------------------------------------------------------
// DOUBLE // DOUBLE
//---------------------------------------------------------------------- //----------------------------------------------------------------------
@ -1875,7 +1877,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
else else
{ {
memcpy(&dVal, dataPtr + i, width); memcpy(&dVal, dataPtr + i, width);
if (dVal > column.fMaxDblSat) if (dVal > column.fMaxDblSat)
{ {
dVal = column.fMaxDblSat; dVal = column.fMaxDblSat;
@ -1895,13 +1897,12 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
break; break;
} }
//---------------------------------------------------------------------- //----------------------------------------------------------------------
// CHARACTER // CHARACTER
//---------------------------------------------------------------------- //----------------------------------------------------------------------
@ -1992,8 +1993,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
break; break;
@ -2065,7 +2065,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
bufStats.minBufferVal = origVal; bufStats.minBufferVal = origVal;
if (origVal > bufStats.maxBufferVal) if (origVal > bufStats.maxBufferVal)
bufStats.maxBufferVal = origVal; bufStats.maxBufferVal = origVal;
siVal = origVal; siVal = origVal;
pVal = &siVal; pVal = &siVal;
memcpy(p, pVal, width); memcpy(p, pVal, width);
@ -2073,8 +2073,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
break; break;
@ -2139,7 +2138,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
bufStats.minBufferVal = origVal; bufStats.minBufferVal = origVal;
if (uVal > static_cast<uint64_t>(bufStats.maxBufferVal)) if (uVal > static_cast<uint64_t>(bufStats.maxBufferVal))
bufStats.maxBufferVal = origVal; bufStats.maxBufferVal = origVal;
usiVal = origVal; usiVal = origVal;
pVal = &usiVal; pVal = &usiVal;
memcpy(p, pVal, width); memcpy(p, pVal, width);
@ -2147,8 +2146,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
break; break;
@ -2159,10 +2157,10 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
//---------------------------------------------------------------------- //----------------------------------------------------------------------
case WriteEngine::WR_BYTE: case WriteEngine::WR_BYTE:
{ {
long long origVal; long long origVal;
// if use int8_t here, it will take 8 bool value of parquet array // if use int8_t here, it will take 8 bool value of parquet array
std::shared_ptr<arrow::BooleanArray> boolArray = std::static_pointer_cast<arrow::BooleanArray>(columnData); std::shared_ptr<arrow::BooleanArray> boolArray =
std::static_pointer_cast<arrow::BooleanArray>(columnData);
const int8_t* dataPtr = columnData->data()->GetValues<int8_t>(1); const int8_t* dataPtr = columnData->data()->GetValues<int8_t>(1);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++) for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
@ -2185,7 +2183,6 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
memcpy(p, pVal, width); memcpy(p, pVal, width);
continue; continue;
} }
} }
else else
{ {
@ -2224,10 +2221,9 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if (bSatVal) if (bSatVal)
bufStats.satCount++; bufStats.satCount++;
if (origVal < bufStats.minBufferVal) if (origVal < bufStats.minBufferVal)
bufStats.minBufferVal = origVal; bufStats.minBufferVal = origVal;
if (origVal > bufStats.maxBufferVal) if (origVal > bufStats.maxBufferVal)
bufStats.maxBufferVal = origVal; bufStats.maxBufferVal = origVal;
@ -2238,8 +2234,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
break; break;
@ -2317,8 +2312,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
} }
@ -2372,8 +2366,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
} }
@ -2387,14 +2380,13 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
case WriteEngine::WR_LONGLONG: case WriteEngine::WR_LONGLONG:
{ {
if (column.dataType != CalpontSystemCatalog::DATETIME && if (column.dataType != CalpontSystemCatalog::DATETIME &&
column.dataType != CalpontSystemCatalog::TIMESTAMP && column.dataType != CalpontSystemCatalog::TIMESTAMP && column.dataType != CalpontSystemCatalog::TIME)
column.dataType != CalpontSystemCatalog::TIME)
{ {
const long long *dataPtr = columnData->data()->GetValues<long long>(1); const long long* dataPtr = columnData->data()->GetValues<long long>(1);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++) for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{ {
void *p = buf + i * width; void* p = buf + i * width;
bool bSatVal = false; bool bSatVal = false;
if (columnData->IsNull(i)) if (columnData->IsNull(i))
@ -2459,259 +2451,282 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
}
}
}
else if (column.dataType == CalpontSystemCatalog::TIME)
{
// time conversion here
int rc = 0;
// for parquet, there are two time type, time32 and time64
// if it's time32, unit is millisecond, int32
if (columnData->type_id() == arrow::Type::type::TIME32 || columnData->type_id() == arrow::Type::type::NA)
{
std::shared_ptr<arrow::Time32Array> timeArray = std::static_pointer_cast<arrow::Time32Array>(columnData);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
void *p = buf + i * width;
if (columnData->IsNull(i))
{
if (column.fWithDefault)
{
llDate = column.fDefaultInt;
}
else
{
llDate = joblist::TIMENULL;
pVal = &llDate;
memcpy(p, pVal, width);
continue;
}
}
else
{
// timeVal is millisecond since midnight
int32_t timeVal = timeArray->Value(i);
llDate = dataconvert::DataConvert::convertArrowColumnTime32(timeVal, rc);
}
if (rc == 0)
{
if (llDate < bufStats.minBufferVal)
bufStats.minBufferVal = llDate;
if (llDate > bufStats.maxBufferVal)
bufStats.maxBufferVal = llDate;
}
else
{
bufStats.satCount++;
}
pVal = &llDate;
memcpy(p, pVal, width);
updateCPInfoPendingFlag = true;
if ((fStartRowParser + i) == lastInputRowInExtent)
{
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag,
section, i);
}
}
}
// if it's time64, unit is microsecond, int64
else if (columnData->type_id() == arrow::Type::type::TIME64)
{
std::shared_ptr<arrow::Time64Array> timeArray = std::static_pointer_cast<arrow::Time64Array>(columnData);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
void *p = buf + i * width;
if (columnData->IsNull(i))
{
if (column.fWithDefault)
{
llDate = column.fDefaultInt;
}
else
{
llDate = joblist::TIMENULL;
pVal = &llDate;
memcpy(p, pVal, width);
continue;
}
}
else
{
// timeVal is macrosecond since midnight
int64_t timeVal = timeArray->Value(i);
llDate = dataconvert::DataConvert::convertArrowColumnTime64(timeVal, rc);
}
if (rc == 0)
{
if (llDate < bufStats.minBufferVal)
bufStats.minBufferVal = llDate;
if (llDate > bufStats.maxBufferVal)
bufStats.maxBufferVal = llDate;
}
else
{
bufStats.satCount++;
}
pVal = &llDate;
memcpy(p, pVal, width);
updateCPInfoPendingFlag = true;
if ((fStartRowParser + i) == lastInputRowInExtent)
{
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag,
section, i);
}
}
}
}
else if (column.dataType == CalpontSystemCatalog::TIMESTAMP)
{
// timestamp conversion here
// default column type is TIMESTAMP
// default unit is millisecond
std::shared_ptr<arrow::TimestampArray> timeStampArray = std::static_pointer_cast<arrow::TimestampArray>(columnData);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
int rc = 0;
void *p = buf + i * width;
if (columnData->IsNull(i))
{
if (column.fWithDefault)
{
llDate = column.fDefaultInt;
}
else
{
llDate = joblist::TIMESTAMPNULL;
pVal = &llDate;
memcpy(p, pVal, width);
continue;
}
}
else
{
int64_t timeVal = timeStampArray->Value(i);
std::shared_ptr<arrow::TimestampType> fType = std::static_pointer_cast<arrow::TimestampType>(columnData->type());
if (fType->unit() == arrow::TimeUnit::MILLI)
{
llDate = dataconvert::DataConvert::convertArrowColumnTimestamp(timeVal, rc);
}
else
{
llDate = dataconvert::DataConvert::convertArrowColumnTimestampUs(timeVal, rc);
}
}
if (rc == 0)
{
if (llDate < bufStats.minBufferVal)
bufStats.minBufferVal = llDate;
if (llDate > bufStats.maxBufferVal)
bufStats.maxBufferVal = llDate;
}
else
{
llDate = 0;
bufStats.satCount++;
}
pVal = &llDate;
memcpy(p, pVal, width);
updateCPInfoPendingFlag = true;
if ((fStartRowParser + i) == lastInputRowInExtent)
{
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag,
section, i);
} }
} }
} }
else else
{ {
// datetime conversion here datatypes::TypeAttributesStd dummyTypeAttribute;
// default column type is TIMESTAMP const auto* typeHandler = datatypes::TypeHandler::find(column.dataType, dummyTypeAttribute);
std::shared_ptr<arrow::TimestampArray> dateTimeArray = std::static_pointer_cast<arrow::TimestampArray>(columnData);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
int rc = 0;
void *p = buf + i * width;
if (columnData->IsNull(i)) if (column.dataType == CalpontSystemCatalog::TIME)
{
// time conversion here
int rc = 0;
// for parquet, there are two time type, time32 and time64
// if it's time32, unit is millisecond, int32
if (columnData->type_id() == arrow::Type::type::TIME32 ||
columnData->type_id() == arrow::Type::type::NA)
{ {
if (column.fWithDefault) std::shared_ptr<arrow::Time32Array> timeArray =
std::static_pointer_cast<arrow::Time32Array>(columnData);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{ {
llDate = column.fDefaultInt; void* p = buf + i * width;
}
else if (columnData->IsNull(i))
{ {
llDate = joblist::DATETIMENULL; if (column.fWithDefault)
{
llDate = column.fDefaultInt;
}
else
{
llDate = joblist::TIMENULL;
pVal = &llDate;
memcpy(p, pVal, width);
continue;
}
}
else
{
// timeVal is millisecond since midnight
int32_t timeVal = timeArray->Value(i);
const datatypes::TypeHandlerTime* typeHandlerTime =
dynamic_cast<const datatypes::TypeHandlerTime*>(typeHandler);
idbassert(typeHandlerTime);
llDate = typeHandlerTime->convertArrowColumnTime32(timeVal, rc);
}
if (rc == 0)
{
if (llDate < bufStats.minBufferVal)
bufStats.minBufferVal = llDate;
if (llDate > bufStats.maxBufferVal)
bufStats.maxBufferVal = llDate;
}
else
{
bufStats.satCount++;
}
pVal = &llDate; pVal = &llDate;
memcpy(p, pVal, width); memcpy(p, pVal, width);
continue; updateCPInfoPendingFlag = true;
if ((fStartRowParser + i) == lastInputRowInExtent)
{
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section,
i);
}
} }
} }
else // if it's time64, unit is microsecond, int64
else if (columnData->type_id() == arrow::Type::type::TIME64)
{ {
int64_t timeVal = dateTimeArray->Value(i); std::shared_ptr<arrow::Time64Array> timeArray =
std::shared_ptr<arrow::TimestampType> fType = std::static_pointer_cast<arrow::TimestampType>(columnData->type()); std::static_pointer_cast<arrow::Time64Array>(columnData);
if (fType->unit() == arrow::TimeUnit::MILLI) for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{ {
llDate = dataconvert::DataConvert::convertArrowColumnDatetime(timeVal, rc); void* p = buf + i * width;
if (columnData->IsNull(i))
{
if (column.fWithDefault)
{
llDate = column.fDefaultInt;
}
else
{
llDate = joblist::TIMENULL;
pVal = &llDate;
memcpy(p, pVal, width);
continue;
}
}
else
{
// timeVal is macrosecond since midnight
int64_t timeVal = timeArray->Value(i);
const datatypes::TypeHandlerTime* typeHandlerTime =
dynamic_cast<const datatypes::TypeHandlerTime*>(typeHandler);
idbassert(typeHandlerTime);
llDate = typeHandlerTime->convertArrowColumnTime64(timeVal, rc);
}
if (rc == 0)
{
if (llDate < bufStats.minBufferVal)
bufStats.minBufferVal = llDate;
if (llDate > bufStats.maxBufferVal)
bufStats.maxBufferVal = llDate;
}
else
{
bufStats.satCount++;
}
pVal = &llDate;
memcpy(p, pVal, width);
updateCPInfoPendingFlag = true;
if ((fStartRowParser + i) == lastInputRowInExtent)
{
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section,
i);
}
}
}
}
else if (column.dataType == CalpontSystemCatalog::TIMESTAMP)
{
// timestamp conversion here
// default column type is TIMESTAMP
// default unit is millisecond
std::shared_ptr<arrow::TimestampArray> timeStampArray =
std::static_pointer_cast<arrow::TimestampArray>(columnData);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
int rc = 0;
void* p = buf + i * width;
if (columnData->IsNull(i))
{
if (column.fWithDefault)
{
llDate = column.fDefaultInt;
}
else
{
llDate = joblist::TIMESTAMPNULL;
pVal = &llDate;
memcpy(p, pVal, width);
continue;
}
} }
else else
{ {
llDate = dataconvert::DataConvert::convertArrowColumnDatetimeUs(timeVal, rc); int64_t timeVal = timeStampArray->Value(i);
std::shared_ptr<arrow::TimestampType> fType =
std::static_pointer_cast<arrow::TimestampType>(columnData->type());
const datatypes::TypeHandlerTimestamp* typeHandlerTimestamp =
dynamic_cast<const datatypes::TypeHandlerTimestamp*>(typeHandler);
idbassert(typeHandlerTimestamp);
if (fType->unit() == arrow::TimeUnit::MILLI)
{
llDate = typeHandlerTimestamp->convertArrowColumnTimestamp(timeVal, rc);
}
else
{
llDate = typeHandlerTimestamp->convertArrowColumnTimestampUs(timeVal, rc);
}
}
if (rc == 0)
{
if (llDate < bufStats.minBufferVal)
bufStats.minBufferVal = llDate;
if (llDate > bufStats.maxBufferVal)
bufStats.maxBufferVal = llDate;
}
else
{
llDate = 0;
bufStats.satCount++;
}
pVal = &llDate;
memcpy(p, pVal, width);
updateCPInfoPendingFlag = true;
if ((fStartRowParser + i) == lastInputRowInExtent)
{
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
} }
} }
}
else
{
// datetime conversion here
// default column type is TIMESTAMP
std::shared_ptr<arrow::TimestampArray> dateTimeArray =
std::static_pointer_cast<arrow::TimestampArray>(columnData);
if (rc == 0) for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{ {
if (llDate < bufStats.minBufferVal) int rc = 0;
bufStats.minBufferVal = llDate; void* p = buf + i * width;
if (llDate > bufStats.maxBufferVal) if (columnData->IsNull(i))
bufStats.maxBufferVal = llDate; {
} if (column.fWithDefault)
else {
{ llDate = column.fDefaultInt;
llDate = 0; }
bufStats.satCount++; else
} {
llDate = joblist::DATETIMENULL;
pVal = &llDate;
memcpy(p, pVal, width);
continue;
}
}
else
{
int64_t timeVal = dateTimeArray->Value(i);
std::shared_ptr<arrow::TimestampType> fType =
std::static_pointer_cast<arrow::TimestampType>(columnData->type());
pVal = &llDate; const datatypes::TypeHandlerDatetime* typeHandlerDateTime =
memcpy(p, pVal, width); dynamic_cast<const datatypes::TypeHandlerDatetime*>(typeHandler);
updateCPInfoPendingFlag = true; idbassert(typeHandlerDateTime);
if ((fStartRowParser + i) == lastInputRowInExtent) if (fType->unit() == arrow::TimeUnit::MILLI)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, llDate = typeHandlerDateTime->convertArrowColumnDatetime(timeVal, rc);
section, i); }
else
{
llDate = typeHandlerDateTime->convertArrowColumnDatetimeUs(timeVal, rc);
}
}
if (rc == 0)
{
if (llDate < bufStats.minBufferVal)
bufStats.minBufferVal = llDate;
if (llDate > bufStats.maxBufferVal)
bufStats.maxBufferVal = llDate;
}
else
{
llDate = 0;
bufStats.satCount++;
}
pVal = &llDate;
memcpy(p, pVal, width);
updateCPInfoPendingFlag = true;
if ((fStartRowParser + i) == lastInputRowInExtent)
{
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
}
} }
} }
} }
break; break;
} }
@ -2721,8 +2736,10 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
//---------------------------------------------------------------------- //----------------------------------------------------------------------
case WriteEngine::WR_BINARY: case WriteEngine::WR_BINARY:
{ {
std::shared_ptr<arrow::Decimal128Array> decimalArray = std::static_pointer_cast<arrow::Decimal128Array>(columnData); std::shared_ptr<arrow::Decimal128Array> decimalArray =
std::shared_ptr<arrow::DecimalType> fType = std::static_pointer_cast<arrow::DecimalType>(decimalArray->type()); std::static_pointer_cast<arrow::Decimal128Array>(columnData);
std::shared_ptr<arrow::DecimalType> fType =
std::static_pointer_cast<arrow::DecimalType>(decimalArray->type());
const int128_t* dataPtr = decimalArray->data()->GetValues<int128_t>(1); const int128_t* dataPtr = decimalArray->data()->GetValues<int128_t>(1);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++) for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
@ -2765,18 +2782,17 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if (bigllVal < bufStats.bigMinBufferVal) if (bigllVal < bufStats.bigMinBufferVal)
bufStats.bigMinBufferVal = bigllVal; bufStats.bigMinBufferVal = bigllVal;
if (bigllVal > bufStats.bigMaxBufferVal) if (bigllVal > bufStats.bigMaxBufferVal)
bufStats.bigMaxBufferVal = bigllVal; bufStats.bigMaxBufferVal = bigllVal;
pVal = &bigllVal; pVal = &bigllVal;
memcpy(p, pVal, width); memcpy(p, pVal, width);
updateCPInfoPendingFlag = true; updateCPInfoPendingFlag = true;
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
break; break;
@ -2817,7 +2833,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
} }
else else
{ {
memcpy(&ullVal, dataPtr+i, width); memcpy(&ullVal, dataPtr + i, width);
} }
if (ullVal > column.fMaxIntSat) if (ullVal > column.fMaxIntSat)
@ -2840,8 +2856,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
break; break;
@ -2917,8 +2932,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
break; break;
@ -3003,16 +3017,20 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
} }
else else
{ {
// date conversion here // Parquet support.
std::shared_ptr<arrow::Date32Array> dateArray = std::static_pointer_cast<arrow::Date32Array>(columnData); std::shared_ptr<arrow::Date32Array> dateArray =
std::static_pointer_cast<arrow::Date32Array>(columnData);
datatypes::TypeAttributesStd dummyTypeAttribute;
const datatypes::TypeHandlerDate* typeHandlerDate = dynamic_cast<const datatypes::TypeHandlerDate*>(
datatypes::TypeHandler::find(column.dataType, dummyTypeAttribute));
idbassert(typeHandlerDate);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++) for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{ {
int rc = 0; int rc = 0;
@ -3035,7 +3053,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
else else
{ {
int32_t dayVal = dateArray->Value(i); int32_t dayVal = dateArray->Value(i);
iDate = dataconvert::DataConvert::ConvertArrowColumnDate(dayVal, rc); iDate = typeHandlerDate->convertArrowColumnDate(dayVal, rc);
} }
if (rc == 0) if (rc == 0)
@ -3055,11 +3073,10 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
pVal = &iDate; pVal = &iDate;
memcpy(p, pVal, width); memcpy(p, pVal, width);
updateCPInfoPendingFlag = true; updateCPInfoPendingFlag = true;
if ((fStartRowParser + i) == lastInputRowInExtent) if ((fStartRowParser + i) == lastInputRowInExtent)
{ {
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
section, i);
} }
} }
} }
@ -3067,8 +3084,9 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
} }
} }
inline void BulkLoadBuffer::updateCPMinMax(ColumnInfo& columnInfo, RID& lastInputRowInExtent, BLBufferStats& bufStats, inline void BulkLoadBuffer::updateCPMinMax(ColumnInfo& columnInfo, RID& lastInputRowInExtent,
bool& updateCPInfoPendingFlag, ColumnBufferSection* section, uint32_t curRow) BLBufferStats& bufStats, bool& updateCPInfoPendingFlag,
ColumnBufferSection* section, uint32_t curRow)
{ {
if (columnInfo.column.width <= 8) if (columnInfo.column.width <= 8)
{ {
@ -3085,9 +3103,8 @@ inline void BulkLoadBuffer::updateCPMinMax(ColumnInfo& columnInfo, RID& lastInpu
if (fLog->isDebug(DEBUG_2)) if (fLog->isDebug(DEBUG_2))
{ {
ostringstream oss; ostringstream oss;
oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows1: " << section->startRowId()
<< "; StartRID/Rows1: " << section->startRowId() << " " << curRow + 1 << " " << curRow + 1 << "; lastExtentRow: " << lastInputRowInExtent;
<< "; lastExtentRow: " << lastInputRowInExtent;
parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal); parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal);
fLog->logMsg(oss.str(), MSGLVL_INFO2); fLog->logMsg(oss.str(), MSGLVL_INFO2);