From 208c5d65bbd60e33e272964578cb74182ac726a8 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Tue, 22 Mar 2022 07:11:19 +0530 Subject: [PATCH] Add ALTER SUBSCRIPTION ... SKIP. This feature allows skipping the transaction on subscriber nodes. If incoming change violates any constraint, logical replication stops until it's resolved. Currently, users need to either manually resolve the conflict by updating a subscriber-side database or by using function pg_replication_origin_advance() to skip the conflicting transaction. This commit introduces a simpler way to skip the conflicting transactions. The user can specify LSN by ALTER SUBSCRIPTION ... SKIP (lsn = XXX), which allows the apply worker to skip the transaction finished at specified LSN. The apply worker skips all data modification changes within the transaction. Author: Masahiko Sawada Reviewed-by: Takamichi Osumi, Hou Zhijie, Peter Eisentraut, Amit Kapila, Shi Yu, Vignesh C, Greg Nancarrow, Haiying Tang, Euler Taveira Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com --- doc/src/sgml/catalogs.sgml | 10 + doc/src/sgml/logical-replication.sgml | 27 +- doc/src/sgml/ref/alter_subscription.sgml | 42 ++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 73 ++++++ src/backend/parser/gram.y | 9 + src/backend/replication/logical/worker.c | 233 +++++++++++++++++- src/bin/pg_dump/pg_dump.c | 4 + src/bin/psql/describe.c | 8 +- src/bin/psql/tab-complete.c | 5 +- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_subscription.h | 5 + src/include/nodes/parsenodes.h | 3 +- src/test/regress/expected/subscription.out | 126 +++++----- src/test/regress/sql/subscription.sql | 11 + .../subscription/t/029_disable_on_error.pl | 94 ------- src/test/subscription/t/029_on_error.pl | 183 ++++++++++++++ 18 files changed, 665 insertions(+), 173 deletions(-) delete mode 100644 src/test/subscription/t/029_disable_on_error.pl create mode 100644 src/test/subscription/t/029_on_error.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 4dc5b34d21c..2a8cd026649 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7797,6 +7797,16 @@ SCRAM-SHA-256$<iteration count>:&l + + + subskiplsn pg_lsn + + + Finish LSN of the transaction whose changes are to be skipped, if a valid + LSN; otherwise 0/0. + + + subconninfo text diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 6431d4796db..555fbd749cc 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -362,19 +362,24 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER The LSN of the transaction that contains the change violating the constraint and the replication origin name can be found from the server log (LSN 0/14C0378 and - replication origin pg_16395 in the above case). To skip the - transaction, the subscription needs to be disabled temporarily by - ALTER SUBSCRIPTION ... DISABLE first or alternatively, the + replication origin pg_16395 in the above case). The + transaction that produces conflict can be skipped by using + ALTER SUBSCRIPTION ... SKIP with the finish LSN + (i.e., LSN 0/14C0378). The finish LSN could be an LSN at which the transaction + is committed or prepared on the publisher. Alternatively, the transaction can + also be skipped by calling the + pg_replication_origin_advance() function + transaction. Before using this function, the subscription needs to be disabled + temporarily either by ALTER SUBSCRIPTION ... DISABLE or, the subscription can be used with the disable_on_error option. - Then, the transaction can be skipped by calling the - - pg_replication_origin_advance() function with - the node_name (i.e., pg_16395) and the - next LSN of the transaction's LSN (i.e., LSN 0/14C0379). After that the replication - can be resumed by ALTER SUBSCRIPTION ... ENABLE. The current - position of origins can be seen in the - + Then, you can use pg_replication_origin_advance() function + with the node_name (i.e., pg_16395) + and the next LSN of the finish LSN (i.e., 0/14C0379). The current position of + origins can be seen in the pg_replication_origin_status system view. + Please note that skipping the whole transaction include skipping changes that + might not violate any constraint. This can easily make the subscriber + inconsistent. diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 58b78a94eab..ac2db249cbb 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -29,6 +29,7 @@ ALTER SUBSCRIPTION name REFRESH PUB ALTER SUBSCRIPTION name ENABLE ALTER SUBSCRIPTION name DISABLE ALTER SUBSCRIPTION name SET ( subscription_parameter [= value] [, ... ] ) +ALTER SUBSCRIPTION name SKIP ( skip_option = value ) ALTER SUBSCRIPTION name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER SUBSCRIPTION name RENAME TO new_name @@ -210,6 +211,47 @@ ALTER SUBSCRIPTION name RENAME TO < + + SKIP ( skip_option = value ) + + + Skips applying all changes of the remote transaction. If incoming data + violates any constraints, logical replication will stop until it is + resolved. By using ALTER SUBSCRIPTION ... SKIP command, + the logical replication worker skips all data modification changes within + the transaction. This option has no effect on the transactions that are + already prepared by enabling two_phase on + subscriber. + After logical replication worker successfully skips the transaction or + finishes a transaction, LSN (stored in + pg_subscription.subskiplsn) + is cleared. See for + the details of logical replication conflicts. Using this command requires + superuser privilege. + + + + skip_option specifies options for this operation. + The supported option is: + + + + lsn (pg_lsn) + + + Specifies the finish LSN of the remote transaction whose changes + are to be skipped by the logical replication worker. The finish LSN + is the LSN at which the transaction is either committed or prepared. + Skipping individual subtransaction is not supported. Setting + NONE resets the LSN. + + + + + + + + new_owner diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a6304f5f81a..0ff0982f7b2 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -70,6 +70,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->skiplsn = subform->subskiplsn; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index bb1ac30cd19..bd48ee7bd25 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1261,7 +1261,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary, - substream, subtwophasestate, subdisableonerr, subslotname, + substream, subtwophasestate, subdisableonerr, subskiplsn, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3922658bbca..e16f04626de 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -45,6 +45,7 @@ #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/syscache.h" /* @@ -62,6 +63,7 @@ #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 #define SUBOPT_DISABLE_ON_ERR 0x00000400 +#define SUBOPT_LSN 0x00000800 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -84,6 +86,7 @@ typedef struct SubOpts bool streaming; bool twophase; bool disableonerr; + XLogRecPtr lsn; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -262,6 +265,33 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_DISABLE_ON_ERR; opts->disableonerr = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_LSN) && + strcmp(defel->defname, "lsn") == 0) + { + char *lsn_str = defGetString(defel); + XLogRecPtr lsn; + + if (IsSet(opts->specified_opts, SUBOPT_LSN)) + errorConflictingDefElem(defel, pstate); + + /* Setting lsn = NONE is treated as resetting LSN */ + if (strcmp(lsn_str, "none") == 0) + lsn = InvalidXLogRecPtr; + else + { + /* Parse the argument as LSN */ + lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, + CStringGetDatum(lsn_str))); + + if (XLogRecPtrIsInvalid(lsn)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid WAL location (LSN): %s", lsn_str))); + } + + opts->specified_opts |= SUBOPT_LSN; + opts->lsn = lsn; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -479,6 +509,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1106,6 +1137,48 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_SKIP: + { + parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts); + + /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */ + Assert(IsSet(opts.specified_opts, SUBOPT_LSN)); + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to skip transaction"))); + + /* + * If the user sets subskiplsn, we do a sanity check to make + * sure that the specified LSN is a probable value. + */ + if (!XLogRecPtrIsInvalid(opts.lsn)) + { + RepOriginId originid; + char originname[NAMEDATALEN]; + XLogRecPtr remote_lsn; + + snprintf(originname, sizeof(originname), "pg_%u", subid); + originid = replorigin_by_name(originname, false); + remote_lsn = replorigin_get_progress(originid, false); + + /* Check the given LSN is at least a future LSN */ + if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X", + LSN_FORMAT_ARGS(opts.lsn), + LSN_FORMAT_ARGS(remote_lsn)))); + } + + values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn); + replaces[Anum_pg_subscription_subskiplsn - 1] = true; + + update_tuple = true; + break; + } + default: elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", stmt->kind); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index a03b33b53bd..0036c2f9e2d 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9983,6 +9983,15 @@ AlterSubscriptionStmt: (Node *)makeBoolean(false), @1)); $$ = (Node *)n; } + | ALTER SUBSCRIPTION name SKIP definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_SKIP; + n->subname = $3; + n->options = $5; + $$ = (Node *)n; + } ; /***************************************************************************** diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 03e069c7cdd..82dcffc2db8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -136,6 +136,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" @@ -189,6 +190,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/rel.h" #include "utils/rls.h" #include "utils/syscache.h" @@ -259,6 +261,21 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; +/* + * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for + * the subscription if the remote transaction's finish LSN matches the subskiplsn. + * Once we start skipping changes, we don't stop it until we skip all changes of + * the transaction even if pg_subscription is updated and MySubscription->skiplsn + * gets changed or reset during that. Also, in streaming transaction cases, we + * don't skip receiving and spooling the changes since we decide whether or not + * to skip applying the changes when starting to apply changes. The subskiplsn is + * cleared after successfully skipping the transaction or applying non-empty + * transaction. The latter prevents the mistakenly specified subskiplsn from + * being left. + */ +static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; +#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn))) + /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -336,6 +353,11 @@ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int /* Common streaming function to apply all the spooled messages */ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); +/* Functions for skipping changes */ +static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); +static void stop_skipping_changes(void); +static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn); + /* Functions for apply error callback */ static void apply_error_callback(void *arg); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); @@ -795,6 +817,8 @@ apply_handle_begin(StringInfo s) remote_final_lsn = begin_data.final_lsn; + maybe_start_skipping_changes(begin_data.final_lsn); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -847,6 +871,8 @@ apply_handle_begin_prepare(StringInfo s) remote_final_lsn = begin_data.prepare_lsn; + maybe_start_skipping_changes(begin_data.prepare_lsn); + in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); @@ -905,9 +931,9 @@ apply_handle_prepare(StringInfo s) /* * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction. It is done this way because at - * commit prepared time, we won't know whether we have skipped preparing a - * transaction because of no change. + * change has happened in this transaction or all changes are skipped. It + * is done this way because at commit prepared time, we won't know whether + * we have skipped preparing a transaction because of those reasons. * * XXX, We can optimize such that at commit prepared time, we first check * whether we have prepared the transaction or not but that doesn't seem @@ -928,6 +954,15 @@ apply_handle_prepare(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); + /* + * Since we have already prepared the transaction, in a case where the + * server crashes before clearing the subskiplsn, it will be left but the + * transaction won't be resent. But that's okay because it's a rare case + * and the subskiplsn will be cleared when finishing the next transaction. + */ + stop_skipping_changes(); + clear_subscription_skip_lsn(prepare_data.prepare_lsn); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); } @@ -969,6 +1004,8 @@ apply_handle_commit_prepared(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); + clear_subscription_skip_lsn(prepare_data.end_lsn); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); } @@ -1010,6 +1047,8 @@ apply_handle_rollback_prepared(StringInfo s) FinishPreparedTransaction(gid, false); end_replication_step(); CommitTransactionCommand(); + + clear_subscription_skip_lsn(rollback_data.rollback_end_lsn); } pgstat_report_stat(false); @@ -1072,6 +1111,13 @@ apply_handle_stream_prepare(StringInfo s) /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); + /* + * Similar to prepare case, the subskiplsn could be left in a case of + * server crash but it's okay. See the comments in apply_handle_prepare(). + */ + stop_skipping_changes(); + clear_subscription_skip_lsn(prepare_data.prepare_lsn); + pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -1311,6 +1357,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) MemoryContext oldcxt; BufFile *fd; + maybe_start_skipping_changes(lsn); + /* Make sure we have an open transaction */ begin_replication_step(); @@ -1455,8 +1503,26 @@ apply_handle_stream_commit(StringInfo s) static void apply_handle_commit_internal(LogicalRepCommitData *commit_data) { + if (is_skipping_changes()) + { + stop_skipping_changes(); + + /* + * Start a new transaction to clear the subskiplsn, if not started + * yet. + */ + if (!IsTransactionState()) + StartTransactionCommand(); + } + if (IsTransactionState()) { + /* + * The transaction is either non-empty or skipped, so we clear the + * subskiplsn. + */ + clear_subscription_skip_lsn(commit_data->commit_lsn); + /* * Update origin state so we can restart streaming from correct * position in case of crash. @@ -1583,7 +1649,12 @@ apply_handle_insert(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; - if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) return; begin_replication_step(); @@ -1710,7 +1781,12 @@ apply_handle_update(StringInfo s) RangeTblEntry *target_rte; MemoryContext oldctx; - if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) return; begin_replication_step(); @@ -1874,7 +1950,12 @@ apply_handle_delete(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; - if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) return; begin_replication_step(); @@ -2261,7 +2342,12 @@ apply_handle_truncate(StringInfo s) ListCell *lc; LOCKMODE lockmode = AccessExclusiveLock; - if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) + /* + * Quick return if we are skipping data modification changes or handling + * streamed transactions. + */ + if (is_skipping_changes() || + handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) return; begin_replication_step(); @@ -3738,6 +3824,139 @@ IsLogicalWorker(void) return MyLogicalRepWorker != NULL; } +/* + * Start skipping changes of the transaction if the given LSN matches the + * LSN specified by subscription's skiplsn. + */ +static void +maybe_start_skipping_changes(XLogRecPtr finish_lsn) +{ + Assert(!is_skipping_changes()); + Assert(!in_remote_transaction); + Assert(!in_streamed_transaction); + + /* + * Quick return if it's not requested to skip this transaction. This + * function is called for every remote transaction and we assume that + * skipping the transaction is not used often. + */ + if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) || + MySubscription->skiplsn != finish_lsn)) + return; + + /* Start skipping all changes of this transaction */ + skip_xact_finish_lsn = finish_lsn; + + ereport(LOG, + errmsg("start skipping logical replication transaction finished at %X/%X", + LSN_FORMAT_ARGS(skip_xact_finish_lsn))); +} + +/* + * Stop skipping changes by resetting skip_xact_finish_lsn if enabled. + */ +static void +stop_skipping_changes(void) +{ + if (!is_skipping_changes()) + return; + + ereport(LOG, + (errmsg("done skipping logical replication transaction finished at %X/%X", + LSN_FORMAT_ARGS(skip_xact_finish_lsn)))); + + /* Stop skipping changes */ + skip_xact_finish_lsn = InvalidXLogRecPtr; +} + +/* + * Clear subskiplsn of pg_subscription catalog. + * + * finish_lsn is the transaction's finish LSN that is used to check if the + * subskiplsn matches it. If not matched, we raise a warning when clearing the + * subskiplsn in order to inform users for cases e.g., where the user mistakenly + * specified the wrong subskiplsn. + */ +static void +clear_subscription_skip_lsn(XLogRecPtr finish_lsn) +{ + Relation rel; + Form_pg_subscription subform; + HeapTuple tup; + XLogRecPtr myskiplsn = MySubscription->skiplsn; + bool started_tx = false; + + if (likely(XLogRecPtrIsInvalid(myskiplsn))) + return; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* + * Protect subskiplsn of pg_subscription from being concurrently updated + * while clearing it. + */ + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + + /* Fetch the existing tuple. */ + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + subform = (Form_pg_subscription) GETSTRUCT(tup); + + /* + * Clear the subskiplsn. If the user has already changed subskiplsn before + * clearing it we don't update the catalog and the replication origin + * state won't get advanced. So in the worst case, if the server crashes + * before sending an acknowledgment of the flush position the transaction + * will be sent again and the user needs to set subskiplsn again. We can + * reduce the possibility by logging a replication origin WAL record to + * advance the origin LSN instead but there is no way to advance the + * origin timestamp and it doesn't seem to be worth doing anything about + * it since it's a very rare case. + */ + if (subform->subskiplsn == myskiplsn) + { + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* reset subskiplsn */ + values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); + replaces[Anum_pg_subscription_subskiplsn - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + CatalogTupleUpdate(rel, &tup->t_self, tup); + + if (myskiplsn != finish_lsn) + ereport(WARNING, + errmsg("skip-LSN of logical replication subscription \"%s\" cleared", MySubscription->name), + errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X", + LSN_FORMAT_ARGS(finish_lsn), + LSN_FORMAT_ARGS(myskiplsn))); + } + + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} + /* Error callback to give more context info about the change being applied */ static void apply_error_callback(void *arg) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 725cd2e4ebc..e5816c4ccea 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4385,6 +4385,10 @@ getSubscriptions(Archive *fout) ntups = PQntuples(res); + /* + * Get subscription fields. We don't include subskiplsn in the dump as + * after restoring the dump this value may no longer be relevant. + */ i_tableoid = PQfnumber(res, "tableoid"); i_oid = PQfnumber(res, "oid"); i_subname = PQfnumber(res, "subname"); diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 991bfc1546b..714097cad1b 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6105,7 +6105,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false}; + false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6152,6 +6152,12 @@ describeSubscriptions(const char *pattern, bool verbose) ", subconninfo AS \"%s\"\n", gettext_noop("Synchronous commit"), gettext_noop("Conninfo")); + + /* Skip LSN is only supported in v15 and higher */ + if (pset.sversion >= 150000) + appendPQExpBuffer(&buf, + ", subskiplsn AS \"%s\"\n", + gettext_noop("Skip LSN")); } /* Only display subscriptions in current database. */ diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 183abcc2753..5c064595a97 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1852,7 +1852,7 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny)) COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO", - "RENAME TO", "REFRESH PUBLICATION", "SET", + "RENAME TO", "REFRESH PUBLICATION", "SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION"); /* ALTER SUBSCRIPTION REFRESH PUBLICATION */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && @@ -1868,6 +1868,9 @@ psql_completion(const char *text, int start, int end) /* ALTER SUBSCRIPTION SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error"); + /* ALTER SUBSCRIPTION SKIP ( */ + else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) + COMPLETE_WITH("lsn"); /* ALTER SUBSCRIPTION SET PUBLICATION */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION")) { diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 1383761c1f8..db9963db727 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202203211 +#define CATALOG_VERSION_NO 202203221 #endif diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index e2befaf3512..69969a0617e 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -70,6 +70,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subdisableonerr; /* True if a worker error should cause the * subscription to be disabled */ + XLogRecPtr subskiplsn; /* All changes finished at this LSN are + * skipped */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -109,6 +112,8 @@ typedef struct Subscription bool disableonerr; /* Indicates if the subscription should be * automatically disabled if a worker error * occurs */ + XLogRecPtr skiplsn; /* All changes finished at this LSN are + * skipped */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 1617702d9d6..6f83a79a96c 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3726,7 +3726,8 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, - ALTER_SUBSCRIPTION_ENABLED + ALTER_SUBSCRIPTION_ENABLED, + ALTER_SUBSCRIPTION_SKIP } AlterSubscriptionType; typedef struct AlterSubscriptionStmt diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index ad8003fae12..7fcfad15916 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -93,11 +93,25 @@ ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2 ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" +-- ok +ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/12345 +(1 row) + +-- ok - with lsn = NONE +ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); +-- fail +ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); +ERROR: invalid WAL location (LSN): 0/0 +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -129,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -165,19 +179,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -188,19 +202,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -215,10 +229,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more then once @@ -233,10 +247,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -270,10 +284,10 @@ ERROR: two_phase requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -282,10 +296,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -294,10 +308,10 @@ DROP SUBSCRIPTION regress_testsub; CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -309,18 +323,18 @@ ERROR: disable_on_error requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index a7c15b1dafc..74c38ead5d6 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -72,6 +72,17 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = ''); ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2'; ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); +-- ok +ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); + +\dRs+ + +-- ok - with lsn = NONE +ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); + +-- fail +ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); + \dRs+ BEGIN; diff --git a/src/test/subscription/t/029_disable_on_error.pl b/src/test/subscription/t/029_disable_on_error.pl deleted file mode 100644 index 5eca8044460..00000000000 --- a/src/test/subscription/t/029_disable_on_error.pl +++ /dev/null @@ -1,94 +0,0 @@ - -# Copyright (c) 2021-2022, PostgreSQL Global Development Group - -# Test of logical replication subscription self-disabling feature. -use strict; -use warnings; -use PostgreSQL::Test::Cluster; -use PostgreSQL::Test::Utils; -use Test::More; - -# create publisher node -my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); -$node_publisher->init(allows_streaming => 'logical'); -$node_publisher->start; - -# create subscriber node -my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); -$node_subscriber->init; -$node_subscriber->start; - -# Create identical table on both nodes. -$node_publisher->safe_psql('postgres', "CREATE TABLE tbl (i INT)"); -$node_subscriber->safe_psql('postgres', "CREATE TABLE tbl (i INT)"); - -# Insert duplicate values on the publisher. -$node_publisher->safe_psql('postgres', - "INSERT INTO tbl (i) VALUES (1), (1), (1)"); - -# Create an additional unique index on the subscriber. -$node_subscriber->safe_psql('postgres', - "CREATE UNIQUE INDEX tbl_unique ON tbl (i)"); - -# Create a pub/sub to set up logical replication. This tests that the -# uniqueness violation will cause the subscription to fail during initial -# synchronization and make it disabled. -my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; -$node_publisher->safe_psql('postgres', - "CREATE PUBLICATION pub FOR TABLE tbl"); -$node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true)" -); - -# Initial synchronization failure causes the subscription to be disabled. -$node_subscriber->poll_query_until('postgres', - "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'" -) or die "Timed out while waiting for subscriber to be disabled"; - -# Drop the unique index on the subscriber which caused the subscription to be -# disabled. -$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique"); - -# Re-enable the subscription "sub". -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); - -# Wait for the data to replicate. -$node_publisher->wait_for_catchup('sub'); -$node_subscriber->poll_query_until('postgres', - "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass" -); - -# Confirm that we have finished the table sync. -my $result = - $node_subscriber->safe_psql('postgres', "SELECT MAX(i), COUNT(*) FROM tbl"); -is($result, qq(1|3), "subscription sub replicated data"); - -# Delete the data from the subscriber and recreate the unique index. -$node_subscriber->safe_psql('postgres', "DELETE FROM tbl"); -$node_subscriber->safe_psql('postgres', - "CREATE UNIQUE INDEX tbl_unique ON tbl (i)"); - -# Add more non-unique data to the publisher. -$node_publisher->safe_psql('postgres', - "INSERT INTO tbl (i) VALUES (3), (3), (3)"); - -# Apply failure causes the subscription to be disabled. -$node_subscriber->poll_query_until('postgres', - "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'" -) or die "Timed out while waiting for subscription sub to be disabled"; - -# Drop the unique index on the subscriber and re-enabled the subscription. Then -# confirm that the previously failing insert was applied OK. -$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique"); -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); - -$node_publisher->wait_for_catchup('sub'); - -$result = $node_subscriber->safe_psql('postgres', - "SELECT COUNT(*) FROM tbl WHERE i = 3"); -is($result, qq(3), 'check the result of apply'); - -$node_subscriber->stop; -$node_publisher->stop; - -done_testing(); diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl new file mode 100644 index 00000000000..e8b904b7452 --- /dev/null +++ b/src/test/subscription/t/029_on_error.pl @@ -0,0 +1,183 @@ + +# Copyright (c) 2021-2022, PostgreSQL Global Development Group + +# Tests for disable_on_error and SKIP transaction features. +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $offset = 0; + +# Test skipping the transaction. This function must be called after the caller +# has inserted data that conflicts with the subscriber. The finish LSN of the +# error transaction that is used to specify to ALTER SUBSCRIPTION ... SKIP is +# fetched from the server logs. After executing ALTER SUBSCRITPION ... SKIP, we +# check if logical replication can continue working by inserting $nonconflict_data +# on the publisher. +sub test_skip_lsn +{ + my ($node_publisher, $node_subscriber, $nonconflict_data, $expected, $msg) + = @_; + + # Wait until a conflict occurs on the subscriber. + $node_subscriber->poll_query_until('postgres', + "SELECT subenabled = FALSE FROM pg_subscription WHERE subname = 'sub'" + ); + + # Get the finish LSN of the error transaction. + my $contents = slurp_file($node_subscriber->logfile, $offset); + $contents =~ + qr/processing remote data for replication origin \"pg_\d+\" during "INSERT" for replication target relation "public.tbl" in transaction \d+ finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/ + or die "could not get error-LSN"; + my $lsn = $1; + + # Set skip lsn. + $node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION sub SKIP (lsn = '$lsn')"); + + # Re-enable the subscription. + $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); + + # Wait for the failed transaction to be skipped + $node_subscriber->poll_query_until('postgres', + "SELECT subskiplsn = '0/0' FROM pg_subscription WHERE subname = 'sub'" + ); + + # Check the log to ensure that the transaction is skipped, and advance the + # offset of the log file for the next test. + $offset = $node_subscriber->wait_for_log( + qr/LOG: done skipping logical replication transaction finished at $lsn/, + $offset); + + # Insert non-conflict data + $node_publisher->safe_psql('postgres', + "INSERT INTO tbl VALUES $nonconflict_data"); + + $node_publisher->wait_for_catchup('sub'); + + # Check replicated data + my $res = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl"); + is($res, $expected, $msg); +} + +# Create publisher node. Set a low value of logical_decoding_work_mem to test +# streaming cases. +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf( + 'postgresql.conf', + qq[ +logical_decoding_work_mem = 64kB +max_prepared_transactions = 10 +]); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf( + 'postgresql.conf', + qq[ +max_prepared_transactions = 10 +]); +$node_subscriber->start; + +# Initial table setup on both publisher and subscriber. On the subscriber, we +# create the same tables but with a primary key. Also, insert some data that +# will conflict with the data replicated from publisher later. +$node_publisher->safe_psql( + 'postgres', + qq[ +CREATE TABLE tbl (i INT, t TEXT); +INSERT INTO tbl VALUES (1, NULL); +]); +$node_subscriber->safe_psql( + 'postgres', + qq[ +CREATE TABLE tbl (i INT PRIMARY KEY, t TEXT); +INSERT INTO tbl VALUES (1, NULL); +]); + +# Create a pub/sub to set up logical replication. This tests that the +# uniqueness violation will cause the subscription to fail during initial +# synchronization and make it disabled. +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub FOR TABLE tbl"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on)" +); + +# Initial synchronization failure causes the subscription to be disabled. +$node_subscriber->poll_query_until('postgres', + "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'" +) or die "Timed out while waiting for subscriber to be disabled"; + +# Truncate the table on the subscriber which caused the subscription to be +# disabled. +$node_subscriber->safe_psql('postgres', "TRUNCATE tbl"); + +# Re-enable the subscription "sub". +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); + +# Wait for the data to replicate. +$node_publisher->wait_for_catchup('sub'); +$node_subscriber->poll_query_until('postgres', + "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass" +); + +# Confirm that we have finished the table sync. +my $result = + $node_subscriber->safe_psql('postgres', "SELECT COUNT(*) FROM tbl"); +is($result, qq(1), "subscription sub replicated data"); + +# Insert data to tbl, raising an error on the subscriber due to violation +# of the unique constraint on tbl. Then skip the transaction. +$node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO tbl VALUES (1, NULL); +COMMIT; +]); +test_skip_lsn($node_publisher, $node_subscriber, + "(2, NULL)", "2", "test skipping transaction"); + +# Test for PREPARE and COMMIT PREPARED. Insert the same data to tbl and +# PREPARE the transaction, raising an error. Then skip the transaction. +$node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO tbl VALUES (1, NULL); +PREPARE TRANSACTION 'gtx'; +COMMIT PREPARED 'gtx'; +]); +test_skip_lsn($node_publisher, $node_subscriber, + "(3, NULL)", "3", "test skipping prepare and commit prepared "); + +# Test for STREAM COMMIT. Insert enough rows to tbl to exceed the 64kB +# limit, also raising an error on the subscriber during applying spooled +# changes for the same reason. Then skip the transaction. +$node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO tbl SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i); +COMMIT; +]); +test_skip_lsn($node_publisher, $node_subscriber, "(4, md5(4::text))", + "4", "test skipping stream-commit"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_prepared_xacts"); +is($result, "0", + "check all prepared transactions are resolved on the subscriber"); + +$node_subscriber->stop; +$node_publisher->stop; + +done_testing();