1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-05 23:56:58 +03:00

logical decoding: Fix handling of large old tuples with replica identity full.

When decoding the old version of an UPDATE or DELETE change, and if that
tuple was bigger than MaxHeapTupleSize, we either Assert'ed out, or
failed in more subtle ways in non-assert builds.  Normally individual
tuples aren't bigger than MaxHeapTupleSize, with big datums toasted.
But that's not the case for the old version of a tuple for logical
decoding; the replica identity is logged as one piece. With the default
replica identity btree limits that to small tuples, but that's not the
case for FULL.

Change the tuple buffer infrastructure to separate allocate over-large
tuples, instead of always going through the slab cache.

This unfortunately requires changing the ReorderBufferTupleBuf
definition, we need to store the allocated size someplace. To avoid
requiring output plugins to recompile, don't store HeapTupleHeaderData
directly after HeapTupleData, but point to it via t_data; that leaves
rooms for the allocated size.  As there's no reason for an output plugin
to look at ReorderBufferTupleBuf->t_data.header, remove the field. It
was just a minor convenience having it directly accessible.

Reported-By: Adam Dratwiński
Discussion: CAKg6ypLd7773AOX4DiOGRwQk1TVOQKhNwjYiVjJnpq8Wo+i62Q@mail.gmail.com
This commit is contained in:
Andres Freund 2016-03-05 18:02:20 -08:00
parent a50f50a652
commit 3b94b3a496
5 changed files with 237 additions and 73 deletions

View File

@ -285,6 +285,64 @@ SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot',
COMMIT COMMIT
(232 rows) (232 rows)
-- test we can decode "old" tuples bigger than the max heap tuple size correctly
DROP TABLE IF EXISTS toasted_several;
NOTICE: table "toasted_several" does not exist, skipping
CREATE TABLE toasted_several (
id serial unique not null,
toasted_key text primary key,
toasted_col1 text,
toasted_col2 text
);
ALTER TABLE toasted_several REPLICA IDENTITY FULL;
ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL;
ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL;
ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL;
INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 2000));
SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
regexp_replace
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
BEGIN
table public.toasted_several: INSERT: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..098765432109876543210987654321098765432109876543210' toasted_col1[text]:null toasted_col2[text]:null
COMMIT
(3 rows)
-- test update of a toasted key without changing it
UPDATE toasted_several SET toasted_col1 = toasted_key;
UPDATE toasted_several SET toasted_col2 = toasted_col1;
SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
regexp_replace
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
BEGIN
table public.toasted_several: INSERT: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..098765432109876543210987654321098765432109876543210' toasted_col1[text]:null toasted_col2[text]:null
COMMIT
BEGIN
table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..432109876543210987654321098765432109876543210987654321098765432109876543210' toasted_col2[text]:null
COMMIT
BEGIN
table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..876543210987654321098765432109876543210987654321098765432109876543210987654321098765432109876543210'
COMMIT
(9 rows)
/*
* update with large tuplebuf, in a transaction large enough to force to spool to disk
*/
BEGIN;
INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234);
UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1;
DELETE FROM toasted_several WHERE id = 1;
COMMIT;
DROP TABLE toasted_several;
SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
WHERE data NOT LIKE '%INSERT: %';
regexp_replace
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
BEGIN
table public.toasted_several: UPDATE: old-key: id[integer]:1 toasted_key[text]:'98765432109876543210..7654321098765432109876543210987654321098765432109876543210' toasted_col2[text]:unchanged-toast-datum
table public.toasted_several: DELETE: id[integer]:1 toasted_key[text]:'98765432109876543210987654321..876543210987654321098765432109876543210987654321098765432109876543210987654321098765432109876543210'
COMMIT
(4 rows)
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot pg_drop_replication_slot
-------------------------- --------------------------

View File

