diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index 5818f072321..e847d0e452b 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -24,6 +24,7 @@ #include "access/hash.h" #include "access/xact.h" #include "catalog/pg_type.h" +#include "common/int128.h" #include "funcapi.h" #include "libpq/pqformat.h" #include "miscadmin.h" @@ -2562,19 +2563,47 @@ timestamptz_cmp_timestamp(PG_FUNCTION_ARGS) /* * interval_relop - is interval1 relop interval2 * - * collate invalid interval at the end + * Interval comparison is based on converting interval values to a linear + * representation expressed in the units of the time field (microseconds, + * in the case of integer timestamps) with days assumed to be always 24 hours + * and months assumed to be always 30 days. To avoid overflow, we need a + * wider-than-int64 datatype for the linear representation, so use INT128 + * with integer timestamps. + * + * In the float8 case, our problems are not with overflow but with precision; + * but it's been like that since day one, so live with it. */ -static inline TimeOffset +#ifdef HAVE_INT64_TIMESTAMP +typedef INT128 IntervalOffset; +#else +typedef TimeOffset IntervalOffset; +#endif + +static inline IntervalOffset interval_cmp_value(const Interval *interval) { - TimeOffset span; - - span = interval->time; + IntervalOffset span; #ifdef HAVE_INT64_TIMESTAMP - span += interval->month * INT64CONST(30) * USECS_PER_DAY; - span += interval->day * INT64CONST(24) * USECS_PER_HOUR; + int64 dayfraction; + int64 days; + + /* + * Separate time field into days and dayfraction, then add the month and + * day fields to the days part. We cannot overflow int64 days here. + */ + dayfraction = interval->time % USECS_PER_DAY; + days = interval->time / USECS_PER_DAY; + days += interval->month * INT64CONST(30); + days += interval->day; + + /* Widen dayfraction to 128 bits */ + span = int64_to_int128(dayfraction); + + /* Scale up days to microseconds, forming a 128-bit product */ + int128_add_int64_mul_int64(&span, days, USECS_PER_DAY); #else + span = interval->time; span += interval->month * ((double) DAYS_PER_MONTH * SECS_PER_DAY); span += interval->day * ((double) HOURS_PER_DAY * SECS_PER_HOUR); #endif @@ -2585,10 +2614,14 @@ interval_cmp_value(const Interval *interval) static int interval_cmp_internal(Interval *interval1, Interval *interval2) { - TimeOffset span1 = interval_cmp_value(interval1); - TimeOffset span2 = interval_cmp_value(interval2); + IntervalOffset span1 = interval_cmp_value(interval1); + IntervalOffset span2 = interval_cmp_value(interval2); +#ifdef HAVE_INT64_TIMESTAMP + return int128_compare(span1, span2); +#else return ((span1 < span2) ? -1 : (span1 > span2) ? 1 : 0); +#endif } Datum @@ -2665,10 +2698,20 @@ Datum interval_hash(PG_FUNCTION_ARGS) { Interval *interval = PG_GETARG_INTERVAL_P(0); - TimeOffset span = interval_cmp_value(interval); + IntervalOffset span = interval_cmp_value(interval); #ifdef HAVE_INT64_TIMESTAMP - return DirectFunctionCall1(hashint8, Int64GetDatumFast(span)); + int64 span64; + + /* + * Use only the least significant 64 bits for hashing. The upper 64 bits + * seldom add any useful information, and besides we must do it like this + * for compatibility with hashes calculated before use of INT128 was + * introduced. + */ + span64 = int128_to_int64(span); + + return DirectFunctionCall1(hashint8, Int64GetDatumFast(span64)); #else return DirectFunctionCall1(hashfloat8, Float8GetDatumFast(span)); #endif diff --git a/src/include/common/int128.h b/src/include/common/int128.h new file mode 100644 index 00000000000..4c46e26f400 --- /dev/null +++ b/src/include/common/int128.h @@ -0,0 +1,276 @@ +/*------------------------------------------------------------------------- + * + * int128.h + * Roll-our-own 128-bit integer arithmetic. + * + * We make use of the native int128 type if there is one, otherwise + * implement things the hard way based on two int64 halves. + * + * See src/tools/testint128.c for a simple test harness for this file. + * + * Copyright (c) 2017, PostgreSQL Global Development Group + * + * src/include/common/int128.h + * + *------------------------------------------------------------------------- + */ +#ifndef INT128_H +#define INT128_H + +/* + * For testing purposes, use of native int128 can be switched on/off by + * predefining USE_NATIVE_INT128. + */ +#ifndef USE_NATIVE_INT128 +#ifdef HAVE_INT128 +#define USE_NATIVE_INT128 1 +#else +#define USE_NATIVE_INT128 0 +#endif +#endif + + +#if USE_NATIVE_INT128 + +typedef int128 INT128; + +/* + * Add an unsigned int64 value into an INT128 variable. + */ +static inline void +int128_add_uint64(INT128 *i128, uint64 v) +{ + *i128 += v; +} + +/* + * Add a signed int64 value into an INT128 variable. + */ +static inline void +int128_add_int64(INT128 *i128, int64 v) +{ + *i128 += v; +} + +/* + * Add the 128-bit product of two int64 values into an INT128 variable. + * + * XXX with a stupid compiler, this could actually be less efficient than + * the other implementation; maybe we should do it by hand always? + */ +static inline void +int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y) +{ + *i128 += (int128) x *(int128) y; +} + +/* + * Compare two INT128 values, return -1, 0, or +1. + */ +static inline int +int128_compare(INT128 x, INT128 y) +{ + if (x < y) + return -1; + if (x > y) + return 1; + return 0; +} + +/* + * Widen int64 to INT128. + */ +static inline INT128 +int64_to_int128(int64 v) +{ + return (INT128) v; +} + +/* + * Convert INT128 to int64 (losing any high-order bits). + * This also works fine for casting down to uint64. + */ +static inline int64 +int128_to_int64(INT128 val) +{ + return (int64) val; +} + +#else /* !USE_NATIVE_INT128 */ + +/* + * We lay out the INT128 structure with the same content and byte ordering + * that a native int128 type would (probably) have. This makes no difference + * for ordinary use of INT128, but allows union'ing INT128 with int128 for + * testing purposes. + */ +typedef struct +{ +#ifdef WORDS_BIGENDIAN + int64 hi; /* most significant 64 bits, including sign */ + uint64 lo; /* least significant 64 bits, without sign */ +#else + uint64 lo; /* least significant 64 bits, without sign */ + int64 hi; /* most significant 64 bits, including sign */ +#endif +} INT128; + +/* + * Add an unsigned int64 value into an INT128 variable. + */ +static inline void +int128_add_uint64(INT128 *i128, uint64 v) +{ + /* + * First add the value to the .lo part, then check to see if a carry needs + * to be propagated into the .hi part. A carry is needed if both inputs + * have high bits set, or if just one input has high bit set while the new + * .lo part doesn't. Remember that .lo part is unsigned; we cast to + * signed here just as a cheap way to check the high bit. + */ + uint64 oldlo = i128->lo; + + i128->lo += v; + if (((int64) v < 0 && (int64) oldlo < 0) || + (((int64) v < 0 || (int64) oldlo < 0) && (int64) i128->lo >= 0)) + i128->hi++; +} + +/* + * Add a signed int64 value into an INT128 variable. + */ +static inline void +int128_add_int64(INT128 *i128, int64 v) +{ + /* + * This is much like the above except that the carry logic differs for + * negative v. Ordinarily we'd need to subtract 1 from the .hi part + * (corresponding to adding the sign-extended bits of v to it); but if + * there is a carry out of the .lo part, that cancels and we do nothing. + */ + uint64 oldlo = i128->lo; + + i128->lo += v; + if (v >= 0) + { + if ((int64) oldlo < 0 && (int64) i128->lo >= 0) + i128->hi++; + } + else + { + if (!((int64) oldlo < 0 || (int64) i128->lo >= 0)) + i128->hi--; + } +} + +/* + * INT64_AU32 extracts the most significant 32 bits of int64 as int64, while + * INT64_AL32 extracts the least significant 32 bits as uint64. + */ +#define INT64_AU32(i64) ((i64) >> 32) +#define INT64_AL32(i64) ((i64) & UINT64CONST(0xFFFFFFFF)) + +/* + * Add the 128-bit product of two int64 values into an INT128 variable. + */ +static inline void +int128_add_int64_mul_int64(INT128 *i128, int64 x, int64 y) +{ + /* INT64_AU32 must use arithmetic right shift */ + StaticAssertStmt(((int64) -1 >> 1) == (int64) -1, + "arithmetic right shift is needed"); + + /*---------- + * Form the 128-bit product x * y using 64-bit arithmetic. + * Considering each 64-bit input as having 32-bit high and low parts, + * we can compute + * + * x * y = ((x.hi << 32) + x.lo) * (((y.hi << 32) + y.lo) + * = (x.hi * y.hi) << 64 + + * (x.hi * y.lo) << 32 + + * (x.lo * y.hi) << 32 + + * x.lo * y.lo + * + * Each individual product is of 32-bit terms so it won't overflow when + * computed in 64-bit arithmetic. Then we just have to shift it to the + * correct position while adding into the 128-bit result. We must also + * keep in mind that the "lo" parts must be treated as unsigned. + *---------- + */ + + /* No need to work hard if product must be zero */ + if (x != 0 && y != 0) + { + int64 x_u32 = INT64_AU32(x); + uint64 x_l32 = INT64_AL32(x); + int64 y_u32 = INT64_AU32(y); + uint64 y_l32 = INT64_AL32(y); + int64 tmp; + + /* the first term */ + i128->hi += x_u32 * y_u32; + + /* the second term: sign-extend it only if x is negative */ + tmp = x_u32 * y_l32; + if (x < 0) + i128->hi += INT64_AU32(tmp); + else + i128->hi += ((uint64) tmp) >> 32; + int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32); + + /* the third term: sign-extend it only if y is negative */ + tmp = x_l32 * y_u32; + if (y < 0) + i128->hi += INT64_AU32(tmp); + else + i128->hi += ((uint64) tmp) >> 32; + int128_add_uint64(i128, ((uint64) INT64_AL32(tmp)) << 32); + + /* the fourth term: always unsigned */ + int128_add_uint64(i128, x_l32 * y_l32); + } +} + +/* + * Compare two INT128 values, return -1, 0, or +1. + */ +static inline int +int128_compare(INT128 x, INT128 y) +{ + if (x.hi < y.hi) + return -1; + if (x.hi > y.hi) + return 1; + if (x.lo < y.lo) + return -1; + if (x.lo > y.lo) + return 1; + return 0; +} + +/* + * Widen int64 to INT128. + */ +static inline INT128 +int64_to_int128(int64 v) +{ + INT128 val; + + val.lo = (uint64) v; + val.hi = (v < 0) ? -INT64CONST(1) : INT64CONST(0); + return val; +} + +/* + * Convert INT128 to int64 (losing any high-order bits). + * This also works fine for casting down to uint64. + */ +static inline int64 +int128_to_int64(INT128 val) +{ + return (int64) val.lo; +} + +#endif /* USE_NATIVE_INT128 */ + +#endif /* INT128_H */ diff --git a/src/test/regress/expected/interval.out b/src/test/regress/expected/interval.out index 946c97ad924..f88f34550ad 100644 --- a/src/test/regress/expected/interval.out +++ b/src/test/regress/expected/interval.out @@ -207,6 +207,70 @@ SELECT '' AS fortyfive, r1.*, r2.* | 34 years | 6 years (45 rows) +-- Test intervals that are large enough to overflow 64 bits in comparisons +CREATE TEMP TABLE INTERVAL_TBL_OF (f1 interval); +INSERT INTO INTERVAL_TBL_OF (f1) VALUES + ('2147483647 days 2147483647 months'), + ('2147483647 days -2147483648 months'), + ('1 year'), + ('-2147483648 days 2147483647 months'), + ('-2147483648 days -2147483648 months'); +-- these should fail as out-of-range +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483648 days'); +ERROR: interval field value out of range: "2147483648 days" +LINE 1: INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483648 days'); + ^ +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483649 days'); +ERROR: interval field value out of range: "-2147483649 days" +LINE 1: INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483649 days')... + ^ +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483647 years'); +ERROR: interval out of range +LINE 1: INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483647 years')... + ^ +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483648 years'); +ERROR: interval out of range +LINE 1: INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483648 years'... + ^ +SELECT r1.*, r2.* + FROM INTERVAL_TBL_OF r1, INTERVAL_TBL_OF r2 + WHERE r1.f1 > r2.f1 + ORDER BY r1.f1, r2.f1; + f1 | f1 +-------------------------------------------+------------------------------------------- + -178956970 years -8 mons +2147483647 days | -178956970 years -8 mons -2147483648 days + 1 year | -178956970 years -8 mons -2147483648 days + 1 year | -178956970 years -8 mons +2147483647 days + 178956970 years 7 mons -2147483648 days | -178956970 years -8 mons -2147483648 days + 178956970 years 7 mons -2147483648 days | -178956970 years -8 mons +2147483647 days + 178956970 years 7 mons -2147483648 days | 1 year + 178956970 years 7 mons 2147483647 days | -178956970 years -8 mons -2147483648 days + 178956970 years 7 mons 2147483647 days | -178956970 years -8 mons +2147483647 days + 178956970 years 7 mons 2147483647 days | 1 year + 178956970 years 7 mons 2147483647 days | 178956970 years 7 mons -2147483648 days +(10 rows) + +CREATE INDEX ON INTERVAL_TBL_OF USING btree (f1); +SET enable_seqscan TO false; +EXPLAIN (COSTS OFF) +SELECT f1 FROM INTERVAL_TBL_OF r1 ORDER BY f1; + QUERY PLAN +-------------------------------------------------------------------- + Index Only Scan using interval_tbl_of_f1_idx on interval_tbl_of r1 +(1 row) + +SELECT f1 FROM INTERVAL_TBL_OF r1 ORDER BY f1; + f1 +------------------------------------------- + -178956970 years -8 mons -2147483648 days + -178956970 years -8 mons +2147483647 days + 1 year + 178956970 years 7 mons -2147483648 days + 178956970 years 7 mons 2147483647 days +(5 rows) + +RESET enable_seqscan; +DROP TABLE INTERVAL_TBL_OF; -- Test multiplication and division with intervals. -- Floating point arithmetic rounding errors can lead to unexpected results, -- though the code attempts to do the right thing and round up to days and diff --git a/src/test/regress/sql/interval.sql b/src/test/regress/sql/interval.sql index cff9adab321..bc5537d1b9c 100644 --- a/src/test/regress/sql/interval.sql +++ b/src/test/regress/sql/interval.sql @@ -59,6 +59,33 @@ SELECT '' AS fortyfive, r1.*, r2.* WHERE r1.f1 > r2.f1 ORDER BY r1.f1, r2.f1; +-- Test intervals that are large enough to overflow 64 bits in comparisons +CREATE TEMP TABLE INTERVAL_TBL_OF (f1 interval); +INSERT INTO INTERVAL_TBL_OF (f1) VALUES + ('2147483647 days 2147483647 months'), + ('2147483647 days -2147483648 months'), + ('1 year'), + ('-2147483648 days 2147483647 months'), + ('-2147483648 days -2147483648 months'); +-- these should fail as out-of-range +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483648 days'); +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483649 days'); +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('2147483647 years'); +INSERT INTO INTERVAL_TBL_OF (f1) VALUES ('-2147483648 years'); + +SELECT r1.*, r2.* + FROM INTERVAL_TBL_OF r1, INTERVAL_TBL_OF r2 + WHERE r1.f1 > r2.f1 + ORDER BY r1.f1, r2.f1; + +CREATE INDEX ON INTERVAL_TBL_OF USING btree (f1); +SET enable_seqscan TO false; +EXPLAIN (COSTS OFF) +SELECT f1 FROM INTERVAL_TBL_OF r1 ORDER BY f1; +SELECT f1 FROM INTERVAL_TBL_OF r1 ORDER BY f1; +RESET enable_seqscan; + +DROP TABLE INTERVAL_TBL_OF; -- Test multiplication and division with intervals. -- Floating point arithmetic rounding errors can lead to unexpected results,