diff --git a/datatypes/mcs_datatype.cpp b/datatypes/mcs_datatype.cpp index f4aa01a5f..fbeaa709d 100644 --- a/datatypes/mcs_datatype.cpp +++ b/datatypes/mcs_datatype.cpp @@ -1609,6 +1609,11 @@ boost::any TypeHandlerDate::convertFromString(const TypeAttributesStd& colType, return dataconvert::DataConvert::StringToDate(data, pushWarning); } +int32_t TypeHandlerDate::convertArrowColumnDate(int32_t dayVal, int& status) const +{ + return dataconvert::DataConvert::convertArrowColumnDate(dayVal, status); +} + boost::any TypeHandlerDatetime::convertFromString(const TypeAttributesStd& colType, const ConvertFromStringParam& prm, const std::string& data, bool& pushWarning) const @@ -1616,6 +1621,16 @@ boost::any TypeHandlerDatetime::convertFromString(const TypeAttributesStd& colTy return dataconvert::DataConvert::StringToDatetime(data, pushWarning); } +int64_t TypeHandlerDatetime::convertArrowColumnDatetime(int64_t timeVal, int& status) const +{ + return dataconvert::DataConvert::convertArrowColumnDatetime(timeVal, status); +} + +int64_t TypeHandlerDatetime::convertArrowColumnDatetimeUs(int64_t timeVal, int& status) const +{ + return dataconvert::DataConvert::convertArrowColumnDatetimeUs(timeVal, status); +} + boost::any TypeHandlerTime::convertFromString(const TypeAttributesStd& colType, const ConvertFromStringParam& prm, const std::string& data, bool& pushWarning) const @@ -1623,6 +1638,16 @@ boost::any TypeHandlerTime::convertFromString(const TypeAttributesStd& colType, return dataconvert::DataConvert::StringToTime(colType, data, pushWarning); } +int64_t TypeHandlerTime::convertArrowColumnTime64(int64_t timeVal, int& status) const +{ + return dataconvert::DataConvert::convertArrowColumnTime64(timeVal, status); +} + +int64_t TypeHandlerTime::convertArrowColumnTime32(int32_t timeVal, int& status) const +{ + return dataconvert::DataConvert::convertArrowColumnTime32(timeVal, status); +} + boost::any TypeHandlerTimestamp::convertFromString(const TypeAttributesStd& colType, const ConvertFromStringParam& prm, const std::string& data, bool& pushWarning) const @@ -1630,6 +1655,16 @@ boost::any TypeHandlerTimestamp::convertFromString(const TypeAttributesStd& colT return dataconvert::DataConvert::StringToTimestamp(prm, data, pushWarning); } +int64_t TypeHandlerTimestamp::convertArrowColumnTimestamp(int64_t timeVal, int& status) const +{ + return dataconvert::DataConvert::convertArrowColumnTimestamp(timeVal, status); +} + +int64_t TypeHandlerTimestamp::convertArrowColumnTimestampUs(int64_t timeVal, int& status) const +{ + return dataconvert::DataConvert::convertArrowColumnTimestampUs(timeVal, status); +} + boost::any TypeHandlerChar::convertFromString(const TypeAttributesStd& colType, const ConvertFromStringParam& prm, const std::string& data, bool& pushWarning) const diff --git a/datatypes/mcs_datatype.h b/datatypes/mcs_datatype.h index 4f1ddfa6c..8db04d353 100644 --- a/datatypes/mcs_datatype.h +++ b/datatypes/mcs_datatype.h @@ -2276,6 +2276,10 @@ class TypeHandlerTemporal : public TypeHandler class TypeHandlerDate : public TypeHandlerTemporal { + public: + int32_t convertArrowColumnDate(int32_t dayVal, int& status) const; + + private: const string& name() const override; code_t code() const override { @@ -2301,6 +2305,11 @@ class TypeHandlerDate : public TypeHandlerTemporal class TypeHandlerDatetime : public TypeHandlerTemporal { + public: + int64_t convertArrowColumnDatetime(int64_t timeVal, int& status) const; + int64_t convertArrowColumnDatetimeUs(int64_t timeVal, int& status) const; + + private: const string& name() const override; code_t code() const override { @@ -2326,6 +2335,11 @@ class TypeHandlerDatetime : public TypeHandlerTemporal class TypeHandlerTime : public TypeHandlerTemporal { + public: + int64_t convertArrowColumnTime64(int64_t timeVal, int& status) const; + int64_t convertArrowColumnTime32(int32_t timeVal, int& status) const; + + private: const string& name() const override; code_t code() const override { @@ -2351,6 +2365,11 @@ class TypeHandlerTime : public TypeHandlerTemporal class TypeHandlerTimestamp : public TypeHandlerTemporal { + public: + int64_t convertArrowColumnTimestamp(int64_t timeVal, int& status) const; + int64_t convertArrowColumnTimestampUs(int64_t timeVal, int& status) const; + + private: const string& name() const override; code_t code() const override { diff --git a/utils/dataconvert/dataconvert.cpp b/utils/dataconvert/dataconvert.cpp index 496126ff8..7206a1b0c 100644 --- a/utils/dataconvert/dataconvert.cpp +++ b/utils/dataconvert/dataconvert.cpp @@ -1576,7 +1576,7 @@ boost::any DataConvert::StringToTimestamp(const datatypes::ConvertFromStringPara //------------------------------------------------------------------------------ // Convert date32 parquet data to binary date. Used by BulkLoad. //------------------------------------------------------------------------------ -int32_t DataConvert::ConvertArrowColumnDate(int32_t dayVal, int& status) +int32_t DataConvert::convertArrowColumnDate(int32_t dayVal, int& status) { int inYear; int inMonth; diff --git a/utils/dataconvert/dataconvert.h b/utils/dataconvert/dataconvert.h index 271296250..15d4246d7 100644 --- a/utils/dataconvert/dataconvert.h +++ b/utils/dataconvert/dataconvert.h @@ -1176,7 +1176,7 @@ class DataConvert * @param dayVal the input data representing days * @param status 0 - success, -1 - fail */ - EXPORT static int32_t ConvertArrowColumnDate(int32_t dayVal, int& status); + EXPORT static int32_t convertArrowColumnDate(int32_t dayVal, int& status); /** * @brief convert a date column data, represnted as a string, to it's native diff --git a/writeengine/bulk/we_bulkloadbuffer.cpp b/writeengine/bulk/we_bulkloadbuffer.cpp index 47451fe48..2b245181b 100644 --- a/writeengine/bulk/we_bulkloadbuffer.cpp +++ b/writeengine/bulk/we_bulkloadbuffer.cpp @@ -40,6 +40,7 @@ #include "dataconvert.h" #include "exceptclasses.h" #include "mcs_decimal.h" +#include "mcs_datatype.h" #include "joblisttypes.h" @@ -1562,7 +1563,7 @@ int BulkLoadBuffer::parse(ColumnInfo& columnInfo) boost::mutex::scoped_lock lock(fSyncUpdatesBLB); fTotalReadRowsParser = fTotalReadRows; fStartRowParser = fStartRow; - + if (fImportDataMode != IMPORT_DATA_PARQUET) { fDataParser = fData; @@ -1603,7 +1604,7 @@ int BulkLoadBuffer::parse(ColumnInfo& columnInfo) int BulkLoadBuffer::parseColParquet(ColumnInfo& columnInfo) { int rc = NO_ERROR; - + // Parse the data and fill up a buffer; which is written to output file uint32_t nRowsParsed; @@ -1689,8 +1690,9 @@ int BulkLoadBuffer::parseColParquet(ColumnInfo& columnInfo) return ERR_PARQUET_AUX; } } - - convertParquet(columnData, buf, columnInfo.column, bufStats, lastInputRowInExtent, columnInfo, updateCPInfoPendingFlag, section); + + convertParquet(columnData, buf, columnInfo.column, bufStats, lastInputRowInExtent, columnInfo, + updateCPInfoPendingFlag, section); if (updateCPInfoPendingFlag) { @@ -1764,8 +1766,9 @@ int BulkLoadBuffer::parseColParquet(ColumnInfo& columnInfo) // fTotalReadRowsParser (in) - current batch size(row number) // fAutoIncNextValue (in) - first auto increment number of this batch //----------------------------------------------------------------------------------- -void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, unsigned char* buf, const JobColumn& column, - BLBufferStats& bufStats, RID& lastInputRowInExtent, ColumnInfo& columnInfo, +void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, unsigned char* buf, + const JobColumn& column, BLBufferStats& bufStats, + RID& lastInputRowInExtent, ColumnInfo& columnInfo, bool& updateCPInfoPendingFlag, ColumnBufferSection* section) { char biVal; @@ -1841,13 +1844,12 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; } - + //---------------------------------------------------------------------- // DOUBLE //---------------------------------------------------------------------- @@ -1875,7 +1877,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un else { memcpy(&dVal, dataPtr + i, width); - + if (dVal > column.fMaxDblSat) { dVal = column.fMaxDblSat; @@ -1895,13 +1897,12 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; } - + //---------------------------------------------------------------------- // CHARACTER //---------------------------------------------------------------------- @@ -1992,8 +1993,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; @@ -2065,7 +2065,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un bufStats.minBufferVal = origVal; if (origVal > bufStats.maxBufferVal) bufStats.maxBufferVal = origVal; - + siVal = origVal; pVal = &siVal; memcpy(p, pVal, width); @@ -2073,8 +2073,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; @@ -2139,7 +2138,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un bufStats.minBufferVal = origVal; if (uVal > static_cast(bufStats.maxBufferVal)) bufStats.maxBufferVal = origVal; - + usiVal = origVal; pVal = &usiVal; memcpy(p, pVal, width); @@ -2147,8 +2146,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; @@ -2159,10 +2157,10 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un //---------------------------------------------------------------------- case WriteEngine::WR_BYTE: { - long long origVal; // if use int8_t here, it will take 8 bool value of parquet array - std::shared_ptr boolArray = std::static_pointer_cast(columnData); + std::shared_ptr boolArray = + std::static_pointer_cast(columnData); const int8_t* dataPtr = columnData->data()->GetValues(1); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) @@ -2185,7 +2183,6 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un memcpy(p, pVal, width); continue; } - } else { @@ -2224,10 +2221,9 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if (bSatVal) bufStats.satCount++; - if (origVal < bufStats.minBufferVal) bufStats.minBufferVal = origVal; - + if (origVal > bufStats.maxBufferVal) bufStats.maxBufferVal = origVal; @@ -2238,8 +2234,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; @@ -2317,8 +2312,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } @@ -2372,8 +2366,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } @@ -2387,14 +2380,13 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un case WriteEngine::WR_LONGLONG: { if (column.dataType != CalpontSystemCatalog::DATETIME && - column.dataType != CalpontSystemCatalog::TIMESTAMP && - column.dataType != CalpontSystemCatalog::TIME) + column.dataType != CalpontSystemCatalog::TIMESTAMP && column.dataType != CalpontSystemCatalog::TIME) { - const long long *dataPtr = columnData->data()->GetValues(1); + const long long* dataPtr = columnData->data()->GetValues(1); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { - void *p = buf + i * width; + void* p = buf + i * width; bool bSatVal = false; if (columnData->IsNull(i)) @@ -2459,259 +2451,282 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); - } - } - } - else if (column.dataType == CalpontSystemCatalog::TIME) - { - // time conversion here - int rc = 0; - - // for parquet, there are two time type, time32 and time64 - // if it's time32, unit is millisecond, int32 - if (columnData->type_id() == arrow::Type::type::TIME32 || columnData->type_id() == arrow::Type::type::NA) - { - std::shared_ptr timeArray = std::static_pointer_cast(columnData); - - for (unsigned int i = 0; i < fTotalReadRowsParser; i++) - { - void *p = buf + i * width; - - if (columnData->IsNull(i)) - { - if (column.fWithDefault) - { - llDate = column.fDefaultInt; - } - else - { - llDate = joblist::TIMENULL; - pVal = &llDate; - memcpy(p, pVal, width); - continue; - } - } - else - { - // timeVal is millisecond since midnight - int32_t timeVal = timeArray->Value(i); - llDate = dataconvert::DataConvert::convertArrowColumnTime32(timeVal, rc); - - } - - if (rc == 0) - { - if (llDate < bufStats.minBufferVal) - bufStats.minBufferVal = llDate; - - if (llDate > bufStats.maxBufferVal) - bufStats.maxBufferVal = llDate; - } - else - { - bufStats.satCount++; - } - - pVal = &llDate; - memcpy(p, pVal, width); - updateCPInfoPendingFlag = true; - - if ((fStartRowParser + i) == lastInputRowInExtent) - { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); - } - } - } - // if it's time64, unit is microsecond, int64 - else if (columnData->type_id() == arrow::Type::type::TIME64) - { - std::shared_ptr timeArray = std::static_pointer_cast(columnData); - - for (unsigned int i = 0; i < fTotalReadRowsParser; i++) - { - void *p = buf + i * width; - - if (columnData->IsNull(i)) - { - if (column.fWithDefault) - { - llDate = column.fDefaultInt; - } - else - { - llDate = joblist::TIMENULL; - pVal = &llDate; - memcpy(p, pVal, width); - continue; - } - } - else - { - // timeVal is macrosecond since midnight - int64_t timeVal = timeArray->Value(i); - llDate = dataconvert::DataConvert::convertArrowColumnTime64(timeVal, rc); - - } - - if (rc == 0) - { - if (llDate < bufStats.minBufferVal) - bufStats.minBufferVal = llDate; - - if (llDate > bufStats.maxBufferVal) - bufStats.maxBufferVal = llDate; - } - else - { - bufStats.satCount++; - } - - pVal = &llDate; - memcpy(p, pVal, width); - updateCPInfoPendingFlag = true; - - if ((fStartRowParser + i) == lastInputRowInExtent) - { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); - } - } - } - } - else if (column.dataType == CalpontSystemCatalog::TIMESTAMP) - { - // timestamp conversion here - // default column type is TIMESTAMP - // default unit is millisecond - std::shared_ptr timeStampArray = std::static_pointer_cast(columnData); - - for (unsigned int i = 0; i < fTotalReadRowsParser; i++) - { - int rc = 0; - void *p = buf + i * width; - - if (columnData->IsNull(i)) - { - if (column.fWithDefault) - { - llDate = column.fDefaultInt; - } - else - { - llDate = joblist::TIMESTAMPNULL; - pVal = &llDate; - memcpy(p, pVal, width); - continue; - } - } - else - { - int64_t timeVal = timeStampArray->Value(i); - std::shared_ptr fType = std::static_pointer_cast(columnData->type()); - - if (fType->unit() == arrow::TimeUnit::MILLI) - { - llDate = dataconvert::DataConvert::convertArrowColumnTimestamp(timeVal, rc); - } - else - { - llDate = dataconvert::DataConvert::convertArrowColumnTimestampUs(timeVal, rc); - } - } - - if (rc == 0) - { - if (llDate < bufStats.minBufferVal) - bufStats.minBufferVal = llDate; - - if (llDate > bufStats.maxBufferVal) - bufStats.maxBufferVal = llDate; - } - else - { - llDate = 0; - bufStats.satCount++; - } - - pVal = &llDate; - memcpy(p, pVal, width); - updateCPInfoPendingFlag = true; - - if ((fStartRowParser + i) == lastInputRowInExtent) - { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } else { - // datetime conversion here - // default column type is TIMESTAMP - std::shared_ptr dateTimeArray = std::static_pointer_cast(columnData); - - for (unsigned int i = 0; i < fTotalReadRowsParser; i++) - { - int rc = 0; - void *p = buf + i * width; + datatypes::TypeAttributesStd dummyTypeAttribute; + const auto* typeHandler = datatypes::TypeHandler::find(column.dataType, dummyTypeAttribute); - if (columnData->IsNull(i)) + if (column.dataType == CalpontSystemCatalog::TIME) + { + // time conversion here + int rc = 0; + + // for parquet, there are two time type, time32 and time64 + // if it's time32, unit is millisecond, int32 + if (columnData->type_id() == arrow::Type::type::TIME32 || + columnData->type_id() == arrow::Type::type::NA) { - if (column.fWithDefault) + std::shared_ptr timeArray = + std::static_pointer_cast(columnData); + + for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { - llDate = column.fDefaultInt; - } - else - { - llDate = joblist::DATETIMENULL; + void* p = buf + i * width; + + if (columnData->IsNull(i)) + { + if (column.fWithDefault) + { + llDate = column.fDefaultInt; + } + else + { + llDate = joblist::TIMENULL; + pVal = &llDate; + memcpy(p, pVal, width); + continue; + } + } + else + { + // timeVal is millisecond since midnight + int32_t timeVal = timeArray->Value(i); + const datatypes::TypeHandlerTime* typeHandlerTime = + dynamic_cast(typeHandler); + idbassert(typeHandlerTime); + + llDate = typeHandlerTime->convertArrowColumnTime32(timeVal, rc); + } + + if (rc == 0) + { + if (llDate < bufStats.minBufferVal) + bufStats.minBufferVal = llDate; + + if (llDate > bufStats.maxBufferVal) + bufStats.maxBufferVal = llDate; + } + else + { + bufStats.satCount++; + } + pVal = &llDate; memcpy(p, pVal, width); - continue; + updateCPInfoPendingFlag = true; + + if ((fStartRowParser + i) == lastInputRowInExtent) + { + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, + i); + } } } - else + // if it's time64, unit is microsecond, int64 + else if (columnData->type_id() == arrow::Type::type::TIME64) { - int64_t timeVal = dateTimeArray->Value(i); - std::shared_ptr fType = std::static_pointer_cast(columnData->type()); + std::shared_ptr timeArray = + std::static_pointer_cast(columnData); - if (fType->unit() == arrow::TimeUnit::MILLI) + for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { - llDate = dataconvert::DataConvert::convertArrowColumnDatetime(timeVal, rc); + void* p = buf + i * width; + + if (columnData->IsNull(i)) + { + if (column.fWithDefault) + { + llDate = column.fDefaultInt; + } + else + { + llDate = joblist::TIMENULL; + pVal = &llDate; + memcpy(p, pVal, width); + continue; + } + } + else + { + // timeVal is macrosecond since midnight + int64_t timeVal = timeArray->Value(i); + const datatypes::TypeHandlerTime* typeHandlerTime = + dynamic_cast(typeHandler); + idbassert(typeHandlerTime); + + llDate = typeHandlerTime->convertArrowColumnTime64(timeVal, rc); + } + + if (rc == 0) + { + if (llDate < bufStats.minBufferVal) + bufStats.minBufferVal = llDate; + + if (llDate > bufStats.maxBufferVal) + bufStats.maxBufferVal = llDate; + } + else + { + bufStats.satCount++; + } + + pVal = &llDate; + memcpy(p, pVal, width); + updateCPInfoPendingFlag = true; + + if ((fStartRowParser + i) == lastInputRowInExtent) + { + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, + i); + } + } + } + } + else if (column.dataType == CalpontSystemCatalog::TIMESTAMP) + { + // timestamp conversion here + // default column type is TIMESTAMP + // default unit is millisecond + std::shared_ptr timeStampArray = + std::static_pointer_cast(columnData); + + for (unsigned int i = 0; i < fTotalReadRowsParser; i++) + { + int rc = 0; + void* p = buf + i * width; + + if (columnData->IsNull(i)) + { + if (column.fWithDefault) + { + llDate = column.fDefaultInt; + } + else + { + llDate = joblist::TIMESTAMPNULL; + pVal = &llDate; + memcpy(p, pVal, width); + continue; + } } else { - llDate = dataconvert::DataConvert::convertArrowColumnDatetimeUs(timeVal, rc); + int64_t timeVal = timeStampArray->Value(i); + std::shared_ptr fType = + std::static_pointer_cast(columnData->type()); + + const datatypes::TypeHandlerTimestamp* typeHandlerTimestamp = + dynamic_cast(typeHandler); + idbassert(typeHandlerTimestamp); + + if (fType->unit() == arrow::TimeUnit::MILLI) + { + llDate = typeHandlerTimestamp->convertArrowColumnTimestamp(timeVal, rc); + } + else + { + llDate = typeHandlerTimestamp->convertArrowColumnTimestampUs(timeVal, rc); + } + } + + if (rc == 0) + { + if (llDate < bufStats.minBufferVal) + bufStats.minBufferVal = llDate; + + if (llDate > bufStats.maxBufferVal) + bufStats.maxBufferVal = llDate; + } + else + { + llDate = 0; + bufStats.satCount++; + } + + pVal = &llDate; + memcpy(p, pVal, width); + updateCPInfoPendingFlag = true; + + if ((fStartRowParser + i) == lastInputRowInExtent) + { + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } + } + else + { + // datetime conversion here + // default column type is TIMESTAMP + std::shared_ptr dateTimeArray = + std::static_pointer_cast(columnData); - if (rc == 0) + for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { - if (llDate < bufStats.minBufferVal) - bufStats.minBufferVal = llDate; + int rc = 0; + void* p = buf + i * width; - if (llDate > bufStats.maxBufferVal) - bufStats.maxBufferVal = llDate; - } - else - { - llDate = 0; - bufStats.satCount++; - } + if (columnData->IsNull(i)) + { + if (column.fWithDefault) + { + llDate = column.fDefaultInt; + } + else + { + llDate = joblist::DATETIMENULL; + pVal = &llDate; + memcpy(p, pVal, width); + continue; + } + } + else + { + int64_t timeVal = dateTimeArray->Value(i); + std::shared_ptr fType = + std::static_pointer_cast(columnData->type()); - pVal = &llDate; - memcpy(p, pVal, width); - updateCPInfoPendingFlag = true; + const datatypes::TypeHandlerDatetime* typeHandlerDateTime = + dynamic_cast(typeHandler); + idbassert(typeHandlerDateTime); - if ((fStartRowParser + i) == lastInputRowInExtent) - { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + if (fType->unit() == arrow::TimeUnit::MILLI) + { + llDate = typeHandlerDateTime->convertArrowColumnDatetime(timeVal, rc); + } + else + { + llDate = typeHandlerDateTime->convertArrowColumnDatetimeUs(timeVal, rc); + } + } + + if (rc == 0) + { + if (llDate < bufStats.minBufferVal) + bufStats.minBufferVal = llDate; + + if (llDate > bufStats.maxBufferVal) + bufStats.maxBufferVal = llDate; + } + else + { + llDate = 0; + bufStats.satCount++; + } + + pVal = &llDate; + memcpy(p, pVal, width); + updateCPInfoPendingFlag = true; + + if ((fStartRowParser + i) == lastInputRowInExtent) + { + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); + } } } - } break; } @@ -2721,8 +2736,10 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un //---------------------------------------------------------------------- case WriteEngine::WR_BINARY: { - std::shared_ptr decimalArray = std::static_pointer_cast(columnData); - std::shared_ptr fType = std::static_pointer_cast(decimalArray->type()); + std::shared_ptr decimalArray = + std::static_pointer_cast(columnData); + std::shared_ptr fType = + std::static_pointer_cast(decimalArray->type()); const int128_t* dataPtr = decimalArray->data()->GetValues(1); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) @@ -2765,18 +2782,17 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if (bigllVal < bufStats.bigMinBufferVal) bufStats.bigMinBufferVal = bigllVal; - + if (bigllVal > bufStats.bigMaxBufferVal) bufStats.bigMaxBufferVal = bigllVal; - + pVal = &bigllVal; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; @@ -2817,7 +2833,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un } else { - memcpy(&ullVal, dataPtr+i, width); + memcpy(&ullVal, dataPtr + i, width); } if (ullVal > column.fMaxIntSat) @@ -2840,8 +2856,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; @@ -2917,8 +2932,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; @@ -3003,16 +3017,20 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } else { - // date conversion here - std::shared_ptr dateArray = std::static_pointer_cast(columnData); - + // Parquet support. + std::shared_ptr dateArray = + std::static_pointer_cast(columnData); + datatypes::TypeAttributesStd dummyTypeAttribute; + const datatypes::TypeHandlerDate* typeHandlerDate = dynamic_cast( + datatypes::TypeHandler::find(column.dataType, dummyTypeAttribute)); + idbassert(typeHandlerDate); + for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { int rc = 0; @@ -3035,7 +3053,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un else { int32_t dayVal = dateArray->Value(i); - iDate = dataconvert::DataConvert::ConvertArrowColumnDate(dayVal, rc); + iDate = typeHandlerDate->convertArrowColumnDate(dayVal, rc); } if (rc == 0) @@ -3055,11 +3073,10 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un pVal = &iDate; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; - + if ((fStartRowParser + i) == lastInputRowInExtent) { - updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, - section, i); + updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } @@ -3067,8 +3084,9 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, un } } -inline void BulkLoadBuffer::updateCPMinMax(ColumnInfo& columnInfo, RID& lastInputRowInExtent, BLBufferStats& bufStats, - bool& updateCPInfoPendingFlag, ColumnBufferSection* section, uint32_t curRow) +inline void BulkLoadBuffer::updateCPMinMax(ColumnInfo& columnInfo, RID& lastInputRowInExtent, + BLBufferStats& bufStats, bool& updateCPInfoPendingFlag, + ColumnBufferSection* section, uint32_t curRow) { if (columnInfo.column.width <= 8) { @@ -3085,9 +3103,8 @@ inline void BulkLoadBuffer::updateCPMinMax(ColumnInfo& columnInfo, RID& lastInpu if (fLog->isDebug(DEBUG_2)) { ostringstream oss; - oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid - << "; StartRID/Rows1: " << section->startRowId() << " " << curRow + 1 - << "; lastExtentRow: " << lastInputRowInExtent; + oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows1: " << section->startRowId() + << " " << curRow + 1 << "; lastExtentRow: " << lastInputRowInExtent; parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal); fLog->logMsg(oss.str(), MSGLVL_INFO2);