1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-06 18:42:54 +03:00

Logical decoding of TRUNCATE

Add a new WAL record type for TRUNCATE, which is only used when
wal_level >= logical.  (For physical replication, TRUNCATE is already
replicated via SMGR records.)  Add new callback for logical decoding
output plugins to receive TRUNCATE actions.

Author: Simon Riggs <simon@2ndquadrant.com>
Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it>
Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
This commit is contained in:
Peter Eisentraut
2018-04-07 11:17:56 -04:00
parent b508a56f2f
commit 5dfd1e5a66
15 changed files with 414 additions and 13 deletions

View File

@@ -9260,6 +9260,13 @@ heap_redo(XLogReaderState *record)
case XLOG_HEAP_UPDATE:
heap_xlog_update(record, false);
break;
case XLOG_HEAP_TRUNCATE:
/*
* TRUNCATE is a no-op because the actions are already logged as
* SMGR WAL records. TRUNCATE WAL record only exists for logical
* decoding.
*/
break;
case XLOG_HEAP_HOT_UPDATE:
heap_xlog_update(record, true);
break;

View File

@@ -75,6 +75,19 @@ heap_desc(StringInfo buf, XLogReaderState *record)
xlrec->new_offnum,
xlrec->new_xmax);
}
else if (info == XLOG_HEAP_TRUNCATE)
{
xl_heap_truncate *xlrec = (xl_heap_truncate *) rec;
int i;
if (xlrec->flags & XLH_TRUNCATE_CASCADE)
appendStringInfo(buf, "cascade ");
if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
appendStringInfo(buf, "restart_seqs ");
appendStringInfo(buf, "nrelids %u relids", xlrec->nrelids);
for (i = 0; i < xlrec->nrelids; i++)
appendStringInfo(buf, " %u", xlrec->relids[i]);
}
else if (info == XLOG_HEAP_CONFIRM)
{
xl_heap_confirm *xlrec = (xl_heap_confirm *) rec;
@@ -186,6 +199,9 @@ heap_identify(uint8 info)
case XLOG_HEAP_HOT_UPDATE | XLOG_HEAP_INIT_PAGE:
id = "HOT_UPDATE+INIT";
break;
case XLOG_HEAP_TRUNCATE:
id = "TRUNCATE";
break;
case XLOG_HEAP_CONFIRM:
id = "HEAP_CONFIRM";
break;

View File

@@ -16,6 +16,7 @@
#include "access/genam.h"
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/multixact.h"
#include "access/reloptions.h"
#include "access/relscan.h"
@@ -1322,11 +1323,7 @@ ExecuteTruncate(TruncateStmt *stmt)
{
List *rels = NIL;
List *relids = NIL;
List *seq_relids = NIL;
EState *estate;
ResultRelInfo *resultRelInfos;
ResultRelInfo *resultRelInfo;
SubTransactionId mySubid;
List *relids_logged = NIL;
ListCell *cell;
/*
@@ -1350,6 +1347,9 @@ ExecuteTruncate(TruncateStmt *stmt)
truncate_check_rel(rel);
rels = lappend(rels, rel);
relids = lappend_oid(relids, myrelid);
/* Log this relation only if needed for logical decoding */
if (RelationIsLogicallyLogged(rel))
relids_logged = lappend_oid(relids_logged, myrelid);
if (recurse)
{
@@ -1370,6 +1370,9 @@ ExecuteTruncate(TruncateStmt *stmt)
truncate_check_rel(rel);
rels = lappend(rels, rel);
relids = lappend_oid(relids, childrelid);
/* Log this relation only if needed for logical decoding */
if (RelationIsLogicallyLogged(rel))
relids_logged = lappend_oid(relids_logged, childrelid);
}
}
else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
@@ -1379,7 +1382,47 @@ ExecuteTruncate(TruncateStmt *stmt)
errhint("Do not specify the ONLY keyword, or use TRUNCATE ONLY on the partitions directly.")));
}
ExecuteTruncateGuts(rels, relids, relids_logged,
stmt->behavior, stmt->restart_seqs);
/* And close the rels */
foreach(cell, rels)
{
Relation rel = (Relation) lfirst(cell);
heap_close(rel, NoLock);
}
}
/*
* ExecuteTruncateGuts
*
* Internal implementation of TRUNCATE. This is called by the actual TRUNCATE
* command (see above) as well as replication subscribers that execute a
* replicated TRUNCATE action.
*
* explicit_rels is the list of Relations to truncate that the command
* specified. relids is the list of Oids corresponding to explicit_rels.
* relids_logged is the list of Oids (a subset of relids) that require
* WAL-logging. This is all a bit redundant, but the existing callers have
* this information handy in this form.
*/
void
ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
DropBehavior behavior, bool restart_seqs)
{
List *rels;
List *seq_relids = NIL;
EState *estate;
ResultRelInfo *resultRelInfos;
ResultRelInfo *resultRelInfo;
SubTransactionId mySubid;
ListCell *cell;
Oid *logrelids;
/*
* Open, exclusive-lock, and check all the explicitly-specified relations
*
* In CASCADE mode, suck in all referencing relations as well. This
* requires multiple iterations to find indirectly-dependent relations. At
* each phase, we need to exclusive-lock new rels before looking for their
@@ -1387,7 +1430,8 @@ ExecuteTruncate(TruncateStmt *stmt)
* soon as we open it, to avoid a faux pas such as holding lock for a long
* time on a rel we have no permissions for.
*/
if (stmt->behavior == DROP_CASCADE)
rels = list_copy(explicit_rels);
if (behavior == DROP_CASCADE)
{
for (;;)
{
@@ -1409,6 +1453,9 @@ ExecuteTruncate(TruncateStmt *stmt)
truncate_check_rel(rel);
rels = lappend(rels, rel);
relids = lappend_oid(relids, relid);
/* Log this relation only if needed for logical decoding */
if (RelationIsLogicallyLogged(rel))
relids_logged = lappend_oid(relids_logged, relid);
}
}
}
@@ -1421,7 +1468,7 @@ ExecuteTruncate(TruncateStmt *stmt)
#ifdef USE_ASSERT_CHECKING
heap_truncate_check_FKs(rels, false);
#else
if (stmt->behavior == DROP_RESTRICT)
if (behavior == DROP_RESTRICT)
heap_truncate_check_FKs(rels, false);
#endif
@@ -1431,7 +1478,7 @@ ExecuteTruncate(TruncateStmt *stmt)
* We want to do this early since it's pointless to do all the truncation
* work only to fail on sequence permissions.
*/
if (stmt->restart_seqs)
if (restart_seqs)
{
foreach(cell, rels)
{
@@ -1586,6 +1633,41 @@ ExecuteTruncate(TruncateStmt *stmt)
ResetSequence(seq_relid);
}
/*
* Write a WAL record to allow this set of actions to be logically decoded.
*
* Assemble an array of relids so we can write a single WAL record for the
* whole action.
*/
if (list_length(relids_logged) > 0)
{
xl_heap_truncate xlrec;
int i = 0;
/* should only get here if wal_level >= logical */
Assert(XLogLogicalInfoActive());
logrelids = palloc(list_length(relids_logged) * sizeof(Oid));
foreach (cell, relids_logged)
logrelids[i++] = lfirst_oid(cell);
xlrec.dbId = MyDatabaseId;
xlrec.nrelids = list_length(relids_logged);
xlrec.flags = 0;
if (behavior == DROP_CASCADE)
xlrec.flags |= XLH_TRUNCATE_CASCADE;
if (restart_seqs)
xlrec.flags |= XLH_TRUNCATE_RESTART_SEQS;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfHeapTruncate);
XLogRegisterData((char *) logrelids, list_length(relids_logged) * sizeof(Oid));
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
(void) XLogInsert(RM_HEAP_ID, XLOG_HEAP_TRUNCATE);
}
/*
* Process all AFTER STATEMENT TRUNCATE triggers.
*/
@@ -1603,7 +1685,11 @@ ExecuteTruncate(TruncateStmt *stmt)
/* We can clean up the EState now */
FreeExecutorState(estate);
/* And close the rels (can't do this while EState still holds refs) */
/*
* Close any rels opened by CASCADE (can't do this while EState still
* holds refs)
*/
rels = list_difference_ptr(rels, explicit_rels);
foreach(cell, rels)
{
Relation rel = (Relation) lfirst(cell);

View File

@@ -65,6 +65,7 @@ static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *bu
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -450,6 +451,11 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
DecodeDelete(ctx, buf);
break;
case XLOG_HEAP_TRUNCATE:
if (SnapBuildProcessChange(builder, xid, buf->origptr))
DecodeTruncate(ctx, buf);
break;
case XLOG_HEAP_INPLACE:
/*
@@ -826,6 +832,41 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
}
/*
* Parse XLOG_HEAP_TRUNCATE from wal
*/
static void
DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
XLogReaderState *r = buf->record;
xl_heap_truncate *xlrec;
ReorderBufferChange *change;
xlrec = (xl_heap_truncate *) XLogRecGetData(r);
/* only interested in our database */
if (xlrec->dbId != ctx->slot->data.database)
return;
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
change->origin_id = XLogRecGetOrigin(r);
if (xlrec->flags & XLH_TRUNCATE_CASCADE)
change->data.truncate.cascade = true;
if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
change->data.truncate.restart_seqs = true;
change->data.truncate.nrelids = xlrec->nrelids;
change->data.truncate.relids = palloc(xlrec->nrelids * sizeof(Oid));
memcpy(change->data.truncate.relids, xlrec->relids,
xlrec->nrelids * sizeof(Oid));
ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
buf->origptr, change);
}
/*
* Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
*

View File

@@ -62,6 +62,8 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change);
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
@@ -183,6 +185,7 @@ StartupDecodingContext(List *output_plugin_options,
/* wrap output plugin callbacks, so we can add error context information */
ctx->reorder->begin = begin_cb_wrapper;
ctx->reorder->apply_change = change_cb_wrapper;
ctx->reorder->apply_truncate = truncate_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
@@ -734,6 +737,46 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
static void
truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
Assert(!ctx->fast_forward);
if (!ctx->callbacks.truncate_cb)
return;
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "truncate";
state.report_location = change->lsn;
errcallback.callback = output_plugin_error_callback;
errcallback.arg = (void *) &state;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/* set output state */
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
/*
* report this change's lsn so replies from clients can give an up2date
* answer. This won't ever be enough (and shouldn't be!) to confirm
* receipt of this transaction, but it might allow another transaction's
* commit to be confirmed with one message.
*/
ctx->write_location = change->lsn;
ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
}
bool
filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
{

View File

@@ -415,6 +415,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
case REORDER_BUFFER_CHANGE_TRUNCATE:
break;
}
@@ -1491,6 +1492,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
specinsert = change;
break;
case REORDER_BUFFER_CHANGE_TRUNCATE:
{
int i;
int nrelids = change->data.truncate.nrelids;
int nrelations = 0;
Relation *relations;
relations = palloc0(nrelids * sizeof(Relation));
for (i = 0; i < nrelids; i++)
{
Oid relid = change->data.truncate.relids[i];
Relation relation;
relation = RelationIdGetRelation(relid);
if (relation == NULL)
elog(ERROR, "could not open relation with OID %u", relid);
if (!RelationIsLogicallyLogged(relation))
continue;
relations[nrelations++] = relation;
}
rb->apply_truncate(rb, txn, nrelations, relations, change);
for (i = 0; i < nrelations; i++)
RelationClose(relations[i]);
break;
}
case REORDER_BUFFER_CHANGE_MESSAGE:
rb->message(rb, txn, change->lsn, true,
change->data.msg.prefix,
@@ -2255,6 +2288,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
break;
}
case REORDER_BUFFER_CHANGE_TRUNCATE:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
@@ -2534,6 +2568,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
/* the base struct contains all the data, easy peasy */
case REORDER_BUFFER_CHANGE_TRUNCATE:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:

