From 554c6da8e87b8658698f7ea9cdeeea2c7c1ebff5 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Fri, 3 Apr 2020 13:30:10 -0400 Subject: [PATCH] MCOL-641 Implement int128_t versions of arithmetic operations and add unit test cases. --- datatypes/mcs_decimal.cpp | 359 ++++++++-- datatypes/mcs_decimal.h | 162 ++++- dbcon/execplan/arithmeticoperator.cpp | 6 +- dbcon/execplan/arithmeticoperator.h | 148 ++-- dbcon/mysql/ha_mcs_execplan.cpp | 73 +- dbcon/mysql/ha_mcs_sysvars.cpp | 19 + dbcon/mysql/ha_mcs_sysvars.h | 3 + tests/mcs_decimal-tests.cpp | 986 ++++++++++++++++---------- 8 files changed, 1254 insertions(+), 502 deletions(-) diff --git a/datatypes/mcs_decimal.cpp b/datatypes/mcs_decimal.cpp index cc227f215..325f67721 100644 --- a/datatypes/mcs_decimal.cpp +++ b/datatypes/mcs_decimal.cpp @@ -48,7 +48,7 @@ namespace datatypes template - void execute(const execplan::IDB_Decimal& l, + void addSubtractExecute(const execplan::IDB_Decimal& l, const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result, BinaryOperation op, @@ -57,7 +57,7 @@ namespace datatypes { int128_t lValue = Decimal::isWideDecimalType(l.precision) ? l.s128Value : l.value; - int128_t rValue = Decimal::isWideDecimalType(l.precision) + int128_t rValue = Decimal::isWideDecimalType(r.precision) ? r.s128Value : r.value; if (result.scale == l.scale && result.scale == r.scale) @@ -78,7 +78,9 @@ namespace datatypes { int128_t scaleMultiplier; getScaleDivisor(scaleMultiplier, l.scale - result.scale); - lValue /= scaleMultiplier; + lValue = (int128_t) (lValue > 0 ? + (__float128)lValue / scaleMultiplier + 0.5 : + (__float128)lValue / scaleMultiplier - 0.5); } if (result.scale > r.scale) @@ -92,7 +94,9 @@ namespace datatypes { int128_t scaleMultiplier; getScaleDivisor(scaleMultiplier, r.scale - result.scale); - rValue /= scaleMultiplier; + rValue = (int128_t) (rValue > 0 ? + (__float128)rValue / scaleMultiplier + 0.5 : + (__float128)rValue / scaleMultiplier - 0.5); } // We assume there is no way that lValue or rValue calculations @@ -102,6 +106,94 @@ namespace datatypes result.s128Value = op(lValue, rValue); } + template + void divisionExecute(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, + execplan::IDB_Decimal& result, + OpOverflowCheck opOverflowCheck, + MultiplicationOverflowCheck mulOverflowCheck) + { + int128_t lValue = Decimal::isWideDecimalType(l.precision) + ? l.s128Value : l.value; + int128_t rValue = Decimal::isWideDecimalType(r.precision) + ? r.s128Value : r.value; + + opOverflowCheck(lValue, rValue); + + if (result.scale >= l.scale - r.scale) + { + int128_t scaleMultiplier; + + getScaleDivisor(scaleMultiplier, result.scale - (l.scale - r.scale)); + + // TODO How do we check overflow of (int128_t)((__float128)lValue / rValue * scaleMultiplier) ? + + result.s128Value = (int128_t)(( (lValue > 0 && rValue > 0) || (lValue < 0 && rValue < 0) ? + (__float128)lValue / rValue * scaleMultiplier + 0.5 : + (__float128)lValue / rValue * scaleMultiplier - 0.5)); + } + else + { + int128_t scaleMultiplier; + + getScaleDivisor(scaleMultiplier, (l.scale - r.scale) - result.scale); + + result.s128Value = (int128_t)(( (lValue > 0 && rValue > 0) || (lValue < 0 && rValue < 0) ? + (__float128)lValue / rValue / scaleMultiplier + 0.5 : + (__float128)lValue / rValue / scaleMultiplier - 0.5)); + } + } + + template + void multiplicationExecute(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, + execplan::IDB_Decimal& result, + OpOverflowCheck opOverflowCheck, + MultiplicationOverflowCheck mulOverflowCheck) + { + int128_t lValue = Decimal::isWideDecimalType(l.precision) + ? l.s128Value : l.value; + int128_t rValue = Decimal::isWideDecimalType(r.precision) + ? r.s128Value : r.value; + + if (lValue == 0 || rValue == 0) + { + result.s128Value = 0; + return; + } + + if (result.scale >= l.scale + r.scale) + { + int128_t scaleMultiplier; + + getScaleDivisor(scaleMultiplier, result.scale - (l.scale + r.scale)); + + opOverflowCheck(lValue, rValue, result.s128Value); + opOverflowCheck(result.s128Value, scaleMultiplier, result.s128Value); + } + else + { + unsigned int diff = l.scale + r.scale - result.scale; + + int128_t scaleMultiplierL, scaleMultiplierR; + + getScaleDivisor(scaleMultiplierL, diff / 2); + getScaleDivisor(scaleMultiplierR, diff - (diff / 2)); + + lValue = (int128_t)(( (lValue > 0) ? + (__float128)lValue / scaleMultiplierL + 0.5 : + (__float128)lValue / scaleMultiplierL - 0.5)); + + rValue = (int128_t)(( (rValue > 0) ? + (__float128)rValue / scaleMultiplierR + 0.5 : + (__float128)rValue / scaleMultiplierR - 0.5)); + + opOverflowCheck(lValue, rValue, result.s128Value);; + } + } + std::string Decimal::toString(execplan::IDB_Decimal& value) { char buf[utils::MAXLENGTH16BYTES]; @@ -163,13 +255,9 @@ namespace datatypes { std::plus add; NoOverflowCheck noOverflowCheck; - execute(l, r, result, add, noOverflowCheck, noOverflowCheck); + addSubtractExecute(l, r, result, add, noOverflowCheck, noOverflowCheck); } - template - void Decimal::addition(const execplan::IDB_Decimal& l, - const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result); - // with overflow check template<> void Decimal::addition(const execplan::IDB_Decimal& l, @@ -178,13 +266,10 @@ namespace datatypes std::plus add; AdditionOverflowCheck overflowCheck; MultiplicationOverflowCheck mulOverflowCheck; - execute(l, r, result, add, overflowCheck, mulOverflowCheck); + addSubtractExecute(l, r, result, add, overflowCheck, mulOverflowCheck); } - template - void Decimal::addition(const execplan::IDB_Decimal& l, - const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result); - + // no overflow check template<> void Decimal::addition(const execplan::IDB_Decimal& l, const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) @@ -195,86 +280,252 @@ namespace datatypes return; } - int64_t lValue = 0, rValue = 0; + int64_t lValue = l.value, rValue = r.value; - if (result.scale >= l.scale) - lValue = l.value * mcs_pow_10[result.scale - l.scale]; - else - lValue = (int64_t)(l.value > 0 ? - (double)l.value / mcs_pow_10[l.scale - result.scale] + 0.5 : - (double)l.value / mcs_pow_10[l.scale - result.scale] - 0.5); + if (result.scale > l.scale) + lValue *= mcs_pow_10[result.scale - l.scale]; + else if (result.scale < l.scale) + lValue = (int64_t)(lValue > 0 ? + (double)lValue / mcs_pow_10[l.scale - result.scale] + 0.5 : + (double)lValue / mcs_pow_10[l.scale - result.scale] - 0.5); - if (result.scale >= r.scale) - rValue = r.value * mcs_pow_10[result.scale - r.scale]; - else - rValue = (int64_t)(r.value > 0 ? - (double)r.value / mcs_pow_10[r.scale - result.scale] + 0.5 : - (double)r.value / mcs_pow_10[r.scale - result.scale] - 0.5); + if (result.scale > r.scale) + rValue *= mcs_pow_10[result.scale - r.scale]; + else if (result.scale < r.scale) + rValue = (int64_t)(rValue > 0 ? + (double)rValue / mcs_pow_10[r.scale - result.scale] + 0.5 : + (double)rValue / mcs_pow_10[r.scale - result.scale] - 0.5); result.value = lValue + rValue; } - template - void Decimal::addition(const execplan::IDB_Decimal& l, - const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result); + // with overflow check template<> void Decimal::addition(const execplan::IDB_Decimal& l, const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) { - throw logging::NotImplementedExcept("Decimal::addition"); + AdditionOverflowCheck additionOverflowCheck; + MultiplicationOverflowCheck mulOverflowCheck; + + if (result.scale == l.scale && result.scale == r.scale) + { + additionOverflowCheck(l.value, r.value); + result.value = l.value + r.value; + return; + } + + int64_t lValue = l.value, rValue = r.value; + + if (result.scale > l.scale) + mulOverflowCheck(lValue, mcs_pow_10[result.scale - l.scale], lValue); + else if (result.scale < l.scale) + lValue = (int64_t)(lValue > 0 ? + (double)lValue / mcs_pow_10[l.scale - result.scale] + 0.5 : + (double)lValue / mcs_pow_10[l.scale - result.scale] - 0.5); + + if (result.scale > r.scale) + mulOverflowCheck(rValue, mcs_pow_10[result.scale - r.scale], rValue); + else if (result.scale < r.scale) + rValue = (int64_t)(rValue > 0 ? + (double)rValue / mcs_pow_10[r.scale - result.scale] + 0.5 : + (double)rValue / mcs_pow_10[r.scale - result.scale] - 0.5); + + additionOverflowCheck(lValue, rValue); + result.value = lValue + rValue; } + // no overflow check + template<> + void Decimal::subtraction(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) + { + std::minus subtract; + NoOverflowCheck noOverflowCheck; + addSubtractExecute(l, r, result, subtract, noOverflowCheck, noOverflowCheck); + } + + // with overflow check + template<> + void Decimal::subtraction(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) + { + std::minus subtract; + SubtractionOverflowCheck overflowCheck; + MultiplicationOverflowCheck mulOverflowCheck; + addSubtractExecute(l, r, result, subtract, overflowCheck, mulOverflowCheck); + } + + // no overflow check + template<> + void Decimal::subtraction(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) + { + if (result.scale == l.scale && result.scale == r.scale) + { + result.value = l.value - r.value; + return; + } + + int64_t lValue = l.value, rValue = r.value; + + if (result.scale > l.scale) + lValue *= mcs_pow_10[result.scale - l.scale]; + else if (result.scale < l.scale) + lValue = (int64_t)(lValue > 0 ? + (double)lValue / mcs_pow_10[l.scale - result.scale] + 0.5 : + (double)lValue / mcs_pow_10[l.scale - result.scale] - 0.5); + + if (result.scale > r.scale) + rValue *= mcs_pow_10[result.scale - r.scale]; + else if (result.scale < r.scale) + rValue = (int64_t)(rValue > 0 ? + (double)rValue / mcs_pow_10[r.scale - result.scale] + 0.5 : + (double)rValue / mcs_pow_10[r.scale - result.scale] - 0.5); + + result.value = lValue - rValue; + } + + // with overflow check + template<> + void Decimal::subtraction(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) + { + SubtractionOverflowCheck subtractionOverflowCheck; + MultiplicationOverflowCheck mulOverflowCheck; + + if (result.scale == l.scale && result.scale == r.scale) + { + subtractionOverflowCheck(l.value, r.value); + result.value = l.value - r.value; + return; + } + + int64_t lValue = l.value, rValue = r.value; + + if (result.scale > l.scale) + mulOverflowCheck(lValue, mcs_pow_10[result.scale - l.scale], lValue); + else if (result.scale < l.scale) + lValue = (int64_t)(lValue > 0 ? + (double)lValue / mcs_pow_10[l.scale - result.scale] + 0.5 : + (double)lValue / mcs_pow_10[l.scale - result.scale] - 0.5); + + if (result.scale > r.scale) + mulOverflowCheck(rValue, mcs_pow_10[result.scale - r.scale], rValue); + else if (result.scale < r.scale) + rValue = (int64_t)(rValue > 0 ? + (double)rValue / mcs_pow_10[r.scale - result.scale] + 0.5 : + (double)rValue / mcs_pow_10[r.scale - result.scale] - 0.5); + + subtractionOverflowCheck(lValue, rValue); + result.value = lValue - rValue; + } + + // no overflow check template<> void Decimal::division(const execplan::IDB_Decimal& l, const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) { - std::divides division; NoOverflowCheck noOverflowCheck; - execute(l, r, result, division, noOverflowCheck, noOverflowCheck); + divisionExecute(l, r, result, noOverflowCheck, noOverflowCheck); } - template - void Decimal::division(const execplan::IDB_Decimal& l, - const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result); - // With overflow check template<> void Decimal::division(const execplan::IDB_Decimal& l, const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) { - std::divides division; DivisionOverflowCheck overflowCheck; MultiplicationOverflowCheck mulOverflowCheck; - execute(l, r, result, division, overflowCheck, mulOverflowCheck); + divisionExecute(l, r, result, overflowCheck, mulOverflowCheck); } + // no overflow check // We rely on the zero check from ArithmeticOperator::execute template<> void Decimal::division(const execplan::IDB_Decimal& l, const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) { if (result.scale >= l.scale - r.scale) - result.value = (int64_t)(( (l.value > 0 && r.value > 0) - || (l.value < 0 && r.value < 0) ? - (long double)l.value / r.value * mcs_pow_10[result.scale - (l.scale - r.scale)] + 0.5 : - (long double)l.value / r.value * mcs_pow_10[result.scale - (l.scale - r.scale)] - 0.5)); - else - result.value = (int64_t)(( (l.value > 0 && r.value > 0) - || (l.value < 0 && r.value < 0) ? - (long double)l.value / r.value / mcs_pow_10[l.scale - r.scale - result.scale] + 0.5 : - (long double)l.value / r.value / mcs_pow_10[l.scale - r.scale - result.scale] - 0.5)); - + result.value = (int64_t)(( (l.value > 0 && r.value > 0) || (l.value < 0 && r.value < 0) ? + (long double)l.value / r.value * mcs_pow_10[result.scale - (l.scale - r.scale)] + 0.5 : + (long double)l.value / r.value * mcs_pow_10[result.scale - (l.scale - r.scale)] - 0.5)); + else + result.value = (int64_t)(( (l.value > 0 && r.value > 0) || (l.value < 0 && r.value < 0) ? + (long double)l.value / r.value / mcs_pow_10[l.scale - r.scale - result.scale] + 0.5 : + (long double)l.value / r.value / mcs_pow_10[l.scale - r.scale - result.scale] - 0.5)); } - template - void Decimal::division(const execplan::IDB_Decimal& l, - const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result); - + // With overflow check template<> void Decimal::division(const execplan::IDB_Decimal& l, const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) { - throw logging::NotImplementedExcept("Decimal::division"); + DivisionOverflowCheck divisionOverflowCheck; + + divisionOverflowCheck(l.value, r.value); + + if (result.scale >= l.scale - r.scale) + // TODO How do we check overflow of (int64_t)((long double)l.value / r.value * mcs_pow_10[result.scale - (l.scale - r.scale)]) ? + result.value = (int64_t)(( (l.value > 0 && r.value > 0) || (l.value < 0 && r.value < 0) ? + (long double)l.value / r.value * mcs_pow_10[result.scale - (l.scale - r.scale)] + 0.5 : + (long double)l.value / r.value * mcs_pow_10[result.scale - (l.scale - r.scale)] - 0.5)); + else + result.value = (int64_t)(( (l.value > 0 && r.value > 0) || (l.value < 0 && r.value < 0) ? + (long double)l.value / r.value / mcs_pow_10[l.scale - r.scale - result.scale] + 0.5 : + (long double)l.value / r.value / mcs_pow_10[l.scale - r.scale - result.scale] - 0.5)); + } + + // no overflow check + template<> + void Decimal::multiplication(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) + { + MultiplicationNoOverflowCheck noOverflowCheck; + multiplicationExecute(l, r, result, noOverflowCheck, noOverflowCheck); + } + + // With overflow check + template<> + void Decimal::multiplication(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) + { + MultiplicationOverflowCheck mulOverflowCheck; + multiplicationExecute(l, r, result, mulOverflowCheck, mulOverflowCheck); + } + + // no overflow check + template<> + void Decimal::multiplication(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) + { + if (result.scale >= l.scale + r.scale) + result.value = l.value * r.value * mcs_pow_10[result.scale - (l.scale + r.scale)]; + else + result.value = (int64_t)(( (l.value > 0 && r.value > 0) || (l.value < 0 && r.value < 0) ? + (double)l.value * r.value / mcs_pow_10[l.scale + r.scale - result.scale] + 0.5 : + (double)l.value * r.value / mcs_pow_10[l.scale + r.scale - result.scale] - 0.5)); + } + + // With overflow check + template<> + void Decimal::multiplication(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result) + { + MultiplicationOverflowCheck mulOverflowCheck; + + if (result.scale >= l.scale + r.scale) + { + mulOverflowCheck(l.value, r.value, result.value); + mulOverflowCheck(result.value, mcs_pow_10[result.scale - (l.scale + r.scale)], result.value); + } + else + { + mulOverflowCheck(l.value, r.value, result.value); + + result.value = (int64_t)(( (result.value > 0) ? + (double)result.value / mcs_pow_10[l.scale + r.scale - result.scale] + 0.5 : + (double)result.value / mcs_pow_10[l.scale + r.scale - result.scale] - 0.5)); + } } } // end of namespace diff --git a/datatypes/mcs_decimal.h b/datatypes/mcs_decimal.h index 93b827207..54071c03b 100644 --- a/datatypes/mcs_decimal.h +++ b/datatypes/mcs_decimal.h @@ -35,6 +35,7 @@ namespace datatypes constexpr uint32_t MAXDECIMALWIDTH = 16U; constexpr uint8_t INT64MAXPRECISION = 18U; constexpr uint8_t INT128MAXPRECISION = 38U; +constexpr uint8_t MAXLEGACYWIDTH = 8U; const uint64_t mcs_pow_10[20] = { @@ -126,6 +127,15 @@ class Decimal const execplan::IDB_Decimal& r, execplan::IDB_Decimal& result); + /** + @brief Subtraction template that supports overflow check and + two internal representations of decimal. + */ + template + static void subtraction(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, + execplan::IDB_Decimal& result); + /** @brief Division template that supports overflow check and two internal representations of decimal. @@ -136,7 +146,16 @@ class Decimal execplan::IDB_Decimal& result); /** - @brief Convinience method to put decimal into a std:;string. + @brief Multiplication template that supports overflow check and + two internal representations of decimal. + */ + template + static void multiplication(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, + execplan::IDB_Decimal& result); + + /** + @brief Convenience method to put decimal into a std::string. */ static std::string toString(execplan::IDB_Decimal& value); @@ -161,6 +180,66 @@ class Decimal && precision <= INT128MAXPRECISION; } + /** + @brief The method sets the legacy scale and precision of a wide decimal + column which is the result of an arithmetic operation. + */ + static inline void setDecimalScalePrecisionLegacy(execplan::CalpontSystemCatalog::ColType& ct, + unsigned int precision, unsigned int scale) + { + ct.scale = scale; + + if (ct.scale == 0) + ct.precision = precision - 1; + else + ct.precision = precision - scale; + } + + /** + @brief The method sets the scale and precision of a wide decimal + column which is the result of an arithmetic operation. + */ + static inline void setDecimalScalePrecision(execplan::CalpontSystemCatalog::ColType& ct, + unsigned int precision, unsigned int scale) + { + ct.colWidth = (precision > INT64MAXPRECISION) + ? MAXDECIMALWIDTH : MAXLEGACYWIDTH; + + ct.precision = (precision > INT128MAXPRECISION) + ? INT128MAXPRECISION : precision; + + ct.scale = scale; + } + + /** + @brief The method sets the scale and precision of a wide decimal + column which is the result of an arithmetic operation, based on a heuristic. + */ + static inline void setDecimalScalePrecisionHeuristic(execplan::CalpontSystemCatalog::ColType& ct, + unsigned int precision, unsigned int scale) + { + unsigned int diff = 0; + + if (precision > INT128MAXPRECISION) + { + ct.precision = INT128MAXPRECISION; + diff = precision - INT128MAXPRECISION; + } + else + { + ct.precision = precision; + } + + ct.scale = scale; + + if (diff != 0) + { + ct.scale = scale - (int)(diff * (38.0/65.0)); + + if (ct.scale < 0) + ct.scale = 0; + } + } }; /** @@ -170,17 +249,25 @@ class Decimal struct DivisionOverflowCheck { void operator()(const int128_t& x, const int128_t& y) { - if (x == Decimal::maxInt128 && y == -1) + if (x == Decimal::minInt128 && y == -1) { throw logging::OperationOverflowExcept( "Decimal::division produces an overflow."); } } + void operator()(const int64_t x, const int64_t y) + { + if (x == std::numeric_limits::min() && y == -1) + { + throw logging::OperationOverflowExcept( + "Decimal::division produces an overflow."); + } + } }; /** @brief The structure contains an overflow check for int128 - addition. + and int64_t multiplication. */ struct MultiplicationOverflowCheck { void operator()(const int128_t& x, const int128_t& y) @@ -189,7 +276,7 @@ struct MultiplicationOverflowCheck { { throw logging::OperationOverflowExcept( "Decimal::multiplication or scale multiplication \ - produces an overflow."); +produces an overflow."); } } bool operator()(const int128_t& x, const int128_t& y, int128_t& r) @@ -198,16 +285,45 @@ struct MultiplicationOverflowCheck { { throw logging::OperationOverflowExcept( "Decimal::multiplication or scale multiplication \ - produces an overflow."); +produces an overflow."); } return true; } + void operator()(const int64_t x, const int64_t y) + { + if (x * y / y != x) + { + throw logging::OperationOverflowExcept( + "Decimal::multiplication or scale multiplication \ +produces an overflow."); + } + } + bool operator()(const int64_t x, const int64_t y, int64_t& r) + { + if ((r = x * y) / y != x) + { + throw logging::OperationOverflowExcept( + "Decimal::multiplication or scale multiplication \ +produces an overflow."); + } + return true; + } +}; +/** + @brief The strucuture runs an empty overflow check for int128 + multiplication operation. +*/ +struct MultiplicationNoOverflowCheck { + void operator()(const int128_t& x, const int128_t& y, int128_t& r) + { + r = x * y; + } }; /** @brief The structure contains an overflow check for int128 - addition. + and int64 addition. */ struct AdditionOverflowCheck { void operator()(const int128_t& x, const int128_t& y) @@ -219,6 +335,40 @@ struct AdditionOverflowCheck { "Decimal::addition produces an overflow."); } } + void operator()(const int64_t x, const int64_t y) + { + if ((y > 0 && x > std::numeric_limits::max() - y) + || (y < 0 && x < std::numeric_limits::min() - y)) + { + throw logging::OperationOverflowExcept( + "Decimal::addition produces an overflow."); + } + } +}; + +/** + @brief The structure contains an overflow check for int128 + subtraction. +*/ +struct SubtractionOverflowCheck { + void operator()(const int128_t& x, const int128_t& y) + { + if ((y > 0 && x < Decimal::minInt128 + y) + || (y < 0 && x > Decimal::maxInt128 + y)) + { + throw logging::OperationOverflowExcept( + "Decimal::subtraction produces an overflow."); + } + } + void operator()(const int64_t x, const int64_t y) + { + if ((y > 0 && x < std::numeric_limits::min() + y) + || (y < 0 && x > std::numeric_limits::max() + y)) + { + throw logging::OperationOverflowExcept( + "Decimal::subtraction produces an overflow."); + } + } }; /** diff --git a/dbcon/execplan/arithmeticoperator.cpp b/dbcon/execplan/arithmeticoperator.cpp index 610bf2ad7..3f4fa695c 100644 --- a/dbcon/execplan/arithmeticoperator.cpp +++ b/dbcon/execplan/arithmeticoperator.cpp @@ -35,19 +35,19 @@ namespace execplan * Constructors/Destructors */ ArithmeticOperator::ArithmeticOperator() : Operator(), - fDecimalOverflowCheck(true) + fDecimalOverflowCheck(false) { } ArithmeticOperator::ArithmeticOperator(const string& operatorName): Operator(operatorName), - fDecimalOverflowCheck(true) + fDecimalOverflowCheck(false) { } ArithmeticOperator::ArithmeticOperator(const ArithmeticOperator& rhs): Operator(rhs), fTimeZone(rhs.timeZone()), - fDecimalOverflowCheck(true) + fDecimalOverflowCheck(false) { } diff --git a/dbcon/execplan/arithmeticoperator.h b/dbcon/execplan/arithmeticoperator.h index 88c4c1f11..5286ab25c 100644 --- a/dbcon/execplan/arithmeticoperator.h +++ b/dbcon/execplan/arithmeticoperator.h @@ -196,7 +196,7 @@ public: return TreeNode::getBoolVal(); } void adjustResultType(const CalpontSystemCatalog::ColType& m); - constexpr inline bool getOverflowCheck() + constexpr inline bool getOverflowCheck() const { return fDecimalOverflowCheck; } @@ -209,7 +209,6 @@ private: template inline result_t execute(result_t op1, result_t op2, bool& isNull); inline void execute(IDB_Decimal& result, IDB_Decimal op1, IDB_Decimal op2, bool& isNull); - inline void execute(IDB_Decimal& result, IDB_Decimal op1, IDB_Decimal op2, bool& isNull, cscType& resultCscType); std::string fTimeZone; bool fDecimalOverflowCheck; }; @@ -250,7 +249,7 @@ inline void ArithmeticOperator::evaluate(rowgroup::Row& row, bool& isNull, Parse // WIP MCOL-641 case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL: - execute(fResult.decimalVal, lop->getDecimalVal(row, isNull), rop->getDecimalVal(row, isNull), isNull, fOperationType); + execute(fResult.decimalVal, lop->getDecimalVal(row, isNull), rop->getDecimalVal(row, isNull), isNull); break; default: { @@ -292,20 +291,36 @@ inline result_t ArithmeticOperator::execute(result_t op1, result_t op2, bool& is } } -inline void ArithmeticOperator::execute(IDB_Decimal& result, IDB_Decimal op1, IDB_Decimal op2, bool& isNull, cscType& resultCscType) +inline void ArithmeticOperator::execute(IDB_Decimal& result, IDB_Decimal op1, IDB_Decimal op2, bool& isNull) { switch (fOp) { case OP_ADD: - if (resultCscType.colWidth == datatypes::MAXDECIMALWIDTH) + if (fOperationType.colWidth == datatypes::MAXDECIMALWIDTH) { - datatypes::Decimal::addition( - op1, op2, result); + if (LIKELY(!fDecimalOverflowCheck)) + { + datatypes::Decimal::addition( + op1, op2, result); + } + else + { + datatypes::Decimal::addition( + op1, op2, result); + } } - else if (resultCscType.colWidth == utils::MAXLEGACYWIDTH) + else if (fOperationType.colWidth == utils::MAXLEGACYWIDTH) { - datatypes::Decimal::addition( - op1, op2, result); + if (LIKELY(!fDecimalOverflowCheck)) + { + datatypes::Decimal::addition( + op1, op2, result); + } + else + { + datatypes::Decimal::addition( + op1, op2, result); + } } else { @@ -315,52 +330,95 @@ inline void ArithmeticOperator::execute(IDB_Decimal& result, IDB_Decimal op1, ID break; case OP_SUB: - if (result.scale == op1.scale && result.scale == op2.scale) + if (fOperationType.colWidth == datatypes::MAXDECIMALWIDTH) { - result.value = op1.value - op2.value; - break; + if (LIKELY(!fDecimalOverflowCheck)) + { + datatypes::Decimal::subtraction( + op1, op2, result); + } + else + { + datatypes::Decimal::subtraction( + op1, op2, result); + } + } + else if (fOperationType.colWidth == utils::MAXLEGACYWIDTH) + { + if (LIKELY(!fDecimalOverflowCheck)) + { + datatypes::Decimal::subtraction( + op1, op2, result); + } + else + { + datatypes::Decimal::subtraction( + op1, op2, result); + } } - - if (result.scale >= op1.scale) - op1.value *= IDB_pow[result.scale - op1.scale]; else - op1.value = (int64_t)(op1.value > 0 ? - (double)op1.value / IDB_pow[op1.scale - result.scale] + 0.5 : - (double)op1.value / IDB_pow[op1.scale - result.scale] - 0.5); - - if (result.scale >= op2.scale) - op2.value *= IDB_pow[result.scale - op2.scale]; - else - op2.value = (int64_t)(op2.value > 0 ? - (double)op2.value / IDB_pow[op2.scale - result.scale] + 0.5 : - (double)op2.value / IDB_pow[op2.scale - result.scale] - 0.5); - - result.value = op1.value - op2.value; + { + throw logging::InvalidArgumentExcept( + "Unexpected result width"); + } break; case OP_MUL: - if (result.scale >= op1.scale + op2.scale) - result.value = op1.value * op2.value * IDB_pow[result.scale - (op1.scale + op2.scale)]; + if (fOperationType.colWidth == datatypes::MAXDECIMALWIDTH) + { + if (LIKELY(!fDecimalOverflowCheck)) + { + datatypes::Decimal::multiplication( + op1, op2, result); + } + else + { + datatypes::Decimal::multiplication( + op1, op2, result); + } + } + else if (fOperationType.colWidth == utils::MAXLEGACYWIDTH) + { + if (LIKELY(!fDecimalOverflowCheck)) + { + datatypes::Decimal::multiplication( + op1, op2, result); + } + else + { + datatypes::Decimal::multiplication( + op1, op2, result); + } + } else - result.value = (int64_t)(( (op1.value > 0 && op2.value > 0) || (op1.value < 0 && op2.value < 0) ? - (double)op1.value * op2.value / IDB_pow[op1.scale + op2.scale - result.scale] + 0.5 : - (double)op1.value * op2.value / IDB_pow[op1.scale + op2.scale - result.scale] - 0.5)); - + { + throw logging::InvalidArgumentExcept( + "Unexpected result width"); + } break; case OP_DIV: - if (resultCscType.colWidth == 16) + if (fOperationType.colWidth == datatypes::MAXDECIMALWIDTH) { - if (op2.s128Value == 0) + if ((datatypes::Decimal::isWideDecimalType(op2.precision) && op2.s128Value == 0) + || (!datatypes::Decimal::isWideDecimalType(op2.precision) && op2.value == 0)) { isNull = true; break; } - datatypes::Decimal::division( - op1, op2, result); + if (LIKELY(!fDecimalOverflowCheck)) + { + datatypes::Decimal::division( + op1, op2, result); + } + else + { + datatypes::Decimal::division( + op1, op2, result); + } } - else if (resultCscType.colWidth == 8) + else if (fOperationType.colWidth == utils::MAXLEGACYWIDTH) { if (op2.value == 0) { @@ -368,8 +426,16 @@ inline void ArithmeticOperator::execute(IDB_Decimal& result, IDB_Decimal op1, ID break; } - datatypes::Decimal::division( - op1, op2, result); + if (LIKELY(!fDecimalOverflowCheck)) + { + datatypes::Decimal::division( + op1, op2, result); + } + else + { + datatypes::Decimal::division( + op1, op2, result); + } } else { diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 5a3260b65..20a2158bc 100755 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -3105,17 +3105,13 @@ CalpontSystemCatalog::ColType colType_MysqlToIDB (const Item* item) case DECIMAL_RESULT: { Item_decimal* idp = (Item_decimal*)item; + ct.colDataType = CalpontSystemCatalog::DECIMAL; - ct.colWidth = (idp->max_length >= datatypes::INT64MAXPRECISION) - ? datatypes::MAXDECIMALWIDTH : utils::MAXLEGACYWIDTH; - ct.scale = idp->decimals; - // WIP MCOL-641 - if (ct.scale == 0) - ct.precision = (idp->max_length > datatypes::INT128MAXPRECISION) - ? datatypes::INT128MAXPRECISION : idp->max_length - 1; - else - ct.precision = (idp->max_length > datatypes::INT128MAXPRECISION ) - ? datatypes::INT128MAXPRECISION : idp->max_length - idp->decimals; + + unsigned int precision = idp->decimal_precision(); + unsigned int scale = idp->decimal_scale(); + + datatypes::Decimal::setDecimalScalePrecision(ct, precision, scale); break; } @@ -3141,7 +3137,7 @@ ReturnedColumn* buildReturnedColumn( { ReturnedColumn* rc = NULL; - if ( gwi.thd) + if (gwi.thd) { //if ( ((gwi.thd->lex)->sql_command == SQLCOM_UPDATE ) || ((gwi.thd->lex)->sql_command == SQLCOM_UPDATE_MULTI )) { @@ -3444,6 +3440,7 @@ ArithmeticColumn* buildArithmeticColumn( Item** sfitempp = item->arguments(); ArithmeticOperator* aop = new ArithmeticOperator(item->func_name()); aop->timeZone(gwi.thd->variables.time_zone->get_name()->ptr()); + aop->setOverflowCheck(get_decimal_overflow_check(gwi.thd)); ParseTree* pt = new ParseTree(aop); //ReturnedColumn *lhs = 0, *rhs = 0; ParseTree* lhs = 0, *rhs = 0; @@ -3606,6 +3603,60 @@ ArithmeticColumn* buildArithmeticColumn( // decimal arithmetic operation gives double result when the session variable is set. CalpontSystemCatalog::ColType mysql_type = colType_MysqlToIDB(item); + if (mysql_type.colDataType == CalpontSystemCatalog::DECIMAL || + mysql_type.colDataType == CalpontSystemCatalog::UDECIMAL) + { + int32_t leftColWidth = pt->left()->data()->resultType().colWidth; + int32_t rightColWidth = pt->right()->data()->resultType().colWidth; + + // Revert back to legacy values of scale and precision + // if the 2 columns involved in the expression are not wide + if (leftColWidth <= utils::MAXLEGACYWIDTH && + rightColWidth <= utils::MAXLEGACYWIDTH) + { + Item_decimal* idp = (Item_decimal*)item; + + mysql_type.colWidth = 8; + + unsigned int precision = idp->max_length; + unsigned int scale = idp->decimals; + + datatypes::Decimal::setDecimalScalePrecisionLegacy(mysql_type, precision, scale); + } + else + { + + if (leftColWidth == datatypes::MAXDECIMALWIDTH || + rightColWidth == datatypes::MAXDECIMALWIDTH) + mysql_type.colWidth = datatypes::MAXDECIMALWIDTH; + + if (mysql_type.colWidth == datatypes::MAXDECIMALWIDTH) + { + string funcName = item->func_name(); + + int32_t scale1 = pt->left()->data()->resultType().scale; + int32_t scale2 = pt->right()->data()->resultType().scale; + + if (funcName == "/" && + (mysql_type.scale - (scale1 - scale2)) > datatypes::INT128MAXPRECISION) + { + Item_decimal* idp = (Item_decimal*)item; + + unsigned int precision = idp->decimal_precision(); + unsigned int scale = idp->decimal_scale(); + + datatypes::Decimal::setDecimalScalePrecisionHeuristic(mysql_type, precision, scale); + + if (mysql_type.scale < scale1) + mysql_type.scale = scale1; + + if (mysql_type.precision < mysql_type.scale) + mysql_type.precision = mysql_type.scale; + } + } + } + } + if (get_double_for_decimal_math(current_thd) == true) aop->adjustResultType(mysql_type); else diff --git a/dbcon/mysql/ha_mcs_sysvars.cpp b/dbcon/mysql/ha_mcs_sysvars.cpp index 02f89ab50..fc037005b 100644 --- a/dbcon/mysql/ha_mcs_sysvars.cpp +++ b/dbcon/mysql/ha_mcs_sysvars.cpp @@ -159,6 +159,15 @@ static MYSQL_THDVAR_BOOL( 0 ); +static MYSQL_THDVAR_BOOL( + decimal_overflow_check, + PLUGIN_VAR_NOCMDARG, + "Enable/disable for ColumnStore to check for overflow in arithmetic operation.", + NULL, + NULL, + 0 +); + static MYSQL_THDVAR_BOOL( ordered_only, PLUGIN_VAR_NOCMDARG, @@ -353,6 +362,7 @@ st_mysql_sys_var* mcs_system_variables[] = MYSQL_SYSVAR(diskjoin_bucketsize), MYSQL_SYSVAR(um_mem_limit), MYSQL_SYSVAR(double_for_decimal_math), + MYSQL_SYSVAR(decimal_overflow_check), MYSQL_SYSVAR(local_query), MYSQL_SYSVAR(use_import_for_batchinsert), MYSQL_SYSVAR(import_for_batchinsert_delimiter), @@ -557,6 +567,15 @@ void set_double_for_decimal_math(THD* thd, bool value) THDVAR(thd, double_for_decimal_math) = value; } +bool get_decimal_overflow_check(THD* thd) +{ + return ( thd == NULL ) ? false : THDVAR(thd, decimal_overflow_check); +} +void set_decimal_overflow_check(THD* thd, bool value) +{ + THDVAR(thd, decimal_overflow_check) = value; +} + ulong get_local_query(THD* thd) { return ( thd == NULL ) ? 0 : THDVAR(thd, local_query); diff --git a/dbcon/mysql/ha_mcs_sysvars.h b/dbcon/mysql/ha_mcs_sysvars.h index 1bb01307f..b8e3884b7 100644 --- a/dbcon/mysql/ha_mcs_sysvars.h +++ b/dbcon/mysql/ha_mcs_sysvars.h @@ -101,6 +101,9 @@ void set_varbin_always_hex(THD* thd, bool value); bool get_double_for_decimal_math(THD* thd); void set_double_for_decimal_math(THD* thd, bool value); +bool get_decimal_overflow_check(THD* thd); +void set_decimal_overflow_check(THD* thd, bool value); + ulong get_local_query(THD* thd); void set_local_query(THD* thd, ulong value); diff --git a/tests/mcs_decimal-tests.cpp b/tests/mcs_decimal-tests.cpp index b3a041867..53a40c533 100644 --- a/tests/mcs_decimal-tests.cpp +++ b/tests/mcs_decimal-tests.cpp @@ -15,11 +15,15 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ +#include + #include "gtest/gtest.h" #include "treenode.h" #include "mcs_decimal.h" #include "widedecimalutils.h" +#include "dataconvert.h" +#include "calpontsystemcatalog.h" TEST(Decimal, compareCheck) { @@ -82,6 +86,9 @@ TEST(Decimal, compareCheck) TEST(Decimal, additionNoOverflowCheck) { + execplan::CalpontSystemCatalog::ColType ct; + ct.colDataType = execplan::CalpontSystemCatalog::DECIMAL; + char buf[42]; // Addition w/o overflow check execplan::IDB_Decimal l, r, result; // same precision, same scale, both positive values @@ -98,61 +105,33 @@ TEST(Decimal, additionNoOverflowCheck) result.s128Value = 0; datatypes::Decimal::addition(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); EXPECT_EQ(462, result.s128Value); + // same precision, same scale, both negative values - l.scale = 38; - l.precision = 38; l.s128Value = -42; - - r.scale = 38; - r.precision = 38; r.s128Value = -420; - - result.scale = 38; - result.precision = 38; result.s128Value = 0; datatypes::Decimal::addition(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); EXPECT_EQ(-462, result.s128Value); + // same precision, same scale, +- values - l.scale = 38; - l.precision = 38; l.s128Value = 42; - - r.scale = 38; - r.precision = 38; r.s128Value = -420; - - result.scale = 38; - result.precision = 38; result.s128Value = 0; datatypes::Decimal::addition(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); EXPECT_EQ(-378, result.s128Value); - // same precision, same scale, both 0 - l.scale = 38; - l.precision = 38; - l.s128Value = 0; - r.scale = 38; - r.precision = 38; + // same precision, same scale, both 0 + l.s128Value = 0; r.s128Value = 0; - - result.scale = 38; - result.precision = 38; result.s128Value = 0; datatypes::Decimal::addition(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); EXPECT_EQ(0, result.s128Value); - // diff scale + + // different scale // same precision, L scale > R scale, both positive values l.scale = 38; l.precision = 38; @@ -167,65 +146,35 @@ TEST(Decimal, additionNoOverflowCheck) result.s128Value = 0; datatypes::Decimal::addition(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - int128_t s128ScaleMultiplier1 = - static_cast(10000000000000)*10000000000; - int128_t s128Result = r.s128Value*s128ScaleMultiplier1+l.s128Value; - EXPECT_EQ(s128Result, result.s128Value); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000042000000000000000000000042", std::string(buf)); + // same precision, L scale > R scale, both negative values - l.scale = 38; - l.precision = 38; l.s128Value = -42; - - r.scale = 15; - r.precision = 38; r.s128Value = -420; - - result.scale = 38; - result.precision = 38; result.s128Value = 0; datatypes::Decimal::addition(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = r.s128Value*s128ScaleMultiplier1+l.s128Value; - EXPECT_EQ(s128Result, result.s128Value); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-0.00000000000042000000000000000000000042", std::string(buf)); + // same precision, L scale > R scale, +- values - l.scale = 38; - l.precision = 38; l.s128Value = 42; - - r.scale = 15; - r.precision = 38; r.s128Value = -420; - - result.scale = 38; - result.precision = 38; result.s128Value = 0; datatypes::Decimal::addition(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = r.s128Value*s128ScaleMultiplier1+l.s128Value; - EXPECT_EQ(s128Result, result.s128Value); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-0.00000000000041999999999999999999999958", std::string(buf)); + // same precision, L scale > R scale, both 0 - l.scale = 38; - l.precision = 38; l.s128Value = 0; - - r.scale = 15; - r.precision = 38; r.s128Value = 0; - - result.scale = 38; - result.precision = 38; result.s128Value = 0; datatypes::Decimal::addition(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); EXPECT_EQ(0, result.s128Value); + // same precision, L scale < R scale, both positive values l.scale = 15; l.precision = 38; @@ -240,72 +189,45 @@ TEST(Decimal, additionNoOverflowCheck) result.s128Value = 0; datatypes::Decimal::addition(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = l.s128Value*s128ScaleMultiplier1+r.s128Value; - EXPECT_EQ(s128Result, result.s128Value); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000004200000000000000000000420", std::string(buf)); + // same precision, L scale < R scale, both negative values - l.scale = 15; - l.precision = 38; l.s128Value = -42; - - r.scale = 38; - r.precision = 38; r.s128Value = -420; - - result.scale = 38; - result.precision = 38; result.s128Value = 0; datatypes::Decimal::addition(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = l.s128Value*s128ScaleMultiplier1+r.s128Value; - EXPECT_EQ(s128Result, result.s128Value); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-0.00000000000004200000000000000000000420", std::string(buf)); + // same precision, L scale < R scale, +- values - l.scale = 15; - l.precision = 38; l.s128Value = 42; - - r.scale = 38; - r.precision = 38; r.s128Value = -420; - - result.scale = 38; - result.precision = 38; result.s128Value = 0; datatypes::Decimal::addition(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = l.s128Value*s128ScaleMultiplier1+r.s128Value; - EXPECT_EQ(s128Result, result.s128Value); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000004199999999999999999999580", std::string(buf)); + // same precision, L scale < R scale, both 0 - l.scale = 15; - l.precision = 38; l.s128Value = 0; - - r.scale = 38; - r.precision = 38; r.s128Value = 0; - - result.scale = 38; - result.precision = 38; result.s128Value = 0; datatypes::Decimal::addition(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = l.s128Value*s128ScaleMultiplier1+r.s128Value; - EXPECT_EQ(s128Result, result.s128Value); + EXPECT_EQ(0, result.s128Value); } TEST(Decimal, divisionNoOverflowCheck) { // DIVISION // same precision, same scale, both positive values - std::string decimalStr; + execplan::CalpontSystemCatalog::ColType ct; + ct.colDataType = execplan::CalpontSystemCatalog::DECIMAL; + char buf[42]; execplan::IDB_Decimal l, r, result; + l.scale = 38; l.precision = 38; l.s128Value = 43; @@ -314,244 +236,104 @@ TEST(Decimal, divisionNoOverflowCheck) r.precision = 38; r.s128Value = 420; - result.scale = r.scale; + result.scale = 10; result.precision = 38; result.s128Value = 0; datatypes::Decimal::division(r, l, result); - - EXPECT_EQ(r.scale, result.scale); - EXPECT_EQ(result.precision, result.precision); - EXPECT_EQ(r.s128Value/l.s128Value, result.s128Value); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("9.7674418605", std::string(buf)); + // same precision, same scale, both negative values - l.scale = 38; - l.precision = 38; - l.s128Value = -42; - - r.scale = 38; - r.precision = 38; + l.s128Value = -43; r.s128Value = -420; - - result.scale = r.scale; - result.precision = r.precision; result.s128Value = 0; datatypes::Decimal::division(r, l, result); - EXPECT_EQ(r.scale, result.scale); - EXPECT_EQ(r.precision, result.precision); - EXPECT_EQ(r.s128Value/l.s128Value, result.s128Value); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("9.7674418605", std::string(buf)); + // same precision, same scale, +- values - l.scale = 38; - l.precision = 38; - l.s128Value = 42; - - r.scale = 38; - r.precision = 38; - r.s128Value = -420; - - result.scale = 38; - result.precision = 38; + l.s128Value = 2200000; + r.s128Value = -1900; result.s128Value = 0; datatypes::Decimal::division(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - EXPECT_EQ(l.s128Value/r.s128Value, result.s128Value); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-1157.8947368421", std::string(buf)); + // same precision, same scale, l = 0 - l.scale = 38; - l.precision = 38; l.s128Value = 0; - - r.scale = 38; - r.precision = 38; r.s128Value = 42424242; - - result.scale = 38; - result.precision = 38; result.s128Value = 0; datatypes::Decimal::division(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); EXPECT_EQ(0, result.s128Value); + // diff scale // same precision, L scale > R scale, both positive values l.scale = 38; - l.precision = 38; - l.s128Value = 42; + l.s128Value = 19; r.scale = 15; - r.precision = 38; - r.s128Value = 420; + r.s128Value = 22; - result.scale = 38; - result.precision = 38; + result.scale = 10; result.s128Value = 0; datatypes::Decimal::division(r, l, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - int128_t s128ScaleMultiplier1 = - static_cast(10000000000000)*10000000000; - int128_t s128Result = r.s128Value*s128ScaleMultiplier1/l.s128Value; - EXPECT_EQ(s128Result, result.s128Value); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("115789473684210526315789.4736842105", std::string(buf)); + // same precision, L scale > R scale, both negative values - l.scale = 38; - l.precision = 38; - l.s128Value = -42; - - r.scale = 15; - r.precision = 38; - r.s128Value = -420; - - result.scale = 38; - result.precision = 38; + l.s128Value = -22; + r.s128Value = -19; result.s128Value = 0; datatypes::Decimal::division(r, l, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = r.s128Value*s128ScaleMultiplier1/l.s128Value; - EXPECT_EQ(s128Result, result.s128Value); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("86363636363636363636363.6363636364", std::string(buf)); + // same precision, L scale > R scale, +- values - l.scale = 38; - l.precision = 38; - l.s128Value = 42; - - r.scale = 15; - r.precision = 38; - r.s128Value = -420; - - result.scale = 38; - result.precision = 38; + l.s128Value = 19; + r.s128Value = -22; result.s128Value = 0; datatypes::Decimal::division(r, l, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = r.s128Value*s128ScaleMultiplier1/l.s128Value; - EXPECT_EQ(s128Result, result.s128Value); - // same precision, L scale > R scale, L = 0 - l.scale = 38; - l.precision = 38; - l.s128Value = 0; + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-115789473684210526315789.4736842105", std::string(buf)); - r.scale = 15; - r.precision = 38; - r.s128Value = 424242; - - result.scale = 38; - result.precision = 38; + // same precision, L scale > R scale, R = 0 + l.s128Value = 424242; + r.s128Value = 0; result.s128Value = 0; - datatypes::Decimal::division(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); + datatypes::Decimal::division(r, l, result); EXPECT_EQ(0, result.s128Value); + // same precision, L scale > R scale, both MAX positive values - // WIP Investigate the next two - l.scale = 38; - l.precision = 38; - l.s128Value = 0; utils::int128Max(l.s128Value); - - r.scale = 15; - r.precision = 38; - r.s128Value = 0; utils::int128Max(r.s128Value); - - result.scale = 38; - result.precision = 38; - result.s128Value = 0; - - datatypes::Decimal::division(l, r, result); - // Use as an examplar + utils::int128Max(l.s128Value); utils::int128Max(r.s128Value); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = r.s128Value*s128ScaleMultiplier1/l.s128Value; - // WIP - //EXPECT_EQ(s128Result, result.s128Value); - // same precision, L scale > R scale, both MIN negative values - l.scale = 38; - l.precision = 38; - l.s128Value = 0; utils::int128Min(l.s128Value); - - r.scale = 15; - r.precision = 38; - r.s128Value = 0; utils::int128Min(l.s128Value); - - result.scale = 38; - result.precision = 38; result.s128Value = 0; - //datatypes::Decimal::division(l, r, result); - // Use as an examplar + datatypes::Decimal::division(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("100000000000000000000000.0000000000", std::string(buf)); + + // same precision, L scale > R scale, both MIN negative values + utils::int128Min(l.s128Value); utils::int128Min(r.s128Value); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = r.s128Value*s128ScaleMultiplier1/l.s128Value; - //EXPECT_EQ(s128Result, result.s128Value); - // WIP - //EXPECT_EQ(r.s128Value, result.s128Value); + result.s128Value = 0; + + datatypes::Decimal::division(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("100000000000000000000000.0000000000", std::string(buf)); // same precision, L scale < R scale, both positive values l.scale = 15; l.precision = 38; l.s128Value = 42; - r.scale = 38; - r.precision = 38; - r.s128Value = 420; - - result.scale = 38; - result.precision = 38; - result.s128Value = 0; - - datatypes::Decimal::division(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = l.s128Value*s128ScaleMultiplier1/r.s128Value; - EXPECT_EQ(s128Result, result.s128Value); - // same precision, L scale < R scale, both negative values - l.scale = 15; - l.precision = 38; - l.s128Value = -42; - - r.scale = 38; - r.precision = 38; - r.s128Value = -420; - - result.scale = 38; - result.precision = 38; - result.s128Value = 0; - - datatypes::Decimal::division(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = l.s128Value*s128ScaleMultiplier1/r.s128Value; - EXPECT_EQ(s128Result, result.s128Value); - // same precision, L scale < R scale, +- values - l.scale = 15; - l.precision = 38; - l.s128Value = 42; - - r.scale = 38; - r.precision = 38; - r.s128Value = -420; - - result.scale = 38; - result.precision = 38; - result.s128Value = 0; - - datatypes::Decimal::division(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = l.s128Value*s128ScaleMultiplier1/r.s128Value; - EXPECT_EQ(s128Result, result.s128Value); - // same precision, L scale < R scale, L = 0 - l.scale = 15; - l.precision = 38; - l.s128Value = 0; - r.scale = 38; r.precision = 38; r.s128Value = 42; @@ -560,73 +342,140 @@ TEST(Decimal, divisionNoOverflowCheck) result.precision = 38; result.s128Value = 0; - datatypes::Decimal::division(l, r, result); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = l.s128Value*s128ScaleMultiplier1/r.s128Value; - EXPECT_EQ(s128Result, result.s128Value); + datatypes::Decimal::division(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000000000000001000000000000000", std::string(buf)); + + // same precision, L scale < R scale, both negative values + l.s128Value = -22; + r.s128Value = -19; + result.s128Value = 0; + + datatypes::Decimal::division(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000000000000000863636363636364", std::string(buf)); + + // same precision, L scale < R scale, +- values + l.s128Value = 22; + r.s128Value = -19; + result.s128Value = 0; + + datatypes::Decimal::division(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-0.00000000000000000000000863636363636364", std::string(buf)); + + // same precision, L scale < R scale, R = 0 + l.s128Value = 42; + r.s128Value = 0; + result.s128Value = 0; + + datatypes::Decimal::division(r, l, result); + EXPECT_EQ(0, result.s128Value); + // same precision, L scale < R scale, both MAX positive values // WIP Investigate the next two - l.scale = 15; - l.precision = 38; - l.s128Value = 0; utils::int128Max(l.s128Value); - - r.scale = 38; - r.precision = 38; - r.s128Value = 0; utils::int128Max(r.s128Value); - - result.scale = 38; - result.precision = 38; - result.s128Value = 0; - - datatypes::Decimal::division(l, r, result); - // Use as an examplar + utils::int128Max(l.s128Value); utils::int128Max(r.s128Value); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = l.s128Value*s128ScaleMultiplier1/r.s128Value; - // WIP - //EXPECT_EQ(s128Result, result.s128Value); - //EXPECT_EQ(r.s128Value, result.s128Value); + result.s128Value = 0; + + datatypes::Decimal::division(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000000000000001000000000000000", std::string(buf)); + // same precision, L scale < R scale, both MIN negative values - l.scale = 15; + utils::int128Min(l.s128Value); + utils::int128Min(r.s128Value); + result.s128Value = 0; + + datatypes::Decimal::division(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000000000000001000000000000000", std::string(buf)); + + // same precision, L scale < R scale, result.scale < (r.scale-l.scale) + // both positive values + l.scale = 37; l.precision = 38; - l.s128Value = 0; utils::int128Min(l.s128Value); + l.s128Value = 43; r.scale = 38; r.precision = 38; - r.s128Value = 0; utils::int128Min(l.s128Value); + r.s128Value = 420; - result.scale = 38; + result.scale = 0; result.precision = 38; result.s128Value = 0; - //datatypes::Decimal::division(l, r, result); - // Use as an examplar + datatypes::Decimal::division(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("1", std::string(buf)); + + // same precision, L scale < R scale, result.scale < (r.scale-l.scale) + // both negative values + l.s128Value = -22; + r.s128Value = -1900; + result.s128Value = 0; + + datatypes::Decimal::division(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("9", std::string(buf)); + + // same precision, L scale < R scale, result.scale < (r.scale-l.scale) + // +- values + l.s128Value = 22; + r.s128Value = -1900; + result.s128Value = 0; + + datatypes::Decimal::division(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-9", std::string(buf)); + + // same precision, L scale < R scale, result.scale < (r.scale-l.scale) + // R = 0 + l.s128Value = 42; + r.s128Value = 0; + result.s128Value = 0; + + datatypes::Decimal::division(r, l, result); + EXPECT_EQ(0, result.s128Value); + + // same precision, L scale < R scale, result.scale < (r.scale-l.scale) + // both MAX positive values + utils::int128Max(l.s128Value); + utils::int128Max(r.s128Value); + result.s128Value = 0; + + datatypes::Decimal::division(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0", std::string(buf)); + + // same precision, L scale < R scale, result.scale < (r.scale-l.scale) + // both MIN negative values + utils::int128Min(l.s128Value); utils::int128Min(r.s128Value); - EXPECT_EQ(38, result.scale); - EXPECT_EQ(38, result.precision); - s128Result = l.s128Value*s128ScaleMultiplier1/r.s128Value; - // WIP - // EXPECT_EQ(s128Result, result.s128Value); - //EXPECT_EQ(r.s128Value, result.s128Value); + result.s128Value = 0; + + datatypes::Decimal::division(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0", std::string(buf)); } -void doDiv(execplan::IDB_Decimal& l, - execplan::IDB_Decimal& r, - execplan::IDB_Decimal& result) +void doDiv(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, + execplan::IDB_Decimal& result) { datatypes::Decimal::division(l, r, result); } TEST(Decimal, divisionWithOverflowCheck) { - - // Divide max int128 by -1 + execplan::CalpontSystemCatalog::ColType ct; + ct.colDataType = execplan::CalpontSystemCatalog::DECIMAL; + char buf[42]; + // Divide min int128 by -1 execplan::IDB_Decimal l, r, result; l.scale = 0; l.precision = 38; - l.s128Value = datatypes::Decimal::maxInt128; + l.s128Value = datatypes::Decimal::minInt128; r.scale = 0; r.precision = 38; @@ -636,71 +485,63 @@ TEST(Decimal, divisionWithOverflowCheck) result.precision = 38; result.s128Value = 42; EXPECT_THROW(doDiv(l, r, result), logging::OperationOverflowExcept); - // Divide two ints one of which overflows after the scaling. - l.scale = 0; - l.precision = 38; - l.s128Value = datatypes::Decimal::maxInt128; + // Divide two ints one of which overflows after the scaling. + // TODO We currently do not test overflow due to scaling in + // case of division. Re-enable this test when we check for overflow + /*l.s128Value = datatypes::Decimal::maxInt128; r.scale = 1; - r.precision = 38; r.s128Value = 42; result.scale = 1; - result.precision = 38; result.s128Value = 42; - EXPECT_THROW(doDiv(l, r, result), logging::OperationOverflowExcept); + EXPECT_THROW(doDiv(l, r, result), logging::OperationOverflowExcept);*/ + // Normal execution w/o overflow l.scale = 0; - l.precision = 38; - l.s128Value = datatypes::Decimal::maxInt128-1; + l.s128Value = datatypes::Decimal::maxInt128 - 1; r.scale = 0; - r.precision = 38; r.s128Value = 0xFFFFFFFFFFFFFFFF; result.scale = 0; - result.precision = 38; result.s128Value = 0; EXPECT_NO_THROW(doDiv(l, r, result)); - - l.s128Value /= r.s128Value; - - EXPECT_EQ(0, result.scale); - EXPECT_EQ(38, result.precision); - EXPECT_EQ(l.s128Value, result.s128Value); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("9223372036854775809", std::string(buf)); } -void doAdd(execplan::IDB_Decimal& l, - execplan::IDB_Decimal& r, - execplan::IDB_Decimal& result) +void doAdd(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, + execplan::IDB_Decimal& result) { datatypes::Decimal::addition(l, r, result); } TEST(Decimal, additionWithOverflowCheck) { + execplan::CalpontSystemCatalog::ColType ct; + ct.colDataType = execplan::CalpontSystemCatalog::DECIMAL; + char buf[42]; // Add two max ints execplan::IDB_Decimal l, r, result; l.scale = 0; l.precision = 38; - l.s128Value = datatypes::Decimal::maxInt128-1; + l.s128Value = datatypes::Decimal::maxInt128 - 1; r.scale = 0; r.precision = 38; - r.s128Value = datatypes::Decimal::maxInt128-1; + r.s128Value = datatypes::Decimal::maxInt128 - 1; result.scale = 0; result.precision = 38; result.s128Value = 42; EXPECT_THROW(doAdd(l, r, result), logging::OperationOverflowExcept); - // Add two ints one of which overflows after the scaling. - l.scale = 0; - l.precision = 38; - l.s128Value = datatypes::Decimal::maxInt128-1; + // Add two ints one of which overflows after the scaling. + l.s128Value = datatypes::Decimal::maxInt128 - 1; r.scale = 1; - r.precision = 38; r.s128Value = 0xFFFFFFFFFFFFFFFF; result.scale = 1; @@ -708,36 +549,407 @@ TEST(Decimal, additionWithOverflowCheck) result.s128Value = 0; EXPECT_THROW(doAdd(l, r, result), logging::OperationOverflowExcept); + // Normal execution w/o overflow l.scale = 0; - l.precision = 38; - l.s128Value = datatypes::Decimal::maxInt128-1; + l.s128Value = datatypes::Decimal::minInt128; r.scale = 0; - r.precision = 38; r.s128Value = 0xFFFFFFFFFFFFFFFF; - + result.scale = 0; + result.s128Value = 0; + + EXPECT_NO_THROW(doAdd(l, r, result)); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-170141183460469231713240559642174554113", std::string(buf)); +} + +TEST(Decimal, subtractionNoOverflowCheck) +{ + execplan::CalpontSystemCatalog::ColType ct; + ct.colDataType = execplan::CalpontSystemCatalog::DECIMAL; + char buf[42]; + // Subtractio w/o overflow check + execplan::IDB_Decimal l, r, result; + // same precision, same scale, both positive values + l.scale = 38; + l.precision = 38; + l.s128Value = 42; + + r.scale = 38; + r.precision = 38; + r.s128Value = 420; + + result.scale = 38; result.precision = 38; result.s128Value = 0; - EXPECT_NO_THROW(doDiv(l, r, result)); + datatypes::Decimal::subtraction(l, r, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-0.00000000000000000000000000000000000378", std::string(buf)); - l.s128Value /= r.s128Value; + // same precision, same scale, both negative values + l.s128Value = -42; + r.s128Value = -420; + result.s128Value = 0; - EXPECT_EQ(0, result.scale); - EXPECT_EQ(38, result.precision); - EXPECT_EQ(l.s128Value, result.s128Value); + datatypes::Decimal::subtraction(l, r, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000000000000000000000000000378", std::string(buf)); + + // same precision, same scale, +- values + l.s128Value = 42; + r.s128Value = -420; + result.s128Value = 0; + + datatypes::Decimal::subtraction(l, r, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000000000000000000000000000462", std::string(buf)); + + // same precision, same scale, both 0 + l.s128Value = 0; + r.s128Value = 0; + result.s128Value = 0; + + datatypes::Decimal::subtraction(l, r, result); + EXPECT_EQ(0, result.s128Value); + + // different scale + // same precision, L scale > R scale, both positive values + l.scale = 38; + l.precision = 38; + l.s128Value = 42; + + r.scale = 15; + r.precision = 38; + r.s128Value = 420; + + result.scale = 38; + result.precision = 38; + result.s128Value = 0; + + datatypes::Decimal::subtraction(l, r, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-0.00000000000041999999999999999999999958", std::string(buf)); + + // same precision, L scale > R scale, both negative values + l.s128Value = -42; + r.s128Value = -420; + result.s128Value = 0; + + datatypes::Decimal::subtraction(l, r, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000041999999999999999999999958", std::string(buf)); + + // same precision, L scale > R scale, +- values + l.s128Value = 42; + r.s128Value = -420; + result.s128Value = 0; + + datatypes::Decimal::subtraction(l, r, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000042000000000000000000000042", std::string(buf)); + + // same precision, L scale > R scale, both 0 + l.s128Value = 0; + r.s128Value = 0; + result.s128Value = 0; + + datatypes::Decimal::subtraction(l, r, result); + EXPECT_EQ(0, result.s128Value); + + // same precision, L scale < R scale, both positive values + l.scale = 15; + l.precision = 38; + l.s128Value = 42; + + r.scale = 38; + r.precision = 38; + r.s128Value = 420; + + result.scale = 38; + result.precision = 38; + result.s128Value = 0; + + datatypes::Decimal::subtraction(l, r, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000004199999999999999999999580", std::string(buf)); + + // same precision, L scale < R scale, both negative values + l.s128Value = -42; + r.s128Value = -420; + result.s128Value = 0; + + datatypes::Decimal::subtraction(l, r, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-0.00000000000004199999999999999999999580", std::string(buf)); + + // same precision, L scale < R scale, +- values + l.s128Value = 42; + r.s128Value = -420; + result.s128Value = 0; + + datatypes::Decimal::subtraction(l, r, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.00000000000004200000000000000000000420", std::string(buf)); + + // same precision, L scale < R scale, both 0 + l.s128Value = 0; + r.s128Value = 0; + result.s128Value = 0; + + datatypes::Decimal::subtraction(l, r, result); + EXPECT_EQ(0, result.s128Value); +} + +void doSubtract(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, + execplan::IDB_Decimal& result) +{ + datatypes::Decimal::subtraction(l, r, result); +} + +TEST(Decimal, subtractionWithOverflowCheck) +{ + execplan::CalpontSystemCatalog::ColType ct; + ct.colDataType = execplan::CalpontSystemCatalog::DECIMAL; + char buf[42]; + // Subtract a max int from a min int + execplan::IDB_Decimal l, r, result; + l.scale = 0; + l.precision = 38; + l.s128Value = datatypes::Decimal::minInt128 + 1; + + r.scale = 0; + r.precision = 38; + r.s128Value = datatypes::Decimal::maxInt128 - 1; + + result.scale = 0; + result.precision = 38; + result.s128Value = 42; + EXPECT_THROW(doSubtract(l, r, result), logging::OperationOverflowExcept); + + // Subtract two ints one of which overflows after the scaling. + l.s128Value = datatypes::Decimal::minInt128 + 1; + r.scale = 1; + r.s128Value = 0xFFFFFFFFFFFFFFFF; + + result.scale = 1; + result.precision = 38; + result.s128Value = 0; + + EXPECT_THROW(doSubtract(l, r, result), logging::OperationOverflowExcept); + + // Normal execution w/o overflow + l.scale = 0; + l.s128Value = datatypes::Decimal::maxInt128; + + r.scale = 0; + r.s128Value = 0xFFFFFFFFFFFFFFFF; + + result.scale = 0; + result.s128Value = 0; + + EXPECT_NO_THROW(doSubtract(l, r, result)); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("170141183460469231713240559642174554112", std::string(buf)); +} + +TEST(Decimal, multiplicationNoOverflowCheck) +{ + // Multiplication + // same precision, l.scale + r.scale = result.scale, both positive values + execplan::CalpontSystemCatalog::ColType ct; + ct.colDataType = execplan::CalpontSystemCatalog::DECIMAL; + char buf[42]; + execplan::IDB_Decimal l, r, result; + + l.scale = 19; + l.precision = 38; + l.s128Value = 4611686018427387904; + + r.scale = 19; + r.precision = 38; + r.s128Value = UINT64_MAX; + + result.scale = 38; + result.precision = 38; + result.s128Value = 0; + + datatypes::Decimal::multiplication(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.85070591730234615861231965839514664960", std::string(buf)); + + // same precision, l.scale + r.scale = result.scale, both negative values + l.s128Value = -4611686018427387904; + r.s128Value = UINT64_MAX; + r.s128Value = -r.s128Value; + result.s128Value = 0; + + datatypes::Decimal::multiplication(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.85070591730234615861231965839514664960", std::string(buf)); + + // same precision, l.scale + r.scale = result.scale, +- values + l.s128Value = -4611686018427387904; + r.s128Value = UINT64_MAX; + result.s128Value = 0; + + datatypes::Decimal::multiplication(l, r, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-0.85070591730234615861231965839514664960", std::string(buf)); + + // same precision, l.scale + r.scale = result.scale, l = 0 + l.s128Value = 0; + r.s128Value = UINT64_MAX; + result.s128Value = 0; + + datatypes::Decimal::multiplication(l, r, result); + EXPECT_EQ(0, result.s128Value); + + // same precision, l.scale + r.scale < result.scale, both positive values + l.scale = 18; + l.precision = 38; + l.s128Value = 72057594037927936; + + r.scale = 18; + r.precision = 38; + r.s128Value = 9223372036854775808ULL; + + result.scale = 38; + result.precision = 38; + result.s128Value = 0; + + datatypes::Decimal::multiplication(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.66461399789245793645190353014017228800", std::string(buf)); + + // same precision, l.scale + r.scale < result.scale, both negative values + l.s128Value = -72057594037927936; + r.s128Value = 9223372036854775808ULL; + r.s128Value = -r.s128Value; + result.s128Value = 0; + + datatypes::Decimal::multiplication(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.66461399789245793645190353014017228800", std::string(buf)); + + // same precision, l.scale + r.scale < result.scale, +- values + l.s128Value = -72057594037927936; + r.s128Value = 9223372036854775808ULL; + result.s128Value = 0; + + datatypes::Decimal::multiplication(l, r, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-0.66461399789245793645190353014017228800", std::string(buf)); + + // same precision, l.scale + r.scale < result.scale, l = 0 + l.s128Value = 0; + r.s128Value = 9223372036854775808ULL; + result.s128Value = 0; + + datatypes::Decimal::multiplication(l, r, result); + EXPECT_EQ(0, result.s128Value); + + // same precision, l.scale + r.scale > result.scale, both positive values + l.scale = 38; + l.precision = 38; + l.s128Value = (((int128_t)1234567890123456789ULL * 10000000000000000000ULL) + + 1234567890123456789); + + r.scale = 38; + r.precision = 38; + r.s128Value = (((int128_t)1234567890123456789ULL * 10000000000000000000ULL) + + 1234567890123456789ULL); + + result.scale = 38; + result.precision = 38; + result.s128Value = 0; + + datatypes::Decimal::multiplication(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.01524157875323883675019051998750190521", std::string(buf)); + + // same precision, l.scale + r.scale > result.scale, both negative values + l.s128Value = -l.s128Value; + r.s128Value = -r.s128Value; + result.s128Value = 0; + + datatypes::Decimal::multiplication(r, l, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("0.01524157875323883675019051998750190521", std::string(buf)); + + // same precision, l.scale + r.scale > result.scale, +- values + r.s128Value = -r.s128Value; + result.s128Value = 0; + + datatypes::Decimal::multiplication(l, r, result); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("-0.01524157875323883675019051998750190521", std::string(buf)); + + // same precision, l.scale + r.scale > result.scale, l = 0 + r.s128Value = 0; + result.s128Value = 0; + + datatypes::Decimal::multiplication(l, r, result); + EXPECT_EQ(0, result.s128Value); +} + +void doMultiply(const execplan::IDB_Decimal& l, + const execplan::IDB_Decimal& r, + execplan::IDB_Decimal& result) +{ + datatypes::Decimal::multiplication(l, r, result); } TEST(Decimal, multiplicationWithOverflowCheck) { - datatypes::MultiplicationOverflowCheck mul; - int128_t x = 42, y = 42, r = 0; - execplan::IDB_Decimal d; - EXPECT_NO_THROW(mul(x, y, r)); - EXPECT_EQ(x*y, r); + execplan::CalpontSystemCatalog::ColType ct; + ct.colDataType = execplan::CalpontSystemCatalog::DECIMAL; + char buf[42]; + execplan::IDB_Decimal l, r, result; + // result.scale >= l.scale + r.scale + l.scale = 0; + l.precision = 38; + l.s128Value = UINT64_MAX; - x = datatypes::Decimal::maxInt128, y = 42, r = 0; - EXPECT_THROW(mul(x, y, r), logging::OperationOverflowExcept); + r.scale = 0; + r.precision = 38; + r.s128Value = UINT64_MAX; + + result.scale = 0; + result.precision = 38; + result.s128Value = 42; + EXPECT_THROW(doMultiply(l, r, result), logging::OperationOverflowExcept); + + // result.scale < l.scale + r.scale + l.scale = 36; + l.precision = 38; + l.s128Value = (((int128_t)1234567890123456789ULL * 10000000000000000000ULL) + + 1234567890123456789); + + r.scale = 36; + r.precision = 38; + r.s128Value = (((int128_t)1234567890123456789ULL * 10000000000000000000ULL) + + 1234567890123456789); + + result.scale = 38; + result.precision = 38; + result.s128Value = 42; + EXPECT_THROW(doMultiply(l, r, result), logging::OperationOverflowExcept); + + // Normal execution w/o overflow + l.scale = 0; + l.s128Value = 4611686018427387904; + + r.scale = 0; + r.s128Value = 4611686018427387904; + + result.scale = 0; + result.s128Value = 0; + + EXPECT_NO_THROW(doMultiply(l, r, result)); + dataconvert::DataConvert::decimalToString(&result.s128Value, result.scale, buf, 42, ct.colDataType); + EXPECT_EQ("21267647932558653966460912964485513216", std::string(buf)); }