1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-4409 Moved static Decimal conversion methods into VDecimal class

MCOL-4409 This patch combines VDecimal and Decimal and makes
IDB_Decimal an alias for the result class

MCOL-4409 More boilerplate reduction in Func_mod

Removed couple TSInt128::toType() methods
This commit is contained in:
Roman Nozdrin
2020-11-24 15:28:38 +00:00
parent 2003417a89
commit 494bde61e1
27 changed files with 760 additions and 929 deletions

View File

@ -630,7 +630,7 @@ int TypeHandlerSLongDouble::storeValueToField(rowgroup::Row &row, int pos,
int TypeHandlerXDecimal::storeValueToField64(rowgroup::Row &row, int pos, int TypeHandlerXDecimal::storeValueToField64(rowgroup::Row &row, int pos,
StoreField *f) const StoreField *f) const
{ {
return f->store_decimal64(datatypes::VDecimal(row.getIntField(pos), return f->store_decimal64(datatypes::Decimal(row.getIntField(pos),
f->scale(), f->scale(),
f->precision())); f->precision()));
} }
@ -640,7 +640,7 @@ int TypeHandlerXDecimal::storeValueToField128(rowgroup::Row &row, int pos,
StoreField *f) const StoreField *f) const
{ {
int128_t* decPtr = row.getBinaryField<int128_t>(pos); int128_t* decPtr = row.getBinaryField<int128_t>(pos);
return f->store_decimal128(datatypes::VDecimal(0, return f->store_decimal128(datatypes::Decimal(0,
f->scale(), f->scale(),
f->precision(), f->precision(),
decPtr)); decPtr));
@ -782,7 +782,7 @@ TypeHandlerXDecimal::format128(const SimpleValue &v,
const const
{ {
idbassert(isValidXDecimal128(attr)); idbassert(isValidXDecimal128(attr));
datatypes::VDecimal dec(0, attr.scale, attr.precision, v.toSInt128()); datatypes::Decimal dec(0, attr.scale, attr.precision, v.toSInt128());
return dec.toString(true); return dec.toString(true);
} }

View File

@ -727,8 +727,8 @@ public:
virtual int store_float(float val) = 0; virtual int store_float(float val) = 0;
virtual int store_double(double val) = 0; virtual int store_double(double val) = 0;
virtual int store_long_double(long double val) = 0; virtual int store_long_double(long double val) = 0;
virtual int store_decimal64(const datatypes::VDecimal& dec) = 0; virtual int store_decimal64(const datatypes::Decimal& dec) = 0;
virtual int store_decimal128(const datatypes::VDecimal& dec) = 0; virtual int store_decimal128(const datatypes::Decimal& dec) = 0;
virtual int store_lob(const char *str, size_t length) = 0; virtual int store_lob(const char *str, size_t length) = 0;
}; };

View File

@ -49,9 +49,9 @@ namespace datatypes
template<typename BinaryOperation, template<typename BinaryOperation,
typename OpOverflowCheck, typename OpOverflowCheck,
typename MultiplicationOverflowCheck> typename MultiplicationOverflowCheck>
void addSubtractExecute(const VDecimal& l, void addSubtractExecute(const Decimal& l,
const VDecimal& r, const Decimal& r,
VDecimal& result, Decimal& result,
BinaryOperation op, BinaryOperation op,
OpOverflowCheck opOverflowCheck, OpOverflowCheck opOverflowCheck,
MultiplicationOverflowCheck mulOverflowCheck) MultiplicationOverflowCheck mulOverflowCheck)
@ -109,9 +109,9 @@ namespace datatypes
template<typename OpOverflowCheck, template<typename OpOverflowCheck,
typename MultiplicationOverflowCheck> typename MultiplicationOverflowCheck>
void divisionExecute(const VDecimal& l, void divisionExecute(const Decimal& l,
const VDecimal& r, const Decimal& r,
VDecimal& result, Decimal& result,
OpOverflowCheck opOverflowCheck, OpOverflowCheck opOverflowCheck,
MultiplicationOverflowCheck mulOverflowCheck) MultiplicationOverflowCheck mulOverflowCheck)
{ {
@ -148,9 +148,9 @@ namespace datatypes
template<typename OpOverflowCheck, template<typename OpOverflowCheck,
typename MultiplicationOverflowCheck> typename MultiplicationOverflowCheck>
void multiplicationExecute(const VDecimal& l, void multiplicationExecute(const Decimal& l,
const VDecimal& r, const Decimal& r,
VDecimal& result, Decimal& result,
OpOverflowCheck opOverflowCheck, OpOverflowCheck opOverflowCheck,
MultiplicationOverflowCheck mulOverflowCheck) MultiplicationOverflowCheck mulOverflowCheck)
{ {
@ -195,7 +195,7 @@ namespace datatypes
} }
} }
int Decimal::compare(const VDecimal& l, const VDecimal& r) int Decimal::compare(const Decimal& l, const Decimal& r)
{ {
int128_t divisorL, divisorR; int128_t divisorL, divisorR;
getScaleDivisor(divisorL, l.scale); getScaleDivisor(divisorL, l.scale);
@ -242,8 +242,8 @@ namespace datatypes
// no overflow check // no overflow check
template<> template<>
void Decimal::addition<int128_t, false>(const VDecimal& l, void Decimal::addition<int128_t, false>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
std::plus<int128_t> add; std::plus<int128_t> add;
NoOverflowCheck noOverflowCheck; NoOverflowCheck noOverflowCheck;
@ -252,8 +252,8 @@ namespace datatypes
// with overflow check // with overflow check
template<> template<>
void Decimal::addition<int128_t, true>(const VDecimal& l, void Decimal::addition<int128_t, true>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
std::plus<int128_t> add; std::plus<int128_t> add;
AdditionOverflowCheck overflowCheck; AdditionOverflowCheck overflowCheck;
@ -263,8 +263,8 @@ namespace datatypes
// no overflow check // no overflow check
template<> template<>
void Decimal::addition<int64_t, false>(const VDecimal& l, void Decimal::addition<int64_t, false>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
if (result.scale == l.scale && result.scale == r.scale) if (result.scale == l.scale && result.scale == r.scale)
{ {
@ -293,8 +293,8 @@ namespace datatypes
// with overflow check // with overflow check
template<> template<>
void Decimal::addition<int64_t, true>(const VDecimal& l, void Decimal::addition<int64_t, true>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
AdditionOverflowCheck additionOverflowCheck; AdditionOverflowCheck additionOverflowCheck;
MultiplicationOverflowCheck mulOverflowCheck; MultiplicationOverflowCheck mulOverflowCheck;
@ -328,8 +328,8 @@ namespace datatypes
// no overflow check // no overflow check
template<> template<>
void Decimal::subtraction<int128_t, false>(const VDecimal& l, void Decimal::subtraction<int128_t, false>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
std::minus<int128_t> subtract; std::minus<int128_t> subtract;
NoOverflowCheck noOverflowCheck; NoOverflowCheck noOverflowCheck;
@ -338,8 +338,8 @@ namespace datatypes
// with overflow check // with overflow check
template<> template<>
void Decimal::subtraction<int128_t, true>(const VDecimal& l, void Decimal::subtraction<int128_t, true>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
std::minus<int128_t> subtract; std::minus<int128_t> subtract;
SubtractionOverflowCheck overflowCheck; SubtractionOverflowCheck overflowCheck;
@ -349,8 +349,8 @@ namespace datatypes
// no overflow check // no overflow check
template<> template<>
void Decimal::subtraction<int64_t, false>(const VDecimal& l, void Decimal::subtraction<int64_t, false>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
if (result.scale == l.scale && result.scale == r.scale) if (result.scale == l.scale && result.scale == r.scale)
{ {
@ -379,8 +379,8 @@ namespace datatypes
// with overflow check // with overflow check
template<> template<>
void Decimal::subtraction<int64_t, true>(const VDecimal& l, void Decimal::subtraction<int64_t, true>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
SubtractionOverflowCheck subtractionOverflowCheck; SubtractionOverflowCheck subtractionOverflowCheck;
MultiplicationOverflowCheck mulOverflowCheck; MultiplicationOverflowCheck mulOverflowCheck;
@ -414,8 +414,8 @@ namespace datatypes
// no overflow check // no overflow check
template<> template<>
void Decimal::division<int128_t, false>(const VDecimal& l, void Decimal::division<int128_t, false>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
NoOverflowCheck noOverflowCheck; NoOverflowCheck noOverflowCheck;
divisionExecute(l, r, result, noOverflowCheck, noOverflowCheck); divisionExecute(l, r, result, noOverflowCheck, noOverflowCheck);
@ -423,8 +423,8 @@ namespace datatypes
// With overflow check // With overflow check
template<> template<>
void Decimal::division<int128_t, true>(const VDecimal& l, void Decimal::division<int128_t, true>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
DivisionOverflowCheck overflowCheck; DivisionOverflowCheck overflowCheck;
MultiplicationOverflowCheck mulOverflowCheck; MultiplicationOverflowCheck mulOverflowCheck;
@ -434,8 +434,8 @@ namespace datatypes
// no overflow check // no overflow check
// We rely on the zero check from ArithmeticOperator::execute // We rely on the zero check from ArithmeticOperator::execute
template<> template<>
void Decimal::division<int64_t, false>(const VDecimal& l, void Decimal::division<int64_t, false>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
if (result.scale >= l.scale - r.scale) if (result.scale >= l.scale - r.scale)
result.value = (int64_t)(( (l.value > 0 && r.value > 0) || (l.value < 0 && r.value < 0) ? result.value = (int64_t)(( (l.value > 0 && r.value > 0) || (l.value < 0 && r.value < 0) ?
@ -449,8 +449,8 @@ namespace datatypes
// With overflow check // With overflow check
template<> template<>
void Decimal::division<int64_t, true>(const VDecimal& l, void Decimal::division<int64_t, true>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
DivisionOverflowCheck divisionOverflowCheck; DivisionOverflowCheck divisionOverflowCheck;
@ -469,8 +469,8 @@ namespace datatypes
// no overflow check // no overflow check
template<> template<>
void Decimal::multiplication<int128_t, false>(const VDecimal& l, void Decimal::multiplication<int128_t, false>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
MultiplicationNoOverflowCheck noOverflowCheck; MultiplicationNoOverflowCheck noOverflowCheck;
multiplicationExecute(l, r, result, noOverflowCheck, noOverflowCheck); multiplicationExecute(l, r, result, noOverflowCheck, noOverflowCheck);
@ -478,8 +478,8 @@ namespace datatypes
// With overflow check // With overflow check
template<> template<>
void Decimal::multiplication<int128_t, true>(const VDecimal& l, void Decimal::multiplication<int128_t, true>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
MultiplicationOverflowCheck mulOverflowCheck; MultiplicationOverflowCheck mulOverflowCheck;
multiplicationExecute(l, r, result, mulOverflowCheck, mulOverflowCheck); multiplicationExecute(l, r, result, mulOverflowCheck, mulOverflowCheck);
@ -487,8 +487,8 @@ namespace datatypes
// no overflow check // no overflow check
template<> template<>
void Decimal::multiplication<int64_t, false>(const VDecimal& l, void Decimal::multiplication<int64_t, false>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
if (result.scale >= l.scale + r.scale) if (result.scale >= l.scale + r.scale)
result.value = l.value * r.value * mcs_pow_10[result.scale - (l.scale + r.scale)]; result.value = l.value * r.value * mcs_pow_10[result.scale - (l.scale + r.scale)];
@ -500,8 +500,8 @@ namespace datatypes
// With overflow check // With overflow check
template<> template<>
void Decimal::multiplication<int64_t, true>(const VDecimal& l, void Decimal::multiplication<int64_t, true>(const Decimal& l,
const VDecimal& r, VDecimal& result) const Decimal& r, Decimal& result)
{ {
MultiplicationOverflowCheck mulOverflowCheck; MultiplicationOverflowCheck mulOverflowCheck;
@ -521,7 +521,7 @@ namespace datatypes
} }
// Writes integer part of a Decimal using int128 argument provided // Writes integer part of a Decimal using int128 argument provided
uint8_t VDecimal::writeIntPart(const int128_t& x, uint8_t Decimal::writeIntPart(const int128_t& x,
char* buf, char* buf,
const uint8_t buflen) const const uint8_t buflen) const
{ {
@ -552,7 +552,7 @@ namespace datatypes
high = intPart / maxUint64divisor; high = intPart / maxUint64divisor;
break; break;
default: default:
throw logging::QueryDataExcept("VDecimal::writeIntPart() bad scale", throw logging::QueryDataExcept("Decimal::writeIntPart() bad scale",
logging::formatErr); logging::formatErr);
} }
@ -560,14 +560,14 @@ namespace datatypes
uint8_t written = p - buf; uint8_t written = p - buf;
if (buflen <= written) if (buflen <= written)
{ {
throw logging::QueryDataExcept("VDecimal::writeIntPart() char buffer overflow.", throw logging::QueryDataExcept("Decimal::writeIntPart() char buffer overflow.",
logging::formatErr); logging::formatErr);
} }
return written; return written;
} }
uint8_t VDecimal::writeFractionalPart(const int128_t& x, uint8_t Decimal::writeFractionalPart(const int128_t& x,
char* buf, char* buf,
const uint8_t buflen) const const uint8_t buflen) const
{ {
@ -605,7 +605,7 @@ namespace datatypes
// The method writes Decimal based on TSInt128 with scale provided. // The method writes Decimal based on TSInt128 with scale provided.
// It first writes sign, then extracts integer part // It first writes sign, then extracts integer part
// prints delimiter and then decimal part. // prints delimiter and then decimal part.
std::string VDecimal::toStringTSInt128WithScale() const std::string Decimal::toStringTSInt128WithScale() const
{ {
char buf[Decimal::MAXLENGTH16BYTES]; char buf[Decimal::MAXLENGTH16BYTES];
uint8_t left = sizeof(buf); uint8_t left = sizeof(buf);
@ -633,13 +633,13 @@ namespace datatypes
uint8_t written = p - buf; uint8_t written = p - buf;
if (sizeof(buf) <= written) if (sizeof(buf) <= written)
{ {
throw logging::QueryDataExcept("VDecimal::toString() char buffer overflow.", throw logging::QueryDataExcept("Decimal::toString() char buffer overflow.",
logging::formatErr); logging::formatErr);
} }
return std::string(buf); return std::string(buf);
} }
std::string VDecimal::toStringTSInt64() const std::string Decimal::toStringTSInt64() const
{ {
char buf[Decimal::MAXLENGTH8BYTES]; char buf[Decimal::MAXLENGTH8BYTES];
// Need 19 digits maxium to hold a sum result of 18 digits decimal column. // Need 19 digits maxium to hold a sum result of 18 digits decimal column.
@ -706,7 +706,7 @@ namespace datatypes
} }
// Dispatcher method for toString() implementations // Dispatcher method for toString() implementations
std::string VDecimal::toString(bool hasTSInt128) const std::string Decimal::toString(bool hasTSInt128) const
{ {
// There must be no empty at this point though // There must be no empty at this point though
if (isNull()) if (isNull())
@ -730,7 +730,7 @@ namespace datatypes
return std::to_string(value); return std::to_string(value);
} }
std::ostream& operator<<(std::ostream& os, const VDecimal& dec) std::ostream& operator<<(std::ostream& os, const Decimal& dec)
{ {
os << dec.toString(); os << dec.toString();
return os; return os;

View File

@ -26,11 +26,13 @@
#include "widedecimalutils.h" #include "widedecimalutils.h"
#include "mcs_int128.h" #include "mcs_int128.h"
#include "mcs_float128.h" #include "mcs_float128.h"
#include "checks.h"
#include "branchpred.h"
namespace datatypes namespace datatypes
{ {
class VDecimal; class Decimal;
} }
// A class by Fabio Fernandes pulled off of stackoverflow // A class by Fabio Fernandes pulled off of stackoverflow
@ -158,21 +160,16 @@ inline void getScaleDivisor(T& divisor, const int8_t scale)
} }
} }
// @brief The class for Decimal related operations
/** // The class contains Decimal related operations are scale and
@brief Contains subset of decimal related operations. // precision aware.
// This class will inherit from:
Most of the methods are static to allow to call them from // DecimalMeta class that stores scale and precision
the existing code. // Storage classes TSInt128 and int64
The main purpose of the class is to collect methods for a // !!! There are some static classes that will exists during transition period.
base Datatype class. class Decimal: public TSInt128
*/
class Decimal
{ {
public: public:
Decimal() { };
~Decimal() { };
static constexpr uint8_t MAXLENGTH16BYTES = TSInt128::maxLength(); static constexpr uint8_t MAXLENGTH16BYTES = TSInt128::maxLength();
static constexpr uint8_t MAXLENGTH8BYTES = 23; static constexpr uint8_t MAXLENGTH8BYTES = 23;
@ -200,44 +197,44 @@ class Decimal
static constexpr int128_t maxInt128 = (int128_t(0x7FFFFFFFFFFFFFFFLL) << 64) + 0xFFFFFFFFFFFFFFFFLL; static constexpr int128_t maxInt128 = (int128_t(0x7FFFFFFFFFFFFFFFLL) << 64) + 0xFFFFFFFFFFFFFFFFLL;
/** /**
@brief Compares two VDecimal taking scale into account. @brief Compares two Decimal taking scale into account.
*/ */
static int compare(const VDecimal& l, const VDecimal& r); static int compare(const Decimal& l, const Decimal& r);
/** /**
@brief Addition template that supports overflow check and @brief Addition template that supports overflow check and
two internal representations of decimal. two internal representations of decimal.
*/ */
template<typename T, bool overflow> template<typename T, bool overflow>
static void addition(const VDecimal& l, static void addition(const Decimal& l,
const VDecimal& r, const Decimal& r,
VDecimal& result); Decimal& result);
/** /**
@brief Subtraction template that supports overflow check and @brief Subtraction template that supports overflow check and
two internal representations of decimal. two internal representations of decimal.
*/ */
template<typename T, bool overflow> template<typename T, bool overflow>
static void subtraction(const VDecimal& l, static void subtraction(const Decimal& l,
const VDecimal& r, const Decimal& r,
VDecimal& result); Decimal& result);
/** /**
@brief Division template that supports overflow check and @brief Division template that supports overflow check and
two internal representations of decimal. two internal representations of decimal.
*/ */
template<typename T, bool overflow> template<typename T, bool overflow>
static void division(const VDecimal& l, static void division(const Decimal& l,
const VDecimal& r, const Decimal& r,
VDecimal& result); Decimal& result);
/** /**
@brief Multiplication template that supports overflow check and @brief Multiplication template that supports overflow check and
two internal representations of decimal. two internal representations of decimal.
*/ */
template<typename T, bool overflow> template<typename T, bool overflow>
static void multiplication(const VDecimal& l, static void multiplication(const Decimal& l,
const VDecimal& r, const Decimal& r,
VDecimal& result); Decimal& result);
/** /**
@brief The method detects whether decimal type is wide @brief The method detects whether decimal type is wide
@ -249,85 +246,6 @@ class Decimal
&& precision <= INT128MAXPRECISION; && precision <= INT128MAXPRECISION;
} }
/**
@brief The method converts a __float128 value to an int64_t.
*/
static inline int64_t getInt64FromFloat128(const __float128& value)
{
if (value > static_cast<__float128>(INT64_MAX))
return INT64_MAX;
else if (value < static_cast<__float128>(INT64_MIN))
return INT64_MIN;
return static_cast<int64_t>(value);
}
/**
@brief The method converts a __float128 value to an uint64_t.
*/
static inline uint64_t getUInt64FromFloat128(const __float128& value)
{
if (value > static_cast<__float128>(UINT64_MAX))
return UINT64_MAX;
else if (value < 0)
return 0;
return static_cast<uint64_t>(value);
}
/**
@brief The method converts a wide decimal value to an uint32_t.
*/
static inline uint32_t getUInt32FromWideDecimal(const int128_t& value)
{
if (value > static_cast<int128_t>(UINT32_MAX))
return UINT32_MAX;
else if (value < 0)
return 0;
return static_cast<uint32_t>(value);
}
/**
@brief The method converts a wide decimal value to an int32_t.
*/
static inline int32_t getInt32FromWideDecimal(const int128_t& value)
{
if (value > static_cast<int128_t>(INT32_MAX))
return INT32_MAX;
else if (value < static_cast<int128_t>(INT32_MIN))
return INT32_MIN;
return static_cast<int32_t>(value);
}
/**
@brief The method converts a wide decimal value to an uint64_t.
*/
static inline uint64_t getUInt64FromWideDecimal(const int128_t& value)
{
if (value > static_cast<int128_t>(UINT64_MAX))
return UINT64_MAX;
else if (value < 0)
return 0;
return static_cast<uint64_t>(value);
}
/**
@brief The method converts a wide decimal value to an int64_t,
saturating the value if necessary.
*/
static inline int64_t getInt64FromWideDecimal(const int128_t& value)
{
if (value > static_cast<int128_t>(INT64_MAX))
return INT64_MAX;
else if (value < static_cast<int128_t>(INT64_MIN))
return INT64_MIN;
return static_cast<int64_t>(value);
}
/** /**
@brief MDB increases scale by up to 4 digits calculating avg() @brief MDB increases scale by up to 4 digits calculating avg()
*/ */
@ -340,7 +258,461 @@ class Decimal
scale += (scaleAvailable >= MAXSCALEINC4AVG) ? MAXSCALEINC4AVG : scaleAvailable; scale += (scaleAvailable >= MAXSCALEINC4AVG) ? MAXSCALEINC4AVG : scaleAvailable;
precision += (precisionAvailable >= MAXSCALEINC4AVG) ? MAXSCALEINC4AVG : precisionAvailable; precision += (precisionAvailable >= MAXSCALEINC4AVG) ? MAXSCALEINC4AVG : precisionAvailable;
} }
};
Decimal(): value(0), scale(0), precision(0)
{
}
Decimal(int64_t val, int8_t s, uint8_t p, const int128_t &val128 = 0) :
TSInt128(val128),
value(val),
scale(s),
precision(p)
{ }
Decimal(int64_t unused, int8_t s, uint8_t p, const int128_t* val128Ptr) :
TSInt128(val128Ptr),
value(unused),
scale(s),
precision(p)
{ }
Decimal(const TSInt128& val128, int8_t s, uint8_t p) :
TSInt128(val128),
value(0),
scale(s),
precision(p)
{ }
int decimalComp(const Decimal& d) const
{
lldiv_t d1 = lldiv(value, static_cast<int64_t>(mcs_pow_10[scale]));
lldiv_t d2 = lldiv(d.value, static_cast<int64_t>(mcs_pow_10[d.scale]));
int ret = 0;
if (d1.quot > d2.quot)
{
ret = 1;
}
else if (d1.quot < d2.quot)
{
ret = -1;
}
else
{
// rem carries the value's sign, but needs to be normalized.
int64_t s = scale - d.scale;
if (s < 0)
{
if ((d1.rem * static_cast<int64_t>(mcs_pow_10[-s])) > d2.rem)
ret = 1;
else if ((d1.rem * static_cast<int64_t>(mcs_pow_10[-s])) < d2.rem)
ret = -1;
}
else
{
if (d1.rem > (d2.rem * static_cast<int64_t>(mcs_pow_10[s])))
ret = 1;
else if (d1.rem < (d2.rem * static_cast<int64_t>(mcs_pow_10[s])))
ret = -1;
}
}
return ret;
}
inline TSInt128 toTSInt128() const
{
return TSInt128(s128Value);
}
inline TFloat128 toTFloat128() const
{
return TFloat128(s128Value);
}
inline double toDouble() const
{
int128_t scaleDivisor;
getScaleDivisor(scaleDivisor, scale);
datatypes::TFloat128 tmpval((__float128) s128Value / scaleDivisor);
return static_cast<double>(tmpval);
}
inline operator double() const
{
return toDouble();
}
inline long double toLongDouble() const
{
datatypes::TFloat128 y(s128Value);
return static_cast<long double>(y);
}
// This method returns integral part as a TSInt128 and
// fractional part as a TFloat128
inline std::pair<TSInt128, TFloat128> getIntegralAndDividedFractional() const
{
int128_t scaleDivisor;
getScaleDivisor(scaleDivisor, scale);
return std::make_pair(TSInt128(s128Value / scaleDivisor),
TFloat128((__float128)(s128Value % scaleDivisor) / scaleDivisor));
}
// This method returns integral part as a TSInt128 and
// fractional part as a TFloat128
inline std::pair<TSInt128, TSInt128> getIntegralAndFractional() const
{
int128_t scaleDivisor;
getScaleDivisor(scaleDivisor, scale);
return std::make_pair(TSInt128(s128Value / scaleDivisor),
TSInt128(s128Value % scaleDivisor));
}
inline std::tuple<TSInt128, TSInt128, TSInt128> getIntegralFractionalAndDivisor() const
{
int128_t scaleDivisor;
getScaleDivisor(scaleDivisor, scale);
return std::make_tuple(TSInt128(s128Value / scaleDivisor),
TSInt128(s128Value % scaleDivisor),
TSInt128(scaleDivisor));
}
inline TSInt128 getIntegralPart() const
{
int128_t scaleDivisor = 0;
if(LIKELY(utils::is_nonnegative(scale)))
{
return TSInt128(getIntegralPartNonNegativeScale(scaleDivisor));
}
return TSInt128(getIntegralPartNegativeScale(scaleDivisor));
}
// !!! This is a very hevyweight rouding style
// Argument determines if it is a ceil
// rounding or not. 0 - ceil rounding
inline TSInt128 getRoundedIntegralPart(const uint8_t roundingFactor = 0) const
{
int128_t flooredScaleDivisor = 0;
int128_t roundedValue = getIntegralPartNonNegativeScale(flooredScaleDivisor);
int128_t ceiledScaleDivisor = (flooredScaleDivisor <= 10) ? 1 : (flooredScaleDivisor / 10);
int128_t leftO = (s128Value - roundedValue * flooredScaleDivisor) / ceiledScaleDivisor;
if (leftO > roundingFactor)
{
roundedValue++;
}
return TSInt128(roundedValue);
}
inline TSInt128 getPosNegRoundedIntegralPart(const uint8_t roundingFactor = 0) const
{
int128_t flooredScaleDivisor = 0;
int128_t roundedValue = getIntegralPartNonNegativeScale(flooredScaleDivisor);
int128_t ceiledScaleDivisor = (flooredScaleDivisor <= 10) ? 1 : (flooredScaleDivisor / 10);
int128_t leftO = (s128Value - roundedValue * flooredScaleDivisor) / ceiledScaleDivisor;
if (utils::is_nonnegative(roundedValue) && leftO > roundingFactor)
{
roundedValue++;
}
if (utils::is_negative(roundedValue) && leftO < -roundingFactor)
{
roundedValue--;
}
return TSInt128(roundedValue);
}
// MOD operator for an integer divisor to be used
// for integer rhs
inline TSInt128 operator%(const TSInt128& div) const
{
if (!isScaled())
{
return s128Value % div.getValue();
}
// Scale the value and calculate
// (LHS.value % RHS.value) * LHS.scaleMultiplier + LHS.scale_div_remainder
auto integralFractionalDivisor = getIntegralFractionalAndDivisor();
return (std::get<0>(integralFractionalDivisor) % div.getValue()) * std::get<2>(integralFractionalDivisor) + std::get<1>(integralFractionalDivisor);
}
bool operator==(const Decimal& rhs) const
{
if (precision > datatypes::INT64MAXPRECISION &&
rhs.precision > datatypes::INT64MAXPRECISION)
{
if (scale == rhs.scale)
return s128Value == rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) == 0);
}
else if (precision > datatypes::INT64MAXPRECISION &&
rhs.precision <= datatypes::INT64MAXPRECISION)
{
const_cast<Decimal&>(rhs).s128Value = rhs.value;
if (scale == rhs.scale)
return s128Value == rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) == 0);
}
else if (precision <= datatypes::INT64MAXPRECISION &&
rhs.precision > datatypes::INT64MAXPRECISION)
{
if (scale == rhs.scale)
return (int128_t) value == rhs.s128Value;
else
return (datatypes::Decimal::compare(Decimal(0, scale, precision, (int128_t) value), rhs) == 0);
}
else
{
if (scale == rhs.scale)
return value == rhs.value;
else
return (decimalComp(rhs) == 0);
}
}
bool operator>(const Decimal& rhs) const
{
if (precision > datatypes::INT64MAXPRECISION &&
rhs.precision > datatypes::INT64MAXPRECISION)
{
if (scale == rhs.scale)
return s128Value > rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) > 0);
}
else if (precision > datatypes::INT64MAXPRECISION &&
rhs.precision <= datatypes::INT64MAXPRECISION)
{
Decimal rhstmp(0, rhs.scale, rhs.precision, (int128_t) rhs.value);
if (scale == rhstmp.scale)
return s128Value > rhstmp.s128Value;
else
return (datatypes::Decimal::compare(*this, rhstmp) > 0);
}
else if (precision <= datatypes::INT64MAXPRECISION &&
rhs.precision > datatypes::INT64MAXPRECISION)
{
if (scale == rhs.scale)
return (int128_t) value > rhs.s128Value;
else
return (datatypes::Decimal::compare(Decimal(0, scale, precision, (int128_t) value), rhs) > 0);
}
else
{
if (scale == rhs.scale)
return value > rhs.value;
else
return (decimalComp(rhs) > 0);
}
}
bool operator<(const Decimal& rhs) const
{
if (precision > datatypes::INT64MAXPRECISION &&
rhs.precision > datatypes::INT64MAXPRECISION)
{
if (scale == rhs.scale)
return s128Value < rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) < 0);
}
else if (precision > datatypes::INT64MAXPRECISION &&
rhs.precision <= datatypes::INT64MAXPRECISION)
{
Decimal rhstmp(0, rhs.scale, rhs.precision, (int128_t) rhs.value);
if (scale == rhstmp.scale)
return s128Value < rhstmp.s128Value;
else
return (datatypes::Decimal::compare(*this, rhstmp) < 0);
}
else if (precision <= datatypes::INT64MAXPRECISION &&
rhs.precision > datatypes::INT64MAXPRECISION)
{
if (scale == rhs.scale)
return (int128_t) value < rhs.s128Value;
else
return (datatypes::Decimal::compare(Decimal(0, scale, precision, (int128_t) value), rhs) < 0);
}
else
{
if (scale == rhs.scale)
return value < rhs.value;
else
return (decimalComp(rhs) < 0);
}
}
bool operator>=(const Decimal& rhs) const
{
if (precision > datatypes::INT64MAXPRECISION &&
rhs.precision > datatypes::INT64MAXPRECISION)
{
if (scale == rhs.scale)
return s128Value >= rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) >= 0);
}
else if (precision > datatypes::INT64MAXPRECISION &&
rhs.precision <= datatypes::INT64MAXPRECISION)
{
Decimal rhstmp(0, rhs.scale, rhs.precision, (int128_t) rhs.value);
if (scale == rhstmp.scale)
return s128Value >= rhstmp.s128Value;
else
return (datatypes::Decimal::compare(*this, rhstmp) >= 0);
}
else if (precision <= datatypes::INT64MAXPRECISION &&
rhs.precision > datatypes::INT64MAXPRECISION)
{
if (scale == rhs.scale)
return (int128_t) value >= rhs.s128Value;
else
return (datatypes::Decimal::compare(Decimal(0, scale, precision, (int128_t) value), rhs) >= 0);
}
else
{
if (scale == rhs.scale)
return value >= rhs.value;
else
return (decimalComp(rhs) >= 0);
}
}
bool operator<=(const Decimal& rhs) const
{
if (precision > datatypes::INT64MAXPRECISION &&
rhs.precision > datatypes::INT64MAXPRECISION)
{
if (scale == rhs.scale)
return s128Value <= rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) <= 0);
}
else if (precision > datatypes::INT64MAXPRECISION &&
rhs.precision <= datatypes::INT64MAXPRECISION)
{
Decimal rhstmp(0, rhs.scale, rhs.precision, (int128_t) rhs.value);
if (scale == rhstmp.scale)
return s128Value <= rhstmp.s128Value;
else
return (datatypes::Decimal::compare(*this, rhstmp) <= 0);
}
else if (precision <= datatypes::INT64MAXPRECISION &&
rhs.precision > datatypes::INT64MAXPRECISION)
{
if (scale == rhs.scale)
return (int128_t) value <= rhs.s128Value;
else
return (datatypes::Decimal::compare(Decimal(0, scale, precision, (int128_t) value), rhs) <= 0);
}
else
{
if (scale == rhs.scale)
return value <= rhs.value;
else
return (decimalComp(rhs) <= 0);
}
}
bool operator!=(const Decimal& rhs) const
{
if (precision > datatypes::INT64MAXPRECISION &&
rhs.precision > datatypes::INT64MAXPRECISION)
{
if (scale == rhs.scale)
return s128Value != rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) != 0);
}
else if (precision > datatypes::INT64MAXPRECISION &&
rhs.precision <= datatypes::INT64MAXPRECISION)
{
Decimal rhstmp(0, rhs.scale, rhs.precision, (int128_t) rhs.value);
if (scale == rhstmp.scale)
return s128Value != rhstmp.s128Value;
else
return (datatypes::Decimal::compare(*this, rhstmp) != 0);
}
else if (precision <= datatypes::INT64MAXPRECISION &&
rhs.precision > datatypes::INT64MAXPRECISION)
{
if (scale == rhs.scale)
return (int128_t) value != rhs.s128Value;
else
return (datatypes::Decimal::compare(Decimal(0, scale, precision, (int128_t) value), rhs) != 0);
}
else
{
if (scale == rhs.scale)
return value != rhs.value;
else
return (decimalComp(rhs) != 0);
}
}
inline bool isTSInt128ByPrecision() const
{
return precision > INT64MAXPRECISION
&& precision <= INT128MAXPRECISION;
}
inline bool isScaled() const
{
return scale != 0;
}
// hasTSInt128 explicitly tells to print int128 out in cases
// where precision can't detect decimal type properly, e.g.
// DECIMAL(10)/DECIMAL(38)
std::string toString(bool hasTSInt128 = false) const;
friend std::ostream& operator<<(std::ostream& os, const Decimal& dec);
int64_t value;
int8_t scale; // 0~38
uint8_t precision; // 1~38
// STRICTLY for unit tests!!!
void setTSInt64Value(const int64_t x) { value = x; }
void setTSInt128Value(const int128_t& x) { s128Value = x; }
void setScale(const uint8_t x) { scale = x; }
private:
uint8_t writeIntPart(const int128_t& x,
char* buf,
const uint8_t buflen) const;
uint8_t writeFractionalPart(const int128_t& x,
char* buf,
const uint8_t buflen) const;
std::string toStringTSInt128WithScale() const;
std::string toStringTSInt64() const;
inline int128_t getIntegralPartNonNegativeScale(int128_t& scaleDivisor) const
{
getScaleDivisor(scaleDivisor, scale);
return s128Value / scaleDivisor;
}
inline int128_t getIntegralPartNegativeScale(int128_t& scaleDivisor) const
{
getScaleDivisor(scaleDivisor, -scale);
// Calls for overflow check
return s128Value * scaleDivisor;
}
}; //end of Decimal
/** /**
@brief The structure contains an overflow check for int128 @brief The structure contains an overflow check for int128
@ -480,330 +852,5 @@ struct NoOverflowCheck {
} }
}; };
/**
* @brief VDecimal type
*
*/
class VDecimal: public TSInt128
{
public:
VDecimal(): value(0), scale(0), precision(0)
{
}
VDecimal(int64_t val, int8_t s, uint8_t p, const int128_t &val128 = 0) :
TSInt128(val128),
value(val),
scale(s),
precision(p)
{ }
VDecimal(int64_t unused, int8_t s, uint8_t p, const int128_t* val128Ptr) :
TSInt128(val128Ptr),
value(unused),
scale(s),
precision(p)
{ }
int decimalComp(const VDecimal& d) const
{
lldiv_t d1 = lldiv(value, static_cast<int64_t>(mcs_pow_10[scale]));
lldiv_t d2 = lldiv(d.value, static_cast<int64_t>(mcs_pow_10[d.scale]));
int ret = 0;
if (d1.quot > d2.quot)
{
ret = 1;
}
else if (d1.quot < d2.quot)
{
ret = -1;
}
else
{
// rem carries the value's sign, but needs to be normalized.
int64_t s = scale - d.scale;
if (s < 0)
{
if ((d1.rem * static_cast<int64_t>(mcs_pow_10[-s])) > d2.rem)
ret = 1;
else if ((d1.rem * static_cast<int64_t>(mcs_pow_10[-s])) < d2.rem)
ret = -1;
}
else
{
if (d1.rem > (d2.rem * static_cast<int64_t>(mcs_pow_10[s])))
ret = 1;
else if (d1.rem < (d2.rem * static_cast<int64_t>(mcs_pow_10[s])))
ret = -1;
}
}
return ret;
}
inline TSInt128 toTSInt128() const
{
return TSInt128(s128Value);
}
inline TFloat128 toTFloat128() const
{
return TFloat128(s128Value);
}
inline double toDouble() const
{
int128_t scaleDivisor;
getScaleDivisor(scaleDivisor, scale);
datatypes::TFloat128 tmpval((__float128) s128Value / scaleDivisor);
return static_cast<double>(tmpval);
}
inline operator double() const
{
return toDouble();
}
bool operator==(const VDecimal& rhs) const
{
if (isTSInt128ByPrecision() && rhs.isTSInt128ByPrecision())
{
if (scale == rhs.scale)
return s128Value == rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) == 0);
}
else if (isTSInt128ByPrecision() && !rhs.isTSInt128ByPrecision())
{
const_cast<VDecimal&>(rhs).s128Value = rhs.value;
if (scale == rhs.scale)
return s128Value == rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) == 0);
}
else if (!isTSInt128ByPrecision() && rhs.isTSInt128ByPrecision())
{
if (scale == rhs.scale)
return (int128_t) value == rhs.s128Value;
else
return (datatypes::Decimal::compare(VDecimal(0, scale, precision, (int128_t) value), rhs) == 0);
}
else
{
if (scale == rhs.scale)
return value == rhs.value;
else
return (decimalComp(rhs) == 0);
}
}
bool operator>(const VDecimal& rhs) const
{
if (isTSInt128ByPrecision() && rhs.isTSInt128ByPrecision())
{
if (scale == rhs.scale)
return s128Value > rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) > 0);
}
else if (isTSInt128ByPrecision() && !rhs.isTSInt128ByPrecision())
{
VDecimal rhstmp(0, rhs.scale, rhs.precision, (int128_t) rhs.value);
if (scale == rhstmp.scale)
return s128Value > rhstmp.s128Value;
else
return (datatypes::Decimal::compare(*this, rhstmp) > 0);
}
else if (!isTSInt128ByPrecision() && rhs.isTSInt128ByPrecision())
{
if (scale == rhs.scale)
return (int128_t) value > rhs.s128Value;
else
return (datatypes::Decimal::compare(VDecimal(0, scale, precision, (int128_t) value), rhs) > 0);
}
else
{
if (scale == rhs.scale)
return value > rhs.value;
else
return (decimalComp(rhs) > 0);
}
}
bool operator<(const VDecimal& rhs) const
{
if (isTSInt128ByPrecision() && rhs.isTSInt128ByPrecision())
{
if (scale == rhs.scale)
return s128Value < rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) < 0);
}
else if (isTSInt128ByPrecision() && !rhs.isTSInt128ByPrecision())
{
VDecimal rhstmp(0, rhs.scale, rhs.precision, (int128_t) rhs.value);
if (scale == rhstmp.scale)
return s128Value < rhstmp.s128Value;
else
return (datatypes::Decimal::compare(*this, rhstmp) < 0);
}
else if (!isTSInt128ByPrecision() && rhs.isTSInt128ByPrecision())
{
if (scale == rhs.scale)
return (int128_t) value < rhs.s128Value;
else
return (datatypes::Decimal::compare(VDecimal(0, scale, precision, (int128_t) value), rhs) < 0);
}
else
{
if (scale == rhs.scale)
return value < rhs.value;
else
return (decimalComp(rhs) < 0);
}
}
bool operator>=(const VDecimal& rhs) const
{
if (isTSInt128ByPrecision() && rhs.isTSInt128ByPrecision())
{
if (scale == rhs.scale)
return s128Value >= rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) >= 0);
}
else if (isTSInt128ByPrecision() && !rhs.isTSInt128ByPrecision())
{
VDecimal rhstmp(0, rhs.scale, rhs.precision, (int128_t) rhs.value);
if (scale == rhstmp.scale)
return s128Value >= rhstmp.s128Value;
else
return (datatypes::Decimal::compare(*this, rhstmp) >= 0);
}
else if (!isTSInt128ByPrecision() && rhs.isTSInt128ByPrecision())
{
if (scale == rhs.scale)
return (int128_t) value >= rhs.s128Value;
else
return (datatypes::Decimal::compare(VDecimal(0, scale, precision, (int128_t) value), rhs) >= 0);
}
else
{
if (scale == rhs.scale)
return value >= rhs.value;
else
return (decimalComp(rhs) >= 0);
}
}
bool operator<=(const VDecimal& rhs) const
{
if (isTSInt128ByPrecision() && rhs.isTSInt128ByPrecision())
{
if (scale == rhs.scale)
return s128Value <= rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) <= 0);
}
else if (isTSInt128ByPrecision() && !rhs.isTSInt128ByPrecision())
{
VDecimal rhstmp(0, rhs.scale, rhs.precision, (int128_t) rhs.value);
if (scale == rhstmp.scale)
return s128Value <= rhstmp.s128Value;
else
return (datatypes::Decimal::compare(*this, rhstmp) <= 0);
}
else if (!isTSInt128ByPrecision() && rhs.isTSInt128ByPrecision())
{
if (scale == rhs.scale)
return (int128_t) value <= rhs.s128Value;
else
return (datatypes::Decimal::compare(VDecimal(0, scale, precision, (int128_t) value), rhs) <= 0);
}
else
{
if (scale == rhs.scale)
return value <= rhs.value;
else
return (decimalComp(rhs) <= 0);
}
}
bool operator!=(const VDecimal& rhs) const
{
if (isTSInt128ByPrecision() && rhs.isTSInt128ByPrecision())
{
if (scale == rhs.scale)
return s128Value != rhs.s128Value;
else
return (datatypes::Decimal::compare(*this, rhs) != 0);
}
else if (isTSInt128ByPrecision() && !rhs.isTSInt128ByPrecision())
{
VDecimal rhstmp(0, rhs.scale, rhs.precision, (int128_t) rhs.value);
if (scale == rhstmp.scale)
return s128Value != rhstmp.s128Value;
else
return (datatypes::Decimal::compare(*this, rhstmp) != 0);
}
else if (!isTSInt128ByPrecision() && rhs.isTSInt128ByPrecision())
{
if (scale == rhs.scale)
return (int128_t) value != rhs.s128Value;
else
return (datatypes::Decimal::compare(VDecimal(0, scale, precision, (int128_t) value), rhs) != 0);
}
else
{
if (scale == rhs.scale)
return value != rhs.value;
else
return (decimalComp(rhs) != 0);
}
}
inline bool isTSInt128ByPrecision() const
{
return precision > INT64MAXPRECISION
&& precision <= INT128MAXPRECISION;
}
// hasTSInt128 explicitly tells to print int128 out in cases
// where precision can't detect decimal type properly, e.g.
// DECIMAL(10)/DECIMAL(38)
std::string toString(bool hasTSInt128 = false) const;
friend std::ostream& operator<<(std::ostream& os, const VDecimal& dec);
int64_t value;
int8_t scale; // 0~38
uint8_t precision; // 1~38
// STRICTLY for unit tests!!!
void setTSInt64Value(const int64_t x) { value = x; }
void setTSInt128Value(const int128_t& x) { s128Value = x; }
void setScale(const uint8_t x) { scale = x; }
private:
uint8_t writeIntPart(const int128_t& x,
char* buf,
const uint8_t buflen) const;
uint8_t writeFractionalPart(const int128_t& x,
char* buf,
const uint8_t buflen) const;
std::string toStringTSInt128WithScale() const;
std::string toStringTSInt64() const;
};
} //end of namespace } //end of namespace
#endif #endif

