diff --git a/dbcon/joblist/tupleunion.cpp b/dbcon/joblist/tupleunion.cpp index 291dd325a..2c01bbf19 100644 --- a/dbcon/joblist/tupleunion.cpp +++ b/dbcon/joblist/tupleunion.cpp @@ -56,6 +56,1233 @@ inline double exp10(double x) } // namespace #endif +namespace +{ + // union helper functions. + + inline uint64_t pickScaleForDouble(Row* out, uint32_t i, double val) + { + /* have to pick a scale to use for the double. using 5... */ + uint32_t scale = 5; + uint64_t ival = (uint64_t)(double)(val * datatypes::scaleDivisor(scale)); + const int diff = out->getScale(i) - scale; + ival = datatypes::applySignedScale(ival, diff); + return ival; + } + + inline uint64_t pickScaleForLongDouble(Row* out, uint32_t i, long double val) + { + /* have to pick a scale to use for the double. using 5... */ + uint32_t scale = 5; + uint64_t ival = (uint64_t)(double)(val * datatypes::scaleDivisor(scale)); + int diff = out->getScale(i) - scale; + ival = datatypes::applySignedScale(ival, diff); + return ival; + } + + void normalizeIntToIntNoScale(const Row& in, Row* out, uint32_t i) + { + out->setIntField(in.getIntField(i), i); + } + + void normalizeIntToIntWithScaleInt128(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int128_t val = datatypes::applySignedScale(in.getIntField(i), diff); + out->setInt128Field(val, i); + } + + void normalizeIntToIntWithScaleInt64(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int64_t val = datatypes::applySignedScale(in.getIntField(i), diff); + out->setIntField(val, i); + } + + void normalizeIntToUintNoScale(const Row& in, Row* out, uint32_t i) + { + out->setUintField(in.getIntField(i), i); + } + + void normalizeIntToUintWithScaleInt128(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int128_t val = datatypes::applySignedScale(in.getIntField(i), diff); + out->setInt128Field(val, i); + } + + void normalizeIntToUintWithScaleInt64(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int64_t val = datatypes::applySignedScale(in.getIntField(i), diff); + out->setIntField(val, i); + } + + void normalizeIntToStringWithScale(const Row& in, Row* out, uint32_t i) + { + ostringstream os; + double d = in.getIntField(i); + d /= exp10(in.getScale(i)); + os.precision(15); + os << d; + out->setStringField(os.str(), i); + } + + void normalizeIntToStringNoScale(const Row& in, Row* out, uint32_t i) + { + ostringstream os; + os << in.getIntField(i); + out->setStringField(os.str(), i); + } + + void normalizeIntToXFloat(const Row& in, Row* out, uint32_t i) + { + auto d = in.getScaledSInt64FieldAsXFloat(i); + out->setFloatField((float)d, i); + } + + void normalizeIntToXDouble(const Row& in, Row* out, uint32_t i) + { + auto d = in.getScaledSInt64FieldAsXFloat(i); + out->setDoubleField(d, i); + } + + void normalizeIntToLongDouble(const Row& in, Row* out, uint32_t i) + { + auto d = in.getScaledSInt64FieldAsXFloat(i); + out->setLongDoubleField(d, i); + } + + void normalizeIntToXDecimalInt128(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int128_t val = datatypes::applySignedScale(in.getIntField(i), diff); + out->setInt128Field(val, i); + } + + void normalizeIntToXDecimalInt64(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int64_t val = datatypes::applySignedScale(in.getIntField(i), diff); + out->setIntField(val, i); + } + + void normalizeUintToIntNoScale(const Row& in, Row* out, uint32_t i) + { + out->setIntField(in.getUintField(i), i); + } + + void normalizeUintToIntWithScaleInt128(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int128_t val = datatypes::applySignedScale(in.getUintField(i), diff); + out->setInt128Field(val, i); + } + + void normalizeUntToIntWithScaleInt64(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + uint64_t val = datatypes::applySignedScale(in.getUintField(i), diff); + out->setIntField(val, i); + } + + void normalizeUintToUint(const Row& in, Row* out, uint32_t i) + { + out->setUintField(in.getUintField(i), i); + } + + void normalizeUintToStringWithScale(const Row& in, Row* out, uint32_t i) + { + ostringstream os; + double d = in.getUintField(i); + d /= exp10(in.getScale(i)); + os.precision(15); + os << d; + out->setStringField(os.str(), i); + } + + void normalizeUintToStringNoScale(const Row& in, Row* out, uint32_t i) + { + ostringstream os; + os << in.getUintField(i); + out->setStringField(os.str(), i); + } + + void normalizUintToXFloat(const Row& in, Row* out, uint32_t i) + { + auto d = in.getScaledUInt64FieldAsXFloat(i); + out->setFloatField((float)d, i); + } + + void normalizeUintToXDouble(const Row& in, Row* out, uint32_t i) + { + auto d = in.getScaledUInt64FieldAsXFloat(i); + out->setDoubleField(d, i); + } + + void normalizeUintToLongDouble(const Row& in, Row* out, uint32_t i) + { + auto d = in.getScaledUInt64FieldAsXFloat(i); + out->setLongDoubleField(d, i); + } + + void normalizeUintToXDecimalInt128(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + int128_t val = datatypes::applySignedScale(in.getUintField(i), diff); + out->setInt128Field(val, i); + } + + void normalizeUintToXDecimalInt64(const Row& in, Row* out, uint32_t i) + { + const int diff = out->getScale(i) - in.getScale(i); + idbassert(diff >= 0); + uint64_t val = datatypes::applySignedScale(in.getUintField(i), diff); + out->setIntField(val, i); + } + + void normalizeStringToString(const Row& in, Row* out, uint32_t i) + { + out->setStringField(in.getStringField(i), i); + } + + void normalizeDateToDate(const Row& in, Row* out, uint32_t i) + { + out->setIntField(in.getIntField(i), i); + } + + void normalizeDateToDatetime(const Row& in, Row* out, uint32_t i) + { + uint64_t date = in.getUintField(i); + date &= ~0x3f; // zero the 'spare' field + date <<= 32; + out->setUintField(date, i); + } + + void normalizeDateToTimestamp(const Row& in, Row* out, uint32_t i, long fTimeZone) + { + dataconvert::Date date(in.getUintField(i)); + dataconvert::MySQLTime m_time; + m_time.year = date.year; + m_time.month = date.month; + m_time.day = date.day; + m_time.hour = 0; + m_time.minute = 0; + m_time.second = 0; + m_time.second_part = 0; + + dataconvert::TimeStamp timeStamp; + bool isValid = true; + int64_t seconds = dataconvert::mySQLTimeToGmtSec(m_time, fTimeZone, isValid); + + if (!isValid) + { + timeStamp.reset(); + } + else + { + timeStamp.second = seconds; + timeStamp.msecond = m_time.second_part; + } + + uint64_t outValue = (uint64_t) * (reinterpret_cast(&timeStamp)); + out->setUintField(outValue, i); + } + + void normalizeDateToString(const Row& in, Row* out, uint32_t i) + { + string d = DataConvert::dateToString(in.getUintField(i)); + out->setStringField(d, i); + } + + void normalizeDatetimeToDatetime(const Row& in, Row* out, uint32_t i) + { + out->setIntField(in.getIntField(i), i); + } + + void normalizeDatetimeToDate(const Row& in, Row* out, uint32_t i) + { + uint64_t val = in.getUintField(i); + val >>= 32; + out->setUintField(val, i); + } + + void normalizeDatetimeToTimestamp(const Row& in, Row* out, uint32_t i, long fTimeZone) + { + uint64_t val = in.getUintField(i); + dataconvert::DateTime dtime(val); + dataconvert::MySQLTime m_time; + dataconvert::TimeStamp timeStamp; + + m_time.year = dtime.year; + m_time.month = dtime.month; + m_time.day = dtime.day; + m_time.hour = dtime.hour; + m_time.minute = dtime.minute; + m_time.second = dtime.second; + m_time.second_part = dtime.msecond; + + bool isValid = true; + int64_t seconds = mySQLTimeToGmtSec(m_time, fTimeZone, isValid); + + if (!isValid) + { + timeStamp.reset(); + } + else + { + timeStamp.second = seconds; + timeStamp.msecond = m_time.second_part; + } + + uint64_t outValue = (uint64_t) * (reinterpret_cast(&timeStamp)); + out->setUintField(outValue, i); + } + + void normalizeDatetimeToString(const Row& in, Row* out, uint32_t i) + { + string d = DataConvert::datetimeToString(in.getUintField(i)); + out->setStringField(d, i); + } + + void normalizeTimestampToTimestamp(const Row& in, Row* out, uint32_t i) + { + out->setIntField(in.getIntField(i), i); + } + + void normalizeTimestampToDate(const Row& in, Row* out, uint32_t i, long fTimeZone) + { + uint64_t val = in.getUintField(i); + dataconvert::TimeStamp timestamp(val); + int64_t seconds = timestamp.second; + uint64_t outValue; + + dataconvert::MySQLTime time; + dataconvert::gmtSecToMySQLTime(seconds, time, fTimeZone); + + dataconvert::Date date; + date.year = time.year; + date.month = time.month; + date.day = time.day; + date.spare = 0; + outValue = (uint32_t) * (reinterpret_cast(&date)); + + out->setUintField(outValue, i); + } + + void normalizeTimestampToDatetime(const Row& in, Row* out, uint32_t i, long fTimeZone) + { + uint64_t val = in.getUintField(i); + dataconvert::TimeStamp timestamp(val); + int64_t seconds = timestamp.second; + uint64_t outValue; + + dataconvert::MySQLTime time; + dataconvert::gmtSecToMySQLTime(seconds, time, fTimeZone); + + dataconvert::DateTime datetime; + datetime.year = time.year; + datetime.month = time.month; + datetime.day = time.day; + datetime.hour = time.hour; + datetime.minute = time.minute; + datetime.second = time.second; + datetime.msecond = timestamp.msecond; + outValue = (uint64_t) * (reinterpret_cast(&datetime)); + + out->setUintField(outValue, i); + } + + void normalizeTimestampToString(const Row& in, Row* out, uint32_t i, long fTimeZone) + { + string d = DataConvert::timestampToString(in.getUintField(i), fTimeZone); + out->setStringField(d, i); + } + + void normalizeTimeToTime(const Row& in, Row* out, uint32_t i) + { + out->setIntField(in.getIntField(i), i); + } + + void normalizeTimeToString(const Row& in, Row* out, uint32_t i) + { + string d = DataConvert::timeToString(in.getIntField(i)); + out->setStringField(d, i); + } + + void normalizeXFloatToIntWithScaleInt128(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setInt128Field(pickScaleForDouble(out, i, val), i); + } + + void normalizeXDoubleToIntWithScaleInt128(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setInt128Field(pickScaleForDouble(out, i, val), i); + } + + void normalizeXFloatToIntWithScaleInt64(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setIntField(pickScaleForDouble(out, i, val), i); + } + + void normalizeXDoubleToIntWithScaleInt64(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setIntField(pickScaleForDouble(out, i, val), i); + } + + void normalizeXFloatToIntNoScale(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setIntField((int64_t)val, i); + } + + void normalizeXDoubleToIntNoScale(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setIntField((int64_t)val, i); + } + + void normalizeXFloatToUint(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setUintField((uint64_t)val, i); + } + + void normalizeXDoubleToUint(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setUintField((uint64_t)val, i); + } + + void normalizeXFloatToXFloat(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setFloatField(val, i); + } + + void normalizeXDoubleToXFloat(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setFloatField(val, i); + } + + void normalizeXFloatToXDouble(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setDoubleField(val, i); + } + + void normalizeXDoubleToXDouble(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setDoubleField(val, i); + } + + void normalizeXFloatToLongDouble(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setLongDoubleField(val, i); + } + + void normalizeXDoubleToLongDouble(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setLongDoubleField(val, i); + } + + void normalizeXFloatToString(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + ostringstream os; + os.precision(15); // to match mysql's output + os << val; + out->setStringField(os.str(), i); + } + + void normalizeXDoubleToString(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + ostringstream os; + os.precision(15); // to match mysql's output + os << val; + out->setStringField(os.str(), i); + } + + void normalizeXFloatToWideXDecimal(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setInt128Field(pickScaleForDouble(out, i, val), i); + } + + void normalizeXDoubleToWideXDecimal(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setInt128Field(pickScaleForDouble(out, i, val), i); + } + + void normalizeXFloatToXDecimal(const Row& in, Row* out, uint32_t i) + { + double val = in.getFloatField(i); + out->setIntField(pickScaleForDouble(out, i, val), i); + } + + void normalizeXDoubleToXDecimal(const Row& in, Row* out, uint32_t i) + { + double val = in.getDoubleField(i); + out->setIntField(pickScaleForDouble(out, i, val), i); + } + + void normalizeLongDoubleToIntNoScale(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setIntField((int64_t)val, i); + } + + void normalizeLongDoubleToIntWithScaleInt128(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setInt128Field(pickScaleForLongDouble(out, i, val), i); + } + + void normalizeLongDoubleToIntWithScaleInt(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setIntField(pickScaleForLongDouble(out, i, val), i); + } + + void normalizeLongDoubleToUint(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setUintField((uint64_t)val, i); + } + + void normalizeLongDoubleToXFloat(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setFloatField(val, i); + } + + void normalizeLongDoubleToXDouble(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setDoubleField(val, i); + } + + void normalizeLongDoubleToLongDouble(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setLongDoubleField(val, i); + } + + void normalizeLongDoubleToString(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + ostringstream os; + os.precision(15); // to match mysql's output + os << val; + out->setStringField(os.str(), i); + } + + void normalizeLongDoubleToXDecimalInt128(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setInt128Field(pickScaleForLongDouble(out, i, val), i); + } + + void normalizeLongDoubleToXDecimalInt(const Row& in, Row* out, uint32_t i) + { + long double val = in.getLongDoubleField(i); + out->setIntField(pickScaleForLongDouble(out, i, val), i); + } + + void normalizeWideXDecimalToWideXDecimalNoScale(const Row& in, Row* out, uint32_t i) + { + int128_t val128 = 0; + in.getInt128Field(i, val128); + out->setInt128Field(val128, i); + } + + void normalizeXDecimalToWideXDecimalNoScale(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + out->setInt128Field(val, i); + } + + void normalizeWideXDecimalToWideXDecimalWithScale(const Row& in, Row* out, uint32_t i) + { + int128_t val128 = 0; + in.getInt128Field(i, val128); + int128_t temp = datatypes::applySignedScale(val128, out->getScale(i) - in.getScale(i)); + out->setInt128Field(temp, i); + } + + void normalizeXDecimalToWideXDecimalWithScale(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + int128_t temp = datatypes::applySignedScale(val, out->getScale(i) - in.getScale(i)); + out->setInt128Field(temp, i); + } + + void normalizeXDecimalToOtherNoScale(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + out->setIntField(val, i); + } + + void normalizeXDecimalToOtherWithScale(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + int64_t temp = datatypes::applySignedScale(val, out->getScale(i) - in.getScale(i)); + out->setIntField(temp, i); + } + + void normalizeXDecimalToXFloat(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + float fval = ((float)val) / IDB_pow[in.getScale(i)]; + out->setFloatField(fval, i); + } + + void normalizeXDecimalToXDouble(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + double dval = ((double)val) / IDB_pow[in.getScale(i)]; + out->setDoubleField(dval, i); + } + + void normalizeXDecimalToLongDouble(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + long double dval = ((long double)val) / IDB_pow[in.getScale(i)]; + out->setLongDoubleField(dval, i); + } + + void normalizeWideXDecimalToString(const Row& in, Row* out, uint32_t i) + { + int128_t val128 = 0; + in.getInt128Field(i, val128); + datatypes::Decimal dec(0, in.getScale(i), in.getPrecision(i), val128); + out->setStringField(dec.toString(), i); + } + + void normalizeXDecimalToString(const Row& in, Row* out, uint32_t i) + { + int64_t val = in.getIntField(i); + datatypes::Decimal dec(val, in.getScale(i), in.getPrecision(i)); + out->setStringField(dec.toString(), i); + } + + void normalizeBlobVarbinary(const Row& in, Row* out, uint32_t i) + { + // out->setVarBinaryField(in.getVarBinaryStringField(i), i); // not efficient + out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), i); + } + + joblist::normalizeFunctionsT inferNormalizeFunctions(const Row& in, Row* out, long fTimeZone) + { + uint32_t i; + joblist::normalizeFunctionsT result; + + for (i = 0; i < out->getColumnCount(); i++) + { + switch (in.getColTypes()[i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + { + if (out->getScale(i) || in.getScale(i)) + { + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeIntToIntWithScaleInt128); + else + result.emplace_back(normalizeIntToIntWithScaleInt64); + } + else + result.emplace_back(normalizeIntToIntNoScale); + break; + } + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: + { + if (in.getScale(i)) + { + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeIntToUintWithScaleInt128); + else + result.emplace_back(normalizeIntToUintWithScaleInt64); + } + else + result.emplace_back(normalizeIntToUintNoScale); + break; + } + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: + { + if (in.getScale(i)) + result.emplace_back(normalizeIntToStringWithScale); + else + result.emplace_back(normalizeIntToStringNoScale); + break; + } + + case CalpontSystemCatalog::DATE: + case CalpontSystemCatalog::DATETIME: + case CalpontSystemCatalog::TIME: + case CalpontSystemCatalog::TIMESTAMP: + throw logic_error( + "TupleUnion::normalize(): tried to normalize an int to a timestamp, time, date or datetime"); + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: result.emplace_back(normalizeIntToXFloat); break; + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: result.emplace_back(normalizeIntToXDouble); break; + + case CalpontSystemCatalog::LONGDOUBLE: result.emplace_back(normalizeIntToLongDouble); break; + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + /* + Signed INT to XDecimal + TODO: + - This code does not handle overflow that may happen on + scale multiplication. Instead of returning a garbage value + we should probably apply saturation here. In long terms we + should implement DECIMAL(65,x) to avoid overflow completely + (so the UNION between DECIMAL and integer can choose a proper + DECIMAL(M,N) result data type to guarantee that any incoming + integer value can fit into it). + */ + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeIntToXDecimalInt128); + else + result.emplace_back(normalizeIntToXDecimalInt64); + break; + } + + default: + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: integer to " + << out->getColTypes()[i]; + throw logic_error(os.str()); + } + + break; + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + { + if (out->getScale(i)) + { + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeUintToIntWithScaleInt128); + else + result.emplace_back(normalizeUntToIntWithScaleInt64); + } + else + result.emplace_back(normalizeUintToIntNoScale); + break; + } + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: result.emplace_back(normalizeUintToUint); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: + { + if (in.getScale(i)) + result.emplace_back(normalizeUintToStringWithScale); + else + result.emplace_back(normalizeUintToStringNoScale); + break; + } + + case CalpontSystemCatalog::DATE: + case CalpontSystemCatalog::DATETIME: + case CalpontSystemCatalog::TIME: + case CalpontSystemCatalog::TIMESTAMP: + throw logic_error( + "TupleUnion::normalize(): tried to normalize an int to a timestamp, time, date or datetime"); + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: result.emplace_back(normalizUintToXFloat); break; + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: result.emplace_back(normalizeUintToXDouble); break; + + case CalpontSystemCatalog::LONGDOUBLE: result.emplace_back(normalizeUintToLongDouble); break; + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + /* + Unsigned INT to XDecimal + TODO: + - The overflow problem mentioned in the code under case "Signed INT to XDecimal:" is + also applicable here. + */ + + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeUintToXDecimalInt128); + else + result.emplace_back(normalizeUintToXDecimalInt64); + break; + } + + default: + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: integer to " + << out->getColTypes()[i]; + throw logic_error(os.str()); + } + + break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: result.emplace_back(normalizeStringToString); break; + + default: + { + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: string to " << out->getColTypes()[i]; + throw logic_error(os.str()); + } + } + + break; + + case CalpontSystemCatalog::DATE: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::DATE: result.emplace_back(normalizeDateToDate); break; + + case CalpontSystemCatalog::DATETIME: result.emplace_back(normalizeDateToDatetime); break; + + case CalpontSystemCatalog::TIMESTAMP: result.emplace_back(std::bind(normalizeDateToTimestamp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, fTimeZone)); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: result.emplace_back(normalizeDateToString); break; + + default: + { + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: date to " << out->getColTypes()[i]; + throw logic_error(os.str()); + } + } + + break; + + case CalpontSystemCatalog::DATETIME: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::DATETIME: result.emplace_back(normalizeDatetimeToDatetime); break; + + case CalpontSystemCatalog::DATE: result.emplace_back(normalizeDatetimeToDate); break; + + case CalpontSystemCatalog::TIMESTAMP: result.emplace_back(std::bind(normalizeDatetimeToTimestamp, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, fTimeZone)); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: result.emplace_back(normalizeDatetimeToString); break; + + default: + { + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: datetime to " + << out->getColTypes()[i]; + throw logic_error(os.str()); + } + } + + break; + + case CalpontSystemCatalog::TIMESTAMP: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TIMESTAMP: result.emplace_back(normalizeTimestampToTimestamp); break; + + case CalpontSystemCatalog::DATE: result.emplace_back(std::bind(normalizeTimestampToDate, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, fTimeZone)); break; + + case CalpontSystemCatalog::DATETIME: result.emplace_back(std::bind(normalizeTimestampToDatetime, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, fTimeZone)); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: result.emplace_back(std::bind(normalizeTimestampToString, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, fTimeZone)); break; + + default: + { + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: timestamp to " + << out->getColTypes()[i]; + throw logic_error(os.str()); + } + } + + break; + + case CalpontSystemCatalog::TIME: + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TIME: result.emplace_back(normalizeTimeToTime); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: result.emplace_back(normalizeTimeToString); break; + + default: + { + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: time to " << out->getColTypes()[i]; + throw logic_error(os.str()); + } + } + + break; + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: + { + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + { + if (out->getScale(i)) + { + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToIntWithScaleInt128); + else + result.emplace_back(normalizeXDoubleToIntWithScaleInt128); + } + else + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToIntWithScaleInt64); + else + result.emplace_back(normalizeXDoubleToIntWithScaleInt64); + } + } + else + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToIntNoScale); + else + result.emplace_back(normalizeXDoubleToIntNoScale); + } + break; + } + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToUint); + else + result.emplace_back(normalizeXDoubleToUint); + break; + } + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToXFloat); + else + result.emplace_back(normalizeXDoubleToXFloat); + break; + } + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToXDouble); + else + result.emplace_back(normalizeXDoubleToXDouble); + break; + } + + case CalpontSystemCatalog::LONGDOUBLE: + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToLongDouble); + else + result.emplace_back(normalizeXDoubleToLongDouble); + break; + } + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToString); + else + result.emplace_back(normalizeXDoubleToString); + break; + } + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + // xFLOAT or xDOUBLE to xDECIMAL conversion. Is it really possible? + // TODO: + // Perhaps we should add an assert here that this combination is not possible + // In the current reduction all problems mentioned in the code under + // case "Signed INT to XDecimal" are also applicable here. + // TODO: isn't overflow possible below? + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToWideXDecimal); + else + result.emplace_back(normalizeXDoubleToWideXDecimal); + break; + } + else + { + if (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT || in.getColTypes()[i] == CalpontSystemCatalog::UFLOAT) + result.emplace_back(normalizeXFloatToXDecimal); + else + result.emplace_back(normalizeXDoubleToXDecimal); + break; + } + break; + } + + default: + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: floating point to " + << out->getColTypes()[i]; + throw logic_error(os.str()); + } + + break; + } + + case CalpontSystemCatalog::LONGDOUBLE: + { + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + { + if (out->getScale(i)) + { + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeLongDoubleToIntWithScaleInt128); + else + result.emplace_back(normalizeLongDoubleToIntWithScaleInt); + } + else + result.emplace_back(normalizeLongDoubleToIntNoScale); + break; + } + + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: result.emplace_back(normalizeLongDoubleToUint); break; + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: result.emplace_back(normalizeLongDoubleToXFloat); break; + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: result.emplace_back(normalizeLongDoubleToXDouble); break; + + case CalpontSystemCatalog::LONGDOUBLE: result.emplace_back(normalizeLongDoubleToLongDouble); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: result.emplace_back(normalizeLongDoubleToString); break; + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + // LONGDOUBLE to xDECIMAL conversions: is it really possible? + // TODO: + // Perhaps we should add an assert here that this combination is not possible + // In the current reduction all problems mentioned in the code under + // case "Signed INT to XDecimal" are also applicable here. + if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeLongDoubleToXDecimalInt128); + else + result.emplace_back(normalizeLongDoubleToXDecimalInt); + + break; + } + + default: + ostringstream os; + os << "TupleUnion::normalize(): tried an illegal conversion: floating point to " + << out->getColTypes()[i]; + throw logic_error(os.str()); + } + + break; + } + + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + switch (out->getColTypes()[i]) + { + case CalpontSystemCatalog::TINYINT: + case CalpontSystemCatalog::SMALLINT: + case CalpontSystemCatalog::MEDINT: + case CalpontSystemCatalog::INT: + case CalpontSystemCatalog::BIGINT: + case CalpontSystemCatalog::UTINYINT: + case CalpontSystemCatalog::USMALLINT: + case CalpontSystemCatalog::UMEDINT: + case CalpontSystemCatalog::UINT: + case CalpontSystemCatalog::UBIGINT: + case CalpontSystemCatalog::DECIMAL: + case CalpontSystemCatalog::UDECIMAL: + { + if (datatypes::isWideDecimalType(out->getColTypes()[i], out->getColumnWidth(i))) + { + if (out->getScale(i) == in.getScale(i)) + { + if (in.getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeWideXDecimalToWideXDecimalNoScale); + else + result.emplace_back(normalizeXDecimalToWideXDecimalNoScale); + } + else if (out->getScale(i) > in.getScale(i)) + { + if (in.getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) + result.emplace_back(normalizeWideXDecimalToWideXDecimalWithScale); + else + result.emplace_back(normalizeXDecimalToWideXDecimalWithScale); + } + else // should not happen, the output's scale is the largest + throw logic_error("TupleUnion::normalize(): incorrect scale setting"); + } + // If output type is narrow decimal, input type + // has to be narrow decimal as well. + else + { + if (out->getScale(i) == in.getScale(i)) + result.emplace_back(normalizeXDecimalToOtherNoScale); + else if (out->getScale(i) > in.getScale(i)) + result.emplace_back(normalizeXDecimalToOtherWithScale); + else // should not happen, the output's scale is the largest + throw logic_error("TupleUnion::normalize(): incorrect scale setting"); + } + + break; + } + + case CalpontSystemCatalog::FLOAT: + case CalpontSystemCatalog::UFLOAT: result.emplace_back(normalizeXDecimalToXFloat); break; + + case CalpontSystemCatalog::DOUBLE: + case CalpontSystemCatalog::UDOUBLE: result.emplace_back(normalizeXDecimalToXDouble); break; + + case CalpontSystemCatalog::LONGDOUBLE: result.emplace_back(normalizeXDecimalToLongDouble); break; + + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + case CalpontSystemCatalog::VARCHAR: + default: + { + if (LIKELY(in.getColumnWidth(i) == datatypes::MAXDECIMALWIDTH)) + result.emplace_back(normalizeWideXDecimalToString); + else + result.emplace_back(normalizeXDecimalToString); + break; + } + } + + break; + } + + case CalpontSystemCatalog::BLOB: + case CalpontSystemCatalog::VARBINARY: result.emplace_back(normalizeBlobVarbinary); break; + + default: + { + ostringstream os; + os << "TupleUnion::normalize(): unknown input type (" << in.getColTypes()[i] << ")"; + cout << os.str() << endl; + throw logic_error(os.str()); + } + } + } + + idbassert(out->getColumnCount() == result.size()); + return result; + } + +} // namespace + namespace joblist { inline uint64_t TupleUnion::Hasher::operator()(const RowPosition& p) const @@ -218,8 +1445,9 @@ void TupleUnion::readInput(uint32_t which) l_tmpRG.getRow(0, &tmpRow); l_tmpRG.setRowCount(l_inputRG.getRowCount()); + const normalizeFunctionsT normalizeFunctions = inferNormalizeFunctions(inRow, &tmpRow, fTimeZone); for (uint32_t i = 0; i < l_inputRG.getRowCount(); i++, inRow.nextRow(), tmpRow.nextRow()) - normalize(inRow, &tmpRow); + normalize(inRow, &tmpRow, normalizeFunctions); l_tmpRG.getRow(0, &tmpRow); { @@ -227,7 +1455,9 @@ void TupleUnion::readInput(uint32_t which) getOutput(&l_outputRG, &outRow, &outRGData); memUsageBefore = allocator.getMemUsage(); - for (uint32_t i = 0; i < l_tmpRG.getRowCount(); i++, tmpRow.nextRow()) + uint32_t tmpOutputRowCount = l_outputRG.getRowCount(); + const uint32_t tmpRGRowCount = l_tmpRG.getRowCount(); + for (uint32_t i = 0; i < tmpRGRowCount; i++, tmpRow.nextRow()) { pair inserted; inserted = uniquer->insert(RowPosition(which | RowPosition::normalizedFlag, i)); @@ -236,12 +1466,15 @@ void TupleUnion::readInput(uint32_t which) { copyRow(tmpRow, &outRow); const_cast(*(inserted.first)) = - RowPosition(rowMemory.size() - 1, l_outputRG.getRowCount()); + RowPosition(rowMemory.size() - 1, tmpOutputRowCount); memDiff += outRow.getRealSize(); - addToOutput(&outRow, &l_outputRG, true, outRGData); + addToOutput(&outRow, &l_outputRG, true, outRGData, tmpOutputRowCount); + fRowsReturned++; } } + l_outputRG.setRowCount(tmpOutputRowCount); + memUsageAfter = allocator.getMemUsage(); memDiff += (memUsageAfter - memUsageBefore); } @@ -265,11 +1498,18 @@ void TupleUnion::readInput(uint32_t which) } else { - for (uint32_t i = 0; i < l_inputRG.getRowCount(); i++, inRow.nextRow()) + const normalizeFunctionsT normalizeFunctions = inferNormalizeFunctions(inRow, &outRow, fTimeZone); + const uint32_t inputRGRowCount = l_inputRG.getRowCount(); + uint32_t tmpOutputRowCount = l_outputRG.getRowCount(); + + for (uint32_t i = 0; i < inputRGRowCount; i++, inRow.nextRow()) { - normalize(inRow, &outRow); - addToOutput(&outRow, &l_outputRG, false, outRGData); + normalize(inRow, &outRow, normalizeFunctions); + addToOutput(&outRow, &l_outputRG, false, outRGData, tmpOutputRowCount); } + + fRowsReturned += inputRGRowCount; + l_outputRG.setRowCount(tmpOutputRowCount); } more = dl->next(it, &inRGData); @@ -380,14 +1620,14 @@ void TupleUnion::getOutput(RowGroup* rg, Row* row, RGData* data) rg->getRow(rg->getRowCount(), row); } -void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, RGData& data) +void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, RGData& data, uint32_t& tmpOutputRowCount) { r->nextRow(); - rg->incRowCount(); - fRowsReturned++; + tmpOutputRowCount++; - if (rg->getRowCount() == 8192) + if (UNLIKELY(tmpOutputRowCount == 8192)) { + rg->setRowCount(8192); { boost::mutex::scoped_lock lock(sMutex); output->insert(data); @@ -396,13 +1636,14 @@ void TupleUnion::addToOutput(Row* r, RowGroup* rg, bool keepit, RGData& data) rg->setData(&data); rg->resetRowGroup(0); rg->getRow(0, r); + tmpOutputRowCount = 0; if (keepit) rowMemory.push_back(data); } } -void TupleUnion::normalize(const Row& in, Row* out) +void TupleUnion::normalize(const Row& in, Row* out, const normalizeFunctionsT& normalizeFunctions) { uint32_t i; @@ -412,755 +1653,12 @@ void TupleUnion::normalize(const Row& in, Row* out) { if (in.isNullValue(i)) { - writeNull(out, i); + TupleUnion::writeNull(out, i); continue; } - switch (in.getColTypes()[i]) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - if (out->getScale(i) || in.getScale(i)) - goto dec1; - - out->setIntField(in.getIntField(i), i); - break; - - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: - if (in.getScale(i)) - goto dec1; - - out->setUintField(in.getUintField(i), i); - break; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - ostringstream os; - - if (in.getScale(i)) - { - double d = in.getIntField(i); - d /= exp10(in.getScale(i)); - os.precision(15); - os << d; - } - else - os << in.getIntField(i); - - out->setStringField(os.str(), i); - break; - } - - case CalpontSystemCatalog::DATE: - case CalpontSystemCatalog::DATETIME: - case CalpontSystemCatalog::TIME: - case CalpontSystemCatalog::TIMESTAMP: - throw logic_error( - "TupleUnion::normalize(): tried to normalize an int to a timestamp, time, date or datetime"); - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - { - auto d = in.getScaledSInt64FieldAsXFloat(i); - out->setFloatField((float)d, i); - break; - } - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - { - auto d = in.getScaledSInt64FieldAsXFloat(i); - out->setDoubleField(d, i); - break; - } - - case CalpontSystemCatalog::LONGDOUBLE: - { - auto d = in.getScaledSInt64FieldAsXFloat(i); - out->setLongDoubleField(d, i); - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - dec1: - int diff = out->getScale(i) - in.getScale(i); - - idbassert(diff >= 0); - /* - Signed INT to XDecimal - TODO: - - This code does not handle overflow that may happen on - scale multiplication. Instead of returning a garbage value - we should probably apply saturation here. In long terms we - should implement DECIMAL(65,x) to avoid overflow completely - (so the UNION between DECIMAL and integer can choose a proper - DECIMAL(M,N) result data type to guarantee that any incoming - integer value can fit into it). - */ - if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) - { - int128_t val = datatypes::applySignedScale(in.getIntField(i), diff); - out->setInt128Field(val, i); - } - else - { - int64_t val = datatypes::applySignedScale(in.getIntField(i), diff); - out->setIntField(val, i); - } - - break; - } - - default: - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: integer to " - << out->getColTypes()[i]; - throw logic_error(os.str()); - } - - break; - - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - if (out->getScale(i)) - goto dec2; - - out->setIntField(in.getUintField(i), i); - break; - - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: out->setUintField(in.getUintField(i), i); break; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - ostringstream os; - - if (in.getScale(i)) - { - double d = in.getUintField(i); - d /= exp10(in.getScale(i)); - os.precision(15); - os << d; - } - else - os << in.getUintField(i); - - out->setStringField(os.str(), i); - break; - } - - case CalpontSystemCatalog::DATE: - case CalpontSystemCatalog::DATETIME: - case CalpontSystemCatalog::TIME: - case CalpontSystemCatalog::TIMESTAMP: - throw logic_error( - "TupleUnion::normalize(): tried to normalize an int to a timestamp, time, date or datetime"); - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - { - auto d = in.getScaledUInt64FieldAsXFloat(i); - out->setFloatField((float)d, i); - break; - } - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - { - auto d = in.getScaledUInt64FieldAsXFloat(i); - out->setDoubleField(d, i); - break; - } - - case CalpontSystemCatalog::LONGDOUBLE: - { - auto d = in.getScaledUInt64FieldAsXFloat(i); - out->setLongDoubleField(d, i); - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - dec2: - int diff = out->getScale(i) - in.getScale(i); - - idbassert(diff >= 0); - /* - Unsigned INT to XDecimal - TODO: - - The overflow problem mentioned in the code under label "dec1:" is - also applicable here. - */ - if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) - { - int128_t val = datatypes::applySignedScale(in.getUintField(i), diff); - out->setInt128Field(val, i); - } - else - { - uint64_t val = datatypes::applySignedScale(in.getUintField(i), diff); - out->setUintField(val, i); - } - - break; - } - - default: - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: integer to " - << out->getColTypes()[i]; - throw logic_error(os.str()); - } - - break; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: out->setStringField(in.getStringField(i), i); break; - - default: - { - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: string to " << out->getColTypes()[i]; - throw logic_error(os.str()); - } - } - - break; - - case CalpontSystemCatalog::DATE: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::DATE: out->setIntField(in.getIntField(i), i); break; - - case CalpontSystemCatalog::DATETIME: - { - uint64_t date = in.getUintField(i); - date &= ~0x3f; // zero the 'spare' field - date <<= 32; - out->setUintField(date, i); - break; - } - - case CalpontSystemCatalog::TIMESTAMP: - { - dataconvert::Date date(in.getUintField(i)); - dataconvert::MySQLTime m_time; - m_time.year = date.year; - m_time.month = date.month; - m_time.day = date.day; - m_time.hour = 0; - m_time.minute = 0; - m_time.second = 0; - m_time.second_part = 0; - - dataconvert::TimeStamp timeStamp; - bool isValid = true; - int64_t seconds = dataconvert::mySQLTimeToGmtSec(m_time, fTimeZone, isValid); - - if (!isValid) - { - timeStamp.reset(); - } - else - { - timeStamp.second = seconds; - timeStamp.msecond = m_time.second_part; - } - - uint64_t outValue = (uint64_t) * (reinterpret_cast(&timeStamp)); - out->setUintField(outValue, i); - break; - } - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - string d = DataConvert::dateToString(in.getUintField(i)); - out->setStringField(d, i); - break; - } - - default: - { - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: date to " << out->getColTypes()[i]; - throw logic_error(os.str()); - } - } - - break; - - case CalpontSystemCatalog::DATETIME: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::DATETIME: out->setIntField(in.getIntField(i), i); break; - - case CalpontSystemCatalog::DATE: - { - uint64_t val = in.getUintField(i); - val >>= 32; - out->setUintField(val, i); - break; - } - - case CalpontSystemCatalog::TIMESTAMP: - { - uint64_t val = in.getUintField(i); - dataconvert::DateTime dtime(val); - dataconvert::MySQLTime m_time; - dataconvert::TimeStamp timeStamp; - - m_time.year = dtime.year; - m_time.month = dtime.month; - m_time.day = dtime.day; - m_time.hour = dtime.hour; - m_time.minute = dtime.minute; - m_time.second = dtime.second; - m_time.second_part = dtime.msecond; - - bool isValid = true; - int64_t seconds = mySQLTimeToGmtSec(m_time, fTimeZone, isValid); - - if (!isValid) - { - timeStamp.reset(); - } - else - { - timeStamp.second = seconds; - timeStamp.msecond = m_time.second_part; - } - - uint64_t outValue = (uint64_t) * (reinterpret_cast(&timeStamp)); - out->setUintField(outValue, i); - break; - } - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - string d = DataConvert::datetimeToString(in.getUintField(i)); - out->setStringField(d, i); - break; - } - - default: - { - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: datetime to " - << out->getColTypes()[i]; - throw logic_error(os.str()); - } - } - - break; - - case CalpontSystemCatalog::TIMESTAMP: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TIMESTAMP: out->setIntField(in.getIntField(i), i); break; - - case CalpontSystemCatalog::DATE: - case CalpontSystemCatalog::DATETIME: - { - uint64_t val = in.getUintField(i); - dataconvert::TimeStamp timestamp(val); - int64_t seconds = timestamp.second; - uint64_t outValue; - - dataconvert::MySQLTime time; - dataconvert::gmtSecToMySQLTime(seconds, time, fTimeZone); - - if (out->getColTypes()[i] == CalpontSystemCatalog::DATE) - { - dataconvert::Date date; - date.year = time.year; - date.month = time.month; - date.day = time.day; - date.spare = 0; - outValue = (uint32_t) * (reinterpret_cast(&date)); - } - else - { - dataconvert::DateTime datetime; - datetime.year = time.year; - datetime.month = time.month; - datetime.day = time.day; - datetime.hour = time.hour; - datetime.minute = time.minute; - datetime.second = time.second; - datetime.msecond = timestamp.msecond; - outValue = (uint64_t) * (reinterpret_cast(&datetime)); - } - - out->setUintField(outValue, i); - break; - } - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - string d = DataConvert::timestampToString(in.getUintField(i), fTimeZone); - out->setStringField(d, i); - break; - } - - default: - { - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: timestamp to " - << out->getColTypes()[i]; - throw logic_error(os.str()); - } - } - - break; - - case CalpontSystemCatalog::TIME: - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TIME: out->setIntField(in.getIntField(i), i); break; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - string d = DataConvert::timeToString(in.getIntField(i)); - out->setStringField(d, i); - break; - } - - default: - { - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: time to " << out->getColTypes()[i]; - throw logic_error(os.str()); - } - } - - break; - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - { - double val = - (in.getColTypes()[i] == CalpontSystemCatalog::FLOAT ? in.getFloatField(i) : in.getDoubleField(i)); - - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - if (out->getScale(i)) - goto dec3; - - out->setIntField((int64_t)val, i); - break; - - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: out->setUintField((uint64_t)val, i); break; - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: out->setFloatField(val, i); break; - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: out->setDoubleField(val, i); break; - - case CalpontSystemCatalog::LONGDOUBLE: out->setLongDoubleField(val, i); break; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - ostringstream os; - os.precision(15); // to match mysql's output - os << val; - out->setStringField(os.str(), i); - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - dec3: /* have to pick a scale to use for the double. using 5... */ - uint32_t scale = 5; - uint64_t ival = (uint64_t)(double)(val * datatypes::scaleDivisor(scale)); - int diff = out->getScale(i) - scale; - // xFLOAT or xDOUBLE to xDECIMAL conversion. Is it really possible? - // TODO: - // Perhaps we should add an assert here that this combination is not possible - // In the current reduction all problems mentioned in the code under - // label "dec1:" are also applicable here. - // TODO: isn't overflow possible below? - ival = datatypes::applySignedScale(ival, diff); - - if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) - out->setInt128Field(ival, i); - else - out->setIntField(ival, i); - - break; - } - - default: - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: floating point to " - << out->getColTypes()[i]; - throw logic_error(os.str()); - } - - break; - } - - case CalpontSystemCatalog::LONGDOUBLE: - { - long double val = in.getLongDoubleField(i); - - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - if (out->getScale(i)) - goto dec4; - - out->setIntField((int64_t)val, i); - break; - - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: out->setUintField((uint64_t)val, i); break; - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: out->setFloatField(val, i); break; - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: out->setDoubleField(val, i); break; - - case CalpontSystemCatalog::LONGDOUBLE: out->setLongDoubleField(val, i); break; - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - { - ostringstream os; - os.precision(15); // to match mysql's output - os << val; - out->setStringField(os.str(), i); - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - dec4: /* have to pick a scale to use for the double. using 5... */ - uint32_t scale = 5; - uint64_t ival = (uint64_t)(double)(val * datatypes::scaleDivisor(scale)); - int diff = out->getScale(i) - scale; - - // LONGDOUBLE to xDECIMAL conversions: is it really possible? - // TODO: - // Perhaps we should add an assert here that this combination is not possible - // In the current reduction all problems mentioned in the code under - // label "dec1:" are also applicable here. - ival = datatypes::applySignedScale(ival, diff); - - if (out->getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) - out->setInt128Field(ival, i); - else - out->setIntField(ival, i); - - break; - } - - default: - ostringstream os; - os << "TupleUnion::normalize(): tried an illegal conversion: floating point to " - << out->getColTypes()[i]; - throw logic_error(os.str()); - } - - break; - } - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - int64_t val = 0; - int128_t val128 = 0; - bool isInputWide = false; - - if (in.getColumnWidth(i) == datatypes::MAXDECIMALWIDTH) - { - in.getInt128Field(i, val128); - isInputWide = true; - } - else - val = in.getIntField(i); - - uint32_t scale = in.getScale(i); - - switch (out->getColTypes()[i]) - { - case CalpontSystemCatalog::TINYINT: - case CalpontSystemCatalog::SMALLINT: - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - case CalpontSystemCatalog::BIGINT: - case CalpontSystemCatalog::UTINYINT: - case CalpontSystemCatalog::USMALLINT: - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - case CalpontSystemCatalog::UBIGINT: - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - if (datatypes::isWideDecimalType(out->getColTypes()[i], out->getColumnWidth(i))) - { - if (out->getScale(i) == scale) - out->setInt128Field(isInputWide ? val128 : val, i); - else if (out->getScale(i) > scale) - { - int128_t temp = datatypes::applySignedScale(isInputWide ? val128 : val, - out->getScale(i) - scale); - out->setInt128Field(temp, i); - } - else // should not happen, the output's scale is the largest - throw logic_error("TupleUnion::normalize(): incorrect scale setting"); - } - // If output type is narrow decimal, input type - // has to be narrow decimal as well. - else - { - if (out->getScale(i) == scale) - out->setIntField(val, i); - else if (out->getScale(i) > scale) - { - int64_t temp = datatypes::applySignedScale(val, out->getScale(i) - scale); - out->setIntField(temp, i); - } - else // should not happen, the output's scale is the largest - throw logic_error("TupleUnion::normalize(): incorrect scale setting"); - } - - break; - } - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - { - float fval = ((float)val) / IDB_pow[scale]; - out->setFloatField(fval, i); - break; - } - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - { - double dval = ((double)val) / IDB_pow[scale]; - out->setDoubleField(dval, i); - break; - } - - case CalpontSystemCatalog::LONGDOUBLE: - { - long double dval = ((long double)val) / IDB_pow[scale]; - out->setLongDoubleField(dval, i); - break; - } - - case CalpontSystemCatalog::CHAR: - case CalpontSystemCatalog::TEXT: - case CalpontSystemCatalog::VARCHAR: - default: - { - if (LIKELY(isInputWide)) - { - datatypes::Decimal dec(0, in.getScale(i), in.getPrecision(i), val128); - out->setStringField(dec.toString(), i); - } - else - { - datatypes::Decimal dec(val, in.getScale(i), in.getPrecision(i)); - out->setStringField(dec.toString(), i); - } - break; - } - } - - break; - } - - case CalpontSystemCatalog::BLOB: - case CalpontSystemCatalog::VARBINARY: - { - // out->setVarBinaryField(in.getVarBinaryStringField(i), i); // not efficient - out->setVarBinaryField(in.getVarBinaryField(i), in.getVarBinaryLength(i), i); - break; - } - - default: - { - ostringstream os; - os << "TupleUnion::normalize(): unknown input type (" << in.getColTypes()[i] << ")"; - cout << os.str() << endl; - throw logic_error(os.str()); - } - } + /// Call the pre-compiled function. + normalizeFunctions[i](in, out, i); } } diff --git a/dbcon/joblist/tupleunion.h b/dbcon/joblist/tupleunion.h index 23a00b273..0d1370051 100644 --- a/dbcon/joblist/tupleunion.h +++ b/dbcon/joblist/tupleunion.h @@ -41,6 +41,8 @@ namespace joblist { +using normalizeFunctionsT = std::vector>; + class TupleUnion : public JobStep, public TupleDeliveryStep { public: @@ -122,8 +124,8 @@ class TupleUnion : public JobStep, public TupleDeliveryStep }; void getOutput(rowgroup::RowGroup* rg, rowgroup::Row* row, rowgroup::RGData* data); - void addToOutput(rowgroup::Row* r, rowgroup::RowGroup* rg, bool keepit, rowgroup::RGData& data); - void normalize(const rowgroup::Row& in, rowgroup::Row* out); + void addToOutput(rowgroup::Row* r, rowgroup::RowGroup* rg, bool keepit, rowgroup::RGData& data, uint32_t& tmpOutputRowCount); + void normalize(const rowgroup::Row& in, rowgroup::Row* out, const normalizeFunctionsT& normalizeFunctions); void writeNull(rowgroup::Row* out, uint32_t col); void readInput(uint32_t); void formatMiniStats();