From 7f97a6618464d4e1311d62f2f2424a4d0ee7df7f Mon Sep 17 00:00:00 2001 From: Jigao Luo Date: Fri, 9 Sep 2022 14:51:35 +0200 Subject: [PATCH] [MCOL-4590] UNION Performance Improvement with the focus on the normalize functions. This patch improves the runtime performance of UNION processing in CS, as reported JIRA issue MCOL 4590. The idea of the optimization is to infer the normalize seperate functions beforehand and perform the normalization individually later, instead of a huge switch body of all normalization. This patch also cover engineering optimization, removing the hotspots in UNION processing. After application of this patch, the normalize part takes only about 25% of the whole UNION query in our experiment avg case. Signed-off-by: Jigao Luo --- dbcon/joblist/tupleunion.cpp | 2014 +++++++++++++++++++++------------- dbcon/joblist/tupleunion.h | 6 +- 2 files changed, 1260 insertions(+), 760 deletions(-) diff --git a/dbcon/joblist/tupleunion.cpp b/dbcon/joblist/tupleunion.cpp index 291dd325a..2c01bbf19 100644 --- a/dbcon/joblist/tupleunion.cpp +++ b/dbcon/joblist/tupleunion.cpp @@ -56,6 +56,1233 @@ inline double exp10(double x) } // namespace #endif +namespace +{ + // union helper functions. + + inline uint64_t pickScaleForDouble(Row* out, uint32_t i, double val) + { + /* have to pick a scale to use for the double. using 5... */ + uint32_t scale = 5; + uint64_t ival = (uint64_t)(double)(val * datatypes::scaleDivisor(scale)); + const int diff = out->getScale(i) - scale; + ival = datatypes::applySignedScale(ival, diff); + return ival; + } + + inline uint64_t pickScaleForLongDouble(Row* out, uint32_t i, long double val) + { + /* have to pick a scale to use for the double. using 5... */ + uint32_t scale = 5; + uint64_t ival = (uint64_t)(double)(val * datatypes::scaleDivisor(scale)); + int diff = out->getScale(i) - scale; + ival = datatypes::applySignedScale(ival, diff); + return ival; + } + + void normalizeIntToIntNoScale(const Row& in, Row* out, uint32_t i) + { + out->setIntField(in.getIntField(i), i); + } + + void normalizeIntToIntWithScaleInt128(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int128_t val = datatypes::applySignedScale(in.getIntField(i), diff); + out->setInt128Field(val, i); + } + + void normalizeIntToIntWithScaleInt64(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int64_t val = datatypes::applySignedScale(in.getIntField(i), diff); + out->setIntField(val, i); + } + + void normalizeIntToUintNoScale(const Row& in, Row* out, uint32_t i) + { + out->setUintField(in.getIntField(i), i); + } + + void normalizeIntToUintWithScaleInt128(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int128_t val = datatypes::applySignedScale(in.getIntField(i), diff); + out->setInt128Field(val, i); + } + + void normalizeIntToUintWithScaleInt64(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int64_t val = datatypes::applySignedScale(in.getIntField(i), diff); + out->setIntField(val, i); + } + + void normalizeIntToStringWithScale(const Row& in, Row* out, uint32_t i) + { + ostringstream os; + double d = in.getIntField(i); + d /= exp10(in.getScale(i)); + os.precision(15); + os << d; + out->setStringField(os.str(), i); + } + + void normalizeIntToStringNoScale(const Row& in, Row* out, uint32_t i) + { + ostringstream os; + os << in.getIntField(i); + out->setStringField(os.str(), i); + } + + void normalizeIntToXFloat(const Row& in, Row* out, uint32_t i) + { + auto d = in.getScaledSInt64FieldAsXFloat(i); + out->setFloatField((float)d, i); + } + + void normalizeIntToXDouble(const Row& in, Row* out, uint32_t i) + { + auto d = in.getScaledSInt64FieldAsXFloat(i); + out->setDoubleField(d, i); + } + + void normalizeIntToLongDouble(const Row& in, Row* out, uint32_t i) + { + auto d = in.getScaledSInt64FieldAsXFloat(i); + out->setLongDoubleField(d, i); + } + + void normalizeIntToXDecimalInt128(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int128_t val = datatypes::applySignedScale(in.getIntField(i), diff); + out->setInt128Field(val, i); + } + + void normalizeIntToXDecimalInt64(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int64_t val = datatypes::applySignedScale(in.getIntField(i), diff); + out->setIntField(val, i); + } + + void normalizeUintToIntNoScale(const Row& in, Row* out, uint32_t i) + { + out->setIntField(in.getUintField(i), i); + } + + void normalizeUintToIntWithScaleInt128(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int128_t val = datatypes::applySignedScale(in.getUintField(i), diff); + out->setInt128Field(val, i); + } + + void normalizeUntToIntWithScaleInt64(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + uint64_t val = datatypes::applySignedScale(in.getUintField(i), diff); + out->setIntField(val, i); + } + + void normalizeUintToUint(const Row& in, Row* out, uint32_t i) + { + out->setUintField(in.getUintField(i), i); + } + + void normalizeUintToStringWithScale(const Row& in, Row* out, uint32_t i) + { + ostringstream os; + double d = in.getUintField(i); + d /= exp10(in.getScale(i)); + os.precision(15); + os << d; + out->setStringField(os.str(), i); + } + + void normalizeUintToStringNoScale(const Row& in, Row* out, uint32_t i) + { + ostringstream os; + os << in.getUintField(i); + out->setStringField(os.str(), i); + } + + void normalizUintToXFloat(const Row& in, Row* out, uint32_t i) + { + auto d = in.getScaledUInt64FieldAsXFloat(i); + out->setFloatField((float)d, i); + } + + void normalizeUintToXDouble(const Row& in, Row* out, uint32_t i) + { + auto d = in.getScaledUInt64FieldAsXFloat(i); + out->setDoubleField(d, i); + } + + void normalizeUintToLongDouble(const Row& in, Row* out, uint32_t i) + { + auto d = in.getScaledUInt64FieldAsXFloat(i); + out->setLongDoubleField(d, i); + } + + void normalizeUintToXDecimalInt128(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int128_t val = datatypes::applySignedScale(in.getUintField(i), diff); + out->setInt128Field(val, i); + } + + void normalizeUintToXDecimalInt64(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + uint64_t val = datatypes::applySignedScale(in.getUintField(i), diff); + out->setIntField(val, i); + } + + void normalizeStringToString(const Row& in, Row* out, uint32_t i) + { + out->setStringField(in.getStringField(i), i); + } + + void normalizeDateToDate(const Row& in, Row* out, uint32_t i) + { + out->setIntField(in.getIntField(i), i); + } + + void normalizeDateToDatetime(const Row& in, Row* out, uint32_t i) + { + uint64_t date = in.getUintField(i); + date &= ~0x3f; // zero the 'spare' field + date <<= 32; + out->setUintField(date, i); + } + + void normalizeDateToTimestamp(const Row& in, Row* out, uint32_t i, long fTimeZone) + { + dataconvert::Date date(in.getUintField(i)); + dataconvert::MySQLTime m_time; + m_time.year = date.year; + m_time.month = date.month; + m_time.day = date.day; + m_time.hour = 0; + m_time.minute = 0; + m_time.second = 0; + m_time.second_part = 0; + + dataconvert::TimeStamp timeStamp; + bool isValid = true; + int64_t seconds = dataconvert::mySQLTimeToGmtSec(m_time, fTimeZone, isValid); + + if (!isValid) + { + timeStamp.reset(); + } + else + { + timeStamp.second = seconds; + timeStamp.msecond = m_time.second_part; + } + + uint64_t outValue = (uint64_t) * (reinterpret_cast(&timeStamp)); + out->setUintField(outValue, i); + } + + void normalizeDateToString(const Row& in, Row* out, uint32_t i) + { + string d = DataConvert::dateToString(in.getUintField(i)); + out->setStringField(d, i); + } + + void normalizeDatetimeToDatetime(const Row& in, Row* out, uint32_t i) + { + out->setIntField(in.getIntField(i), i); + } + + void normalizeDatetimeToDate(const Row& in, Row* out, uint32_t i) + { + uint64_t val = in.getUintField(i); + val >>= 32; + out->setUintField(val, i); + } + + void normalizeDatetimeToTimestamp(const Row& in, Row* out, uint32_t i, long fTimeZone) + { + uint64_t val = in.getUintField(i); + dataconvert::DateTime dtime(val); + dataconvert::MySQLTime m_time; + dataconvert::TimeStamp timeStamp; + + m_time.year = dtime.year; + m_time.month = dtime.month; + m_time.day = dtime.day; + m_time.hour = dtime.hour; + m_time.minute = dtime.minute; + m_time.second = dtime.second; + m_time.second_part = dtime.msecond; + + bool isValid = true; + int64_t seconds = mySQLTimeToGmtSec(m_time, fTimeZone, isValid); + + if (!isValid) + { + timeStamp.reset(); + } + else + { + timeStamp.second = seconds; + timeStamp.msecond = m_time.second_part; + } + + uint64_t outValue = (uint64_t) * (reinterpret_cast(&timeStamp)); + out->setUintField(outValue, i); + } + + void normalizeDatetimeToString(const Row& in, Row* out, uint32_t i) + { + string d = DataConvert::datetimeToString(in.getUintField(i)); + out->setStringField(d, i); + } + + void normalizeTimestampToTimestamp(const Row& in, Row* out, uint32_t i) + { + out->setIntField(in.getIntField(i), i); + } + + void normalizeTimestampToDate(const Row& in, Row* out, uint32_t i, long fTimeZone) + { + uint64_t val = in.getUintField(i); + dataconvert::TimeStamp timestamp(val); + int64_t seconds = timestamp.second; + uint64_t outValue; + + dataconvert::MySQLTime time; + dataconvert::gmtSecToMySQLTime(seconds, time, fTimeZone); + + dataconvert::Date date; + date.year = time.year; + date.month = time.month; + date.day = time.day; + date.spare = 0; + outValue = (uint32_t) * (reinterpret_cast(&date)); + + out->setUintField(outValue, i); + } + + void normalizeTimestampToDatetime(const Row& in, Row* out, uint32_t i, long fTimeZone) + { + uint64_t val = in.getUintField(i); + dataconvert::TimeStamp timestamp(val); + int64_t seconds = timestamp.second; + uint64_t outValue; + + dataconvert::MySQLTime time; + dataconvert::gmtSecToMySQLTime(seconds, time, fTimeZone); + + dataconvert::DateTime datetime; + datetime.year = time.year; + datetime.month = time.month; + datetime.day = time.day; + datetime.hour = time.hour; + datetime.minute = time.minute; + datetime.second = time.second; + datetime.msecond = timestamp.msecond; + outValue = (uint64_t) * (reinterpret_cast(&datetime)); + + out->setUintField(outValue, i); + } + + void normalizeTimestampToString(const Row& in, Row* out, uint32_t i, long fTimeZone) + { + string d = DataConvert::timestampToString(in.getUintField(i), fTimeZone); + out->setStringField(d, i); + } + + void normalizeTimeToTime(const Row& in, Row* out, uint32_t i) + { + out->setIntField(in.getIntField(i), i); + } + + void normalizeTimeToString(const Row& in, Row* out, uint32_t i) + { + string d = DataConvert::timeToString(in.getIntField(i)); + out->setStringField(d, i); + } + + void normalizeXFloatToIntWithScaleInt128(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setInt128Field(pickScaleForDouble(out, i, val), i); + } + + void normalizeXDoubleToIntWithScaleInt128(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setInt128Field(pickScaleForDouble(out, i, val), i); + } + + void normalizeXFloatToIntWithScaleInt64(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setIntField(pickScaleForDouble(out, i, val), i); + } + + void normalizeXDoubleToIntWithScaleInt64(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setIntField(pickScaleForDouble(out, i, val), i); + } + + void normalizeXFloatToIntNoScale(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setIntField((int64_t)val, i); + } + + void normalizeXDoubleToIntNoScale(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setIntField((int64_t)val, i); + } + + void normalizeXFloatToUint(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setUintField((uint64_t)val, i); + } + + void normalizeXDoubleToUint(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setUintField((uint64_t)val, i); + } + + void normalizeXFloatToXFloat(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setFloatField(val, i); + } + + void normalizeXDoubleToXFloat(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setFloatField(val, i); + } + + void normalizeXFloatToXDouble(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setDoubleField(val, i); + } + + void normalizeXDoubleToXDouble(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setDoubleField(val, i); + } + + void normalizeXFloatToLongDouble(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setLongDoubleField(val, i); + } + + void normalizeXDoubleToLongDouble(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setLongDoubleField(val, i); + } + + void normalizeXFloatToString(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + ostringstream os; + os.precision(15); // to match mysql's output + os << val; + out->setStringField(os.str(), i); + } + + void normalizeXDoubleToString(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + ostringstream os; + os.precision(15); // to match mysql's output + os << val; + out->setStringField(os.str(), i); + } + + void normalizeXFloatToWideXDecimal(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setInt128Field(pickScaleForDouble(out, i, val), i); + } + + void normalizeXDoubleToWideXDecimal(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setInt128Field(pickScaleForDouble(out, i, val), i); + } + + void normalizeXFloatToXDecimal(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setIntField(pickScaleForDouble(out, i, val), i); + } + + void normalizeXDoubleToXDecimal(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setIntField(pickScaleForDouble(out, i, val), i); + } + + void normalizeLongDoubleToIntNoScale(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setIntField((int64_t)val, i); + } + + void normalizeLongDoubleToIntWithScaleInt128(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setInt128Field(pickScaleForLongDouble(out, i, val), i); + } + + void normalizeLongDoubleToIntWithScaleInt(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setIntField(pickScaleForLongDouble(out, i, val), i); + } + + void normalizeLongDoubleToUint(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setUintField((uint64_t)val, i); + } + + void normalizeLongDoubleToXFloat(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setFloatField(val, i); + } + + void normalizeLongDoubleToXDouble(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setDoubleField(val, i); + } + + void normalizeLongDoubleToLongDouble(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setLongDoubleField(val, i); + } + + void normalizeLongDoubleToString(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + ostringstream os; + os.precision(15); // to match mysql's output + os << val; + out->setStringField(os.str(), i); + } + + void normalizeLongDoubleToXDecimalInt128(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setInt128Field(pickScaleForLongDouble(out, i, val), i); + } + + void normalizeLongDoubleToXDecimalInt(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setIntField(pickScaleForLongDouble(out, i, val), i); + } + + void normalizeWideXDecimalToWideXDecimalNoScale(const Row& in, Row* out, uint32_t i) + { + int128_t val128 = 0; + in.getInt128Field(i, val128); + out->setInt128Field(val128, i); + } + + void normalizeXDecimalToWideXDecimalNoScale(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + out->setInt128Field(val, i); + } + + void normalizeWideXDecimalToWideXDecimalWithScale(const Row& in, Row* out, uint32_t i) + { + int128_t val128 = 0; + in.getInt128Field(i, val128); + int128_t temp = datatypes::applySignedScale(val128, out->getScale(i) - in.getScale(i)); + out->setInt128Field(temp, i); + } + + void normalizeXDecimalToWideXDecimalWithScale(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + int128_t temp = datatypes::applySignedScale(val, out->getScale(i) - in.getScale(i)); + out->setInt128Field(temp, i); + } + + void normalizeXDecimalToOtherNoScale(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + out->setIntField(val, i); + } + + void normalizeXDecimalToOtherWithScale(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + int64_t temp = datatypes::applySignedScale(val, out->getScale(i) - in.getScale(i)); + out->setIntField(temp, i); + } + + void normalizeXDecimalToXFloat(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + float fval = ((float)val) / IDB_pow[in.getScale(i)]; + out->setFloatField(fval, i); + } + + void normalizeXDecimalToXDouble(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + double dval = ((double)val) / IDB_pow[in.getScale(i)]; + out->setDoubleField(dval, i); + } + + void normalizeXDecimalToLongDouble(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + long double dval = ((long double)val) / IDB_pow[in.getScale(i)]; + out->setLongDoubleField(dval, i); + } + + void normalizeWideXDecimalToString(const Row& in, Row* out, uint32_t i) + { + int128_t val128 = 0; + in.getInt128Field(i, val128); + datatypes::Decimal dec(0, in.getScale(i), in.getPrecision(i), val128); + out->setStringField(dec.toString(), i); + } + + void normalizeXDecimalToString(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + datatypes::Decimal dec(val, in.getScale(i), in.getPrecision(i)); + out->setStringField(dec.toString(), i); + } + + void normalizeBlobVarbinary(const Row& in, Row* out, uint32_t i) + { + // out->setVarBinaryField(in.getVarBinaryStringField(i), i); // not efficient + out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), i); + } + + joblist::normalizeFunctionsT inferNormalizeFunctions(const Row& in, Row* out, long fTimeZone) + { + uint32_t i; + joblist::normalizeFunctionsT result; + + for (i = 0; i < out->getColumnCount(); i++) + { + switch (in.getColTypes()[i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + { + if (out->getScale(i) || in.getScale(i)) + { + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeIntToIntWithScaleInt128); + else + result.emplace_back(normalizeIntToIntWithScaleInt64); + } + else + result.emplace_back(normalizeIntToIntNoScale); + break; + } + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: + { + if (in.getScale(i)) + { + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeIntToUintWithScaleInt128); + else + result.emplace_back(normalizeIntToUintWithScaleInt64); + } + else + result.emplace_back(normalizeIntToUintNoScale); + break; + } + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: + { + if (in.getScale(i)) + result.emplace_back(normalizeIntToStringWithScale); + else + result.emplace_back(normalizeIntToStringNoScale); + break; + } + + case CalpontSystemCatalog::DATE: + case CalpontSystemCatalog::DATETIME: + case CalpontSystemCatalog::TIME: + case CalpontSystemCatalog::TIMESTAMP: + throw logic_error( + "TupleUnion::normalize(): tried to normalize an int to a timestamp, time, date or datetime"); + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: result.emplace_back(normalizeIntToXFloat); break; + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: result.emplace_back(normalizeIntToXDouble); break; + + case CalpontSystemCatalog::LONGDOUBLE: result.emplace_back(normalizeIntToLongDouble); break; + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + /* + Signed INT to XDecimal + TODO: + - This code does not handle overflow that may happen on + scale multiplication. Instead of returning a garbage value + we should probably apply saturation here. In long terms we + should implement DECIMAL(65,x) to avoid overflow completely + (so the UNION between DECIMAL and integer can choose a proper + DECIMAL(M,N) result data type to guarantee that any incoming + integer value can fit into it). + */ + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeIntToXDecimalInt128); + else + result.emplace_back(normalizeIntToXDecimalInt64); + break; + } + + default: + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: integer to " + << out->getColTypes()[i]; + throw logic_error(os.str()); + } + + break; + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + { + if (out->getScale(i)) + { + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeUintToIntWithScaleInt128); + else + result.emplace_back(normalizeUntToIntWithScaleInt64); + } + else + result.emplace_back(normalizeUintToIntNoScale); + break; + } + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: result.emplace_back(normalizeUintToUint); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: + { + if (in.getScale(i)) + result.emplace_back(normalizeUintToStringWithScale); + else + result.emplace_back(normalizeUintToStringNoScale); + break; + } + + case CalpontSystemCatalog::DATE: + case CalpontSystemCatalog::DATETIME: + case CalpontSystemCatalog::TIME: + case CalpontSystemCatalog::TIMESTAMP: + throw logic_error( + "TupleUnion::normalize(): tried to normalize an int to a timestamp, time, date or datetime"); + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: result.emplace_back(normalizUintToXFloat); break; + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: result.emplace_back(normalizeUintToXDouble); break; + + case CalpontSystemCatalog::LONGDOUBLE: result.emplace_back(normalizeUintToLongDouble); break; + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + /* + Unsigned INT to XDecimal + TODO: + - The overflow problem mentioned in the code under case "Signed INT to XDecimal:" is + also applicable here. + */ + + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeUintToXDecimalInt128); + else + result.emplace_back(normalizeUintToXDecimalInt64); + break; + } + + default: + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: integer to " + << out->getColTypes()[i]; + throw logic_error(os.str()); + } + + break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: result.emplace_back(normalizeStringToString); break; + + default: + { + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: string to " << out->getColTypes()[i]; + throw logic_error(os.str()); + } + } + + break; + + case CalpontSystemCatalog::DATE: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::DATE: result.emplace_back(normalizeDateToDate); break; + + case CalpontSystemCatalog::DATETIME: result.emplace_back(normalizeDateToDatetime); break; + + case CalpontSystemCatalog::TIMESTAMP: result.emplace_back(std::bind(normalizeDateToTimestamp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, fTimeZone)); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: result.emplace_back(normalizeDateToString); break; + + default: + { + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: date to " << out->getColTypes()[i]; + throw logic_error(os.str()); + } + } + + break; + + case CalpontSystemCatalog::DATETIME: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::DATETIME: result.emplace_back(normalizeDatetimeToDatetime); break; + + case CalpontSystemCatalog::DATE: result.emplace_back(normalizeDatetimeToDate); break; + + case CalpontSystemCatalog::TIMESTAMP: result.emplace_back(std::bind(normalizeDatetimeToTimestamp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, fTimeZone)); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: result.emplace_back(normalizeDatetimeToString); break; + + default: + { + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: datetime to " + << out->getColTypes()[i]; + throw logic_error(os.str()); + } + } + + break; + + case CalpontSystemCatalog::TIMESTAMP: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TIMESTAMP: result.emplace_back(normalizeTimestampToTimestamp); break; + + case CalpontSystemCatalog::DATE: result.emplace_back(std::bind(normalizeTimestampToDate, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, fTimeZone)); break; + + case CalpontSystemCatalog::DATETIME: result.emplace_back(std::bind(normalizeTimestampToDatetime, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, fTimeZone)); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: result.emplace_back(std::bind(normalizeTimestampToString, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, fTimeZone)); break; + + default: + { + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: timestamp to " + << out->getColTypes()[i]; + throw logic_error(os.str()); + } + } + + break; + + case CalpontSystemCatalog::TIME: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TIME: result.emplace_back(normalizeTimeToTime); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: result.emplace_back(normalizeTimeToString); break; + + default: + { + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: time to " << out->getColTypes()[i]; + throw logic_error(os.str()); + } + } + + break; + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: + { + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + { + if (out->getScale(i)) + { + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToIntWithScaleInt128); + else + result.emplace_back(normalizeXDoubleToIntWithScaleInt128); + } + else + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToIntWithScaleInt64); + else + result.emplace_back(normalizeXDoubleToIntWithScaleInt64); + } + } + else + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToIntNoScale); + else + result.emplace_back(normalizeXDoubleToIntNoScale); + } + break; + } + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToUint); + else + result.emplace_back(normalizeXDoubleToUint); + break; + } + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToXFloat); + else + result.emplace_back(normalizeXDoubleToXFloat); + break; + } + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToXDouble); + else + result.emplace_back(normalizeXDoubleToXDouble); + break; + } + + case CalpontSystemCatalog::LONGDOUBLE: + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToLongDouble); + else + result.emplace_back(normalizeXDoubleToLongDouble); + break; + } + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToString); + else + result.emplace_back(normalizeXDoubleToString); + break; + } + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + // xFLOAT or xDOUBLE to xDECIMAL conversion. Is it really possible? + // TODO: + // Perhaps we should add an assert here that this combination is not possible + // In the current reduction all problems mentioned in the code under + // case "Signed INT to XDecimal" are also applicable here. + // TODO: isn't overflow possible below? + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToWideXDecimal); + else + result.emplace_back(normalizeXDoubleToWideXDecimal); + break; + } + else + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToXDecimal); + else + result.emplace_back(normalizeXDoubleToXDecimal); + break; + } + break; + } + + default: + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: floating point to " + << out->getColTypes()[i]; + throw logic_error(os.str()); + } + + break; + } + + case CalpontSystemCatalog::LONGDOUBLE: + { + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + { + if (out->getScale(i)) + { + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeLongDoubleToIntWithScaleInt128); + else + result.emplace_back(normalizeLongDoubleToIntWithScaleInt); + } + else + result.emplace_back(normalizeLongDoubleToIntNoScale); + break; + } + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: result.emplace_back(normalizeLongDoubleToUint); break; + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: result.emplace_back(normalizeLongDoubleToXFloat); break; + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: result.emplace_back(normalizeLongDoubleToXDouble); break; + + case CalpontSystemCatalog::LONGDOUBLE: result.emplace_back(normalizeLongDoubleToLongDouble); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: result.emplace_back(normalizeLongDoubleToString); break; + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + // LONGDOUBLE to xDECIMAL conversions: is it really possible? + // TODO: + // Perhaps we should add an assert here that this combination is not possible + // In the current reduction all problems mentioned in the code under + // case "Signed INT to XDecimal" are also applicable here. + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeLongDoubleToXDecimalInt128); + else + result.emplace_back(normalizeLongDoubleToXDecimalInt); + + break; + } + + default: + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: floating point to " + << out->getColTypes()[i]; + throw logic_error(os.str()); + } + + break; + } + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + if (datatypes::isWideDecimalType(out->getColTypes()[i], out->getColumnWidth(i))) + { + if (out->getScale(i) == in.getScale(i)) + { + if (in.getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeWideXDecimalToWideXDecimalNoScale); + else + result.emplace_back(normalizeXDecimalToWideXDecimalNoScale); + } + else if (out->getScale(i) > in.getScale(i)) + { + if (in.getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeWideXDecimalToWideXDecimalWithScale); + else + result.emplace_back(normalizeXDecimalToWideXDecimalWithScale); + } + else // should not happen, the output's scale is the largest + throw logic_error("TupleUnion::normalize(): incorrect scale setting"); + } + // If output type is narrow decimal, input type + // has to be narrow decimal as well. + else + { + if (out->getScale(i) == in.getScale(i)) + result.emplace_back(normalizeXDecimalToOtherNoScale); + else if (out->getScale(i) > in.getScale(i)) + result.emplace_back(normalizeXDecimalToOtherWithScale); + else // should not happen, the output's scale is the largest + throw logic_error("TupleUnion::normalize(): incorrect scale setting"); + } + + break; + } + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: result.emplace_back(normalizeXDecimalToXFloat); break; + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: result.emplace_back(normalizeXDecimalToXDouble); break; + + case CalpontSystemCatalog::LONGDOUBLE: result.emplace_back(normalizeXDecimalToLongDouble); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: + default: + { + if (LIKELY(in.getColumnWidth(i) == datatypes::MAXDECIMALWIDTH)) + result.emplace_back(normalizeWideXDecimalToString); + else + result.emplace_back(normalizeXDecimalToString); + break; + } + } + + break; + } + + case CalpontSystemCatalog::BLOB: + case CalpontSystemCatalog::VARBINARY: result.emplace_back(normalizeBlobVarbinary); break; + + default: + { + ostringstream os; + os << "TupleUnion::normalize(): unknown input type (" << in.getColTypes()[i] << ")"; + cout << os.str() << endl; + throw logic_error(os.str()); + } + } + } + + idbassert(out->getColumnCount() == result.size()); + return result; + } + +} // namespace + namespace joblist { inline uint64_t TupleUnion::Hasher::operator()(const RowPosition& p) const @@ -218,8 +1445,9 @@ void TupleUnion::readInput(uint32_t which) l_tmpRG.getRow(0, &tmpRow); l_tmpRG.setRowCount(l_inputRG.getRowCount()); + const normalizeFunctionsT normalizeFunctions = inferNormalizeFunctions(inRow, &tmpRow, fTimeZone); for (uint32_t i = 0; i < l_inputRG.getRowCount(); i++, inRow.nextRow(), tmpRow.nextRow()) - normalize(inRow, &tmpRow); + normalize(inRow, &tmpRow, normalizeFunctions); l_tmpRG.getRow(0, &tmpRow); { @@ -227,7 +1455,9 @@ void TupleUnion::readInput(uint32_t which) getOutput(&l_outputRG, &outRow, &outRGData); memUsageBefore = allocator.getMemUsage(); - for (uint32_t i = 0; i < l_tmpRG.getRowCount(); i++, tmpRow.nextRow()) + uint32_t tmpOutputRowCount = l_outputRG.getRowCount(); + const uint32_t tmpRGRowCount = l_tmpRG.getRowCount(); + for (uint32_t i = 0; i < tmpRGRowCount; i++, tmpRow.nextRow()) { pair inserted; inserted = uniquer->insert(RowPosition(which | RowPosition::normalizedFlag, i)); @@ -236,12 +1466,15 @@ void TupleUnion::readInput(uint32_t which) { copyRow(tmpRow, &outRow); const_cast(*(inserted.first)) = - RowPosition(rowMemory.size() - 1, l_outputRG.getRowCount()); + RowPosition(rowMemory.size() - 1, tmpOutputRowCount); memDiff += outRow.getRealSize(); - addToOutput(&outRow, &l_outputRG, true, outRGData); + addToOutput(&outRow, &l_outputRG, true, outRGData, tmpOutputRowCount); + fRowsReturned++; } } + l_outputRG.setRowCount(tmpOutputRowCount); + memUsageAfter = allocator.getMemUsage(); memDiff += (memUsageAfter - memUsageBefore); } @@ -265,11 +1498,18 @@ void TupleUnion::readInput(uint32_t which) } else { - for (uint32_t i = 0; i < l_inputRG.getRowCount(); i++, inRow.nextRow()) + const normalizeFunctionsT normalizeFunctions = inferNormalizeFunctions(inRow, &outRow, fTimeZone); + const uint32_t inputRGRowCount = l_inputRG.getRowCount(); + uint32_t tmpOutputRowCount = l_outputRG.getRowCount(); + + for (uint32_t i = 0; i < inputRGRowCount; i++, inRow.nextRow()) { - normalize(inRow, &outRow); - addToOutput(&outRow, &l_outputRG, false, outRGData); + normalize(inRow, &outRow, normalizeFunctions); + addToOutput(&outRow, &l_outputRG, false, outRGData, tmpOutputRowCount); } + + fRowsReturned += inputRGRowCount; + l_outputRG.setRowCount(tmpOutputRowCount); } more = dl->next(it, &inRGData); @@ -380,14 +1620,14 @@ void TupleUnion::getOutput(RowGroup* rg, Row* row, RGData* data) rg->getRow(rg->getRowCount(), row); } -void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, RGData& data) +void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, RGData& data, uint32_t& tmpOutputRowCount) { r->nextRow(); - rg->incRowCount(); - fRowsReturned++; + tmpOutputRowCount++; - if (rg->getRowCount() == 8192) + if (UNLIKELY(tmpOutputRowCount == 8192)) { + rg->setRowCount(8192); { boost::mutex::scoped_lock lock(sMutex); output->insert(data); @@ -396,13 +1636,14 @@ void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, RGData& data) rg->setData(&data); rg->resetRowGroup(0); rg->getRow(0, r); + tmpOutputRowCount = 0; if (keepit) rowMemory.push_back(data); } } -void TupleUnion::normalize(const Row& in, Row* out) +void TupleUnion::normalize(const Row& in, Row* out, const normalizeFunctionsT& normalizeFunctions) { uint32_t i; @@ -412,755 +1653,12 @@ void TupleUnion::normalize(const Row& in, Row* out) { if (in.isNullValue(i)) { - writeNull(out, i); + TupleUnion::writeNull(out, i); continue; } - switch (in.getColTypes()[i]) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - if (out->getScale(i) || in.getScale(i)) - goto dec1; - - out->setIntField(in.getIntField(i), i); - break; - - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: - if (in.getScale(i)) - goto dec1; - - out->setUintField(in.getUintField(i), i); - break; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - ostringstream os; - - if (in.getScale(i)) - { - double d = in.getIntField(i); - d /= exp10(in.getScale(i)); - os.precision(15); - os << d; - } - else - os << in.getIntField(i); - - out->setStringField(os.str(), i); - break; - } - - case CalpontSystemCatalog::DATE: - case CalpontSystemCatalog::DATETIME: - case CalpontSystemCatalog::TIME: - case CalpontSystemCatalog::TIMESTAMP: - throw logic_error( - "TupleUnion::normalize(): tried to normalize an int to a timestamp, time, date or datetime"); - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - { - auto d = in.getScaledSInt64FieldAsXFloat(i); - out->setFloatField((float)d, i); - break; - } - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - { - auto d = in.getScaledSInt64FieldAsXFloat(i); - out->setDoubleField(d, i); - break; - } - - case CalpontSystemCatalog::LONGDOUBLE: - { - auto d = in.getScaledSInt64FieldAsXFloat(i); - out->setLongDoubleField(d, i); - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - dec1: - int diff = out->getScale(i) - in.getScale(i); - - idbassert(diff >= 0); - /* - Signed INT to XDecimal - TODO: - - This code does not handle overflow that may happen on - scale multiplication. Instead of returning a garbage value - we should probably apply saturation here. In long terms we - should implement DECIMAL(65,x) to avoid overflow completely - (so the UNION between DECIMAL and integer can choose a proper - DECIMAL(M,N) result data type to guarantee that any incoming - integer value can fit into it). - */ - if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) - { - int128_t val = datatypes::applySignedScale(in.getIntField(i), diff); - out->setInt128Field(val, i); - } - else - { - int64_t val = datatypes::applySignedScale(in.getIntField(i), diff); - out->setIntField(val, i); - } - - break; - } - - default: - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: integer to " - << out->getColTypes()[i]; - throw logic_error(os.str()); - } - - break; - - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - if (out->getScale(i)) - goto dec2; - - out->setIntField(in.getUintField(i), i); - break; - - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: out->setUintField(in.getUintField(i), i); break; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - ostringstream os; - - if (in.getScale(i)) - { - double d = in.getUintField(i); - d /= exp10(in.getScale(i)); - os.precision(15); - os << d; - } - else - os << in.getUintField(i); - - out->setStringField(os.str(), i); - break; - } - - case CalpontSystemCatalog::DATE: - case CalpontSystemCatalog::DATETIME: - case CalpontSystemCatalog::TIME: - case CalpontSystemCatalog::TIMESTAMP: - throw logic_error( - "TupleUnion::normalize(): tried to normalize an int to a timestamp, time, date or datetime"); - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - { - auto d = in.getScaledUInt64FieldAsXFloat(i); - out->setFloatField((float)d, i); - break; - } - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - { - auto d = in.getScaledUInt64FieldAsXFloat(i); - out->setDoubleField(d, i); - break; - } - - case CalpontSystemCatalog::LONGDOUBLE: - { - auto d = in.getScaledUInt64FieldAsXFloat(i); - out->setLongDoubleField(d, i); - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - dec2: - int diff = out->getScale(i) - in.getScale(i); - - idbassert(diff >= 0); - /* - Unsigned INT to XDecimal - TODO: - - The overflow problem mentioned in the code under label "dec1:" is - also applicable here. - */ - if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) - { - int128_t val = datatypes::applySignedScale(in.getUintField(i), diff); - out->setInt128Field(val, i); - } - else - { - uint64_t val = datatypes::applySignedScale(in.getUintField(i), diff); - out->setUintField(val, i); - } - - break; - } - - default: - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: integer to " - << out->getColTypes()[i]; - throw logic_error(os.str()); - } - - break; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: out->setStringField(in.getStringField(i), i); break; - - default: - { - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: string to " << out->getColTypes()[i]; - throw logic_error(os.str()); - } - } - - break; - - case CalpontSystemCatalog::DATE: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::DATE: out->setIntField(in.getIntField(i), i); break; - - case CalpontSystemCatalog::DATETIME: - { - uint64_t date = in.getUintField(i); - date &= ~0x3f; // zero the 'spare' field - date <<= 32; - out->setUintField(date, i); - break; - } - - case CalpontSystemCatalog::TIMESTAMP: - { - dataconvert::Date date(in.getUintField(i)); - dataconvert::MySQLTime m_time; - m_time.year = date.year; - m_time.month = date.month; - m_time.day = date.day; - m_time.hour = 0; - m_time.minute = 0; - m_time.second = 0; - m_time.second_part = 0; - - dataconvert::TimeStamp timeStamp; - bool isValid = true; - int64_t seconds = dataconvert::mySQLTimeToGmtSec(m_time, fTimeZone, isValid); - - if (!isValid) - { - timeStamp.reset(); - } - else - { - timeStamp.second = seconds; - timeStamp.msecond = m_time.second_part; - } - - uint64_t outValue = (uint64_t) * (reinterpret_cast(&timeStamp)); - out->setUintField(outValue, i); - break; - } - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - string d = DataConvert::dateToString(in.getUintField(i)); - out->setStringField(d, i); - break; - } - - default: - { - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: date to " << out->getColTypes()[i]; - throw logic_error(os.str()); - } - } - - break; - - case CalpontSystemCatalog::DATETIME: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::DATETIME: out->setIntField(in.getIntField(i), i); break; - - case CalpontSystemCatalog::DATE: - { - uint64_t val = in.getUintField(i); - val >>= 32; - out->setUintField(val, i); - break; - } - - case CalpontSystemCatalog::TIMESTAMP: - { - uint64_t val = in.getUintField(i); - dataconvert::DateTime dtime(val); - dataconvert::MySQLTime m_time; - dataconvert::TimeStamp timeStamp; - - m_time.year = dtime.year; - m_time.month = dtime.month; - m_time.day = dtime.day; - m_time.hour = dtime.hour; - m_time.minute = dtime.minute; - m_time.second = dtime.second; - m_time.second_part = dtime.msecond; - - bool isValid = true; - int64_t seconds = mySQLTimeToGmtSec(m_time, fTimeZone, isValid); - - if (!isValid) - { - timeStamp.reset(); - } - else - { - timeStamp.second = seconds; - timeStamp.msecond = m_time.second_part; - } - - uint64_t outValue = (uint64_t) * (reinterpret_cast(&timeStamp)); - out->setUintField(outValue, i); - break; - } - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - string d = DataConvert::datetimeToString(in.getUintField(i)); - out->setStringField(d, i); - break; - } - - default: - { - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: datetime to " - << out->getColTypes()[i]; - throw logic_error(os.str()); - } - } - - break; - - case CalpontSystemCatalog::TIMESTAMP: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TIMESTAMP: out->setIntField(in.getIntField(i), i); break; - - case CalpontSystemCatalog::DATE: - case CalpontSystemCatalog::DATETIME: - { - uint64_t val = in.getUintField(i); - dataconvert::TimeStamp timestamp(val); - int64_t seconds = timestamp.second; - uint64_t outValue; - - dataconvert::MySQLTime time; - dataconvert::gmtSecToMySQLTime(seconds, time, fTimeZone); - - if (out->getColTypes()[i] == CalpontSystemCatalog::DATE) - { - dataconvert::Date date; - date.year = time.year; - date.month = time.month; - date.day = time.day; - date.spare = 0; - outValue = (uint32_t) * (reinterpret_cast(&date)); - } - else - { - dataconvert::DateTime datetime; - datetime.year = time.year; - datetime.month = time.month; - datetime.day = time.day; - datetime.hour = time.hour; - datetime.minute = time.minute; - datetime.second = time.second; - datetime.msecond = timestamp.msecond; - outValue = (uint64_t) * (reinterpret_cast(&datetime)); - } - - out->setUintField(outValue, i); - break; - } - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - string d = DataConvert::timestampToString(in.getUintField(i), fTimeZone); - out->setStringField(d, i); - break; - } - - default: - { - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: timestamp to " - << out->getColTypes()[i]; - throw logic_error(os.str()); - } - } - - break; - - case CalpontSystemCatalog::TIME: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TIME: out->setIntField(in.getIntField(i), i); break; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - string d = DataConvert::timeToString(in.getIntField(i)); - out->setStringField(d, i); - break; - } - - default: - { - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: time to " << out->getColTypes()[i]; - throw logic_error(os.str()); - } - } - - break; - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - { - double val = - (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT ? in.getFloatField(i) : in.getDoubleField(i)); - - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - if (out->getScale(i)) - goto dec3; - - out->setIntField((int64_t)val, i); - break; - - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: out->setUintField((uint64_t)val, i); break; - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: out->setFloatField(val, i); break; - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: out->setDoubleField(val, i); break; - - case CalpontSystemCatalog::LONGDOUBLE: out->setLongDoubleField(val, i); break; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - ostringstream os; - os.precision(15); // to match mysql's output - os << val; - out->setStringField(os.str(), i); - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - dec3: /* have to pick a scale to use for the double. using 5... */ - uint32_t scale = 5; - uint64_t ival = (uint64_t)(double)(val * datatypes::scaleDivisor(scale)); - int diff = out->getScale(i) - scale; - // xFLOAT or xDOUBLE to xDECIMAL conversion. Is it really possible? - // TODO: - // Perhaps we should add an assert here that this combination is not possible - // In the current reduction all problems mentioned in the code under - // label "dec1:" are also applicable here. - // TODO: isn't overflow possible below? - ival = datatypes::applySignedScale(ival, diff); - - if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) - out->setInt128Field(ival, i); - else - out->setIntField(ival, i); - - break; - } - - default: - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: floating point to " - << out->getColTypes()[i]; - throw logic_error(os.str()); - } - - break; - } - - case CalpontSystemCatalog::LONGDOUBLE: - { - long double val = in.getLongDoubleField(i); - - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - if (out->getScale(i)) - goto dec4; - - out->setIntField((int64_t)val, i); - break; - - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: out->setUintField((uint64_t)val, i); break; - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: out->setFloatField(val, i); break; - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: out->setDoubleField(val, i); break; - - case CalpontSystemCatalog::LONGDOUBLE: out->setLongDoubleField(val, i); break; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - ostringstream os; - os.precision(15); // to match mysql's output - os << val; - out->setStringField(os.str(), i); - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - dec4: /* have to pick a scale to use for the double. using 5... */ - uint32_t scale = 5; - uint64_t ival = (uint64_t)(double)(val * datatypes::scaleDivisor(scale)); - int diff = out->getScale(i) - scale; - - // LONGDOUBLE to xDECIMAL conversions: is it really possible? - // TODO: - // Perhaps we should add an assert here that this combination is not possible - // In the current reduction all problems mentioned in the code under - // label "dec1:" are also applicable here. - ival = datatypes::applySignedScale(ival, diff); - - if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) - out->setInt128Field(ival, i); - else - out->setIntField(ival, i); - - break; - } - - default: - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: floating point to " - << out->getColTypes()[i]; - throw logic_error(os.str()); - } - - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - int64_t val = 0; - int128_t val128 = 0; - bool isInputWide = false; - - if (in.getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) - { - in.getInt128Field(i, val128); - isInputWide = true; - } - else - val = in.getIntField(i); - - uint32_t scale = in.getScale(i); - - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - if (datatypes::isWideDecimalType(out->getColTypes()[i], out->getColumnWidth(i))) - { - if (out->getScale(i) == scale) - out->setInt128Field(isInputWide ? val128 : val, i); - else if (out->getScale(i) > scale) - { - int128_t temp = datatypes::applySignedScale(isInputWide ? val128 : val, - out->getScale(i) - scale); - out->setInt128Field(temp, i); - } - else // should not happen, the output's scale is the largest - throw logic_error("TupleUnion::normalize(): incorrect scale setting"); - } - // If output type is narrow decimal, input type - // has to be narrow decimal as well. - else - { - if (out->getScale(i) == scale) - out->setIntField(val, i); - else if (out->getScale(i) > scale) - { - int64_t temp = datatypes::applySignedScale(val, out->getScale(i) - scale); - out->setIntField(temp, i); - } - else // should not happen, the output's scale is the largest - throw logic_error("TupleUnion::normalize(): incorrect scale setting"); - } - - break; - } - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - { - float fval = ((float)val) / IDB_pow[scale]; - out->setFloatField(fval, i); - break; - } - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - { - double dval = ((double)val) / IDB_pow[scale]; - out->setDoubleField(dval, i); - break; - } - - case CalpontSystemCatalog::LONGDOUBLE: - { - long double dval = ((long double)val) / IDB_pow[scale]; - out->setLongDoubleField(dval, i); - break; - } - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - default: - { - if (LIKELY(isInputWide)) - { - datatypes::Decimal dec(0, in.getScale(i), in.getPrecision(i), val128); - out->setStringField(dec.toString(), i); - } - else - { - datatypes::Decimal dec(val, in.getScale(i), in.getPrecision(i)); - out->setStringField(dec.toString(), i); - } - break; - } - } - - break; - } - - case CalpontSystemCatalog::BLOB: - case CalpontSystemCatalog::VARBINARY: - { - // out->setVarBinaryField(in.getVarBinaryStringField(i), i); // not efficient - out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), i); - break; - } - - default: - { - ostringstream os; - os << "TupleUnion::normalize(): unknown input type (" << in.getColTypes()[i] << ")"; - cout << os.str() << endl; - throw logic_error(os.str()); - } - } + /// Call the pre-compiled function. + normalizeFunctions[i](in, out, i); } } diff --git a/dbcon/joblist/tupleunion.h b/dbcon/joblist/tupleunion.h index 23a00b273..0d1370051 100644 --- a/dbcon/joblist/tupleunion.h +++ b/dbcon/joblist/tupleunion.h @@ -41,6 +41,8 @@ namespace joblist { +using normalizeFunctionsT = std::vector>; + class TupleUnion : public JobStep, public TupleDeliveryStep { public: @@ -122,8 +124,8 @@ class TupleUnion : public JobStep, public TupleDeliveryStep }; void getOutput(rowgroup::RowGroup* rg, rowgroup::Row* row, rowgroup::RGData* data); - void addToOutput(rowgroup::Row* r, rowgroup::RowGroup* rg, bool keepit, rowgroup::RGData& data); - void normalize(const rowgroup::Row& in, rowgroup::Row* out); + void addToOutput(rowgroup::Row* r, rowgroup::RowGroup* rg, bool keepit, rowgroup::RGData& data, uint32_t& tmpOutputRowCount); + void normalize(const rowgroup::Row& in, rowgroup::Row* out, const normalizeFunctionsT& normalizeFunctions); void writeNull(rowgroup::Row* out, uint32_t col); void readInput(uint32_t); void formatMiniStats();