View File

@ -25,9 +25,28 @@
namespace datatypes namespace datatypes
{ {
//class TSInt128; class TSInt128;
class TFloat128;
using int128_t = __int128; using int128_t = __int128;
// Type defined integral types
// for templates
template <typename T>
struct get_integral_type {
typedef T type;
};
template<>
struct get_integral_type<TFloat128>{
typedef __float128 type;
};
template<>
struct get_integral_type<TSInt128>{
typedef int128_t type;
};
class TFloat128 class TFloat128
{ {
public: public:
@ -66,6 +85,40 @@ class TFloat128
return toLongDouble(); return toLongDouble();
} }
inline int64_t toTSInt64() const
{
if (value > static_cast<__float128>(INT64_MAX))
return INT64_MAX;
else if (value < static_cast<__float128>(INT64_MIN))
return INT64_MIN;
return static_cast<int64_t>(value);
}
inline operator int64_t() const
{
return toTSInt64();
}
inline int64_t toTUInt64() const
{
if (value > static_cast<__float128>(UINT64_MAX))
return UINT64_MAX;
else if (value < 0)
return 0;
return static_cast<uint64_t>(value);
}
inline operator uint64_t() const
{
return toTUInt64();
}
inline TFloat128 operator+(const TFloat128& rhs) const
{
return TFloat128(value + rhs.value);
}
inline long double toLongDouble() const inline long double toLongDouble() const
{ {

View File

@ -117,50 +117,5 @@ namespace datatypes
return os; return os;
} }
// The method converts a wide decimal s128Value to an int64_t,
// saturating the s128Value if necessary.
inline int64_t TSInt128::getInt64FromWideDecimal()
{
if (s128Value > static_cast<int128_t>(INT64_MAX))
return INT64_MAX;
else if (s128Value < static_cast<int128_t>(INT64_MIN))
return INT64_MIN;
return static_cast<int64_t>(s128Value);
}
// The method converts a wide decimal s128Value to an uint32_t.
inline uint32_t TSInt128::getUInt32FromWideDecimal()
{
if (s128Value > static_cast<int128_t>(UINT32_MAX))
return UINT32_MAX;
else if (s128Value < 0)
return 0;
return static_cast<uint32_t>(s128Value);
}
// The method converts a wide decimal s128Value to an uint64_t.
inline uint64_t TSInt128::getUInt64FromWideDecimal()
{
if (s128Value > static_cast<int128_t>(UINT64_MAX))
return UINT64_MAX;
else if (s128Value < 0)
return 0;
return static_cast<uint64_t>(s128Value);
}
// The method converts a wide decimal s128Value to an int32_t.
inline int32_t TSInt128::getInt32FromWideDecimal()
{
if (s128Value > static_cast<int128_t>(INT32_MAX))
return INT32_MAX;
else if (s128Value < static_cast<int128_t>(INT32_MIN))
return INT32_MIN;
return static_cast<int32_t>(s128Value);
}
} // end of namespace datatypes } // end of namespace datatypes
// vim:ts=2 sw=2: // vim:ts=2 sw=2:

View File

@ -200,16 +200,81 @@ class TSInt128
return static_cast<long double>(s128Value); return static_cast<long double>(s128Value);
} }
inline operator int32_t() const
{
if (s128Value > static_cast<int128_t>(INT32_MAX))
return INT32_MAX;
if (s128Value < static_cast<int128_t>(INT32_MIN))
return INT32_MIN;
return static_cast<int32_t>(s128Value);
}
inline operator uint32_t() const
{
if (s128Value > static_cast<int128_t>(UINT32_MAX))
return UINT32_MAX;
if (s128Value < 0)
return 0;
return static_cast<uint32_t>(s128Value);
}
inline operator int64_t() const
{
if (s128Value > static_cast<int128_t>(INT64_MAX))
return INT64_MAX;
if (s128Value < static_cast<int128_t>(INT64_MIN))
return INT64_MIN;
return static_cast<int64_t>(s128Value);
}
inline operator uint64_t() const
{
if (s128Value > static_cast<int128_t>(UINT64_MAX))
return UINT64_MAX;
if (s128Value < 0)
return 0;
return static_cast<uint64_t>(s128Value);
}
inline operator TFloat128() const inline operator TFloat128() const
{ {
return toTFloat128(); return toTFloat128();
} }
inline TSInt128 operator%(const int64_t& rhs) const
{
return TSInt128(s128Value % rhs);
}
inline TSInt128 operator%(const int128_t& rhs) const
{
return TSInt128(s128Value % rhs);
}
inline TSInt128 operator*(const TSInt128& rhs) const
{
return TSInt128(s128Value * rhs.s128Value);
}
inline TSInt128 operator+(const TSInt128& rhs) const
{
return TSInt128(s128Value + rhs.s128Value);
}
inline TFloat128 toTFloat128() const inline TFloat128 toTFloat128() const
{ {
return TFloat128(s128Value); return TFloat128(s128Value);
} }
inline const int128_t& getValue() const
{
return s128Value;
}
// print int128_t parts represented as PODs // print int128_t parts represented as PODs
uint8_t printPodParts(char* buf, uint8_t printPodParts(char* buf,
const int128_t& high, const int128_t& high,
@ -226,19 +291,6 @@ class TSInt128
friend std::ostream& operator<<(std::ostream& os, const TSInt128& x); friend std::ostream& operator<<(std::ostream& os, const TSInt128& x);
// The method converts a wide decimal s128Value to an int64_t,
// saturating the s128Value if necessary.
inline int64_t getInt64FromWideDecimal();
// The method converts a wide decimal s128Value to an uint32_t.
inline uint32_t getUInt32FromWideDecimal();
// The method converts a wide decimal s128Value to an uint64_t.
inline uint64_t getUInt64FromWideDecimal();
// The method converts a wide decimal s128Value to an int32_t.
inline int32_t getInt32FromWideDecimal();
int128_t s128Value; int128_t s128Value;
}; // end of class }; // end of class

View File

@ -153,7 +153,7 @@ void SimpleColumn_Decimal<len>::setNullVal()
template<int len> template<int len>
inline const std::string& SimpleColumn_Decimal<len>:: getStrVal(rowgroup::Row& row, bool& isNull) inline const std::string& SimpleColumn_Decimal<len>:: getStrVal(rowgroup::Row& row, bool& isNull)
{ {
datatypes::VDecimal dec((int64_t)row.getIntField<len>(fInputIndex), datatypes::Decimal dec((int64_t)row.getIntField<len>(fInputIndex),
fResultType.scale, fResultType.scale,
fResultType.precision); fResultType.precision);
fResult.strVal = dec.toString(); fResult.strVal = dec.toString();

View File

@ -55,24 +55,7 @@ namespace execplan
{ {
typedef execplan::CalpontSystemCatalog::ColType Type; typedef execplan::CalpontSystemCatalog::ColType Type;
typedef datatypes::Decimal IDB_Decimal;
class IDB_Decimal: public datatypes::VDecimal
{
public:
IDB_Decimal() = default;
IDB_Decimal(int64_t val, int8_t s, uint8_t p, const int128_t &val128 = 0) :
VDecimal(val, s, p, val128) {}
inline void operator=(const datatypes::TSInt128& rhs)
{
value = 0; scale = 0; precision = 0;
datatypes::TSInt128::operator=(rhs);
}
};
typedef IDB_Decimal CNX_Decimal;
/** /**
* @brief IDB_Regex struct * @brief IDB_Regex struct
@ -728,13 +711,7 @@ inline int64_t TreeNode::getIntVal()
{ {
if (fResultType.colWidth == datatypes::MAXDECIMALWIDTH) if (fResultType.colWidth == datatypes::MAXDECIMALWIDTH)
{ {
int128_t scaleDivisor; return static_cast<int64_t>(fResult.decimalVal.getIntegralPart());
datatypes::getScaleDivisor(scaleDivisor, fResult.decimalVal.scale);
int128_t tmpval = fResult.decimalVal.s128Value / scaleDivisor;
return datatypes::Decimal::getInt64FromWideDecimal(tmpval);
} }
else else
{ {

View File

@ -460,7 +460,7 @@ void GroupConcator::outputRow(std::ostringstream& oss, const rowgroup::Row& row)
if (LIKELY(row.getColumnWidth(*i) == datatypes::MAXDECIMALWIDTH)) if (LIKELY(row.getColumnWidth(*i) == datatypes::MAXDECIMALWIDTH))
{ {
datatypes::VDecimal dec(0, datatypes::Decimal dec(0,
scale, scale,
row.getPrecision(*i), row.getPrecision(*i),
row.getBinaryField<int128_t>(*i)); row.getBinaryField<int128_t>(*i));

View File

@ -1188,7 +1188,7 @@ dec4: /* have to pick a scale to use for the double. using 5..
{ {
if (LIKELY(isInputWide)) if (LIKELY(isInputWide))
{ {
datatypes::VDecimal dec(0, datatypes::Decimal dec(0,
in.getScale(i), in.getScale(i),
in.getPrecision(i), in.getPrecision(i),
val128); val128);
@ -1196,7 +1196,7 @@ dec4: /* have to pick a scale to use for the double. using 5..
} }
else else
{ {
datatypes::VDecimal dec(val, datatypes::Decimal dec(val,
in.getScale(i), in.getScale(i),
in.getPrecision(i)); in.getPrecision(i));
out->setStringField(dec.toString(), i); out->setStringField(dec.toString(), i);

View File

@ -161,13 +161,13 @@ public:
return m_field->store(static_cast<double>(dl)); return m_field->store(static_cast<double>(dl));
} }
int store_decimal64(const datatypes::VDecimal& dec) override int store_decimal64(const datatypes::Decimal& dec) override
{ {
std::string decAsAStr = dec.toString(); std::string decAsAStr = dec.toString();
return m_field->store(decAsAStr.c_str(), decAsAStr.length(), m_field->charset()); return m_field->store(decAsAStr.c_str(), decAsAStr.length(), m_field->charset());
} }
int store_decimal128(const datatypes::VDecimal& dec) override int store_decimal128(const datatypes::Decimal& dec) override
{ {
std::string decAsAStr = dec.toString(true); std::string decAsAStr = dec.toString(true);
return m_field->store(decAsAStr.c_str(), decAsAStr.length(), m_field->charset()); return m_field->store(decAsAStr.c_str(), decAsAStr.length(), m_field->charset());

View File

@ -908,7 +908,7 @@ TEST(Decimal, DecimalToStringCheckScale0)
int precision = 38; int precision = 38;
int scale = 0; int scale = 0;
res = 0; res = 0;
datatypes::VDecimal dec(0, scale, precision, res); datatypes::Decimal dec(0, scale, precision, res);
// test simple values // test simple values
expected = "0"; expected = "0";
@ -957,7 +957,7 @@ TEST(Decimal, DecimalToStringCheckScale10)
int precision = 38; int precision = 38;
int scale = 10; int scale = 10;
res = 0; res = 0;
datatypes::VDecimal dec(0, scale, precision, res); datatypes::Decimal dec(0, scale, precision, res);
// test simple values // test simple values
expected = "0.0000000000"; expected = "0.0000000000";
@ -1029,7 +1029,7 @@ TEST(Decimal, DecimalToStringCheckScale38)
int precision = 38; int precision = 38;
int scale = 38; int scale = 38;
res = 0; res = 0;
datatypes::VDecimal dec(0, scale, precision, res); datatypes::Decimal dec(0, scale, precision, res);
// test simple values // test simple values
res = 0; res = 0;
@ -1089,7 +1089,7 @@ TEST(Decimal, DecimalToStringCheckScale37)
int precision = 38; int precision = 38;
int scale = 37; int scale = 37;
res = 0; res = 0;
datatypes::VDecimal dec(0, scale, precision, res); datatypes::Decimal dec(0, scale, precision, res);
// test simple values // test simple values
res = 0; res = 0;

View File

@ -188,22 +188,7 @@ int64_t Func_cast_signed::getIntVal(Row& row,
if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH) if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{ {
int128_t scaleDivisor, scaleDivisor2; return static_cast<int64_t>(d.getPosNegRoundedIntegralPart(4));
datatypes::getScaleDivisor(scaleDivisor, d.scale);
scaleDivisor2 = (scaleDivisor <= 10) ? 1 : (scaleDivisor / 10);
int128_t tmpval = d.s128Value / scaleDivisor;
int128_t lefto = (d.s128Value - tmpval * scaleDivisor) / scaleDivisor2;
if (tmpval >= 0 && lefto > 4)
tmpval++;
if (tmpval < 0 && lefto < -4)
tmpval--;
return datatypes::Decimal::getInt64FromWideDecimal(tmpval);
} }
else else
{ {
@ -1206,13 +1191,7 @@ int64_t Func_cast_decimal::getIntVal(Row& row,
if (decimal.isTSInt128ByPrecision()) if (decimal.isTSInt128ByPrecision())
{ {
int128_t scaleDivisor; return static_cast<int64_t>(decimal.getIntegralPart());
datatypes::getScaleDivisor(scaleDivisor, decimal.scale);
int128_t tmpval = decimal.s128Value / scaleDivisor;
return datatypes::Decimal::getInt64FromWideDecimal(tmpval);
} }
return (int64_t) decimal.value / helpers::powerOf10_c[decimal.scale]; return (int64_t) decimal.value / helpers::powerOf10_c[decimal.scale];

View File

@ -99,17 +99,7 @@ int64_t Func_ceil::getIntVal(Row& row,
if (op_ct.colWidth == datatypes::MAXDECIMALWIDTH) if (op_ct.colWidth == datatypes::MAXDECIMALWIDTH)
{ {
int128_t tmp = d.s128Value; ret = static_cast<int64_t>(d.getRoundedIntegralPart());
int128_t scaleDivisor;
datatypes::getScaleDivisor(scaleDivisor, d.scale);
d.s128Value /= scaleDivisor;
// Add 1 if this is a positive number and there were values to the right of the
// decimal point so that we return the largest integer value not less than X.
if ((tmp - (d.s128Value * scaleDivisor)) > 0)
d.s128Value += 1;
ret = datatypes::Decimal::getInt64FromWideDecimal(d.s128Value);
} }
else else
{ {
@ -252,17 +242,7 @@ uint64_t Func_ceil::getUintVal(Row& row,
if (op_ct.colWidth == datatypes::MAXDECIMALWIDTH) if (op_ct.colWidth == datatypes::MAXDECIMALWIDTH)
{ {
int128_t tmp = d.s128Value; ret = static_cast<uint64_t>(d.getRoundedIntegralPart());
int128_t scaleDivisor;
datatypes::getScaleDivisor(scaleDivisor, d.scale);
d.s128Value /= scaleDivisor;
// Add 1 if this is a positive number and there were values to the right of the
// decimal point so that we return the largest integer value not less than X.
if ((tmp - (d.s128Value * scaleDivisor)) > 0)
d.s128Value += 1;
ret = datatypes::Decimal::getUInt64FromWideDecimal(d.s128Value);
} }
else else
{ {
@ -548,6 +528,8 @@ IDB_Decimal Func_ceil::getDecimalVal(Row& row,
bool& isNull, bool& isNull,
CalpontSystemCatalog::ColType& op_ct) CalpontSystemCatalog::ColType& op_ct)
{ {
// Questionable approach. I believe we should use pointer here
// and call an appropriate ctor
IDB_Decimal ret; IDB_Decimal ret;
switch (op_ct.colDataType) switch (op_ct.colDataType)
@ -583,15 +565,9 @@ IDB_Decimal Func_ceil::getDecimalVal(Row& row,
if (op_ct.colWidth == datatypes::MAXDECIMALWIDTH) if (op_ct.colWidth == datatypes::MAXDECIMALWIDTH)
{ {
int128_t tmp = ret.s128Value; ret = IDB_Decimal(ret.getRoundedIntegralPart(),
int128_t scaleDivisor; ret.scale,
datatypes::getScaleDivisor(scaleDivisor, ret.scale); ret.precision);
ret.s128Value /= scaleDivisor;
// Add 1 if this is a positive number and there were values to the right of the
// decimal point so that we return the largest integer value not less than X.
if ((tmp - (ret.s128Value * scaleDivisor)) > 0)
ret.s128Value += 1;
} }
else else
{ {

View File

@ -138,33 +138,22 @@ string Func_char::getStrVal(Row& row,
{ {
IDB_Decimal d = rc->getDecimalVal(row, isNull); IDB_Decimal d = rc->getDecimalVal(row, isNull);
uint8_t roundingFactor = 4;
if (ct.colWidth == datatypes::MAXDECIMALWIDTH) if (ct.colWidth == datatypes::MAXDECIMALWIDTH)
{ {
if (d.s128Value < 0) if (d.s128Value < 0)
return ""; return "";
// rounding by the left over
int128_t scaleDivisor, scaleDivisor2; value = static_cast<int32_t>(d.getRoundedIntegralPart(roundingFactor));
datatypes::getScaleDivisor(scaleDivisor, d.scale);
scaleDivisor2 = (scaleDivisor <= 10) ? 1 : (scaleDivisor / 10);
int128_t tmpval = d.s128Value / scaleDivisor;
int128_t lefto = (d.s128Value - tmpval * scaleDivisor) / scaleDivisor2;
if (lefto > 4)
tmpval++;
value = datatypes::Decimal::getInt32FromWideDecimal(tmpval);
} }
else else
{ {
double dscale = d.scale; double dscale = d.scale;
// get decimal and round up // get decimal and round up
value = d.value / pow(10.0, dscale); value = d.value / pow(10.0, dscale);
int lefto = (d.value - value * pow(10.0, dscale)) / pow(10.0, dscale - 1); uint8_t lefto = (d.value - value * pow(10.0, dscale)) / pow(10.0, dscale - 1);
if ( lefto > 4 ) if ( lefto > roundingFactor )
value++; value++;
} }
} }

View File

@ -77,22 +77,7 @@ string Func_elt::getStrVal(rowgroup::Row& row,
if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH) if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{ {
int128_t scaleDivisor, scaleDivisor2; number = static_cast<int64_t>(d.getPosNegRoundedIntegralPart(4));
datatypes::getScaleDivisor(scaleDivisor, d.scale);
scaleDivisor2 = (scaleDivisor <= 10) ? 1 : (scaleDivisor / 10);
int128_t tmpval = d.s128Value / scaleDivisor;
int128_t lefto = (d.s128Value - tmpval * scaleDivisor) / scaleDivisor2;
if (utils::is_nonnegative(tmpval) && lefto > 4)
tmpval++;
if (utils::is_negative(tmpval) && lefto < -4)
tmpval--;
number = datatypes::Decimal::getInt64FromWideDecimal(tmpval);
} }
else else
{ {

View File

@ -160,7 +160,7 @@ int64_t Func_floor::getIntVal(Row& row,
if (op_ct.colWidth == datatypes::MAXDECIMALWIDTH) if (op_ct.colWidth == datatypes::MAXDECIMALWIDTH)
{ {
ret = datatypes::Decimal::getInt64FromWideDecimal(tmp.s128Value); ret = static_cast<int64_t>(tmp.toTSInt128());
} }
else else
{ {

View File

@ -67,10 +67,9 @@ DateTime getDateTime(rowgroup::Row& row,
if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH) if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{ {
int128_t scaleDivisor; auto integralAndFractional = dec.getIntegralAndFractional();
datatypes::getScaleDivisor(scaleDivisor, dec.scale); val = static_cast<int64_t>(integralAndFractional.first);
val = datatypes::Decimal::getInt64FromWideDecimal(dec.s128Value / scaleDivisor); msec = static_cast<uint32_t>(integralAndFractional.second);
msec = datatypes::Decimal::getUInt32FromWideDecimal(dec.s128Value % scaleDivisor);
} }
else else
{ {

View File

@ -72,22 +72,7 @@ uint64_t makedate(rowgroup::Row& row,
if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH) if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{ {
int128_t scaleDivisor, scaleDivisor2; year = static_cast<int64_t>(d.getPosNegRoundedIntegralPart(4));
datatypes::getScaleDivisor(scaleDivisor, d.scale);
scaleDivisor2 = (scaleDivisor <= 10) ? 1 : (scaleDivisor / 10);
int128_t tmpval = d.s128Value / scaleDivisor;
int128_t lefto = (d.s128Value - tmpval * scaleDivisor) / scaleDivisor2;
if (tmpval >= 0 && lefto > 4)
tmpval++;
if (tmpval < 0 && lefto < -4)
tmpval--;
year = datatypes::Decimal::getInt64FromWideDecimal(tmpval);
} }
else else
{ {
@ -156,29 +141,14 @@ uint64_t makedate(rowgroup::Row& row,
if (parm[1]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH) if (parm[1]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{ {
int128_t scaleDivisor, scaleDivisor2; int64_t tmpval = static_cast<int64_t>(d.getPosNegRoundedIntegralPart(4));
datatypes::getScaleDivisor(scaleDivisor, d.scale);
scaleDivisor2 = (scaleDivisor <= 10) ? 1 : (scaleDivisor / 10);
int128_t tmpval = d.s128Value / scaleDivisor;
int128_t lefto = (d.s128Value - tmpval * scaleDivisor) / scaleDivisor2;
if (tmpval >= 0 && lefto > 4)
tmpval++;
if (tmpval < 0 && lefto < -4)
tmpval--;
if (tmpval < 1) if (tmpval < 1)
{ {
isNull = true; isNull = true;
return 0; return 0;
} }
int64_t tmpval64 = datatypes::Decimal::getInt64FromWideDecimal(tmpval); dayofyear = helpers::intToString(tmpval);
dayofyear = helpers::intToString(tmpval64);
} }
else else
{ {

View File

@ -78,22 +78,7 @@ string Func_maketime::getStrVal(rowgroup::Row& row,
if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH) if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{ {
int128_t scaleDivisor, scaleDivisor2; hour = static_cast<int64_t>(d.getPosNegRoundedIntegralPart(4));
datatypes::getScaleDivisor(scaleDivisor, d.scale);
scaleDivisor2 = (scaleDivisor <= 10) ? 1 : (scaleDivisor / 10);
int128_t tmpval = d.s128Value / scaleDivisor;
int128_t lefto = (d.s128Value - tmpval * scaleDivisor) / scaleDivisor2;
if (tmpval >= 0 && lefto > 4)
tmpval++;
if (tmpval < 0 && lefto < -4)
tmpval--;
hour = datatypes::Decimal::getInt64FromWideDecimal(tmpval);
} }
else else
{ {
@ -142,22 +127,7 @@ string Func_maketime::getStrVal(rowgroup::Row& row,
if (parm[1]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH) if (parm[1]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{ {
int128_t scaleDivisor, scaleDivisor2; min = static_cast<int64_t>(d.getPosNegRoundedIntegralPart(4));
datatypes::getScaleDivisor(scaleDivisor, d.scale);
scaleDivisor2 = (scaleDivisor <= 10) ? 1 : (scaleDivisor / 10);
int128_t tmpval = d.s128Value / scaleDivisor;
int128_t lefto = (d.s128Value - tmpval * scaleDivisor) / scaleDivisor2;
if (tmpval >= 0 && lefto > 4)
tmpval++;
if (tmpval < 0 && lefto < -4)
tmpval--;
min = datatypes::Decimal::getInt64FromWideDecimal(tmpval);
} }
else else
{ {
@ -212,22 +182,7 @@ string Func_maketime::getStrVal(rowgroup::Row& row,
if (parm[2]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH) if (parm[2]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{ {
int128_t scaleDivisor, scaleDivisor2; sec = static_cast<int64_t>(d.getPosNegRoundedIntegralPart(4));
datatypes::getScaleDivisor(scaleDivisor, d.scale);
scaleDivisor2 = (scaleDivisor <= 10) ? 1 : (scaleDivisor / 10);
int128_t tmpval = d.s128Value / scaleDivisor;
int128_t lefto = (d.s128Value - tmpval * scaleDivisor) / scaleDivisor2;
if (tmpval >= 0 && lefto > 4)
tmpval++;
if (tmpval < 0 && lefto < -4)
tmpval--;
sec = datatypes::Decimal::getInt64FromWideDecimal(tmpval);
} }
else else
{ {

View File

@ -56,59 +56,40 @@ IDB_Decimal Func_mod::getDecimalVal(Row& row,
bool& isNull, bool& isNull,
CalpontSystemCatalog::ColType& operationColType) CalpontSystemCatalog::ColType& operationColType)
{ {
IDB_Decimal retValue;
retValue.value = 0;
retValue.scale = 0;
if ( parm.size() < 2 ) if ( parm.size() < 2 )
{ {
isNull = true; isNull = true;
return retValue; return IDB_Decimal();
} }
if (parm[0]->data()->resultType().isWideDecimalType() || if (parm[0]->data()->resultType().isWideDecimalType() ||
parm[1]->data()->resultType().isWideDecimalType()) parm[1]->data()->resultType().isWideDecimalType())
{ {
IDB_Decimal div = parm[1]->data()->getDecimalVal(row, isNull); IDB_Decimal div = parm[1]->data()->getDecimalVal(row, isNull);
int128_t divInt = (parm[1]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH) ? div.s128Value : div.value;
int128_t divInt, dividendInt;
if (parm[1]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
divInt = div.s128Value;
else
divInt = div.value;
if (divInt == 0) if (divInt == 0)
{ {
isNull = true; isNull = true;
return retValue; return IDB_Decimal();
} }
IDB_Decimal d = parm[0]->data()->getDecimalVal(row, isNull); IDB_Decimal d = parm[0]->data()->getDecimalVal(row, isNull);
if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH) // two special cases: both Decimals has no scale
dividendInt = d.s128Value; // or divisor has no scale
else if (!div.isScaled())
dividendInt = d.value;
// integer division
if (d.scale == 0 && div.scale == 0)
{ {
retValue.s128Value = dividendInt % divInt; return IDB_Decimal(d % div.toTSInt128(),
} d.scale,
// special case integer division datatypes::INT128MAXPRECISION);
else if (div.scale == 0)
{
int128_t scaleDivisor;
datatypes::getScaleDivisor(scaleDivisor, d.scale);
int128_t value = dividendInt / scaleDivisor;
int128_t lefto = dividendInt % scaleDivisor;
int128_t mod = (value % divInt) * scaleDivisor + lefto;
retValue.s128Value = mod;
} }
// float division // float division
else else
{ {
int128_t dividendInt = (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH) ? d.s128Value : d.value;
__float128 divF, dividendF; __float128 divF, dividendF;
int128_t scaleDivisor; int128_t scaleDivisor;
@ -121,32 +102,26 @@ IDB_Decimal Func_mod::getDecimalVal(Row& row,
__float128 mod = fmodq(dividendF, divF) * scaleDivisor; __float128 mod = fmodq(dividendF, divF) * scaleDivisor;
retValue.s128Value = (int128_t) mod; return IDB_Decimal(datatypes::TSInt128((int128_t) mod),
d.scale,
datatypes::INT128MAXPRECISION);
} }
retValue.scale = d.scale;
retValue.precision = datatypes::INT128MAXPRECISION;
} }
else int64_t div = parm[1]->data()->getIntVal(row, isNull);
if ( div == 0 )
{ {
int64_t div = parm[1]->data()->getIntVal(row, isNull); isNull = true;
return IDB_Decimal();
if ( div == 0 )
{
isNull = true;
return retValue;
}
IDB_Decimal d = parm[0]->data()->getDecimalVal(row, isNull);
int64_t value = d.value / pow(10.0, d.scale);
int lefto = d.value % (int)pow(10.0, d.scale);
int64_t mod = (value % div) * pow(10.0, d.scale) + lefto;
retValue.value = mod;
retValue.scale = d.scale;
} }
return retValue; IDB_Decimal d = parm[0]->data()->getDecimalVal(row, isNull);
int64_t value = d.value / pow(10.0, d.scale);
int lefto = d.value % (int)pow(10.0, d.scale);
int64_t mod = (value % div) * pow(10.0, d.scale) + lefto;
// It is misterious but precision is set to 0!
return IDB_Decimal(mod, d.scale, 0);
} }
@ -227,29 +202,7 @@ double Func_mod::getDoubleVal(Row& row,
case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL:
{ {
IDB_Decimal d = parm[0]->data()->getDecimalVal(row, isNull); mod = doDecimal<decltype(mod)>(parm, div, row, isNull);
if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{
if (d.scale == 0)
{
mod = d.s128Value % div;
}
else
{
int128_t scaleDivisor;
datatypes::getScaleDivisor(scaleDivisor, d.scale);
int128_t value = d.s128Value / scaleDivisor;
int128_t lefto = d.s128Value % scaleDivisor;
datatypes::TFloat128 tmp((__float128) (value % div) + (__float128) lefto / scaleDivisor);
mod = static_cast<double>(tmp);
}
}
else
{
int64_t value = d.value / pow(10.0, d.scale);
mod = value % div;
}
} }
break; break;
@ -350,29 +303,7 @@ long double Func_mod::getLongDoubleVal(Row& row,
case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL:
{ {
IDB_Decimal d = parm[0]->data()->getDecimalVal(row, isNull); mod = doDecimal<decltype(mod)>(parm, div, row, isNull);
if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{
if (d.scale == 0)
{
mod = d.s128Value % div;
}
else
{
int128_t scaleDivisor;
datatypes::getScaleDivisor(scaleDivisor, d.scale);
int128_t value = d.s128Value / scaleDivisor;
int128_t lefto = d.s128Value % scaleDivisor;
datatypes::TFloat128 tmp((__float128) (value % div) + (__float128) lefto / scaleDivisor);
mod = static_cast<long double>(tmp);
}
}
else
{
int64_t value = d.value / pow(10.0, d.scale);
mod = value % div;
}
} }
break; break;
@ -476,29 +407,7 @@ int64_t Func_mod::getIntVal(Row& row,
case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL:
{ {
IDB_Decimal d = parm[0]->data()->getDecimalVal(row, isNull); mod = doDecimal<decltype(mod)>(parm, div, row, isNull);
if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{
if (d.scale == 0)
{
mod = d.s128Value % div;
}
else
{
int128_t scaleDivisor;
datatypes::getScaleDivisor(scaleDivisor, d.scale);
int128_t value = d.s128Value / scaleDivisor;
int128_t lefto = d.s128Value % scaleDivisor;
__float128 tmp = (__float128) (value % div) + (__float128) lefto / scaleDivisor;
mod = datatypes::Decimal::getInt64FromFloat128(tmp);
}
}
else
{
int64_t value = d.value / pow(10.0, d.scale);
mod = value % div;
}
} }
break; break;
@ -593,29 +502,7 @@ uint64_t Func_mod::getUIntVal(Row& row,
case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL:
{ {
IDB_Decimal d = parm[0]->data()->getDecimalVal(row, isNull); mod = doDecimal<decltype(mod)>(parm, div, row, isNull);
if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{
if (d.scale == 0)
{
mod = d.s128Value % div;
}
else
{
int128_t scaleDivisor;
datatypes::getScaleDivisor(scaleDivisor, d.scale);
int128_t value = d.s128Value / scaleDivisor;
int128_t lefto = d.s128Value % scaleDivisor;
__float128 tmp = (__float128) (value % div) + (__float128) lefto / scaleDivisor;
mod = datatypes::Decimal::getUInt64FromFloat128(tmp);
}
}
else
{
int64_t value = d.value / pow(10.0, d.scale);
mod = value % div;
}
} }
break; break;

View File

@ -133,18 +133,7 @@ int64_t Func_round::getIntVal(Row& row,
} }
else else
{ {
if (x.scale > 0) return static_cast<int64_t>(x.getIntegralPart());
{
while (x.scale-- > 0)
x.s128Value /= 10;
}
else
{
while (x.scale++ < 0)
x.s128Value *= 10;
}
return datatypes::Decimal::getInt64FromWideDecimal(x.s128Value);
} }
} }

View File

@ -136,18 +136,7 @@ int64_t Func_truncate::getIntVal(Row& row,
} }
else else
{ {
if (x.scale > 0) return static_cast<int64_t>(x.getIntegralPart());
{
while (x.scale-- > 0)
x.s128Value /= 10;
}
else
{
while (x.scale++ < 0)
x.s128Value *= 10;
}
return datatypes::Decimal::getInt64FromWideDecimal(x.s128Value);
} }
} }

View File

@ -427,6 +427,35 @@ public:
FunctionParm& fp, FunctionParm& fp,
bool& isNull, bool& isNull,
execplan::CalpontSystemCatalog::ColType& op_ct); execplan::CalpontSystemCatalog::ColType& op_ct);
private:
template<typename ModType>
ModType doDecimal(const FunctionParm& parm,
const int64_t div,
rowgroup::Row& row,
bool isNull)
{
execplan::IDB_Decimal d = parm[0]->data()->getDecimalVal(row, isNull);
if (parm[0]->data()->resultType().colWidth == datatypes::MAXDECIMALWIDTH)
{
if (!d.isScaled())
{
return d.toTSInt128() % div;
}
else
{
auto intAndFract = d.getIntegralAndDividedFractional();
datatypes::TSInt128 integralRemainder = intAndFract.first % div;
return static_cast<ModType>(integralRemainder.toTFloat128() +
intAndFract.second);
}
}
int64_t value = d.value / pow(10.0, d.scale);
return value % div;
}
}; };

View File

@ -637,7 +637,7 @@ string Row::toString() const
case CalpontSystemCatalog::UDECIMAL: case CalpontSystemCatalog::UDECIMAL:
if (colWidths[i] == datatypes::MAXDECIMALWIDTH) if (colWidths[i] == datatypes::MAXDECIMALWIDTH)
{ {
datatypes::VDecimal dec(0, datatypes::Decimal dec(0,
scale[i], scale[i],
precision[i], precision[i],
getBinaryField<int128_t>(i)); getBinaryField<int128_t>(i));

View File

@ -2994,7 +2994,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
{ {
if (fetchColColwidths[fetchColPos] == datatypes::MAXDECIMALWIDTH) if (fetchColColwidths[fetchColPos] == datatypes::MAXDECIMALWIDTH)
{ {
datatypes::VDecimal dec(0, datatypes::Decimal dec(0,
fetchColScales[fetchColPos], fetchColScales[fetchColPos],
rowGroups[txnId]->getPrecision()[fetchColPos], rowGroups[txnId]->getPrecision()[fetchColPos],
row.getBinaryField<int128_t>(fetchColPos)); row.getBinaryField<int128_t>(fetchColPos));
@ -3037,7 +3037,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
} }
else else
{ {
datatypes::VDecimal dec(intColVal, datatypes::Decimal dec(intColVal,
fetchColScales[fetchColPos], fetchColScales[fetchColPos],
rowGroups[txnId]->getPrecision()[fetchColPos]); rowGroups[txnId]->getPrecision()[fetchColPos]);
value = dec.toString(); value = dec.toString();
@ -3349,7 +3349,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
{ {
if (fetchColColwidths[fetchColPos] == datatypes::MAXDECIMALWIDTH) if (fetchColColwidths[fetchColPos] == datatypes::MAXDECIMALWIDTH)
{ {
datatypes::VDecimal dec(0, datatypes::Decimal dec(0,
fetchColScales[fetchColPos], fetchColScales[fetchColPos],
rowGroups[txnId]->getPrecision()[fetchColPos], rowGroups[txnId]->getPrecision()[fetchColPos],
row.getBinaryField<int128_t>(fetchColPos)); row.getBinaryField<int128_t>(fetchColPos));
@ -3393,7 +3393,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs,
} }
else else
{ {
datatypes::VDecimal dec(intColVal, datatypes::Decimal dec(intColVal,
fetchColScales[fetchColPos], fetchColScales[fetchColPos],
rowGroups[txnId]->getPrecision()[fetchColPos]); rowGroups[txnId]->getPrecision()[fetchColPos]);
value = dec.toString(); value = dec.toString();