diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 36e6d9a5c90..13af48524ab 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -607,13 +607,14 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) { - Size tuplelen; - char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); + Size datalen; + char *tupledata = XLogRecGetBlockData(r, 0, &datalen); + Size tuplelen = datalen - SizeOfHeapHeader; change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); - DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple); + DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple); } change->data.tp.clear_toast_afterwards = true; @@ -634,7 +635,6 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_update *xlrec; ReorderBufferChange *change; char *data; - Size datalen; RelFileNode target_node; xlrec = (xl_heap_update *) XLogRecGetData(r); @@ -655,22 +655,31 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE) { + Size datalen; + Size tuplelen; + data = XLogRecGetBlockData(r, 0, &datalen); + tuplelen = datalen - SizeOfHeapHeader; + change->data.tp.newtuple = - ReorderBufferGetTupleBuf(ctx->reorder, datalen); + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple(data, datalen, change->data.tp.newtuple); } if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD) { + Size datalen; + Size tuplelen; + /* caution, remaining data in record is not aligned */ data = XLogRecGetData(r) + SizeOfHeapUpdate; datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate; + tuplelen = datalen - SizeOfHeapHeader; change->data.tp.oldtuple = - ReorderBufferGetTupleBuf(ctx->reorder, datalen); + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); } @@ -720,15 +729,16 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* old primary key stored */ if (xlrec->flags & XLH_DELETE_CONTAINS_OLD) { - Size len = XLogRecGetDataLen(r) - SizeOfHeapDelete; + Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete; + Size tuplelen = datalen - SizeOfHeapHeader; Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); change->data.tp.oldtuple = - ReorderBufferGetTupleBuf(ctx->reorder, len); + ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, - len, change->data.tp.oldtuple); + datalen, change->data.tp.oldtuple); } change->data.tp.clear_toast_afterwards = true; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index e2ad4728cdc..f2b8f4b7b84 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -471,12 +471,15 @@ ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len) rb->nr_cached_tuplebufs--; tuple = slist_container(ReorderBufferTupleBuf, node, slist_pop_head_node(&rb->cached_tuplebufs)); + Assert(tuple->alloc_tuple_size == MaxHeapTupleSize); #ifdef USE_ASSERT_CHECKING memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData)); + VALGRIND_MAKE_MEM_UNDEFINED(&tuple->tuple, sizeof(HeapTupleData)); #endif tuple->tuple.t_data = ReorderBufferTupleBufData(tuple); #ifdef USE_ASSERT_CHECKING memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size); + VALGRIND_MAKE_MEM_UNDEFINED(tuple->tuple.t_data, tuple->alloc_tuple_size); #endif } else @@ -2152,7 +2155,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, data += sizeof(HeapTupleData); memcpy(data, newtup->tuple.t_data, newlen); - data += oldlen; + data += newlen; } break; }