@ -260,4 +260,41 @@ ALTER TABLE toasted_copy ALTER COLUMN data SET STORAGE EXTERNAL;
203 untoasted200 203 untoasted200
\. \.
SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- test we can decode "old" tuples bigger than the max heap tuple size correctly
DROP TABLE IF EXISTS toasted_several;
CREATE TABLE toasted_several (
id serial unique not null,
toasted_key text primary key,
toasted_col1 text,
toasted_col2 text
);
ALTER TABLE toasted_several REPLICA IDENTITY FULL;
ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL;
ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL;
ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL;
INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 2000));
SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- test update of a toasted key without changing it
UPDATE toasted_several SET toasted_col1 = toasted_key;
UPDATE toasted_several SET toasted_col2 = toasted_col1;
SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
/*
* update with large tuplebuf, in a transaction large enough to force to spool to disk
*/
BEGIN;
INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234);
UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1;
DELETE FROM toasted_several WHERE id = 1;
COMMIT;
DROP TABLE toasted_several;
SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
WHERE data NOT LIKE '%INSERT: %';
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');

View File

@ -625,13 +625,15 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{ {
Size tuplelen = r->xl_len - SizeOfHeapInsert;
Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader)); Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader));
change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); change->data.tp.newtuple =
ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert, DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert,
r->xl_len - SizeOfHeapInsert, tuplelen, change->data.tp.newtuple);
change->data.tp.newtuple);
} }
change->data.tp.clear_toast_afterwards = true; change->data.tp.clear_toast_afterwards = true;
@ -650,7 +652,6 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{ {
XLogRecord *r = &buf->record; XLogRecord *r = &buf->record;
xl_heap_update *xlrec; xl_heap_update *xlrec;
xl_heap_header_len xlhdr;
ReorderBufferChange *change; ReorderBufferChange *change;
char *data; char *data;
@ -669,16 +670,20 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{ {
Size tuplelen;
xl_heap_header_len xlhdr;
Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen)); Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen));
memcpy(&xlhdr, data, sizeof(xlhdr)); memcpy(&xlhdr, data, sizeof(xlhdr));
data += offsetof(xl_heap_header_len, header); data += offsetof(xl_heap_header_len, header);
change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); tuplelen = xlhdr.t_len + SizeOfHeapHeader;
DecodeXLogTuple(data, change->data.tp.newtuple =
xlhdr.t_len + SizeOfHeapHeader, ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
change->data.tp.newtuple);
DecodeXLogTuple(data, tuplelen, change->data.tp.newtuple);
/* skip over the rest of the tuple header */ /* skip over the rest of the tuple header */
data += SizeOfHeapHeader; data += SizeOfHeapHeader;
/* skip over the tuple data */ /* skip over the tuple data */
@ -687,14 +692,18 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
{ {
Size tuplelen;
xl_heap_header_len xlhdr;
memcpy(&xlhdr, data, sizeof(xlhdr)); memcpy(&xlhdr, data, sizeof(xlhdr));
data += offsetof(xl_heap_header_len, header); data += offsetof(xl_heap_header_len, header);
change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); tuplelen = xlhdr.t_len + SizeOfHeapHeader;
DecodeXLogTuple(data, change->data.tp.oldtuple =
xlhdr.t_len + SizeOfHeapHeader, ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
change->data.tp.oldtuple);
DecodeXLogTuple(data, tuplelen, change->data.tp.oldtuple);
#ifdef NOT_USED #ifdef NOT_USED
data += SizeOfHeapHeader; data += SizeOfHeapHeader;
data += xlhdr.t_len; data += xlhdr.t_len;
@ -732,13 +741,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/* old primary key stored */ /* old primary key stored */
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
{ {
Size len = r->xl_len - SizeOfHeapDelete;
Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader)); Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader));
change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); change->data.tp.oldtuple =
ReorderBufferGetTupleBuf(ctx->reorder, len);
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
r->xl_len - SizeOfHeapDelete, len, change->data.tp.oldtuple);
change->data.tp.oldtuple);
} }
change->data.tp.clear_toast_afterwards = true; change->data.tp.clear_toast_afterwards = true;
@ -795,37 +806,40 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/ */
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{ {
change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); HeapTupleHeader header;
tuple = change->data.tp.newtuple;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data); xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
data = ((char *) xlhdr) + SizeOfMultiInsertTuple; data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
datalen = xlhdr->datalen; datalen = xlhdr->datalen;
change->data.tp.newtuple =
ReorderBufferGetTupleBuf(ctx->reorder, datalen);
tuple = change->data.tp.newtuple;
header = tuple->tuple.t_data;
/* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self);
/* /*
* We can only figure this out after reassembling the * We can only figure this out after reassembling the
* transactions. * transactions.
*/ */
tuple->tuple.t_tableOid = InvalidOid; tuple->tuple.t_tableOid = InvalidOid;
tuple->tuple.t_data = &tuple->header;
tuple->tuple.t_len = datalen tuple->tuple.t_len = datalen
+ offsetof(HeapTupleHeaderData, t_bits); + offsetof(HeapTupleHeaderData, t_bits);
memset(&tuple->header, 0, sizeof(HeapTupleHeaderData)); memset(header, 0, offsetof(HeapTupleHeaderData, t_bits));
memcpy((char *) &tuple->header memcpy((char *) tuple->tuple.t_data + offsetof(HeapTupleHeaderData, t_bits),
+ offsetof(HeapTupleHeaderData, t_bits),
(char *) data, (char *) data,
datalen); datalen);
data += datalen; data += datalen;
tuple->header.t_infomask = xlhdr->t_infomask; header->t_infomask = xlhdr->t_infomask;
tuple->header.t_infomask2 = xlhdr->t_infomask2; header->t_infomask2 = xlhdr->t_infomask2;
tuple->header.t_hoff = xlhdr->t_hoff; header->t_hoff = xlhdr->t_hoff;
} }
/* /*
@ -856,31 +870,31 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
{ {
xl_heap_header xlhdr; xl_heap_header xlhdr;
int datalen = len - SizeOfHeapHeader; int datalen = len - SizeOfHeapHeader;
HeapTupleHeader header;
Assert(datalen >= 0); Assert(datalen >= 0);
Assert(datalen <= MaxHeapTupleSize);
tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits); tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
header = tuple->tuple.t_data;
/* not a disk based tuple */ /* not a disk based tuple */
ItemPointerSetInvalid(&tuple->tuple.t_self); ItemPointerSetInvalid(&tuple->tuple.t_self);
/* we can only figure this out after reassembling the transactions */ /* we can only figure this out after reassembling the transactions */
tuple->tuple.t_tableOid = InvalidOid; tuple->tuple.t_tableOid = InvalidOid;
tuple->tuple.t_data = &tuple->header;
/* data is not stored aligned, copy to aligned storage */ /* data is not stored aligned, copy to aligned storage */
memcpy((char *) &xlhdr, memcpy((char *) &xlhdr,
data, data,
SizeOfHeapHeader); SizeOfHeapHeader);
memset(&tuple->header, 0, sizeof(HeapTupleHeaderData)); memset(header, 0, offsetof(HeapTupleHeaderData, t_bits));
memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits), memcpy(((char *) tuple->tuple.t_data) + offsetof(HeapTupleHeaderData, t_bits),
data + SizeOfHeapHeader, data + SizeOfHeapHeader,
datalen); datalen);
tuple->header.t_infomask = xlhdr.t_infomask; header->t_infomask = xlhdr.t_infomask;
tuple->header.t_infomask2 = xlhdr.t_infomask2; header->t_infomask2 = xlhdr.t_infomask2;
tuple->header.t_hoff = xlhdr.t_hoff; header->t_hoff = xlhdr.t_hoff;
} }

