diff --git a/contrib/test_decoding/logical.conf b/contrib/test_decoding/logical.conf
index 367f7066514..07c4d3d7c8f 100644
--- a/contrib/test_decoding/logical.conf
+++ b/contrib/test_decoding/logical.conf
@@ -1,2 +1,3 @@
wal_level = logical
max_replication_slots = 4
+logical_decoding_work_mem = 64kB
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index f83770350ed..d4d1fe45cc1 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1732,6 +1732,27 @@ include_dir 'conf.d'
+
+ logical_decoding_work_mem (integer)
+
+ logical_decoding_work_mem configuration parameter
+
+
+
+
+ Specifies the maximum amount of memory to be used by logical decoding,
+ before some of the decoded changes are written to local disk. This
+ limits the amount of memory used by logical streaming replication
+ connections. It defaults to 64 megabytes (64MB).
+ Since each replication connection only uses a single buffer of this size,
+ and an installation normally doesn't have many such connections
+ concurrently (as limited by max_wal_senders), it's
+ safe to set this value significantly higher than work_mem,
+ reducing the amount of decoded changes written to disk.
+
+
+
+
max_stack_depth (integer)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 62e54240ec1..d82a5f18b0a 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -49,6 +49,34 @@
* GenerationContext for the variable-length transaction data (allocated
* and freed in groups with similar lifespan).
*
+ * To limit the amount of memory used by decoded changes, we track memory
+ * used at the reorder buffer level (i.e. total amount of memory), and for
+ * each transaction. When the total amount of used memory exceeds the
+ * limit, the transaction consuming the most memory is then serialized to
+ * disk.
+ *
+ * Only decoded changes are evicted from memory (spilled to disk), not the
+ * transaction records. The number of toplevel transactions is limited,
+ * but a transaction with many subtransactions may still consume significant
+ * amounts of memory. The transaction records are fairly small, though, and
+ * are not included in the memory limit.
+ *
+ * The current eviction algorithm is very simple - the transaction is
+ * picked merely by size, while it might be useful to also consider age
+ * (LSN) of the changes for example. With the new Generational memory
+ * allocator, evicting the oldest changes would make it more likely the
+ * memory gets actually freed.
+ *
+ * We still rely on max_changes_in_memory when loading serialized changes
+ * back into memory. At that point we can't use the memory limit directly
+ * as we load the subxacts independently. One option do deal with this
+ * would be to count the subxacts, and allow each to allocate 1/N of the
+ * memory limit. That however does not seem very appealing, because with
+ * many subtransactions it may easily cause trashing (short cycles of
+ * deserializing and applying very few changes). We probably should give
+ * a bit more memory to the oldest subtransactions, because it's likely
+ * the source for the next sequence of changes.
+ *
* -------------------------------------------------------------------------
*/
#include "postgres.h"
@@ -154,7 +182,8 @@ typedef struct ReorderBufferDiskChange
* resource management here, but it's not entirely clear what that would look
* like.
*/
-static const Size max_changes_in_memory = 4096;
+int logical_decoding_work_mem;
+static const Size max_changes_in_memory = 4096; /* XXX for restore only */
/* ---------------------------------------
* primary reorderbuffer support routines
@@ -189,7 +218,7 @@ static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTX
* Disk serialization support functions
* ---------------------------------------
*/
-static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb);
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
int fd, ReorderBufferChange *change);
@@ -217,6 +246,14 @@ static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
+/*
+ * ---------------------------------------
+ * memory accounting
+ * ---------------------------------------
+ */
+static Size ReorderBufferChangeSize(ReorderBufferChange *change);
+static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
+ ReorderBufferChange *change, bool addition);
/*
* Allocate a new ReorderBuffer and clean out any old serialized state from
@@ -269,6 +306,7 @@ ReorderBufferAllocate(void)
buffer->outbuf = NULL;
buffer->outbufsize = 0;
+ buffer->size = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
@@ -374,6 +412,9 @@ ReorderBufferGetChange(ReorderBuffer *rb)
void
ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
{
+ /* update memory accounting info */
+ ReorderBufferChangeMemoryUpdate(rb, change, false);
+
/* free contained data */
switch (change->action)
{
@@ -585,12 +626,18 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
change->lsn = lsn;
+ change->txn = txn;
+
Assert(InvalidXLogRecPtr != lsn);
dlist_push_tail(&txn->changes, &change->node);
txn->nentries++;
txn->nentries_mem++;
- ReorderBufferCheckSerializeTXN(rb, txn);
+ /* update memory accounting information */
+ ReorderBufferChangeMemoryUpdate(rb, change, true);
+
+ /* check the memory limits and evict something if needed */
+ ReorderBufferCheckMemoryLimit(rb);
}
/*
@@ -1217,6 +1264,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
change = dlist_container(ReorderBufferChange, node, iter.cur);
+ /* Check we're not mixing changes from different transactions. */
+ Assert(change->txn == txn);
+
ReorderBufferReturnChange(rb, change);
}
@@ -1229,7 +1279,11 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferChange *change;
change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+ /* Check we're not mixing changes from different transactions. */
+ Assert(change->txn == txn);
Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+
ReorderBufferReturnChange(rb, change);
}
@@ -2082,9 +2136,48 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
ReorderBufferQueueChange(rb, xid, lsn, change);
}
+/*
+ * Update the memory accounting info. We track memory used by the whole
+ * reorder buffer and the transaction containing the change.
+ */
+static void
+ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
+ ReorderBufferChange *change,
+ bool addition)
+{
+ Size sz;
+
+ Assert(change->txn);
+
+ /*
+ * Ignore tuple CID changes, because those are not evicted when reaching
+ * memory limit. So we just don't count them, because it might easily
+ * trigger a pointless attempt to spill.
+ */
+ if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
+ return;
+
+ sz = ReorderBufferChangeSize(change);
+
+ if (addition)
+ {
+ change->txn->size += sz;
+ rb->size += sz;
+ }
+ else
+ {
+ Assert((rb->size >= sz) && (change->txn->size >= sz));
+ change->txn->size -= sz;
+ rb->size -= sz;
+ }
+}
/*
* Add new (relfilenode, tid) -> (cmin, cmax) mappings.
+ *
+ * We do not include this change type in memory accounting, because we
+ * keep CIDs in a separate list and do not evict them when reaching
+ * the memory limit.
*/
void
ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
@@ -2103,6 +2196,7 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
change->data.tuplecid.cmax = cmax;
change->data.tuplecid.combocid = combocid;
change->lsn = lsn;
+ change->txn = txn;
change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
dlist_push_tail(&txn->tuplecids, &change->node);
@@ -2230,20 +2324,84 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
}
/*
- * Check whether the transaction tx should spill its data to disk.
+ * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
+ *
+ * XXX With many subtransactions this might be quite slow, because we'll have
+ * to walk through all of them. There are some options how we could improve
+ * that: (a) maintain some secondary structure with transactions sorted by
+ * amount of changes, (b) not looking for the entirely largest transaction,
+ * but e.g. for transaction using at least some fraction of the memory limit,
+ * and (c) evicting multiple transactions at once, e.g. to free a given portion
+ * of the memory limit (e.g. 50%).
+ */
+static ReorderBufferTXN *
+ReorderBufferLargestTXN(ReorderBuffer *rb)
+{
+ HASH_SEQ_STATUS hash_seq;
+ ReorderBufferTXNByIdEnt *ent;
+ ReorderBufferTXN *largest = NULL;
+
+ hash_seq_init(&hash_seq, rb->by_txn);
+ while ((ent = hash_seq_search(&hash_seq)) != NULL)
+ {
+ ReorderBufferTXN *txn = ent->txn;
+
+ /* if the current transaction is larger, remember it */
+ if ((!largest) || (txn->size > largest->size))
+ largest = txn;
+ }
+
+ Assert(largest);
+ Assert(largest->size > 0);
+ Assert(largest->size <= rb->size);
+
+ return largest;
+}
+
+/*
+ * Check whether the logical_decoding_work_mem limit was reached, and if yes
+ * pick the transaction to evict and spill the changes to disk.
+ *
+ * XXX At this point we select just a single (largest) transaction, but
+ * we might also adapt a more elaborate eviction strategy - for example
+ * evicting enough transactions to free certain fraction (e.g. 50%) of
+ * the memory limit.
*/
static void
-ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
{
+ ReorderBufferTXN *txn;
+
+ /* bail out if we haven't exceeded the memory limit */
+ if (rb->size < logical_decoding_work_mem * 1024L)
+ return;
+
/*
- * TODO: improve accounting so we cheaply can take subtransactions into
- * account here.
+ * Pick the largest transaction (or subtransaction) and evict it from
+ * memory by serializing it to disk.
*/
- if (txn->nentries_mem >= max_changes_in_memory)
- {
- ReorderBufferSerializeTXN(rb, txn);
- Assert(txn->nentries_mem == 0);
- }
+ txn = ReorderBufferLargestTXN(rb);
+
+ ReorderBufferSerializeTXN(rb, txn);
+
+ /*
+ * After eviction, the transaction should have no entries in memory, and
+ * should use 0 bytes for changes.
+ */
+ Assert(txn->size == 0);
+ Assert(txn->nentries_mem == 0);
+
+ /*
+ * And furthermore, evicting the transaction should get us below the
+ * memory limit again - it is not possible that we're still exceeding the
+ * memory limit after evicting the transaction.
+ *
+ * This follows from the simple fact that the selected transaction is at
+ * least as large as the most recent change (which caused us to go over
+ * the memory limit). So by evicting it we're definitely back below the
+ * memory limit.
+ */
+ Assert(rb->size < logical_decoding_work_mem * 1024L);
}
/*
@@ -2512,6 +2670,84 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
Assert(ondisk->change.action == change->action);
}
+/*
+ * Size of a change in memory.
+ */
+static Size
+ReorderBufferChangeSize(ReorderBufferChange *change)
+{
+ Size sz = sizeof(ReorderBufferChange);
+
+ switch (change->action)
+ {
+ /* fall through these, they're all similar enough */
+ case REORDER_BUFFER_CHANGE_INSERT:
+ case REORDER_BUFFER_CHANGE_UPDATE:
+ case REORDER_BUFFER_CHANGE_DELETE:
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
+ {
+ ReorderBufferTupleBuf *oldtup,
+ *newtup;
+ Size oldlen = 0;
+ Size newlen = 0;
+
+ oldtup = change->data.tp.oldtuple;
+ newtup = change->data.tp.newtuple;
+
+ if (oldtup)
+ {
+ sz += sizeof(HeapTupleData);
+ oldlen = oldtup->tuple.t_len;
+ sz += oldlen;
+ }
+
+ if (newtup)
+ {
+ sz += sizeof(HeapTupleData);
+ newlen = newtup->tuple.t_len;
+ sz += newlen;
+ }
+
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ {
+ Size prefix_size = strlen(change->data.msg.prefix) + 1;
+
+ sz += prefix_size + change->data.msg.message_size +
+ sizeof(Size) + sizeof(Size);
+
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
+ {
+ Snapshot snap;
+
+ snap = change->data.snapshot;
+
+ sz += sizeof(SnapshotData) +
+ sizeof(TransactionId) * snap->xcnt +
+ sizeof(TransactionId) * snap->subxcnt;
+
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_TRUNCATE:
+ {
+ sz += sizeof(Oid) * change->data.truncate.nrelids;
+
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+ case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
+ case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+ /* ReorderBufferChange contains everything important */
+ break;
+ }
+
+ return sz;
+}
+
+
/*
* Restore a number of changes spilled to disk back into memory.
*/
@@ -2784,6 +3020,16 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
dlist_push_tail(&txn->changes, &change->node);
txn->nentries_mem++;
+
+ /*
+ * Update memory accounting for the restored change. We need to do this
+ * although we don't check the memory limit when restoring the changes in
+ * this branch (we only do that when initially queueing the changes after
+ * decoding), because we will release the changes later, and that will
+ * update the accounting too (subtracting the size from the counters). And
+ * we don't want to underflow there.
+ */
+ ReorderBufferChangeMemoryUpdate(rb, change, true);
}
/*
@@ -3003,6 +3249,19 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
*
* We cannot replace unchanged toast tuples though, so those will still point
* to on-disk toast data.
+ *
+ * While updating the existing change with detoasted tuple data, we need to
+ * update the memory accounting info, because the change size will differ.
+ * Otherwise the accounting may get out of sync, triggering serialization
+ * at unexpected times.
+ *
+ * We simply subtract size of the change before rejiggering the tuple, and
+ * then adding the new size. This makes it look like the change was removed
+ * and then added back, except it only tweaks the accounting info.
+ *
+ * In particular it can't trigger serialization, which would be pointless
+ * anyway as it happens during commit processing right before handing
+ * the change to the output plugin.
*/
static void
ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
@@ -3023,6 +3282,13 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
if (txn->toast_hash == NULL)
return;
+ /*
+ * We're going to modify the size of the change, so to make sure the
+ * accounting is correct we'll make it look like we're removing the change
+ * now (with the old size), and then re-add it at the end.
+ */
+ ReorderBufferChangeMemoryUpdate(rb, change, false);
+
oldcontext = MemoryContextSwitchTo(rb->context);
/* we should only have toast tuples in an INSERT or UPDATE */
@@ -3172,6 +3438,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
pfree(isnull);
MemoryContextSwitchTo(oldcontext);
+
+ /* now add the change back, with the correct size */
+ ReorderBufferChangeMemoryUpdate(rb, change, true);
}
/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 4b3769b8b05..ba4edde71a3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -66,6 +66,7 @@
#include "postmaster/syslogger.h"
#include "postmaster/walwriter.h"
#include "replication/logicallauncher.h"
+#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
@@ -2257,6 +2258,18 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"logical_decoding_work_mem", PGC_USERSET, RESOURCES_MEM,
+ gettext_noop("Sets the maximum memory to be used for logical decoding."),
+ gettext_noop("This much memory can be used by each internal "
+ "reorder buffer before spilling to disk."),
+ GUC_UNIT_KB
+ },
+ &logical_decoding_work_mem,
+ 65536, 64, MAX_KILOBYTES,
+ NULL, NULL, NULL
+ },
+
/*
* We use the hopefully-safely-small value of 100kB as the compiled-in
* default for max_stack_depth. InitializeGUCOptions will increase it if
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index be02a76d9d8..46a06ffacd4 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -130,6 +130,7 @@
#work_mem = 4MB # min 64kB
#maintenance_work_mem = 64MB # min 1MB
#autovacuum_work_mem = -1 # min 1MB, or -1 to use maintenance_work_mem
+#logical_decoding_work_mem = 64MB # min 64kB
#max_stack_depth = 2MB # min 100kB
#shared_memory_type = mmap # the default is the first option
# supported by the operating system:
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4c06a78c11f..7c94d920fe9 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -17,6 +17,8 @@
#include "utils/snapshot.h"
#include "utils/timestamp.h"
+extern PGDLLIMPORT int logical_decoding_work_mem;
+
/* an individual tuple, stored in one chunk of memory */
typedef struct ReorderBufferTupleBuf
{
@@ -63,6 +65,9 @@ enum ReorderBufferChangeType
REORDER_BUFFER_CHANGE_TRUNCATE
};
+/* forward declaration */
+struct ReorderBufferTXN;
+
/*
* a single 'change', can be an insert (with one tuple), an update (old, new),
* or a delete (old).
@@ -77,6 +82,9 @@ typedef struct ReorderBufferChange
/* The type of change. */
enum ReorderBufferChangeType action;
+ /* Transaction this change belongs to. */
+ struct ReorderBufferTXN *txn;
+
RepOriginId origin_id;
/*
@@ -286,6 +294,11 @@ typedef struct ReorderBufferTXN
*/
dlist_node node;
+ /*
+ * Size of this transaction (changes currently in memory, in bytes).
+ */
+ Size size;
+
} ReorderBufferTXN;
/* so we can define the callbacks used inside struct ReorderBuffer itself */
@@ -386,6 +399,9 @@ struct ReorderBuffer
/* buffer for disk<->memory conversions */
char *outbuf;
Size outbufsize;
+
+ /* memory accounting */
+ Size size;
};