diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 1c23538d3c5..cdfe8e376f0 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3046,6 +3046,17 @@ include_dir 'conf.d' many UPDATE and DELETE statements are executed. + + It is important to note that when wal_level is set to + replica, the effective WAL level can automatically change + based on the presence of + logical replication slots. The system automatically increases the + effective WAL level to logical when creating the first + logical replication slot, and decreases it back to replica + when dropping or invalidating the last logical replication slot. The current + effective WAL level can be monitored through + parameter. + In releases prior to 9.6, this parameter also allowed the values archive and hot_standby. @@ -11851,6 +11862,38 @@ dynamic_library_path = '/usr/local/lib/postgresql:$libdir' + + effective_wal_level (enum) + + effective_wal_level configuration parameter + + + + + Reports the actual WAL logging level currently in effect in the + system. This parameter shares the same set of values as + , but reflects the operational WAL + level rather than the configured setting. For descriptions of + possible values, refer to the wal_level + parameter documentation. + + + The effective WAL level can differ from the configured + wal_level in certain situations. For example, + when wal_level is set to replica + and the system has one or more logical replication slots, + effective_wal_level will show logical + to indicate that the system is maintaining WAL records at + logical level equivalent. + + + On standby servers, effective_wal_level matches + the value of effective_wal_level from the most + upstream server in the replication chain. + + + + huge_pages_status (enum) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index b3faaa675ef..f47b7378397 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -2629,7 +2629,7 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER wal_level must be - set to logical. + set to replica or logical. @@ -2751,7 +2751,7 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER The new cluster must have wal_level as - logical. + replica or logical. diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 6368e46ce93..f36bf9462fa 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -47,7 +47,7 @@ Before you can use logical decoding, you must set - to logical and + to replica or higher and to at least 1. Then, you should connect to the target database (in the example below, postgres) as a superuser. @@ -257,6 +257,47 @@ postgres=# SELECT * from pg_logical_slot_get_changes('regression_slot', NULL, NU log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements. + + + Logical decoding becomes available in two conditions: + + + + + When is set to logical. + + + + + When is set to replica + and at least one valid logical replication slot exists on the system. + + + + + If either condition is met, the operational WAL level becomes equivalent + to logical, which can be monitored through the + parameter. + + + When wal_level is set to replica, + logical decoding is automatically activated upon creation of the first + logical replication slot. This activation process involves several steps + and requires synchronization among processes, ensuring system-wide + consistency. Conversely, if wal_level is set to + replica and the last logical replication slot is dropped + or invalidated, logical decoding is automatically disabled. Note that the + deactivation of logical decoding might take some time as it is performed + asynchronously by the checkpointer process. + + + + + When wal_level is set to replica, + dropping or invalidating the last logical slot disables logical decoding + on the primary, resulting in slots on standbys being invalidated. + + @@ -328,7 +369,7 @@ postgres=# SELECT * from pg_logical_slot_get_changes('regression_slot', NULL, NU that could be needed by the logical decoding on the standby (as it does not know about the catalog_xmin on the standby). Existing logical slots on standby also get invalidated if - wal_level on the primary is reduced to less than + effective_wal_level on the primary is reduced to less than logical. This is done as soon as the standby detects such a change in the WAL stream. It means that, for walsenders that are lagging (if any), some WAL records up diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index 5a62187b189..e450c6a5b37 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -387,12 +387,12 @@ PostgreSQL documentation The source server must accept connections from the target server. The source server must not be in recovery. The source server must have as logical. The source server - must have configured to a value - greater than or equal to the number of specified databases plus existing - replication slots. The source server must have configured to a value greater than or equal - to the number of specified databases and existing WAL sender processes. + linkend="guc-wal-level"/> as replica or logical. + The source server must have + configured to a value greater than or equal to the number of specified + databases plus existing replication slots. The source server must have + configured to a value greater than or + equal to the number of specified databases and existing WAL sender processes. diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 162c76b729a..7ff7ca4f719 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -3062,8 +3062,9 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx wal_level_insufficient means that the - primary doesn't have a sufficient to - perform logical decoding. It is set only for logical slots. + primary doesn't have an + sufficient to perform logical decoding. It is set only for logical + slots. diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 98759e3bf70..469397e7344 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8925,8 +8925,8 @@ log_heap_update(Relation reln, Buffer oldbuf, * * Skip this if we're taking a full-page image of the new page, as we * don't include the new tuple in the WAL record in that case. Also - * disable if wal_level='logical', as logical decoding needs to be able to - * read the new tuple in whole from the WAL record alone. + * disable if effective_wal_level='logical', as logical decoding needs to + * be able to read the new tuple in whole from the WAL record alone. */ if (oldbuf == newbuf && !need_tuple_data && !XLogCheckBufferNeedsBackup(newbuf)) @@ -9098,8 +9098,8 @@ log_heap_update(Relation reln, Buffer oldbuf, /* * Perform XLogInsert of an XLOG_HEAP2_NEW_CID record * - * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog - * tuples. + * This is only used when effective_wal_level is logical, and only for + * catalog tuples. */ static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup) diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 441034f5929..cd8edf5cc49 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -66,7 +66,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record) CheckPoint *checkpoint = (CheckPoint *) rec; appendStringInfo(buf, "redo %X/%08X; " - "tli %u; prev tli %u; fpw %s; wal_level %s; xid %u:%u; oid %u; multi %u; offset %" PRIu64 "; " + "tli %u; prev tli %u; fpw %s; wal_level %s; logical decoding %s; xid %u:%u; oid %u; multi %u; offset %" PRIu64 "; " "oldest xid %u in DB %u; oldest multi %u in DB %u; " "oldest/newest commit timestamp xid: %u/%u; " "oldest running xid %u; %s", @@ -75,6 +75,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record) checkpoint->PrevTimeLineID, checkpoint->fullPageWrites ? "true" : "false", get_wal_level_string(checkpoint->wal_level), + checkpoint->logicalDecodingEnabled ? "true" : "false", EpochFromFullTransactionId(checkpoint->nextXid), XidFromFullTransactionId(checkpoint->nextXid), checkpoint->nextOid, @@ -167,6 +168,13 @@ xlog_desc(StringInfo buf, XLogReaderState *record) memcpy(&wal_level, rec, sizeof(int)); appendStringInfo(buf, "wal_level %s", get_wal_level_string(wal_level)); } + else if (info == XLOG_LOGICAL_DECODING_STATUS_CHANGE) + { + bool enabled; + + memcpy(&enabled, rec, sizeof(bool)); + appendStringInfoString(buf, enabled ? "true" : "false"); + } } const char * @@ -218,6 +226,9 @@ xlog_identify(uint8 info) case XLOG_CHECKPOINT_REDO: id = "CHECKPOINT_REDO"; break; + case XLOG_LOGICAL_DECODING_STATUS_CHANGE: + id = "LOGICAL_DECODING_STATUS_CHANGE"; + break; } return id; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b69d452551a..1b5c1f6b763 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -552,9 +552,9 @@ MarkCurrentTransactionIdLoggedIfAny(void) * operation in a subtransaction. We require that for logical decoding, see * LogicalDecodingProcessRecord. * - * This returns true if wal_level >= logical and we are inside a valid - * subtransaction, for which the assignment was not yet written to any WAL - * record. + * This returns true if effective_wal_level is logical and we are inside + * a valid subtransaction, for which the assignment was not yet written to + * any WAL record. */ bool IsSubxactTopXidLogPending(void) @@ -563,7 +563,7 @@ IsSubxactTopXidLogPending(void) if (CurrentTransactionState->topXidLogged) return false; - /* wal_level has to be logical */ + /* effective_wal_level has to be logical */ if (!XLogLogicalInfoActive()) return false; @@ -682,14 +682,14 @@ AssignTransactionId(TransactionState s) } /* - * When wal_level=logical, guarantee that a subtransaction's xid can only - * be seen in the WAL stream if its toplevel xid has been logged before. - * If necessary we log an xact_assignment record with fewer than - * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set - * for a transaction even though it appears in a WAL record, we just might - * superfluously log something. That can happen when an xid is included - * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in - * xl_standby_locks. + * When effective_wal_level is logical, guarantee that a subtransaction's + * xid can only be seen in the WAL stream if its toplevel xid has been + * logged before. If necessary we log an xact_assignment record with fewer + * than PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't + * set for a transaction even though it appears in a WAL record, we just + * might superfluously log something. That can happen when an xid is + * included somewhere inside a wal record, but not in XLogRecord->xl_xid, + * like in xl_standby_locks. */ if (isSubXact && XLogLogicalInfoActive() && !TopTransactionStateData.didLogXid) @@ -2489,6 +2489,7 @@ CommitTransaction(void) AtEOXact_Snapshot(true, false); AtEOXact_ApplyLauncher(true); AtEOXact_LogicalRepWorkers(true); + AtEOXact_LogicalCtl(); pgstat_report_xact_timestamp(0); ResourceOwnerDelete(TopTransactionResourceOwner); @@ -2784,6 +2785,7 @@ PrepareTransaction(void) /* we treat PREPARE as ROLLBACK so far as waking workers goes */ AtEOXact_ApplyLauncher(false); AtEOXact_LogicalRepWorkers(false); + AtEOXact_LogicalCtl(); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; @@ -3011,6 +3013,7 @@ AbortTransaction(void) AtEOXact_PgStat(false, is_parallel_worker); AtEOXact_ApplyLauncher(false); AtEOXact_LogicalRepWorkers(false); + AtEOXact_LogicalCtl(); pgstat_report_xact_timestamp(0); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 430a38b1a21..1b7ef589fc0 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -80,6 +80,7 @@ #include "postmaster/walwriter.h" #include "replication/origin.h" #include "replication/slot.h" +#include "replication/slotsync.h" #include "replication/snapbuild.h" #include "replication/walreceiver.h" #include "replication/walsender.h" @@ -4888,6 +4889,25 @@ show_in_hot_standby(void) return RecoveryInProgress() ? "on" : "off"; } +/* + * GUC show_hook for effective_wal_level + */ +const char * +show_effective_wal_level(void) +{ + if (wal_level == WAL_LEVEL_MINIMAL) + return "minimal"; + + /* + * During recovery, effective_wal_level reflects the primary's + * configuration rather than the local wal_level value. + */ + if (RecoveryInProgress()) + return IsXLogLogicalInfoEnabled() ? "logical" : "replica"; + + return XLogLogicalInfoActive() ? "logical" : "replica"; +} + /* * Read the control file, set respective GUCs. * @@ -5134,6 +5154,7 @@ BootStrapXLOG(uint32 data_checksum_version) checkPoint.ThisTimeLineID = BootstrapTimeLineID; checkPoint.PrevTimeLineID = BootstrapTimeLineID; checkPoint.fullPageWrites = fullPageWrites; + checkPoint.logicalDecodingEnabled = (wal_level == WAL_LEVEL_LOGICAL); checkPoint.wal_level = wal_level; checkPoint.nextXid = FullTransactionIdFromEpochAndXid(0, FirstNormalTransactionId); @@ -5658,6 +5679,12 @@ StartupXLOG(void) */ StartupReplicationSlots(); + /* + * Startup the logical decoding status with the last status stored in the + * checkpoint record. + */ + StartupLogicalDecodingStatus(checkPoint.logicalDecodingEnabled); + /* * Startup logical state, needs to be setup now so we have proper data * during crash recovery. @@ -6206,6 +6233,12 @@ StartupXLOG(void) */ CompleteCommitTsInitialization(); + /* + * Update logical decoding status in shared memory and write an + * XLOG_LOGICAL_DECODING_STATUS_CHANGE, if necessary. + */ + UpdateLogicalDecodingStatusEndOfRecovery(); + /* Clean up EndOfWalRecoveryInfo data to appease Valgrind leak checking */ if (endOfRecoveryInfo->lastPage) pfree(endOfRecoveryInfo->lastPage); @@ -6237,6 +6270,12 @@ StartupXLOG(void) UpdateControlFile(); LWLockRelease(ControlFileLock); + /* + * Wake up the checkpointer process as there might be a request to disable + * logical decoding by concurrent slot drop. + */ + WakeupCheckpointer(); + /* * Wake up all waiters for replay LSN. They need to report an error that * recovery was ended before reaching the target LSN. @@ -7187,6 +7226,8 @@ CreateCheckPoint(int flags) checkPoint.nextOid += TransamVariables->oidCount; LWLockRelease(OidGenLock); + checkPoint.logicalDecodingEnabled = IsLogicalDecodingEnabled(); + MultiXactGetCheckptMulti(shutdown, &checkPoint.nextMulti, &checkPoint.nextMultiOffset, @@ -8581,21 +8622,6 @@ xlog_redo(XLogReaderState *record) /* Update our copy of the parameters in pg_control */ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); - /* - * Invalidate logical slots if we are in hot standby and the primary - * does not have a WAL level sufficient for logical decoding. No need - * to search for potentially conflicting logically slots if standby is - * running with wal_level lower than logical, because in that case, we - * would have either disallowed creation of logical slots or - * invalidated existing ones. - */ - if (InRecovery && InHotStandby && - xlrec.wal_level < WAL_LEVEL_LOGICAL && - wal_level >= WAL_LEVEL_LOGICAL) - InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL, - 0, InvalidOid, - InvalidTransactionId); - LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_worker_processes = xlrec.max_worker_processes; @@ -8663,6 +8689,55 @@ xlog_redo(XLogReaderState *record) { /* nothing to do here, just for informational purposes */ } + else if (info == XLOG_LOGICAL_DECODING_STATUS_CHANGE) + { + bool status; + + memcpy(&status, XLogRecGetData(record), sizeof(bool)); + + /* + * We need to toggle the logical decoding status and update the + * XLogLogicalInfo cache of processes synchronously because + * XLogLogicalInfoActive() is used even during read-only queries + * (e.g., via RelationIsAccessibleInLogicalDecoding()). In the + * 'disable' case, it is safe to invalidate existing slots after + * disabling logical decoding because logical decoding cannot process + * subsequent WAL records, which may not contain logical information. + */ + if (status) + EnableLogicalDecoding(); + else + DisableLogicalDecoding(); + + elog(DEBUG1, "update logical decoding status to %d during recovery", + status); + + if (InRecovery && InHotStandby) + { + if (!status) + { + /* + * Invalidate logical slots if we are in hot standby and the + * primary disabled logical decoding. + */ + InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL, + 0, InvalidOid, + InvalidTransactionId); + } + else if (sync_replication_slots) + { + /* + * Signal the postmaster to launch the slotsync worker. + * + * XXX: For simplicity, we keep the slotsync worker running + * even after logical decoding is disabled. A future + * improvement can consider starting and stopping the worker + * based on logical decoding status change. + */ + kill(PostmasterPid, SIGUSR1); + } + } + } } /* diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index a1983508950..40a4efd7390 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -975,11 +975,16 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); - if (wal_level != WAL_LEVEL_LOGICAL) + /* + * We don't need this warning message when wal_level >= 'replica' since + * logical decoding is automatically enabled up on a logical slot + * creation. + */ + if (wal_level < WAL_LEVEL_REPLICA) ereport(WARNING, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("\"wal_level\" is insufficient to publish logical changes"), - errhint("Set \"wal_level\" to \"logical\" before creating subscriptions."))); + errmsg("logical decoding must be enabled to publish logical changes"), + errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher."))); return myself; } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 6b1a00ed477..1d9565b09fc 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -2299,7 +2299,7 @@ ExecuteTruncateGuts(List *explicit_rels, xl_heap_truncate xlrec; int i = 0; - /* should only get here if wal_level >= logical */ + /* should only get here if effective_wal_level is 'logical' */ Assert(XLogLogicalInfoActive()); logrelids = palloc(list_length(relids_logged) * sizeof(Oid)); diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 7f8cf1fa2ec..2eac8ac30d3 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -559,6 +559,12 @@ CheckpointerMain(const void *startup_data, size_t startup_data_len) break; } + /* + * Disable logical decoding if someone requested it. See comments atop + * logicalctl.c. + */ + DisableLogicalDecodingIfNecessary(); + /* Check for archive_timeout and switch xlog files if necessary. */ CheckArchiveTimeout(); @@ -1535,3 +1541,16 @@ FirstCallSinceLastCheckpoint(void) return FirstCall; } + +/* + * Wake up the checkpointer process. + */ +void +WakeupCheckpointer(void) +{ + volatile PROC_HDR *procglobal = ProcGlobal; + ProcNumber checkpointerProc = procglobal->checkpointerProc; + + if (checkpointerProc != INVALID_PROC_NUMBER) + SetLatch(&GetPGProcByNumber(checkpointerProc)->procLatch); +} diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 7dd3a201b1c..cf44a677187 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -854,9 +854,9 @@ PostmasterMain(int argc, char *argv[]) if (summarize_wal && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, (errmsg("WAL cannot be summarized when \"wal_level\" is \"minimal\""))); - if (sync_replication_slots && wal_level < WAL_LEVEL_LOGICAL) + if (sync_replication_slots && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, - (errmsg("replication slot synchronization (\"sync_replication_slots\" = on) requires \"wal_level\" >= \"logical\""))); + (errmsg("replication slot synchronization (\"sync_replication_slots\" = on) requires \"wal_level\" to be \"replica\" or \"logical\""))); /* * Other one-time internal sanity checks can go here, if they are fast. diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index c719af1f8a9..455768a57f0 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -20,6 +20,7 @@ OBJS = \ decode.o \ launcher.o \ logical.o \ + logicalctl.o \ logicalfuncs.o \ message.o \ origin.o \ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 5e15cb1825e..a1df8e1d646 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -149,39 +149,34 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * can restart from there. */ break; - case XLOG_PARAMETER_CHANGE: + case XLOG_LOGICAL_DECODING_STATUS_CHANGE: { - xl_parameter_change *xlrec = - (xl_parameter_change *) XLogRecGetData(buf->record); + bool logical_decoding; + + memcpy(&logical_decoding, XLogRecGetData(buf->record), sizeof(bool)); /* - * If wal_level on the primary is reduced to less than - * logical, we want to prevent existing logical slots from - * being used. Existing logical slots on the standby get - * invalidated when this WAL record is replayed; and further, - * slot creation fails when wal_level is not sufficient; but - * all these operations are not synchronized, so a logical - * slot may creep in while the wal_level is being reduced. - * Hence this extra check. + * Error out as we should not decode this WAL record. + * + * Logical decoding is disabled, and existing logical slots on + * the standby are invalidated when this WAL record is + * replayed. No logical decoder can process this WAL record + * until replay completes, and by then the slots are already + * invalidated. Furthermore, no new logical slots can be + * created while logical decoding is disabled. This cannot + * occur even on primary either, since it will not restart + * with wal_level < replica if any logical slots exist. */ - if (xlrec->wal_level < WAL_LEVEL_LOGICAL) - { - /* - * This can occur only on a standby, as a primary would - * not allow to restart after changing wal_level < logical - * if there is pre-existing logical slot. - */ - Assert(RecoveryInProgress()); - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary"))); - } + elog(ERROR, "unexpected logical decoding status change %d", + logical_decoding); + break; } case XLOG_NOOP: case XLOG_NEXTOID: case XLOG_SWITCH: case XLOG_BACKUP_END: + case XLOG_PARAMETER_CHANGE: case XLOG_RESTORE_POINT: case XLOG_FPW_CHANGE: case XLOG_FPI_FOR_HINT: diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1b11ed63dc6..c8858e06616 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -117,31 +117,20 @@ CheckLogicalDecodingRequirements(void) * needs the same check. */ - if (wal_level < WAL_LEVEL_LOGICAL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding requires \"wal_level\" >= \"logical\""))); - if (MyDatabaseId == InvalidOid) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - if (RecoveryInProgress()) - { - /* - * This check may have race conditions, but whenever - * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we - * verify that there are no existing logical replication slots. And to - * avoid races around creating a new slot, - * CheckLogicalDecodingRequirements() is called once before creating - * the slot, and once when logical decoding is initially starting up. - */ - if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary"))); - } + /* CheckSlotRequirements() has already checked if wal_level >= 'replica' */ + Assert(wal_level >= WAL_LEVEL_REPLICA); + + /* Check if logical decoding is available on standby */ + if (RecoveryInProgress() && !IsLogicalDecodingEnabled()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on standby requires \"effective_wal_level\" >= \"logical\" on the primary"), + errhint("Set \"wal_level\" >= \"logical\" or create at least one logical slot when \"wal_level\" = \"replica\"."))); } /* diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c new file mode 100644 index 00000000000..5a0ddf37b8b --- /dev/null +++ b/src/backend/replication/logical/logicalctl.c @@ -0,0 +1,639 @@ +/*------------------------------------------------------------------------- + * logicalctl.c + * Functionality to control logical decoding status online. + * + * This module enables dynamic control of logical decoding availability. + * Logical decoding becomes active under two conditions: when the wal_level + * parameter is set to 'logical', or when at least one valid logical replication + * slot exists with wal_level set to 'replica'. The system disables logical + * decoding when neither condition is met. Therefore, the dynamic control + * of logical decoding availability is required only when wal_level is set + * to 'replica'. Logical decoding is always enabled when wal_level='logical' + * and always disabled when wal_level='minimal'. + * + * The core concept of dynamically enabling and disabling logical decoding + * is to separately control two aspects: writing information required for + * logical decoding to WAL records, and using logical decoding itself. During + * activation, we first enable logical WAL writing while keeping logical + * decoding disabled. This change is reflected in the read-only + * effective_wal_level GUC parameter. Once we ensure that all processes have + * updated to the latest effective_wal_level value, we then enable logical + * decoding. Deactivation follows a similar careful, multi-step process + * in reverse order. + * + * While activation occurs synchronously right after creating the first + * logical slot, deactivation happens asynchronously through the checkpointer + * process. This design avoids a race condition at the end of recovery; see + * the comments in UpdateLogicalDecodingStatusEndOfRecovery() for details. + * Asynchronous deactivation also avoids excessive toggling of the logical + * decoding status in workloads that repeatedly create and drop a single + * logical slot. On the other hand, this lazy approach can delay changes + * to effective_wal_level and the disabling logical decoding, especially + * when the checkpointer is busy with other tasks. We chose this lazy approach + * in all deactivation paths to keep the implementation simple, even though + * laziness is strictly required only for end-of-recovery cases. Future work + * might address this limitation either by using a dedicated worker instead + * of the checkpointer, or by implementing synchronous waiting during slot + * drops if workloads are significantly affected by the lazy deactivation + * of logical decoding. + * + * Standby servers use the primary server's effective_wal_level and logical + * decoding status. Unlike normal activation and deactivation, these + * are updated simultaneously without status change coordination, solely by + * replaying XLOG_LOGICAL_DECODING_STATUS_CHANGE records. The local wal_level + * setting has no effect during this time. Upon promotion, we update the + * logical decoding status based on local conditions: the wal_level value and + * the presence of logical slots. + * + * In the future, we could extend support to include automatic transitions + * of effective_wal_level between 'minimal' and 'logical' WAL levels. However, + * this enhancement would require additional coordination mechanisms and + * careful implementation of operations such as terminating walsenders and + * archiver processes while carefully considering the sequence of operations + * to ensure system stability during these transitions. + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/replication/logical/logicalctl.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xloginsert.h" +#include "catalog/pg_control.h" +#include "miscadmin.h" +#include "replication/slot.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "utils/injection_point.h" + +/* + * Struct for controlling the logical decoding status. + * + * This struct is protected by LogicalDecodingControlLock. + */ +typedef struct LogicalDecodingCtlData +{ + /* + * This is the authoritative value used by all processes to determine + * whether to write additional information required by logical decoding to + * WAL. Since this information could be checked frequently, each process + * caches this value in XLogLogicalInfo for better performance. + */ + bool xlog_logical_info; + + /* True if logical decoding is available in the system */ + bool logical_decoding_enabled; + + /* True if logical decoding might need to be disabled */ + bool pending_disable; +} LogicalDecodingCtlData; + +static LogicalDecodingCtlData *LogicalDecodingCtl = NULL; + +/* + * A process-local cache of LogicalDecodingCtl->xlog_logical_info. This is + * initialized at process startup, and updated when processing the process + * barrier signal in ProcessBarrierUpdateXLogLogicalInfo(). If the process + * is in an XID-assigned transaction, the cache update is delayed until the + * transaction ends. See the comments for XLogLogicalInfoUpdatePending for details. + */ +bool XLogLogicalInfo = false; + +/* + * When receiving the PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO signal, if + * an XID is assigned to the current transaction, the process sets this flag and + * delays the XLogLogicalInfo update until the transaction ends. This ensures + * that the XLogLogicalInfo value (typically accessed via XLogLogicalInfoActive) + * remains consistent throughout the transaction. + */ +static bool XLogLogicalInfoUpdatePending = false; + +static void update_xlog_logical_info(void); +static void abort_logical_decoding_activation(int code, Datum arg); +static void write_logical_decoding_status_update_record(bool status); + +Size +LogicalDecodingCtlShmemSize(void) +{ + return sizeof(LogicalDecodingCtlData); +} + +void +LogicalDecodingCtlShmemInit(void) +{ + bool found; + + LogicalDecodingCtl = ShmemInitStruct("Logical decoding control", + LogicalDecodingCtlShmemSize(), + &found); + + if (!found) + MemSet(LogicalDecodingCtl, 0, LogicalDecodingCtlShmemSize()); +} + +/* + * Initialize the logical decoding status in shmem at server startup. This + * must be called ONCE during postmaster or standalone-backend startup. + */ +void +StartupLogicalDecodingStatus(bool last_status) +{ + /* Logical decoding is always disabled when 'minimal' WAL level */ + if (wal_level == WAL_LEVEL_MINIMAL) + return; + + /* + * Set the initial logical decoding status based on the last status. If + * logical decoding was enabled before the last shutdown, it remains + * enabled as we might have set wal_level='logical' or have at least one + * logical slot. + */ + LogicalDecodingCtl->xlog_logical_info = last_status; + LogicalDecodingCtl->logical_decoding_enabled = last_status; +} + +/* + * Update the XLogLogicalInfo cache. + */ +static inline void +update_xlog_logical_info(void) +{ + XLogLogicalInfo = IsXLogLogicalInfoEnabled(); +} + +/* + * Initialize XLogLogicalInfo backend-private cache. This routine is called + * during process initialization. + */ +void +InitializeProcessXLogLogicalInfo(void) +{ + update_xlog_logical_info(); +} + +/* + * This routine is called when we are told to update XLogLogicalInfo + * by a ProcSignalBarrier. + */ +bool +ProcessBarrierUpdateXLogLogicalInfo(void) +{ + if (GetTopTransactionIdIfAny() != InvalidTransactionId) + { + /* Delay updating XLogLogicalInfo until the transaction end */ + XLogLogicalInfoUpdatePending = true; + } + else + update_xlog_logical_info(); + + return true; +} + +/* + * Check the shared memory state and return true if logical decoding is + * enabled on the system. + */ +bool +IsLogicalDecodingEnabled(void) +{ + bool enabled; + + LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); + enabled = LogicalDecodingCtl->logical_decoding_enabled; + LWLockRelease(LogicalDecodingControlLock); + + return enabled; +} + +/* + * Returns true if logical WAL logging is enabled based on the shared memory + * status. + */ +bool +IsXLogLogicalInfoEnabled(void) +{ + bool xlog_logical_info; + + LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); + xlog_logical_info = LogicalDecodingCtl->xlog_logical_info; + LWLockRelease(LogicalDecodingControlLock); + + return xlog_logical_info; +} + +/* + * Reset the local cache at end of the transaction. + */ +void +AtEOXact_LogicalCtl(void) +{ + /* Update the local cache if there is a pending update */ + if (XLogLogicalInfoUpdatePending) + { + update_xlog_logical_info(); + XLogLogicalInfoUpdatePending = false; + } +} + +/* + * Writes an XLOG_LOGICAL_DECODING_STATUS_CHANGE WAL record with the given + * status. + */ +static void +write_logical_decoding_status_update_record(bool status) +{ + XLogRecPtr recptr; + + XLogBeginInsert(); + XLogRegisterData(&status, sizeof(bool)); + recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); + XLogFlush(recptr); +} + +/* + * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting + * the shared flags to revert the logical decoding activation process. + */ +static void +abort_logical_decoding_activation(int code, Datum arg) +{ + Assert(MyReplicationSlot); + Assert(!LogicalDecodingCtl->logical_decoding_enabled); + + elog(DEBUG1, "aborting logical decoding activation process"); + + /* + * Abort the change to xlog_logical_info. We don't need to check + * CheckLogicalSlotExists() as we're still holding a logical slot. + */ + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->xlog_logical_info = false; + LWLockRelease(LogicalDecodingControlLock); + + /* + * Some processes might have already started logical info WAL logging, so + * tell all running processes to update their caches. We don't need to + * wait for all processes to disable xlog_logical_info locally as it's + * always safe to write logical information to WAL records, even when not + * strictly required. + */ + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO); +} + +/* + * Enable logical decoding if disabled. + * + * If this function is called during recovery, it simply returns without + * action since the logical decoding status change is not allowed during + * this time. The logical decoding status depends on the status on the primary. + * The caller should use CheckLogicalDecodingRequirements() before calling this + * function to make sure that the logical decoding status can be modified. + * + * Note that there is no interlock between logical decoding activation + * and slot creation. To ensure enabling logical decoding, the caller + * needs to call this function after creating a logical slot before + * initializing the logical decoding context. + */ +void +EnsureLogicalDecodingEnabled(void) +{ + Assert(MyReplicationSlot); + Assert(wal_level >= WAL_LEVEL_REPLICA); + + /* Logical decoding is always enabled */ + if (wal_level >= WAL_LEVEL_LOGICAL) + return; + + if (RecoveryInProgress()) + { + /* + * CheckLogicalDecodingRequirements() must have already errored out if + * logical decoding is not enabled since we cannot enable the logical + * decoding status during recovery. + */ + Assert(IsLogicalDecodingEnabled()); + return; + } + + /* + * Ensure to abort the activation process in cases where there in an + * interruption during the wait. + */ + PG_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0); + { + EnableLogicalDecoding(); + } + PG_END_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0); +} + +/* + * A workhorse function to enable logical decoding. + */ +void +EnableLogicalDecoding(void) +{ + bool in_recovery; + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + + /* Return if it is already enabled */ + if (LogicalDecodingCtl->logical_decoding_enabled) + { + LogicalDecodingCtl->pending_disable = false; + LWLockRelease(LogicalDecodingControlLock); + return; + } + + /* + * Set logical info WAL logging in shmem. All process starts after this + * point will include the information required by logical decoding to WAL + * records. + */ + LogicalDecodingCtl->xlog_logical_info = true; + + LWLockRelease(LogicalDecodingControlLock); + + /* + * Tell all running processes to reflect the xlog_logical_info update, and + * wait. This ensures that all running processes have enabled logical + * information WAL logging. + */ + WaitForProcSignalBarrier( + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO)); + + INJECTION_POINT("logical-decoding-activation", NULL); + + in_recovery = RecoveryInProgress(); + + /* + * There could be some transactions that might have started with the old + * status, but we don't need to wait for these transactions to complete as + * long as they have valid XIDs. These transactions will appear in the + * xl_running_xacts record and therefore the snapshot builder will not try + * to decode the transaction during the logical decoding initialization. + * + * There is a theoretical case where a transaction decides whether to + * include logical-info to WAL records before getting an XID. In this + * case, the transaction won't appear in xl_running_xacts. + * + * For operations that do not require an XID assignment, the process + * starts including logical-info immediately upon receiving the signal + * (barrier). If such an operation checks the effective_wal_level multiple + * times within a single execution, the resulting WAL records might be + * inconsistent (i.e., logical-info is included in some records but not in + * others). However, this is harmless because logical decoding generally + * ignores WAL records that are not associated with an assigned XID. + * + * One might think we need to wait for all running transactions, including + * those without XIDs and read-only transactions, to finish before + * enabling logical decoding. However, such a requirement would force the + * slot creation to wait for a potentially very long time due to + * long-running read queries, which is practically unacceptable. + */ + + START_CRIT_SECTION(); + + /* + * We enable logical decoding first, followed by writing the WAL record. + * This sequence ensures logical decoding becomes available on the primary + * first. + */ + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + + LogicalDecodingCtl->logical_decoding_enabled = true; + + if (!in_recovery) + write_logical_decoding_status_update_record(true); + + LogicalDecodingCtl->pending_disable = false; + + LWLockRelease(LogicalDecodingControlLock); + + END_CRIT_SECTION(); + + if (!in_recovery) + ereport(LOG, + errmsg("logical decoding is enabled upon creating a new logical replication slot")); +} + +/* + * Initiate a request for disabling logical decoding. + * + * Note that this function does not verify whether logical slots exist. The + * checkpointer will verify if logical decoding should actually be disabled. + */ +void +RequestDisableLogicalDecoding(void) +{ + if (wal_level != WAL_LEVEL_REPLICA) + return; + + /* + * It's possible that we might not actually need to disable logical + * decoding if someone creates a new logical slot concurrently. We set the + * flag anyway and the checkpointer will check it and disable logical + * decoding if necessary. + */ + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->pending_disable = true; + LWLockRelease(LogicalDecodingControlLock); + + WakeupCheckpointer(); + + elog(DEBUG1, "requested disabling logical decoding"); +} + +/* + * Disable logical decoding if necessary. + * + * This function disables logical decoding upon a request initiated by + * RequestDisableLogicalDecoding(). Otherwise, it performs no action. + */ +void +DisableLogicalDecodingIfNecessary(void) +{ + bool pending_disable; + + if (wal_level != WAL_LEVEL_REPLICA) + return; + + /* + * Sanity check as we cannot disable logical decoding while holding a + * logical slot. + */ + Assert(!MyReplicationSlot); + + if (RecoveryInProgress()) + return; + + LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); + pending_disable = LogicalDecodingCtl->pending_disable; + LWLockRelease(LogicalDecodingControlLock); + + /* Quick return if no pending disable request */ + if (!pending_disable) + return; + + DisableLogicalDecoding(); +} + +/* + * A workhorse function to disable logical decoding. + */ +void +DisableLogicalDecoding(void) +{ + bool in_recovery = RecoveryInProgress(); + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + + /* + * Check if we can disable logical decoding. + * + * Skip CheckLogicalSlotExists() check during recovery because the + * existing slots will be invalidated after disabling logical decoding. + */ + if (!LogicalDecodingCtl->logical_decoding_enabled || + (!in_recovery && CheckLogicalSlotExists())) + { + LogicalDecodingCtl->pending_disable = false; + LWLockRelease(LogicalDecodingControlLock); + return; + } + + START_CRIT_SECTION(); + + /* + * We need to disable logical decoding first and then disable logical + * information WAL logging in order to ensure that no logical decoding + * processes WAL records with insufficient information. + */ + LogicalDecodingCtl->logical_decoding_enabled = false; + + /* Write the WAL to disable logical decoding on standbys too */ + if (!in_recovery) + write_logical_decoding_status_update_record(false); + + /* Now disable logical information WAL logging */ + LogicalDecodingCtl->xlog_logical_info = false; + LogicalDecodingCtl->pending_disable = false; + + END_CRIT_SECTION(); + + if (!in_recovery) + ereport(LOG, + errmsg("logical decoding is disabled because there are no valid logical replication slots")); + + LWLockRelease(LogicalDecodingControlLock); + + /* + * Tell all running processes to reflect the xlog_logical_info update. + * Unlike when enabling logical decoding, we don't need to wait for all + * processes to complete it in this case. We already disabled logical + * decoding and it's always safe to write logical information to WAL + * records, even when not strictly required. Therefore, we don't need to + * wait for all running transactions to finish either. + */ + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO); +} + +/* + * Updates the logical decoding status at end of recovery, and ensures that + * all running processes have the updated XLogLogicalInfo status. This + * function must be called before accepting writes. + */ +void +UpdateLogicalDecodingStatusEndOfRecovery(void) +{ + bool new_status = false; + + Assert(RecoveryInProgress()); + + /* + * With 'minimal' WAL level, there are no logical replication slots during + * recovery. Logical decoding is always disabled, so there is no need to + * synchronize XLogLogicalInfo. + */ + if (wal_level == WAL_LEVEL_MINIMAL) + { + Assert(!IsXLogLogicalInfoEnabled() && !IsLogicalDecodingEnabled()); + return; + } + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + + if (wal_level == WAL_LEVEL_LOGICAL || CheckLogicalSlotExists()) + new_status = true; + + /* + * When recovery ends, we need to either enable or disable logical + * decoding based on the wal_level setting and the presence of logical + * slots. We need to note that concurrent slot creation and deletion could + * happen but WAL writes are still not permitted until recovery fully + * completes. Here's how we handle concurrent toggling of logical + * decoding: + * + * For 'enable' case, if there's a concurrent disable request before + * recovery fully completes, the checkpointer will handle it after + * recovery is done. This means there might be a brief period after + * recovery where logical decoding remains enabled even with no logical + * replication slots present. This temporary state is not new - it can + * already occur due to the checkpointer's asynchronous deactivation + * process. + * + * For 'disable' case, backend cannot create logical replication slots + * during recovery (see checks in CheckLogicalDecodingRequirements()), + * which prevents a race condition between disabling logical decoding and + * concurrent slot creation. + */ + if (new_status != LogicalDecodingCtl->logical_decoding_enabled) + { + /* + * Update both the logical decoding status and logical WAL logging + * status. Unlike toggling these status during non-recovery, we don't + * need to worry about the operation order as WAL writes are still not + * permitted. + */ + LogicalDecodingCtl->xlog_logical_info = new_status; + LogicalDecodingCtl->logical_decoding_enabled = new_status; + + elog(DEBUG1, + "update logical decoding status to %d at the end of recovery", + new_status); + + /* + * Now that we updated the logical decoding status, clear the pending + * disable flag. It's possible that a concurrent process drops the + * last logical slot and initiates the pending disable again. The + * checkpointer process will check it. + */ + LogicalDecodingCtl->pending_disable = false; + + LWLockRelease(LogicalDecodingControlLock); + + write_logical_decoding_status_update_record(new_status); + } + else + LWLockRelease(LogicalDecodingControlLock); + + /* + * Ensure all running processes have the updated status. We don't need to + * wait for running transactions to finish as we don't accept any writes + * yet. On the other hand, we need to wait for synchronizing + * XLogLogicalInfo even if we've not updated the status above as the + * status have been turned on and off during recovery, having running + * processes have different status on their local caches. + */ + if (IsUnderPostmaster) + WaitForProcSignalBarrier( + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO)); + + INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL); +} diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index a2268d8361e..928b503addf 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -6,6 +6,7 @@ backend_sources += files( 'decode.c', 'launcher.c', 'logical.c', + 'logicalctl.c', 'logicalfuncs.c', 'message.c', 'origin.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index bf50317b443..2aea776352d 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -1201,13 +1201,15 @@ bool ValidateSlotSyncParams(int elevel) { /* - * Logical slot sync/creation requires wal_level >= logical. + * Logical slot sync/creation requires logical decoding to be enabled. */ - if (wal_level < WAL_LEVEL_LOGICAL) + if (!IsLogicalDecodingEnabled()) { ereport(elevel, errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\"")); + errmsg("replication slot synchronization requires \"effective_wal_level\" >= \"logical\" on the primary"), + errhint("To enable logical decoding on primary, set \"wal_level\" >= \"logical\" or create at least one logical slot when \"wal_level\" = \"replica\".")); + return false; } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 682eccd116c..58c41d45516 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -765,16 +765,15 @@ ReplicationSlotRelease(void) { ReplicationSlot *slot = MyReplicationSlot; char *slotname = NULL; /* keep compiler quiet */ - bool is_logical = false; /* keep compiler quiet */ + bool is_logical; TimestampTz now = 0; Assert(slot != NULL && slot->active_pid != 0); + is_logical = SlotIsLogical(slot); + if (am_walsender) - { slotname = pstrdup(NameStr(slot->data.name)); - is_logical = SlotIsLogical(slot); - } if (slot->data.persistency == RS_EPHEMERAL) { @@ -784,6 +783,14 @@ ReplicationSlotRelease(void) * data. */ ReplicationSlotDropAcquired(); + + /* + * Request to disable logical decoding, even though this slot may not + * have been the last logical slot. The checkpointer will verify if + * logical decoding should actually be disabled. + */ + if (is_logical) + RequestDisableLogicalDecoding(); } /* @@ -848,15 +855,21 @@ ReplicationSlotRelease(void) * * Cleanup only synced temporary slots if 'synced_only' is true, else * cleanup all temporary slots. + * + * If it drops the last logical slot in the cluster, requests to disable + * logical decoding. */ void ReplicationSlotCleanup(bool synced_only) { int i; + bool found_valid_logicalslot; + bool dropped_logical = false; Assert(MyReplicationSlot == NULL); restart: + found_valid_logicalslot = false; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -866,6 +879,10 @@ restart: continue; SpinLockAcquire(&s->mutex); + + found_valid_logicalslot |= + (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE); + if ((s->active_pid == MyProcPid && (!synced_only || s->data.synced))) { @@ -873,6 +890,9 @@ restart: SpinLockRelease(&s->mutex); LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */ + if (SlotIsLogical(s)) + dropped_logical = true; + ReplicationSlotDropPtr(s); ConditionVariableBroadcast(&s->active_cv); @@ -883,6 +903,9 @@ restart: } LWLockRelease(ReplicationSlotControlLock); + + if (dropped_logical && !found_valid_logicalslot) + RequestDisableLogicalDecoding(); } /* @@ -891,6 +914,8 @@ restart: void ReplicationSlotDrop(const char *name, bool nowait) { + bool is_logical; + Assert(MyReplicationSlot == NULL); ReplicationSlotAcquire(name, nowait, false); @@ -905,7 +930,12 @@ ReplicationSlotDrop(const char *name, bool nowait) errmsg("cannot drop replication slot \"%s\"", name), errdetail("This replication slot is being synchronized from the primary server.")); + is_logical = SlotIsLogical(MyReplicationSlot); + ReplicationSlotDropAcquired(); + + if (is_logical) + RequestDisableLogicalDecoding(); } /* @@ -1436,16 +1466,22 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) * * This routine isn't as efficient as it could be - but we don't drop * databases often, especially databases with lots of slots. + * + * If it drops the last logical slot in the cluster, it requests to disable + * logical decoding. */ void ReplicationSlotsDropDBSlots(Oid dboid) { int i; + bool found_valid_logicalslot; + bool dropped = false; if (max_replication_slots <= 0) return; restart: + found_valid_logicalslot = false; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -1463,11 +1499,19 @@ restart: if (!SlotIsLogical(s)) continue; + /* + * Check logical slots on other databases too so we can disable + * logical decoding only if no slots in the cluster. + */ + SpinLockAcquire(&s->mutex); + found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE); + SpinLockRelease(&s->mutex); + /* not our database, skip */ if (s->data.database != dboid) continue; - /* NB: intentionally including invalidated slots */ + /* NB: intentionally including invalidated slots to drop */ /* acquire slot, so ReplicationSlotDropAcquired can be reused */ SpinLockAcquire(&s->mutex); @@ -1519,11 +1563,55 @@ restart: */ LWLockRelease(ReplicationSlotControlLock); ReplicationSlotDropAcquired(); + dropped = true; goto restart; } LWLockRelease(ReplicationSlotControlLock); + + if (dropped && !found_valid_logicalslot) + RequestDisableLogicalDecoding(); } +/* + * Returns true if there is at least one in-use valid logical replication slot. + */ +bool +CheckLogicalSlotExists(void) +{ + bool found = false; + + if (max_replication_slots <= 0) + return false; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s; + bool invalidated; + + s = &ReplicationSlotCtl->replication_slots[i]; + + /* cannot change while ReplicationSlotCtlLock is held */ + if (!s->in_use) + continue; + + if (SlotIsPhysical(s)) + continue; + + SpinLockAcquire(&s->mutex); + invalidated = s->data.invalidated != RS_INVAL_NONE; + SpinLockRelease(&s->mutex); + + if (invalidated) + continue; + + found = true; + break; + } + LWLockRelease(ReplicationSlotControlLock); + + return found; +} /* * Check whether the server's configuration supports using replication @@ -1686,7 +1774,7 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, break; case RS_INVAL_WAL_LEVEL: - appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server.")); + appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\".")); break; case RS_INVAL_IDLE_TIMEOUT: @@ -1828,10 +1916,11 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, * * Acquires the given slot and mark it invalid, if necessary and possible. * - * Returns whether ReplicationSlotControlLock was released in the interim (and - * in that case we're not holding the lock at return, otherwise we are). + * Returns true if the slot was invalidated. * - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.) + * Set *released_lock_out if ReplicationSlotControlLock was released in the + * interim (and in that case we're not holding the lock at return, otherwise + * we are). * * This is inherently racy, because we release the LWLock * for syscalls, so caller must restart if we return true. @@ -1841,10 +1930,11 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, - bool *invalidated) + bool *released_lock_out) { int last_signaled_pid = 0; bool released_lock = false; + bool invalidated = false; TimestampTz inactive_since = 0; for (;;) @@ -1933,7 +2023,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, } /* Let caller know */ - *invalidated = true; + invalidated = true; } SpinLockRelease(&s->mutex); @@ -2041,7 +2131,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock)); - return released_lock; + *released_lock_out = released_lock; + return invalidated; } /* @@ -2054,7 +2145,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given * db; dboid may be InvalidOid for shared relations - * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient + * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not + * logical. * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured * "idle_replication_slot_timeout" duration. * @@ -2062,6 +2154,9 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * causes in a single pass, minimizing redundant iterations. The "cause" * parameter can be a MASK representing one or more of the defined causes. * + * If it invalidates the last logical slot in the cluster, it requests to + * disable logical decoding. + * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ bool @@ -2071,6 +2166,8 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes, { XLogRecPtr oldestLSN; bool invalidated = false; + bool invalidated_logical = false; + bool found_valid_logicalslot; Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon)); Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0); @@ -2082,25 +2179,58 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); restart: + found_valid_logicalslot = false; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (int i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + bool released_lock = false; if (!s->in_use) continue; /* Prevent invalidation of logical slots during binary upgrade */ if (SlotIsLogical(s) && IsBinaryUpgrade) - continue; - - if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid, - snapshotConflictHorizon, - &invalidated)) { - /* if the lock was released, start from scratch */ - goto restart; + SpinLockAcquire(&s->mutex); + found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE); + SpinLockRelease(&s->mutex); + + continue; } + + if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, + dboid, snapshotConflictHorizon, + &released_lock)) + { + Assert(released_lock); + + /* Remember we have invalidated a physical or logical slot */ + invalidated = true; + + /* + * Additionally, remember we have invalidated a logical slot as we + * can request disabling logical decoding later. + */ + if (SlotIsLogical(s)) + invalidated_logical = true; + } + else + { + /* + * We need to check if the slot is invalidated here since + * InvalidatePossiblyObsoleteSlot() returns false also if the slot + * is already invalidated. + */ + SpinLockAcquire(&s->mutex); + found_valid_logicalslot |= + (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE)); + SpinLockRelease(&s->mutex); + } + + /* if the lock was released, start from scratch */ + if (released_lock) + goto restart; } LWLockRelease(ReplicationSlotControlLock); @@ -2113,6 +2243,15 @@ restart: ReplicationSlotsComputeRequiredLSN(); } + /* + * Request the checkpointer to disable logical decoding if no valid + * logical slots remain. If called by the checkpointer during a + * checkpoint, only the request is initiated; actual deactivation is + * deferred until after the checkpoint completes. + */ + if (invalidated_logical && !found_valid_logicalslot) + RequestDisableLogicalDecoding(); + return invalidated; } @@ -2648,19 +2787,20 @@ RestoreSlotFromDisk(const char *name) */ if (cp.slotdata.database != InvalidOid) { - if (wal_level < WAL_LEVEL_LOGICAL) + if (wal_level < WAL_LEVEL_REPLICA) ereport(FATAL, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"", + errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"", NameStr(cp.slotdata.name)), - errhint("Change \"wal_level\" to be \"logical\" or higher."))); + errhint("Change \"wal_level\" to be \"replica\" or higher."))); /* * In standby mode, the hot standby must be enabled. This check is * necessary to ensure logical slots are invalidated when they become * incompatible due to insufficient wal_level. Otherwise, if the - * primary reduces wal_level < logical while hot standby is disabled, - * logical slots would remain valid even after promotion. + * primary reduces effective_wal_level < logical while hot standby is + * disabled, primary disable logical decoding while hot standby is + * disabled, logical slots would remain valid even after promotion. */ if (StandbyMode && !EnableHotStandby) ereport(FATAL, diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 7647f051581..70a27f83f29 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -147,6 +147,13 @@ create_logical_replication_slot(char *name, char *plugin, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, failover, false); + /* + * Ensure the logical decoding is enabled before initializing the logical + * decoding context. + */ + EnsureLogicalDecodingEnabled(); + Assert(IsLogicalDecodingEnabled()); + /* * Create logical decoding context to find start point or, if we don't * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 449632ad1aa..96cede8f45a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1297,6 +1297,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) need_full_snapshot = true; } + /* + * Ensure the logical decoding is enabled before initializing the + * logical decoding context. + */ + EnsureLogicalDecodingEnabled(); + Assert(IsLogicalDecodingEnabled()); + ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, InvalidXLogRecPtr, XL_ROUTINE(.page_read = logical_read_xlog_page, diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index b23d0c19360..adebba625e6 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -140,6 +140,7 @@ CalculateShmemSize(void) size = add_size(size, SlotSyncShmemSize()); size = add_size(size, AioShmemSize()); size = add_size(size, WaitLSNShmemSize()); + size = add_size(size, LogicalDecodingCtlShmemSize()); /* include additional requested shmem from preload libraries */ size = add_size(size, total_addin_request); @@ -328,6 +329,7 @@ CreateOrAttachShmemStructs(void) InjectionPointShmemInit(); AioShmemInit(); WaitLSNShmemInit(); + LogicalDecodingCtlShmemInit(); } /* diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 087821311cc..b0b93d96091 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -576,6 +576,9 @@ ProcessProcSignalBarrier(void) case PROCSIGNAL_BARRIER_SMGRRELEASE: processed = ProcessBarrierSmgrRelease(); break; + case PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO: + processed = ProcessBarrierUpdateXLogLogicalInfo(); + break; } /* diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index fc45d72c79b..773832c3a36 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -499,7 +499,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, * seems OK, given that this kind of conflict should not normally be * reached, e.g. due to using a physical replication slot. */ - if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel) + if (IsLogicalDecodingEnabled() && isCatalogRel) InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid, snapshotConflictHorizon); } @@ -1285,6 +1285,7 @@ LogStandbySnapshot(void) RunningTransactions running; xl_standby_lock *locks; int nlocks; + bool logical_decoding_enabled = IsLogicalDecodingEnabled(); Assert(XLogStandbyInfoActive()); @@ -1325,13 +1326,13 @@ LogStandbySnapshot(void) * record. Fortunately this routine isn't executed frequently, and it's * only a shared lock. */ - if (wal_level < WAL_LEVEL_LOGICAL) + if (!logical_decoding_enabled) LWLockRelease(ProcArrayLock); recptr = LogCurrentRunningXacts(running); /* Release lock if we kept it longer ... */ - if (wal_level >= WAL_LEVEL_LOGICAL) + if (logical_decoding_enabled) LWLockRelease(ProcArrayLock); /* GetRunningTransactionData() acquired XidGenLock, we must release it */ diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index c0632bf901a..dcfadbd5aae 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -359,6 +359,7 @@ InjectionPoint "Waiting to read or update information related to injection point SerialControl "Waiting to read or update shared pg_serial state." AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." WaitLSN "Waiting to read or update shared Wait-for-LSN state." +LogicalDecodingControl "Waiting to read or update logical decoding status information." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 8f7a56d0f2c..11ed876264c 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -98,9 +98,9 @@ * likewise send the invalidation immediately, before ending the change's * critical section. This includes inplace heap updates, relmap, and smgr. * - * When wal_level=logical, write invalidations into WAL at each command end to - * support the decoding of the in-progress transactions. See - * CommandEndInvalidationMessages. + * When effective_wal_level is 'logical', write invalidations into WAL at + * each command end to support the decoding of the in-progress transactions. + * See CommandEndInvalidationMessages. * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -1419,7 +1419,7 @@ CommandEndInvalidationMessages(void) ProcessInvalidationMessages(&transInvalInfo->ii.CurrentCmdInvalidMsgs, LocalExecuteInvalidationMessage); - /* WAL Log per-command invalidation messages for wal_level=logical */ + /* WAL Log per-command invalidation messages for logical decoding */ if (XLogLogicalInfoActive()) LogLogicalInvalidations(); diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 4ed69ac7ba2..b7e94ca45bd 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -653,6 +653,9 @@ BaseInit(void) /* Initialize lock manager's local structs */ InitLockManagerAccess(); + /* Initialize logical info WAL logging state */ + InitializeProcessXLogLogicalInfo(); + /* * Initialize replication slots after pgstat. The exit hook might need to * drop ephemeral slots, which in turn triggers stats reporting. diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 3b9d8349078..ac0c7c36c56 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -805,6 +805,15 @@ max => 'MAX_IO_CONCURRENCY', }, +{ name => 'effective_wal_level', type => 'enum', context => 'PGC_INTERNAL', group => 'PRESET_OPTIONS', + short_desc => 'Show effective WAL level.', + flags => 'GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE', + variable => 'effective_wal_level', + boot_val => 'WAL_LEVEL_REPLICA', + options => 'wal_level_options', + show_hook => 'show_effective_wal_level', +}, + { name => 'enable_async_append', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD', short_desc => 'Enables the planner\'s use of async append plans.', flags => 'GUC_EXPLAIN', diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index f87b558c2c6..04ab0a26608 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -617,6 +617,7 @@ static int shared_memory_size_mb; static int shared_memory_size_in_huge_pages; static int wal_block_size; static int num_os_semaphores; +static int effective_wal_level = WAL_LEVEL_REPLICA; static bool data_checksums; static bool integer_datetimes; diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index 41a649297c7..dab4dfb3a52 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -953,7 +953,7 @@ check_publisher(const struct LogicalRepInfo *dbinfo) * Since these parameters are not a requirement for physical replication, * we should check it to make sure it won't fail. * - * - wal_level = logical + * - wal_level >= replica * - max_replication_slots >= current + number of dbs to be converted * - max_wal_senders >= current + number of dbs to be converted * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files) @@ -997,9 +997,9 @@ check_publisher(const struct LogicalRepInfo *dbinfo) disconnect_database(conn, false); - if (strcmp(wal_level, "logical") != 0) + if (strcmp(wal_level, "minimal") == 0) { - pg_log_error("publisher requires \"wal_level\" >= \"logical\""); + pg_log_error("publisher requires \"wal_level\" >= \"replica\""); failed = true; } diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index 9e0db6cd099..4657172c9ac 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -240,7 +240,6 @@ command_fails( # Check some unmet conditions on node P $node_p->append_conf( 'postgresql.conf', q{ -wal_level = replica max_replication_slots = 1 max_wal_senders = 1 max_worker_processes = 2 @@ -265,7 +264,6 @@ command_fails( # standby settings should not be a lower setting than on the primary. $node_p->append_conf( 'postgresql.conf', q{ -wal_level = logical max_replication_slots = 10 max_wal_senders = 10 max_worker_processes = 8 diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 1e17d64b3ec..9cdeb15bd51 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -2131,11 +2131,7 @@ check_new_cluster_replication_slots(void) wal_level = PQgetvalue(res, 0, 0); - if (nslots_on_old > 0 && strcmp(wal_level, "logical") != 0) - pg_fatal("\"wal_level\" must be \"logical\" but is set to \"%s\"", - wal_level); - - if (old_cluster.sub_retain_dead_tuples && + if ((nslots_on_old > 0 || old_cluster.sub_retain_dead_tuples) && strcmp(wal_level, "minimal") == 0) pg_fatal("\"wal_level\" must be \"replica\" or \"logical\" but is set to \"%s\"", wal_level); diff --git a/src/bin/pg_upgrade/t/002_pg_upgrade.pl b/src/bin/pg_upgrade/t/002_pg_upgrade.pl index 823f41e754c..587b683aec1 100644 --- a/src/bin/pg_upgrade/t/002_pg_upgrade.pl +++ b/src/bin/pg_upgrade/t/002_pg_upgrade.pl @@ -225,6 +225,10 @@ $oldnode->init(%old_node_params); # Override log_statement=all set by Cluster.pm. This avoids large amounts # of log traffic that slow this test down even more when run under valgrind. $oldnode->append_conf('postgresql.conf', 'log_statement = none'); + +# Set wal_level = replica to run the regression tests in the same +# wal_level as when 'make check' runs. +$oldnode->append_conf('postgresql.conf', 'wal_level = replica'); $oldnode->start; my $result; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 605280ed8fb..9af3318ef25 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -13,6 +13,7 @@ #include "access/xlogbackup.h" #include "access/xlogdefs.h" +#include "replication/logicalctl.h" #include "datatype/timestamp.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" @@ -94,6 +95,7 @@ typedef enum RecoveryState } RecoveryState; extern PGDLLIMPORT int wal_level; +extern PGDLLIMPORT bool XLogLogicalInfo; /* Is WAL archiving enabled (always or only while server is running normally)? */ #define XLogArchivingActive() \ @@ -122,8 +124,17 @@ extern PGDLLIMPORT int wal_level; /* Do we need to WAL-log information required only for Hot Standby and logical replication? */ #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA) -/* Do we need to WAL-log information required only for logical replication? */ -#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL) +/* + * Do we need to WAL-log information required only for logical replication? + * + * When XLogLogicalInfoActive() returns true, it enables logical-decoding-related + * WAL logging as if wal_level were set to 'logical', even if it's actually set + * to 'replica'. Note that XLogLogicalInfo is a process-local cache and can + * change until an XID is assigned to the transaction. In other words, it + * ensures that the same result is returned within an XID-assigned transaction. + */ +#define XLogLogicalInfoActive() \ + (wal_level >= WAL_LEVEL_LOGICAL || XLogLogicalInfo) #ifdef WAL_DEBUG extern PGDLLIMPORT bool XLOG_DEBUG; @@ -257,6 +268,8 @@ extern XLogRecPtr GetLastImportantRecPtr(void); extern void SetWalWriterSleeping(bool sleeping); +extern void WakeupCheckpointer(void); + extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli); diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 293e9e03f59..742b152b51b 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -22,7 +22,7 @@ /* Version identifier for this pg_control format */ -#define PG_CONTROL_VERSION 1900 +#define PG_CONTROL_VERSION 1901 /* Nonce key length, see below */ #define MOCK_AUTH_NONCE_LEN 32 @@ -41,6 +41,7 @@ typedef struct CheckPoint * timeline (equals ThisTimeLineID otherwise) */ bool fullPageWrites; /* current full_page_writes */ int wal_level; /* current wal_level */ + bool logicalDecodingEnabled; /* current logical decoding status */ FullTransactionId nextXid; /* next free transaction ID */ Oid nextOid; /* next free OID */ MultiXactId nextMulti; /* next free MultiXactId */ @@ -80,6 +81,7 @@ typedef struct CheckPoint /* 0xC0 is used in Postgres 9.5-11 */ #define XLOG_OVERWRITE_CONTRECORD 0xD0 #define XLOG_CHECKPOINT_REDO 0xE0 +#define XLOG_LOGICAL_DECODING_STATUS_CHANGE 0xF0 /* diff --git a/src/include/replication/logicalctl.h b/src/include/replication/logicalctl.h new file mode 100644 index 00000000000..fbe4fa54976 --- /dev/null +++ b/src/include/replication/logicalctl.h @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * + * logicalctl.h + * Definitions for logical decoding status control facility. + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/replication/logicalctl.h + * + *------------------------------------------------------------------------- + */ +#ifndef LOGICALCTL_H +#define LOGICALCTL_H + +extern Size LogicalDecodingCtlShmemSize(void); +extern void LogicalDecodingCtlShmemInit(void); +extern void StartupLogicalDecodingStatus(bool status_in_control_file); +extern void InitializeProcessXLogLogicalInfo(void); +extern bool ProcessBarrierUpdateXLogLogicalInfo(void); +extern bool IsLogicalDecodingEnabled(void); +extern bool IsXLogLogicalInfoEnabled(void); +extern bool CheckXLogLogicalInfo(void); +extern void AtEOXact_LogicalCtl(void); +extern void EnsureLogicalDecodingEnabled(void); +extern void EnableLogicalDecoding(void); +extern void RequestDisableLogicalDecoding(void); +extern void DisableLogicalDecodingIfNecessary(void); +extern void DisableLogicalDecoding(void); +extern void UpdateLogicalDecodingStatusEndOfRecovery(void); + +#endif diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 28251d86638..87b0ee8856d 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -359,6 +359,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); +extern bool CheckLogicalSlotExists(void); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index 5b0ce383408..533344509e9 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -86,6 +86,7 @@ PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) PG_LWLOCK(54, WaitLSN) +PG_LWLOCK(55, LogicalDecodingControl) /* * There also exist several built-in LWLock tranches. As with the predefined diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index afeeb1ca019..8e428f298c6 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -54,6 +54,8 @@ typedef enum typedef enum { PROCSIGNAL_BARRIER_SMGRRELEASE, /* ask smgr to close files */ + PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, /* ask to update + * XLogLogicalInfo */ } ProcSignalBarrierType; /* diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 82ac8646a8d..fbe0b1e2e3d 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -61,6 +61,7 @@ extern bool check_default_text_search_config(char **newval, void **extra, GucSou extern void assign_default_text_search_config(const char *newval, void *extra); extern bool check_default_with_oids(bool *newval, void **extra, GucSource source); +extern const char *show_effective_wal_level(void); extern bool check_huge_page_size(int *newval, void **extra, GucSource source); extern void assign_io_method(int newval, void *extra); extern bool check_io_max_concurrency(int *newval, void **extra, GucSource source); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index e93248bd66e..9cc057909e2 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -59,6 +59,7 @@ tests += { 't/048_vacuum_horizon_floor.pl', 't/049_wait_for_lsn.pl', 't/050_redo_segment_missing.pl', + 't/051_effective_wal_level.pl', ], }, } diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index ebe2fae1789..49d9ea4d096 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -878,9 +878,10 @@ check_slots_conflict_reason('wal_level_', 'wal_level_insufficient'); $handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr); -# We are not able to read from the slot as it requires wal_level >= logical on the primary server +# We are not able to read from the slot as it requires effective_wal_level >= logical on +# the primary server check_pg_recvlogical_stderr($handle, - "logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary" + "logical decoding on standby requires \"effective_wal_level\" >= \"logical\" on the primary" ); # Restore primary wal_level diff --git a/src/test/recovery/t/051_effective_wal_level.pl b/src/test/recovery/t/051_effective_wal_level.pl new file mode 100644 index 00000000000..d245359f304 --- /dev/null +++ b/src/test/recovery/t/051_effective_wal_level.pl @@ -0,0 +1,404 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group +# +# Test that effective_wal_level changes upon logical replication slot creation +# and deletion. + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Check both wal_level and effective_wal_level values on the given node +# are expected. +sub test_wal_level +{ + my ($node, $expected, $msg) = @_; + + is( $node->safe_psql( + 'postgres', + qq[select current_setting('wal_level'), current_setting('effective_wal_level');] + ), + "$expected", + "$msg"); +} + +# Wait for the checkpointer to decrease effective_wal_level to 'replica'. +sub wait_for_logical_decoding_disabled +{ + my ($node) = @_; + + $node->poll_query_until('postgres', + qq[select current_setting('effective_wal_level') = 'replica';]); +} + +# Initialize the primary server with wal_level = 'replica'. +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 1); +$primary->append_conf('postgresql.conf', "log_min_messages = debug1"); +$primary->start(); + +# Check both initial wal_level and effective_wal_level values. +test_wal_level($primary, "replica|replica", + "wal_level and effective_wal_level start with the same value 'replica'"); + +# Create a physical slot and verify that it doesn't affect effective_wal_level. +$primary->safe_psql('postgres', + qq[select pg_create_physical_replication_slot('test_phy_slot', false, false)] +); +test_wal_level($primary, "replica|replica", + "effective_wal_level doesn't change with a new physical slot"); +$primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_phy_slot')]); + +# Create a temporary logical slot but exit without releasing it explicitly. +# This enables logical decoding but skips disabling it and delegates to the +# checkpointer. +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_tmp_slot', 'test_decoding', true)] +); +ok( $primary->log_contains( + "logical decoding is enabled upon creating a new logical replication slot" + ), + "logical decoding has been enabled upon creating a temp slot"); + +# Wait for the checkpointer to disable logical decoding. +wait_for_logical_decoding_disabled($primary); + +# Create a new logical slot and check that effective_wal_level must be increased +# to 'logical'. +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]); +test_wal_level($primary, "replica|logical", + "effective_wal_level increased to 'logical' upon a logical slot creation" +); + +# Restart the server and check again. +$primary->restart(); +test_wal_level($primary, "replica|logical", + "effective_wal_level remains 'logical' even after a server restart"); + +# Create and drop another logical slot, then verify that effective_wal_level remains +# 'logical'. +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot2', 'pgoutput')]); +$primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_slot2')]); +test_wal_level($primary, "replica|logical", + "effective_wal_level stays 'logical' as one slot remains"); + +# Verify that the server cannot start with wal_level='minimal' when there is +# at least one replication slot. +$primary->adjust_conf('postgresql.conf', 'wal_level', 'minimal'); +$primary->adjust_conf('postgresql.conf', 'max_wal_senders', '0'); +$primary->stop; + +command_fails( + [ + 'pg_ctl', + '--pgdata' => $primary->data_dir, + '--log' => $primary->logfile, + 'start', + ], + "cannot start server with wal_level='minimal' as there is in-use logical slot"); + +my $logfile = slurp_file($primary->logfile()); +like( + $logfile, + qr/logical replication slot "test_slot" exists, but "wal_level" < "replica"/, + 'logical slots requires logical decoding enabled at server startup'); + +# Revert the modified settings. +$primary->adjust_conf('postgresql.conf', 'wal_level', 'replica'); +$primary->adjust_conf('postgresql.conf', 'max_wal_senders', '10'); + +# Add other settings to test if we disable logical decoding when invalidating the last +# logical slot. +$primary->append_conf( + 'postgresql.conf', + qq[ +min_wal_size = 32MB +max_wal_size = 32MB +max_slot_wal_keep_size = 16MB +]); +$primary->start; + +# Advance WAL and verify that the slot gets invalidated. +$primary->advance_wal(2); +$primary->safe_psql('postgres', qq[CHECKPOINT]); +is( $primary->safe_psql( + 'postgres', + qq[ +select invalidation_reason = 'wal_removed' from pg_replication_slots where slot_name = 'test_slot'; + ]), + 't', + 'test_slot gets invalidated due to wal_removed'); + +# Verify that logical decoding is disabled after invalidating the last logical slot. +wait_for_logical_decoding_disabled($primary); +test_wal_level($primary, "replica|replica", + "effective_wal_level got decreased to 'replica' after invalidating the last logical slot" +); + +# Revert the modified settings, and restart the server. +$primary->adjust_conf('postgresql.conf', 'max_slot_wal_keep_size', undef); +$primary->adjust_conf('postgresql.conf', 'min_wal_size', undef); +$primary->adjust_conf('postgresql.conf', 'max_wal_size', undef); +$primary->restart; + +# Recreate the logical slot to enable logical decoding again. +$primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_slot')]); +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]); + +# Take backup during the effective_wal_level being 'logical'. But note that +# replication slots are not included in the backup. +$primary->backup('my_backup'); + +# Initialize standby1 node. +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup($primary, 'my_backup', has_streaming => 1); +$standby1->start; + +# Creating a logical slot on standby should succeed as the primary enables +# it. +$primary->wait_for_replay_catchup($standby1); +$standby1->create_logical_slot_on_standby($primary, 'standby1_slot', + 'postgres'); + +# Promote the standby1 node that has one logical slot. So effective_wal_level +# remains 'logical' even after the promotion. +$standby1->promote; +test_wal_level($standby1, "replica|logical", + "effective_wal_level remains 'logical' even after the promotion"); + +# Confirm if we can create a logical slot after the promotion. +$standby1->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('standby1_slot2', 'pgoutput')] +); +$standby1->stop; + +# Initialize standby2 node and start it with wal_level = 'logical'. +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup($primary, 'my_backup', has_streaming => 1); +$standby2->append_conf('postgresql.conf', qq[wal_level = 'logical']); +$standby2->start(); +$standby2->backup('my_backup3'); + +# Initialize cascade standby and start with wal_level = 'replica'. +my $cascade = PostgreSQL::Test::Cluster->new('cascade'); +$cascade->init_from_backup($standby2, 'my_backup3', has_streaming => 1); +$cascade->adjust_conf('postgresql.conf', 'wal_level', 'replica'); +$cascade->start(); + +# Regardless of their wal_level values, effective_wal_level values on the +# standby and the cascaded standby depend on the primary's value, 'logical'. +test_wal_level($standby2, "logical|logical", + "check wal_level and effective_wal_level on standby"); +test_wal_level($cascade, "replica|logical", + "check wal_level and effective_wal_level on cascaded standby"); + +# Drop the primary's last logical slot, decreasing effective_wal_level to +# 'replica' on all nodes. +$primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_slot')]); +wait_for_logical_decoding_disabled($primary); + +$primary->wait_for_replay_catchup($standby2); +$standby2->wait_for_replay_catchup($cascade, $primary); + +test_wal_level($primary, "replica|replica", + "effective_wal_level got decreased to 'replica' on primary"); +test_wal_level($standby2, "logical|replica", + "effective_wal_level got decreased to 'replica' on standby"); +test_wal_level($cascade, "replica|replica", + "effective_wal_level got decreased to 'replica' on cascaded standby"); + +# Promote standby2, increasing effective_wal_level to 'logical' as its wal_level +# is set to 'logical'. +$standby2->promote; + +# Verify that effective_wal_level is increased to 'logical' on the cascaded standby. +$standby2->wait_for_replay_catchup($cascade); +test_wal_level($cascade, "replica|logical", + "effective_wal_level got increased to 'logical' on standby as the new primary has wal_level='logical'" +); + +$standby2->stop; +$cascade->stop; + +# Initialize standby3 node and start it. +my $standby3 = PostgreSQL::Test::Cluster->new('standby3'); +$standby3->init_from_backup($primary, 'my_backup', has_streaming => 1); +$standby3->start; + +# Create logical slots on both nodes. +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]); +$primary->wait_for_replay_catchup($standby3); +$standby3->create_logical_slot_on_standby($primary, 'standby3_slot', + 'postgres'); + +# Drop the logical slot from the primary, decreasing effective_wal_level to +# 'replica' on the primary, which leads to invalidating the logical slot on the +# standby due to 'wal_level_insufficient'. +$primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_slot')]); +wait_for_logical_decoding_disabled($primary); +test_wal_level($primary, "replica|replica", + "effective_wal_level got decreased to 'replica' on the primary to invalidate standby's slots" +); +$standby3->poll_query_until( + 'postgres', qq[ +select invalidation_reason = 'wal_level_insufficient' from pg_replication_slots where slot_name = 'standby3_slot' + ]); + +# Restart the server to verify that the slot is successfully restored during +# startup. +$standby3->restart; + +# Check that the logical decoding is not enabled on the standby3. Note that it still has +# the invalidated logical slot. +test_wal_level($standby3, "replica|replica", + "effective_wal_level got decreased to 'replica' on standby"); + +my ($result, $stdout, $stderr) = $standby3->psql('postgres', + qq[select pg_logical_slot_get_changes('standby3_slot', null, null)]); +like( + $stderr, + qr/ERROR: logical decoding on standby requires "effective_wal_level" >= "logical" on the primary/, + "cannot use logical decoding on standby as it is disabled on primary"); + +# Restart the primary with setting wal_level = 'logical' and create a new logical +# slot. +$primary->append_conf('postgresql.conf', qq[wal_level = 'logical']); +$primary->restart; +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]); + +# effective_wal_level should be 'logical' on both nodes. +$primary->wait_for_replay_catchup($standby3); +test_wal_level($primary, "logical|logical", + "check WAL levels on the primary node"); +test_wal_level($standby3, "replica|logical", + "effective_wal_level got increased to 'logical' again on standby"); + +# Set wal_level to 'replica' and restart the primary. Since one logical slot +# is still present on the primary, effective_wal_level remains 'logical' even +# if wal_level got decreased to 'replica'. +$primary->adjust_conf('postgresql.conf', 'wal_level', 'replica'); +$primary->restart; +$primary->wait_for_replay_catchup($standby3); + +# Verify that the effective_wal_level remains 'logical' on both nodes +test_wal_level($primary, "replica|logical", + "effective_wal_level remains 'logical' on primary even after setting wal_level to 'replica'" +); +test_wal_level($standby3, "replica|logical", + "effective_wal_level remains 'logical' on standby even after setting wal_level to 'replica' on primary" +); + +# Promote the standby3 and verify that effective_wal_level got decreased to +# 'replica' after the promotion since there is no valid logical slot. +$standby3->promote; +test_wal_level($standby3, "replica|replica", + "effective_wal_level got decreased to 'replica' as there is no valid logical slot" +); + +# Cleanup the invalidated slot. +$standby3->safe_psql('postgres', + qq[select pg_drop_replication_slot('standby3_slot')]); + +$standby3->stop; + +# Test the race condition at end of the recovery between the startup and logical +# decoding status change. This test requires injection points enabled. +if ( $ENV{enable_injection_points} eq 'yes' + && $primary->check_extension('injection_points')) +{ + # Initialize standby4 and start it. + my $standby4 = PostgreSQL::Test::Cluster->new('standby4'); + $standby4->init_from_backup($primary, 'my_backup', has_streaming => 1); + $standby4->start; + + # Both servers have one logical slot. + $primary->wait_for_replay_catchup($standby4); + $standby4->create_logical_slot_on_standby($primary, 'standby4_slot', + 'postgres'); + + # Enable and attach the injection point on the standby4. + $primary->safe_psql('postgres', 'create extension injection_points'); + $primary->wait_for_replay_catchup($standby4); + $standby4->safe_psql('postgres', + qq[select injection_points_attach('startup-logical-decoding-status-change-end-of-recovery', 'wait');] + ); + + # Trigger promotion with no wait, and wait for the startup process to reach + # the injection point. + $standby4->safe_psql('postgres', qq[select pg_promote(false)]); + note('promote the standby and waiting for injection_point'); + $standby4->wait_for_event('startup', + 'startup-logical-decoding-status-change-end-of-recovery'); + note( + "injection_point 'startup-logical-decoding-status-change-end-of-recovery' is reached" + ); + + # Drop the logical slot, requesting to disable logical decoding to the checkpointer. + $standby4->safe_psql('postgres', + qq[select pg_drop_replication_slot('standby4_slot');]); + + # Resume the startup process to complete the recovery. + $standby4->safe_psql('postgres', + qq[select injection_points_wakeup('startup-logical-decoding-status-change-end-of-recovery')] + ); + + # Verify that logical decoding got disabled after the recovery. + wait_for_logical_decoding_disabled($standby4); + test_wal_level($standby4, "replica|replica", + "effective_wal_level properly got decreased to 'replica'"); + $standby4->stop; + + # Test the abort process of logical decoding activation. We drop the primary's + # slot to decrease its effective_wal_level to 'replica'. + $primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_slot')]); + wait_for_logical_decoding_disabled($primary); + test_wal_level($primary, "replica|replica", + "effective_wal_level got decreased to 'replica' on primary"); + + # Start a psql session to test the case where the activation process is + # interrupted. + my $psql_create_slot = $primary->background_psql('postgres'); + + # Start the logical decoding activation process upon creating the logical + # slot, but it will wait due to the injection point. + $psql_create_slot->query_until( + qr/create_slot_canceled/, + q(\echo create_slot_canceled +select injection_points_set_local(); +select injection_points_attach('logical-decoding-activation', 'wait'); +select pg_create_logical_replication_slot('slot_canceled', 'pgoutput'); +\q +)); + + $primary->wait_for_event('client backend', 'logical-decoding-activation'); + note("injection_point 'logical-decoding-activation' is reached"); + + # Cancel the backend initiated by $psql_create_slot, aborting its activation + # process. + $primary->safe_psql( + 'postgres', + qq[ +select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled' and pid <> pg_backend_pid() +]); + + # Verify that the backend aborted the activation process. + $primary->wait_for_log("aborting logical decoding activation process"); + test_wal_level($primary, "replica|replica", + "the activation process aborted"); +} + +$primary->stop; + +done_testing(); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index e72d1308967..7f81e61d7a7 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -288,12 +288,8 @@ CREATE PUBLICATION regress_pub_for_allsequences_alltables FOR ALL SEQUENCES, ALL SET client_min_messages = 'NOTICE'; CREATE PUBLICATION regress_pub_for_allsequences_alltables_withclause FOR ALL SEQUENCES, ALL TABLES WITH (publish = 'insert'); NOTICE: publication parameters are not applicable to sequence synchronization and will be ignored for sequences -WARNING: "wal_level" is insufficient to publish logical changes -HINT: Set "wal_level" to "logical" before creating subscriptions. CREATE PUBLICATION regress_pub_for_allsequences_withclause FOR ALL SEQUENCES WITH (publish_generated_columns = 'stored'); NOTICE: publication parameters are not applicable to sequence synchronization and will be ignored for sequences -WARNING: "wal_level" is insufficient to publish logical changes -HINT: Set "wal_level" to "logical" before creating subscriptions. RESET client_min_messages; SELECT pubname, puballtables, puballsequences FROM pg_publication WHERE pubname = 'regress_pub_for_allsequences_alltables'; pubname | puballtables | puballsequences diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 430c1246d14..ecb79e79474 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -595,7 +595,7 @@ ROLLBACK; }); like( $reterr, - qr/WARNING: "wal_level" is insufficient to publish logical changes/, + qr/WARNING: logical decoding must be enabled to publish logical changes/, 'CREATE PUBLICATION while "wal_level=minimal"'); done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 04845d5e680..5c88fa92f4e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1627,6 +1627,7 @@ LogicalDecodeStreamStopCB LogicalDecodeStreamTruncateCB LogicalDecodeTruncateCB LogicalDecodingContext +LogicalDecodingCtlData LogicalErrorCallbackState LogicalOutputPluginInit LogicalOutputPluginWriterPrepareWrite