View File

@ -442,27 +442,48 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
/* /*
* Get a unused, possibly preallocated, ReorderBufferTupleBuf * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
* least a tuple of size tuple_len (excluding header overhead).
*/ */
ReorderBufferTupleBuf * ReorderBufferTupleBuf *
ReorderBufferGetTupleBuf(ReorderBuffer *rb) ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
{ {
ReorderBufferTupleBuf *tuple; ReorderBufferTupleBuf *tuple;
Size alloc_len;
/* check the slab cache */ alloc_len = tuple_len + offsetof(HeapTupleHeaderData, t_bits);
if (rb->nr_cached_tuplebufs)
/*
* Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
* those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
* tuples generated for oldtuples can be bigger, as they don't have
* out-of-line toast columns.
*/
if (alloc_len < MaxHeapTupleSize)
alloc_len = MaxHeapTupleSize;
/* if small enough, check the slab cache */
if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
{ {
rb->nr_cached_tuplebufs--; rb->nr_cached_tuplebufs--;
tuple = slist_container(ReorderBufferTupleBuf, node, tuple = slist_container(ReorderBufferTupleBuf, node,
slist_pop_head_node(&rb->cached_tuplebufs)); slist_pop_head_node(&rb->cached_tuplebufs));
#ifdef USE_ASSERT_CHECKING #ifdef USE_ASSERT_CHECKING
memset(tuple, 0xa9, sizeof(ReorderBufferTupleBuf)); memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
#endif
tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
#ifdef USE_ASSERT_CHECKING
memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
#endif #endif
} }
else else
{ {
tuple = (ReorderBufferTupleBuf *) tuple = (ReorderBufferTupleBuf *)
MemoryContextAlloc(rb->context, sizeof(ReorderBufferTupleBuf)); MemoryContextAlloc(rb->context,
sizeof(ReorderBufferTupleBuf) + alloc_len);
tuple->alloc_tuple_size = alloc_len;
tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
} }
return tuple; return tuple;
@ -477,13 +498,16 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb)
void void
ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple) ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
{ {
/* check whether to put into the slab cache */ /* check whether to put into the slab cache, oversized tuples never are */
if (rb->nr_cached_tuplebufs < max_cached_tuplebufs) if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
rb->nr_cached_tuplebufs < max_cached_tuplebufs)
{ {
rb->nr_cached_tuplebufs++; rb->nr_cached_tuplebufs++;
slist_push_head(&rb->cached_tuplebufs, &tuple->node); slist_push_head(&rb->cached_tuplebufs, &tuple->node);
VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size);
VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf)); VALGRIND_MAKE_MEM_UNDEFINED(tuple, sizeof(ReorderBufferTupleBuf));
VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node)); VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
VALGRIND_MAKE_MEM_DEFINED(&tuple->alloc_tuple_size, sizeof(tuple->alloc_tuple_size));
} }
else else
{ {
@ -2020,17 +2044,18 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
newtup = change->data.tp.newtuple; newtup = change->data.tp.newtuple;
if (oldtup) if (oldtup)
oldlen = offsetof(ReorderBufferTupleBuf, data) {
+oldtup->tuple.t_len sz += sizeof(HeapTupleData);
- offsetof(HeapTupleHeaderData, t_bits); oldlen = oldtup->tuple.t_len;
sz += oldlen;
}
if (newtup) if (newtup)
newlen = offsetof(ReorderBufferTupleBuf, data) {
+newtup->tuple.t_len sz += sizeof(HeapTupleData);
- offsetof(HeapTupleHeaderData, t_bits); newlen = newtup->tuple.t_len;
sz += newlen;
sz += oldlen; }
sz += newlen;
/* make sure we have enough space */ /* make sure we have enough space */
ReorderBufferSerializeReserve(rb, sz); ReorderBufferSerializeReserve(rb, sz);
@ -2041,13 +2066,19 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (oldlen) if (oldlen)
{ {
memcpy(data, oldtup, oldlen); memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
memcpy(data, oldtup->tuple.t_data, oldlen);
data += oldlen; data += oldlen;
} }
if (newlen) if (newlen)
{ {
memcpy(data, newtup, newlen); memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
memcpy(data, newtup->tuple.t_data, newlen);
data += newlen; data += newlen;
} }
break; break;
@ -2268,29 +2299,46 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_DELETE:
if (change->data.tp.oldtuple) if (change->data.tp.oldtuple)
{ {
Size len = offsetof(ReorderBufferTupleBuf, data) Size tuplelen = ((HeapTuple) data)->t_len;
+((ReorderBufferTupleBuf *) data)->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb); change->data.tp.oldtuple =
memcpy(change->data.tp.oldtuple, data, len); ReorderBufferGetTupleBuf(rb, tuplelen - offsetof(HeapTupleHeaderData, t_bits));
/* restore ->tuple */
memcpy(&change->data.tp.oldtuple->tuple, data,
sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
/* reset t_data pointer into the new tuplebuf */
change->data.tp.oldtuple->tuple.t_data = change->data.tp.oldtuple->tuple.t_data =
&change->data.tp.oldtuple->header; ReorderBufferTupleBufData(change->data.tp.oldtuple);
data += len;
/* restore tuple data itself */
memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
data += tuplelen;
} }
if (change->data.tp.newtuple) if (change->data.tp.newtuple)
{ {
Size len = offsetof(ReorderBufferTupleBuf, data) Size tuplelen = ((HeapTuple) data)->t_len;
+((ReorderBufferTupleBuf *) data)->tuple.t_len
- offsetof(HeapTupleHeaderData, t_bits);
change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb); change->data.tp.newtuple =
memcpy(change->data.tp.newtuple, data, len); ReorderBufferGetTupleBuf(rb, tuplelen - offsetof(HeapTupleHeaderData, t_bits));
/* restore ->tuple */
memcpy(&change->data.tp.newtuple->tuple, data,
sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
/* reset t_data pointer into the new tuplebuf */
change->data.tp.newtuple->tuple.t_data = change->data.tp.newtuple->tuple.t_data =
&change->data.tp.newtuple->header; ReorderBufferTupleBufData(change->data.tp.newtuple);
data += len;
/* restore tuple data itself */
memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
data += tuplelen;
} }
break; break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{ {
@ -2667,7 +2715,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/ */
tmphtup = heap_form_tuple(desc, attrs, isnull); tmphtup = heap_form_tuple(desc, attrs, isnull);
Assert(newtup->tuple.t_len <= MaxHeapTupleSize); Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
Assert(&newtup->header == newtup->tuple.t_data); Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len); memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
newtup->tuple.t_len = tmphtup->t_len; newtup->tuple.t_len = tmphtup->t_len;

View File

@ -26,12 +26,19 @@ typedef struct ReorderBufferTupleBuf
/* position in preallocated list */ /* position in preallocated list */
slist_node node; slist_node node;
/* tuple, stored sequentially */ /* tuple header, the interesting bit for users of logical decoding */
HeapTupleData tuple; HeapTupleData tuple;
HeapTupleHeaderData header;
char data[MaxHeapTupleSize]; /* pre-allocated size of tuple buffer, different from tuple size */
Size alloc_tuple_size;
/* actual tuple data follows */
} ReorderBufferTupleBuf; } ReorderBufferTupleBuf;
/* pointer to the data stored in a TupleBuf */
#define ReorderBufferTupleBufData(p) \
((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
/* /*
* Types of the change passed to a 'change' callback. * Types of the change passed to a 'change' callback.
* *
@ -327,7 +334,7 @@ struct ReorderBuffer
ReorderBuffer *ReorderBufferAllocate(void); ReorderBuffer *ReorderBufferAllocate(void);
void ReorderBufferFree(ReorderBuffer *); void ReorderBufferFree(ReorderBuffer *);
ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *); ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple); void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *); ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);