diff --git a/contrib/test_decoding/expected/toast.out b/contrib/test_decoding/expected/toast.out index 0a850b7acdb..b7bae65ee82 100644 --- a/contrib/test_decoding/expected/toast.out +++ b/contrib/test_decoding/expected/toast.out @@ -285,6 +285,64 @@ SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', COMMIT (232 rows) +-- test we can decode "old" tuples bigger than the max heap tuple size correctly +DROP TABLE IF EXISTS toasted_several; +NOTICE: table "toasted_several" does not exist, skipping +CREATE TABLE toasted_several ( + id serial unique not null, + toasted_key text primary key, + toasted_col1 text, + toasted_col2 text +); +ALTER TABLE toasted_several REPLICA IDENTITY FULL; +ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL; +ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL; +ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL; +INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 2000)); +SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + regexp_replace +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + BEGIN + table public.toasted_several: INSERT: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..098765432109876543210987654321098765432109876543210' toasted_col1[text]:null toasted_col2[text]:null + COMMIT +(3 rows) + +-- test update of a toasted key without changing it +UPDATE toasted_several SET toasted_col1 = toasted_key; +UPDATE toasted_several SET toasted_col2 = toasted_col1; +SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + regexp_replace +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + BEGIN + table public.toasted_several: INSERT: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..098765432109876543210987654321098765432109876543210' toasted_col1[text]:null toasted_col2[text]:null + COMMIT + BEGIN + table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..432109876543210987654321098765432109876543210987654321098765432109876543210' toasted_col2[text]:null + COMMIT + BEGIN + table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..876543210987654321098765432109876543210987654321098765432109876543210987654321098765432109876543210' + COMMIT +(9 rows) + +/* + * update with large tuplebuf, in a transaction large enough to force to spool to disk + */ +BEGIN; +INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234); +UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1; +DELETE FROM toasted_several WHERE id = 1; +COMMIT; +DROP TABLE toasted_several; +SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') +WHERE data NOT LIKE '%INSERT: %'; + regexp_replace +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + BEGIN + table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..7654321098765432109876543210987654321098765432109876543210' toasted_col2[text]:unchanged-toast-datum + table public.toasted_several: DELETE: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..876543210987654321098765432109876543210987654321098765432109876543210987654321098765432109876543210' + COMMIT +(4 rows) + SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot -------------------------- diff --git a/contrib/test_decoding/sql/toast.sql b/contrib/test_decoding/sql/toast.sql index 09293865df9..a333d99abce 100644 --- a/contrib/test_decoding/sql/toast.sql +++ b/contrib/test_decoding/sql/toast.sql @@ -260,4 +260,41 @@ ALTER TABLE toasted_copy ALTER COLUMN data SET STORAGE EXTERNAL; 203 untoasted200 \. SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- test we can decode "old" tuples bigger than the max heap tuple size correctly +DROP TABLE IF EXISTS toasted_several; +CREATE TABLE toasted_several ( + id serial unique not null, + toasted_key text primary key, + toasted_col1 text, + toasted_col2 text +); +ALTER TABLE toasted_several REPLICA IDENTITY FULL; +ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL; +ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL; +ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL; + +INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 2000)); + +SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- test update of a toasted key without changing it +UPDATE toasted_several SET toasted_col1 = toasted_key; +UPDATE toasted_several SET toasted_col2 = toasted_col1; + +SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +/* + * update with large tuplebuf, in a transaction large enough to force to spool to disk + */ +BEGIN; +INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234); +UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1; +DELETE FROM toasted_several WHERE id = 1; +COMMIT; + +DROP TABLE toasted_several; + +SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') +WHERE data NOT LIKE '%INSERT: %'; SELECT pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 9610b6ce773..0f7ff1501be 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -625,13 +625,15 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) { + Size tuplelen = r->xl_len - SizeOfHeapInsert; + Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader)); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert, - r->xl_len - SizeOfHeapInsert, - change->data.tp.newtuple); + tuplelen, change->data.tp.newtuple); } change->data.tp.clear_toast_afterwards = true; @@ -650,7 +652,6 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { XLogRecord *r = &buf->record; xl_heap_update *xlrec; - xl_heap_header_len xlhdr; ReorderBufferChange *change; char *data; @@ -669,16 +670,20 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) { + Size tuplelen; + xl_heap_header_len xlhdr; + Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen)); memcpy(&xlhdr, data, sizeof(xlhdr)); data += offsetof(xl_heap_header_len, header); - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); + tuplelen = xlhdr.t_len + SizeOfHeapHeader; - DecodeXLogTuple(data, - xlhdr.t_len + SizeOfHeapHeader, - change->data.tp.newtuple); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + + DecodeXLogTuple(data, tuplelen, change->data.tp.newtuple); /* skip over the rest of the tuple header */ data += SizeOfHeapHeader; /* skip over the tuple data */ @@ -687,14 +692,18 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) { + Size tuplelen; + xl_heap_header_len xlhdr; + memcpy(&xlhdr, data, sizeof(xlhdr)); data += offsetof(xl_heap_header_len, header); - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + tuplelen = xlhdr.t_len + SizeOfHeapHeader; - DecodeXLogTuple(data, - xlhdr.t_len + SizeOfHeapHeader, - change->data.tp.oldtuple); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + + DecodeXLogTuple(data, tuplelen, change->data.tp.oldtuple); #ifdef NOT_USED data += SizeOfHeapHeader; data += xlhdr.t_len; @@ -732,13 +741,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* old primary key stored */ if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) { + Size len = r->xl_len - SizeOfHeapDelete; + Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader)); - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(ctx->reorder, len); DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, - r->xl_len - SizeOfHeapDelete, - change->data.tp.oldtuple); + len, change->data.tp.oldtuple); } change->data.tp.clear_toast_afterwards = true; @@ -795,37 +806,40 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) { - change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); - - tuple = change->data.tp.newtuple; - - /* not a disk based tuple */ - ItemPointerSetInvalid(&tuple->tuple.t_self); + HeapTupleHeader header; xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); data = ((char *) xlhdr) + SizeOfMultiInsertTuple; datalen = xlhdr->datalen; + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(ctx->reorder, datalen); + + tuple = change->data.tp.newtuple; + header = tuple->tuple.t_data; + + /* not a disk based tuple */ + ItemPointerSetInvalid(&tuple->tuple.t_self); + /* * We can only figure this out after reassembling the * transactions. */ tuple->tuple.t_tableOid = InvalidOid; - tuple->tuple.t_data = &tuple->header; + tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits); - memset(&tuple->header, 0, sizeof(HeapTupleHeaderData)); + memset(header, 0, offsetof(HeapTupleHeaderData, t_bits)); - memcpy((char *) &tuple->header - + offsetof(HeapTupleHeaderData, t_bits), + memcpy((char *) tuple->tuple.t_data + offsetof(HeapTupleHeaderData, t_bits), (char *) data, datalen); data += datalen; - tuple->header.t_infomask = xlhdr->t_infomask; - tuple->header.t_infomask2 = xlhdr->t_infomask2; - tuple->header.t_hoff = xlhdr->t_hoff; + header->t_infomask = xlhdr->t_infomask; + header->t_infomask2 = xlhdr->t_infomask2; + header->t_hoff = xlhdr->t_hoff; } /* @@ -856,31 +870,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) { xl_heap_header xlhdr; int datalen = len - SizeOfHeapHeader; + HeapTupleHeader header; Assert(datalen >= 0); - Assert(datalen <= MaxHeapTupleSize); tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits); + header = tuple->tuple.t_data; /* not a disk based tuple */ ItemPointerSetInvalid(&tuple->tuple.t_self); /* we can only figure this out after reassembling the transactions */ tuple->tuple.t_tableOid = InvalidOid; - tuple->tuple.t_data = &tuple->header; /* data is not stored aligned, copy to aligned storage */ memcpy((char *) &xlhdr, data, SizeOfHeapHeader); - memset(&tuple->header, 0, sizeof(HeapTupleHeaderData)); + memset(header, 0, offsetof(HeapTupleHeaderData, t_bits)); - memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits), + memcpy(((char *) tuple->tuple.t_data) + offsetof(HeapTupleHeaderData, t_bits), data + SizeOfHeapHeader, datalen); - tuple->header.t_infomask = xlhdr.t_infomask; - tuple->header.t_infomask2 = xlhdr.t_infomask2; - tuple->header.t_hoff = xlhdr.t_hoff; + header->t_infomask = xlhdr.t_infomask; + header->t_infomask2 = xlhdr.t_infomask2; + header->t_hoff = xlhdr.t_hoff; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 22b83ab6ad0..a1479fe949f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -442,27 +442,48 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) /* - * Get a unused, possibly preallocated, ReorderBufferTupleBuf + * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at + * least a tuple of size tuple_len (excluding header overhead). */ ReorderBufferTupleBuf * -ReorderBufferGetTupleBuf(ReorderBuffer *rb) +ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len) { ReorderBufferTupleBuf *tuple; + Size alloc_len; - /* check the slab cache */ - if (rb->nr_cached_tuplebufs) + alloc_len = tuple_len + offsetof(HeapTupleHeaderData, t_bits); + + /* + * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for + * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples + * tuples generated for oldtuples can be bigger, as they don't have + * out-of-line toast columns. + */ + if (alloc_len < MaxHeapTupleSize) + alloc_len = MaxHeapTupleSize; + + + /* if small enough, check the slab cache */ + if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs) { rb->nr_cached_tuplebufs--; tuple = slist_container(ReorderBufferTupleBuf, node, slist_pop_head_node(&rb->cached_tuplebufs)); #ifdef USE_ASSERT_CHECKING - memset(tuple, 0xa9, sizeof(ReorderBufferTupleBuf)); + memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData)); +#endif + tuple->tuple.t_data = ReorderBufferTupleBufData(tuple); +#ifdef USE_ASSERT_CHECKING + memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size); #endif } else { tuple = (ReorderBufferTupleBuf *) - MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf)); + MemoryContextAlloc(rb->context, + sizeof(ReorderBufferTupleBuf) + alloc_len); + tuple->alloc_tuple_size = alloc_len; + tuple->tuple.t_data = ReorderBufferTupleBufData(tuple); } return tuple; @@ -477,13 +498,16 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb) void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple) { - /* check whether to put into the slab cache */ - if (rb->nr_cached_tuplebufs < max_cached_tuplebufs) + /* check whether to put into the slab cache, oversized tuples never are */ + if (tuple->alloc_tuple_size == MaxHeapTupleSize && + rb->nr_cached_tuplebufs < max_cached_tuplebufs) { rb->nr_cached_tuplebufs++; slist_push_head(&rb->cached_tuplebufs, &tuple->node); + VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size); VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf)); VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node)); + VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size)); } else { @@ -2020,17 +2044,18 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, newtup = change->data.tp.newtuple; if (oldtup) - oldlen = offsetof(ReorderBufferTupleBuf, data) - +oldtup->tuple.t_len - - offsetof(HeapTupleHeaderData, t_bits); + { + sz += sizeof(HeapTupleData); + oldlen = oldtup->tuple.t_len; + sz += oldlen; + } if (newtup) - newlen = offsetof(ReorderBufferTupleBuf, data) - +newtup->tuple.t_len - - offsetof(HeapTupleHeaderData, t_bits); - - sz += oldlen; - sz += newlen; + { + sz += sizeof(HeapTupleData); + newlen = newtup->tuple.t_len; + sz += newlen; + } /* make sure we have enough space */ ReorderBufferSerializeReserve(rb, sz); @@ -2041,13 +2066,19 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, if (oldlen) { - memcpy(data, oldtup, oldlen); + memcpy(data, &oldtup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, oldtup->tuple.t_data, oldlen); data += oldlen; } if (newlen) { - memcpy(data, newtup, newlen); + memcpy(data, &newtup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, newtup->tuple.t_data, newlen); data += newlen; } break; @@ -2268,29 +2299,46 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_DELETE: if (change->data.tp.oldtuple) { - Size len = offsetof(ReorderBufferTupleBuf, data) - +((ReorderBufferTupleBuf *) data)->tuple.t_len - - offsetof(HeapTupleHeaderData, t_bits); + Size tuplelen = ((HeapTuple) data)->t_len; - change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb); - memcpy(change->data.tp.oldtuple, data, len); + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(rb, tuplelen - offsetof(HeapTupleHeaderData, t_bits)); + + /* restore ->tuple */ + memcpy(&change->data.tp.oldtuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ change->data.tp.oldtuple->tuple.t_data = - &change->data.tp.oldtuple->header; - data += len; + ReorderBufferTupleBufData(change->data.tp.oldtuple); + + /* restore tuple data itself */ + memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen); + data += tuplelen; } if (change->data.tp.newtuple) { - Size len = offsetof(ReorderBufferTupleBuf, data) - +((ReorderBufferTupleBuf *) data)->tuple.t_len - - offsetof(HeapTupleHeaderData, t_bits); + Size tuplelen = ((HeapTuple) data)->t_len; - change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb); - memcpy(change->data.tp.newtuple, data, len); + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(rb, tuplelen - offsetof(HeapTupleHeaderData, t_bits)); + + /* restore ->tuple */ + memcpy(&change->data.tp.newtuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ change->data.tp.newtuple->tuple.t_data = - &change->data.tp.newtuple->header; - data += len; + ReorderBufferTupleBufData(change->data.tp.newtuple); + + /* restore tuple data itself */ + memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen); + data += tuplelen; } + break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { @@ -2667,7 +2715,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, */ tmphtup = heap_form_tuple(desc, attrs, isnull); Assert(newtup->tuple.t_len <= MaxHeapTupleSize); - Assert(&newtup->header == newtup->tuple.t_data); + Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data); memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len); newtup->tuple.t_len = tmphtup->t_len; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 00183fa16d6..9092c9b4ff6 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -26,12 +26,19 @@ typedef struct ReorderBufferTupleBuf /* position in preallocated list */ slist_node node; - /* tuple, stored sequentially */ + /* tuple header, the interesting bit for users of logical decoding */ HeapTupleData tuple; - HeapTupleHeaderData header; - char data[MaxHeapTupleSize]; + + /* pre-allocated size of tuple buffer, different from tuple size */ + Size alloc_tuple_size; + + /* actual tuple data follows */ } ReorderBufferTupleBuf; +/* pointer to the data stored in a TupleBuf */ +#define ReorderBufferTupleBufData(p) \ + ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf))) + /* * Types of the change passed to a 'change' callback. * @@ -327,7 +334,7 @@ struct ReorderBuffer ReorderBuffer *ReorderBufferAllocate(void); void ReorderBufferFree(ReorderBuffer *); -ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *); +ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len); void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);