diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 0450f94ba8a..b9df8c0a024 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -134,6 +134,7 @@ #include "replication/logical.h" #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" +#include "replication/snapbuild_internal.h" #include "storage/fd.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -143,146 +144,6 @@ #include "utils/memutils.h" #include "utils/snapmgr.h" #include "utils/snapshot.h" - -/* - * This struct contains the current state of the snapshot building - * machinery. Besides a forward declaration in the header, it is not exposed - * to the public, so we can easily change its contents. - */ -struct SnapBuild -{ - /* how far are we along building our first full snapshot */ - SnapBuildState state; - - /* private memory context used to allocate memory for this module. */ - MemoryContext context; - - /* all transactions < than this have committed/aborted */ - TransactionId xmin; - - /* all transactions >= than this are uncommitted */ - TransactionId xmax; - - /* - * Don't replay commits from an LSN < this LSN. This can be set externally - * but it will also be advanced (never retreat) from within snapbuild.c. - */ - XLogRecPtr start_decoding_at; - - /* - * LSN at which two-phase decoding was enabled or LSN at which we found a - * consistent point at the time of slot creation. - * - * The prepared transactions, that were skipped because previously - * two-phase was not enabled or are not covered by initial snapshot, need - * to be sent later along with commit prepared and they must be before - * this point. - */ - XLogRecPtr two_phase_at; - - /* - * Don't start decoding WAL until the "xl_running_xacts" information - * indicates there are no running xids with an xid smaller than this. - */ - TransactionId initial_xmin_horizon; - - /* Indicates if we are building full snapshot or just catalog one. */ - bool building_full_snapshot; - - /* - * Indicates if we are using the snapshot builder for the creation of a - * logical replication slot. If it's true, the start point for decoding - * changes is not determined yet. So we skip snapshot restores to properly - * find the start point. See SnapBuildFindSnapshot() for details. - */ - bool in_slot_creation; - - /* - * Snapshot that's valid to see the catalog state seen at this moment. - */ - Snapshot snapshot; - - /* - * LSN of the last location we are sure a snapshot has been serialized to. - */ - XLogRecPtr last_serialized_snapshot; - - /* - * The reorderbuffer we need to update with usable snapshots et al. - */ - ReorderBuffer *reorder; - - /* - * TransactionId at which the next phase of initial snapshot building will - * happen. InvalidTransactionId if not known (i.e. SNAPBUILD_START), or - * when no next phase necessary (SNAPBUILD_CONSISTENT). - */ - TransactionId next_phase_at; - - /* - * Array of transactions which could have catalog changes that committed - * between xmin and xmax. - */ - struct - { - /* number of committed transactions */ - size_t xcnt; - - /* available space for committed transactions */ - size_t xcnt_space; - - /* - * Until we reach a CONSISTENT state, we record commits of all - * transactions, not just the catalog changing ones. Record when that - * changes so we know we cannot export a snapshot safely anymore. - */ - bool includes_all_transactions; - - /* - * Array of committed transactions that have modified the catalog. - * - * As this array is frequently modified we do *not* keep it in - * xidComparator order. Instead we sort the array when building & - * distributing a snapshot. - * - * TODO: It's unclear whether that reasoning has much merit. Every - * time we add something here after becoming consistent will also - * require distributing a snapshot. Storing them sorted would - * potentially also make it easier to purge (but more complicated wrt - * wraparound?). Should be improved if sorting while building the - * snapshot shows up in profiles. - */ - TransactionId *xip; - } committed; - - /* - * Array of transactions and subtransactions that had modified catalogs - * and were running when the snapshot was serialized. - * - * We normally rely on some WAL record types such as HEAP2_NEW_CID to know - * if the transaction has changed the catalog. But it could happen that - * the logical decoding decodes only the commit record of the transaction - * after restoring the previously serialized snapshot in which case we - * will miss adding the xid to the snapshot and end up looking at the - * catalogs with the wrong snapshot. - * - * Now to avoid the above problem, we serialize the transactions that had - * modified the catalogs and are still running at the time of snapshot - * serialization. We fill this array while restoring the snapshot and then - * refer it while decoding commit to ensure if the xact has modified the - * catalog. We discard this array when all the xids in the list become old - * enough to matter. See SnapBuildPurgeOlderTxn for details. - */ - struct - { - /* number of transactions */ - size_t xcnt; - - /* This array must be sorted in xidComparator order */ - TransactionId *xip; - } catchange; -}; - /* * Starting a transaction -- which we need to do while exporting a snapshot -- * removes knowledge about the previously used resowner, so we save it here. @@ -1557,40 +1418,6 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) } } -/* ----------------------------------- - * Snapshot serialization support - * ----------------------------------- - */ - -/* - * We store current state of struct SnapBuild on disk in the following manner: - * - * struct SnapBuildOnDisk; - * TransactionId * committed.xcnt; (*not xcnt_space*) - * TransactionId * catchange.xcnt; - * - */ -typedef struct SnapBuildOnDisk -{ - /* first part of this struct needs to be version independent */ - - /* data not covered by checksum */ - uint32 magic; - pg_crc32c checksum; - - /* data covered by checksum */ - - /* version, in case we want to support pg_upgrade */ - uint32 version; - /* how large is the on disk data, excluding the constant sized part */ - uint32 length; - - /* version dependent part */ - SnapBuild builder; - - /* variable amount of TransactionIds follows */ -} SnapBuildOnDisk; - #define SnapBuildOnDiskConstantSize \ offsetof(SnapBuildOnDisk, builder) #define SnapBuildOnDiskNotChecksummedSize \ diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index caa5113ff81..dbb4bc2f4b2 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -46,7 +46,7 @@ typedef enum SNAPBUILD_CONSISTENT = 2, } SnapBuildState; -/* forward declare so we don't have to expose the struct to the public */ +/* forward declare so we don't have to include snapbuild_internal.h */ struct SnapBuild; typedef struct SnapBuild SnapBuild; diff --git a/src/include/replication/snapbuild_internal.h b/src/include/replication/snapbuild_internal.h new file mode 100644 index 00000000000..5df2ae4423d --- /dev/null +++ b/src/include/replication/snapbuild_internal.h @@ -0,0 +1,196 @@ +/*------------------------------------------------------------------------- + * + * snapbuild_internal.h + * This file contains declarations for logical decoding utility + * functions for internal use. + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * src/include/replication/snapbuild_internal.h + * + *------------------------------------------------------------------------- + */ + +#ifndef SNAPBUILD_INTERNAL_H +#define SNAPBUILD_INTERNAL_H + +#include "port/pg_crc32c.h" +#include "replication/reorderbuffer.h" +#include "replication/snapbuild.h" + +/* + * This struct contains the current state of the snapshot building + * machinery. It is exposed to the public, so pay attention when changing its + * contents. + */ +struct SnapBuild +{ + /* how far are we along building our first full snapshot */ + SnapBuildState state; + + /* private memory context used to allocate memory for this module. */ + MemoryContext context; + + /* all transactions < than this have committed/aborted */ + TransactionId xmin; + + /* all transactions >= than this are uncommitted */ + TransactionId xmax; + + /* + * Don't replay commits from an LSN < this LSN. This can be set externally + * but it will also be advanced (never retreat) from within snapbuild.c. + */ + XLogRecPtr start_decoding_at; + + /* + * LSN at which two-phase decoding was enabled or LSN at which we found a + * consistent point at the time of slot creation. + * + * The prepared transactions, that were skipped because previously + * two-phase was not enabled or are not covered by initial snapshot, need + * to be sent later along with commit prepared and they must be before + * this point. + */ + XLogRecPtr two_phase_at; + + /* + * Don't start decoding WAL until the "xl_running_xacts" information + * indicates there are no running xids with an xid smaller than this. + */ + TransactionId initial_xmin_horizon; + + /* Indicates if we are building full snapshot or just catalog one. */ + bool building_full_snapshot; + + /* + * Indicates if we are using the snapshot builder for the creation of a + * logical replication slot. If it's true, the start point for decoding + * changes is not determined yet. So we skip snapshot restores to properly + * find the start point. See SnapBuildFindSnapshot() for details. + */ + bool in_slot_creation; + + /* + * Snapshot that's valid to see the catalog state seen at this moment. + */ + Snapshot snapshot; + + /* + * LSN of the last location we are sure a snapshot has been serialized to. + */ + XLogRecPtr last_serialized_snapshot; + + /* + * The reorderbuffer we need to update with usable snapshots et al. + */ + ReorderBuffer *reorder; + + /* + * TransactionId at which the next phase of initial snapshot building will + * happen. InvalidTransactionId if not known (i.e. SNAPBUILD_START), or + * when no next phase necessary (SNAPBUILD_CONSISTENT). + */ + TransactionId next_phase_at; + + /* + * Array of transactions which could have catalog changes that committed + * between xmin and xmax. + */ + struct + { + /* number of committed transactions */ + size_t xcnt; + + /* available space for committed transactions */ + size_t xcnt_space; + + /* + * Until we reach a CONSISTENT state, we record commits of all + * transactions, not just the catalog changing ones. Record when that + * changes so we know we cannot export a snapshot safely anymore. + */ + bool includes_all_transactions; + + /* + * Array of committed transactions that have modified the catalog. + * + * As this array is frequently modified we do *not* keep it in + * xidComparator order. Instead we sort the array when building & + * distributing a snapshot. + * + * TODO: It's unclear whether that reasoning has much merit. Every + * time we add something here after becoming consistent will also + * require distributing a snapshot. Storing them sorted would + * potentially also make it easier to purge (but more complicated wrt + * wraparound?). Should be improved if sorting while building the + * snapshot shows up in profiles. + */ + TransactionId *xip; + } committed; + + /* + * Array of transactions and subtransactions that had modified catalogs + * and were running when the snapshot was serialized. + * + * We normally rely on some WAL record types such as HEAP2_NEW_CID to know + * if the transaction has changed the catalog. But it could happen that + * the logical decoding decodes only the commit record of the transaction + * after restoring the previously serialized snapshot in which case we + * will miss adding the xid to the snapshot and end up looking at the + * catalogs with the wrong snapshot. + * + * Now to avoid the above problem, we serialize the transactions that had + * modified the catalogs and are still running at the time of snapshot + * serialization. We fill this array while restoring the snapshot and then + * refer it while decoding commit to ensure if the xact has modified the + * catalog. We discard this array when all the xids in the list become old + * enough to matter. See SnapBuildPurgeOlderTxn for details. + */ + struct + { + /* number of transactions */ + size_t xcnt; + + /* This array must be sorted in xidComparator order */ + TransactionId *xip; + } catchange; +}; + +/* ----------------------------------- + * Snapshot serialization support + * ----------------------------------- + */ + +/* + * We store current state of struct SnapBuild on disk in the following manner: + * + * struct SnapBuildOnDisk; + * TransactionId * committed.xcnt; (*not xcnt_space*) + * TransactionId * catchange.xcnt; + * + * Check if the SnapBuildOnDiskConstantSize and SnapBuildOnDiskNotChecksummedSize + * macros need to be updated when modifying the SnapBuildOnDisk struct. + */ +typedef struct SnapBuildOnDisk +{ + /* first part of this struct needs to be version independent */ + + /* data not covered by checksum */ + uint32 magic; + pg_crc32c checksum; + + /* data covered by checksum */ + + /* version, in case we want to support pg_upgrade */ + uint32 version; + /* how large is the on disk data, excluding the constant sized part */ + uint32 length; + + /* version dependent part */ + SnapBuild builder; + + /* variable amount of TransactionIds follows */ +} SnapBuildOnDisk; + +#endif /* SNAPBUILD_INTERNAL_H */