View File

@@ -32,7 +32,7 @@
#define XLOG_HEAP_INSERT 0x00
#define XLOG_HEAP_DELETE 0x10
#define XLOG_HEAP_UPDATE 0x20
/* 0x030 is free, was XLOG_HEAP_MOVE */
#define XLOG_HEAP_TRUNCATE 0x30
#define XLOG_HEAP_HOT_UPDATE 0x40
#define XLOG_HEAP_CONFIRM 0x50
#define XLOG_HEAP_LOCK 0x60
@@ -109,6 +109,27 @@ typedef struct xl_heap_delete
#define SizeOfHeapDelete (offsetof(xl_heap_delete, flags) + sizeof(uint8))
/*
* xl_heap_delete flag values, 8 bits are available.
*/
#define XLH_TRUNCATE_CASCADE (1<<0)
#define XLH_TRUNCATE_RESTART_SEQS (1<<1)
/*
* For truncate we list all truncated relids in an array, followed by all
* sequence relids that need to be restarted, if any.
* All rels are always within the same database, so we just list dbid once.
*/
typedef struct xl_heap_truncate
{
Oid dbId;
uint32 nrelids;
uint8 flags;
Oid relids[FLEXIBLE_ARRAY_MEMBER];
} xl_heap_truncate;
#define SizeOfHeapTruncate (offsetof(xl_heap_truncate, relids))
/*
* We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
* or updated tuple in WAL; we can save a few bytes by reconstructing the

View File

@@ -54,6 +54,8 @@ extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
extern void CheckTableNotInUse(Relation rel, const char *stmt);
extern void ExecuteTruncate(TruncateStmt *stmt);
extern void ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
DropBehavior behavior, bool restart_seqs);
extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass);

View File

@@ -61,6 +61,15 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
Relation relation,
ReorderBufferChange *change);
/*
* Callback for every TRUNCATE in a successful transaction.
*/
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
/*
* Called for every (explicit or implicit) COMMIT of a successful transaction.
*/
@@ -98,6 +107,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;

View File

@@ -59,7 +59,8 @@ enum ReorderBufferChangeType
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
REORDER_BUFFER_CHANGE_TRUNCATE
};
/*
@@ -99,6 +100,18 @@ typedef struct ReorderBufferChange
ReorderBufferTupleBuf *newtuple;
} tp;
/*
* Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing
* one set of relations to be truncated.
*/
struct
{
Size nrelids;
bool cascade;
bool restart_seqs;
Oid *relids;
} truncate;
/* Message with arbitrary data. */
struct
{
@@ -283,6 +296,14 @@ typedef void (*ReorderBufferApplyChangeCB) (
Relation relation,
ReorderBufferChange *change);
/* truncate callback signature */
typedef void (*ReorderBufferApplyTruncateCB) (
ReorderBuffer *rb,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
/* begin callback signature */
typedef void (*ReorderBufferBeginCB) (
ReorderBuffer *rb,
@@ -328,6 +349,7 @@ struct ReorderBuffer
*/
ReorderBufferBeginCB begin;
ReorderBufferApplyChangeCB apply_change;
ReorderBufferApplyTruncateCB apply_truncate;
ReorderBufferCommitCB commit;
ReorderBufferMessageCB message;