mirror of
				https://github.com/postgres/postgres.git
				synced 2025-11-03 09:13:20 +03:00 
			
		
		
		
	Add ALTER SUBSCRIPTION ... SKIP.
This feature allows skipping the transaction on subscriber nodes. If incoming change violates any constraint, logical replication stops until it's resolved. Currently, users need to either manually resolve the conflict by updating a subscriber-side database or by using function pg_replication_origin_advance() to skip the conflicting transaction. This commit introduces a simpler way to skip the conflicting transactions. The user can specify LSN by ALTER SUBSCRIPTION ... SKIP (lsn = XXX), which allows the apply worker to skip the transaction finished at specified LSN. The apply worker skips all data modification changes within the transaction. Author: Masahiko Sawada Reviewed-by: Takamichi Osumi, Hou Zhijie, Peter Eisentraut, Amit Kapila, Shi Yu, Vignesh C, Greg Nancarrow, Haiying Tang, Euler Taveira Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
This commit is contained in:
		@@ -45,6 +45,7 @@
 | 
			
		||||
#include "utils/guc.h"
 | 
			
		||||
#include "utils/lsyscache.h"
 | 
			
		||||
#include "utils/memutils.h"
 | 
			
		||||
#include "utils/pg_lsn.h"
 | 
			
		||||
#include "utils/syscache.h"
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
@@ -62,6 +63,7 @@
 | 
			
		||||
#define SUBOPT_STREAMING			0x00000100
 | 
			
		||||
#define SUBOPT_TWOPHASE_COMMIT		0x00000200
 | 
			
		||||
#define SUBOPT_DISABLE_ON_ERR		0x00000400
 | 
			
		||||
#define SUBOPT_LSN					0x00000800
 | 
			
		||||
 | 
			
		||||
/* check if the 'val' has 'bits' set */
 | 
			
		||||
#define IsSet(val, bits)  (((val) & (bits)) == (bits))
 | 
			
		||||
@@ -84,6 +86,7 @@ typedef struct SubOpts
 | 
			
		||||
	bool		streaming;
 | 
			
		||||
	bool		twophase;
 | 
			
		||||
	bool		disableonerr;
 | 
			
		||||
	XLogRecPtr	lsn;
 | 
			
		||||
} SubOpts;
 | 
			
		||||
 | 
			
		||||
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 | 
			
		||||
@@ -262,6 +265,33 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 | 
			
		||||
			opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
 | 
			
		||||
			opts->disableonerr = defGetBoolean(defel);
 | 
			
		||||
		}
 | 
			
		||||
		else if (IsSet(supported_opts, SUBOPT_LSN) &&
 | 
			
		||||
				 strcmp(defel->defname, "lsn") == 0)
 | 
			
		||||
		{
 | 
			
		||||
			char	   *lsn_str = defGetString(defel);
 | 
			
		||||
			XLogRecPtr	lsn;
 | 
			
		||||
 | 
			
		||||
			if (IsSet(opts->specified_opts, SUBOPT_LSN))
 | 
			
		||||
				errorConflictingDefElem(defel, pstate);
 | 
			
		||||
 | 
			
		||||
			/* Setting lsn = NONE is treated as resetting LSN */
 | 
			
		||||
			if (strcmp(lsn_str, "none") == 0)
 | 
			
		||||
				lsn = InvalidXLogRecPtr;
 | 
			
		||||
			else
 | 
			
		||||
			{
 | 
			
		||||
				/* Parse the argument as LSN */
 | 
			
		||||
				lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
 | 
			
		||||
													  CStringGetDatum(lsn_str)));
 | 
			
		||||
 | 
			
		||||
				if (XLogRecPtrIsInvalid(lsn))
 | 
			
		||||
					ereport(ERROR,
 | 
			
		||||
							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | 
			
		||||
							 errmsg("invalid WAL location (LSN): %s", lsn_str)));
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			opts->specified_opts |= SUBOPT_LSN;
 | 
			
		||||
			opts->lsn = lsn;
 | 
			
		||||
		}
 | 
			
		||||
		else
 | 
			
		||||
			ereport(ERROR,
 | 
			
		||||
					(errcode(ERRCODE_SYNTAX_ERROR),
 | 
			
		||||
@@ -479,6 +509,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 | 
			
		||||
					 LOGICALREP_TWOPHASE_STATE_PENDING :
 | 
			
		||||
					 LOGICALREP_TWOPHASE_STATE_DISABLED);
 | 
			
		||||
	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
 | 
			
		||||
	values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
 | 
			
		||||
	values[Anum_pg_subscription_subconninfo - 1] =
 | 
			
		||||
		CStringGetTextDatum(conninfo);
 | 
			
		||||
	if (opts.slot_name)
 | 
			
		||||
@@ -1106,6 +1137,48 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 | 
			
		||||
				break;
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
		case ALTER_SUBSCRIPTION_SKIP:
 | 
			
		||||
			{
 | 
			
		||||
				parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
 | 
			
		||||
 | 
			
		||||
				/* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
 | 
			
		||||
				Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
 | 
			
		||||
 | 
			
		||||
				if (!superuser())
 | 
			
		||||
					ereport(ERROR,
 | 
			
		||||
							(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 | 
			
		||||
							 errmsg("must be superuser to skip transaction")));
 | 
			
		||||
 | 
			
		||||
				/*
 | 
			
		||||
				 * If the user sets subskiplsn, we do a sanity check to make
 | 
			
		||||
				 * sure that the specified LSN is a probable value.
 | 
			
		||||
				 */
 | 
			
		||||
				if (!XLogRecPtrIsInvalid(opts.lsn))
 | 
			
		||||
				{
 | 
			
		||||
					RepOriginId originid;
 | 
			
		||||
					char		originname[NAMEDATALEN];
 | 
			
		||||
					XLogRecPtr	remote_lsn;
 | 
			
		||||
 | 
			
		||||
					snprintf(originname, sizeof(originname), "pg_%u", subid);
 | 
			
		||||
					originid = replorigin_by_name(originname, false);
 | 
			
		||||
					remote_lsn = replorigin_get_progress(originid, false);
 | 
			
		||||
 | 
			
		||||
					/* Check the given LSN is at least a future LSN */
 | 
			
		||||
					if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
 | 
			
		||||
						ereport(ERROR,
 | 
			
		||||
								(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | 
			
		||||
								 errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
 | 
			
		||||
										LSN_FORMAT_ARGS(opts.lsn),
 | 
			
		||||
										LSN_FORMAT_ARGS(remote_lsn))));
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
 | 
			
		||||
				replaces[Anum_pg_subscription_subskiplsn - 1] = true;
 | 
			
		||||
 | 
			
		||||
				update_tuple = true;
 | 
			
		||||
				break;
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
		default:
 | 
			
		||||
			elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
 | 
			
		||||
				 stmt->kind);
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user