mirror of
https://github.com/postgres/postgres.git
synced 2025-07-11 10:01:57 +03:00
Fix running out of file descriptors for spill files.
Currently while decoding changes, if the number of changes exceeds a certain threshold, we spill those to disk. And this happens for each (sub)transaction. Now, while reading all these files, we don't close them until we read all the files. While reading these files, if the number of such files exceeds the maximum number of file descriptors, the operation errors out. Use PathNameOpenFile interface to open these files as that internally has the mechanism to release kernel FDs as needed to get us under the max_safe_fds limit. Reported-by: Amit Khandekar Author: Amit Khandekar Reviewed-by: Amit Kapila Backpatch-through: 9.4 Discussion: https://postgr.es/m/CAJ3gD9c-sECEn79zXw4yBnBdOttacoE-6gAyP0oy60nfs_sabQ@mail.gmail.com
This commit is contained in:
@ -103,13 +103,21 @@ typedef struct ReorderBufferTupleCidEnt
|
||||
CommandId combocid; /* just for debugging */
|
||||
} ReorderBufferTupleCidEnt;
|
||||
|
||||
/* Virtual file descriptor with file offset tracking */
|
||||
typedef struct TXNEntryFile
|
||||
{
|
||||
File vfd; /* -1 when the file is closed */
|
||||
off_t curOffset; /* offset for next write or read. Reset to 0
|
||||
* when vfd is opened. */
|
||||
} TXNEntryFile;
|
||||
|
||||
/* k-way in-order change iteration support structures */
|
||||
typedef struct ReorderBufferIterTXNEntry
|
||||
{
|
||||
XLogRecPtr lsn;
|
||||
ReorderBufferChange *change;
|
||||
ReorderBufferTXN *txn;
|
||||
int fd;
|
||||
TXNEntryFile file;
|
||||
XLogSegNo segno;
|
||||
} ReorderBufferIterTXNEntry;
|
||||
|
||||
@ -178,7 +186,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
|
||||
* subtransactions
|
||||
* ---------------------------------------
|
||||
*/
|
||||
static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
|
||||
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
ReorderBufferIterTXNState *volatile *iter_state);
|
||||
static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
|
||||
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
|
||||
ReorderBufferIterTXNState *state);
|
||||
@ -194,7 +203,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
|
||||
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
int fd, ReorderBufferChange *change);
|
||||
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
int *fd, XLogSegNo *segno);
|
||||
TXNEntryFile *file, XLogSegNo *segno);
|
||||
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
char *change);
|
||||
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
|
||||
@ -945,15 +954,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
|
||||
/*
|
||||
* Allocate & initialize an iterator which iterates in lsn order over a
|
||||
* transaction and all its subtransactions.
|
||||
*
|
||||
* Note: The iterator state is returned through iter_state parameter rather
|
||||
* than the function's return value. This is because the state gets cleaned up
|
||||
* in a PG_CATCH block in the caller, so we want to make sure the caller gets
|
||||
* back the state even if this function throws an exception.
|
||||
*/
|
||||
static ReorderBufferIterTXNState *
|
||||
ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
||||
static void
|
||||
ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
ReorderBufferIterTXNState *volatile *iter_state)
|
||||
{
|
||||
Size nr_txns = 0;
|
||||
ReorderBufferIterTXNState *state;
|
||||
dlist_iter cur_txn_i;
|
||||
int32 off;
|
||||
|
||||
*iter_state = NULL;
|
||||
|
||||
/*
|
||||
* Calculate the size of our heap: one element for every transaction that
|
||||
* contains changes. (Besides the transactions already in the reorder
|
||||
@ -988,7 +1005,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
||||
|
||||
for (off = 0; off < state->nr_txns; off++)
|
||||
{
|
||||
state->entries[off].fd = -1;
|
||||
state->entries[off].file.vfd = -1;
|
||||
state->entries[off].segno = 0;
|
||||
}
|
||||
|
||||
@ -997,6 +1014,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
||||
ReorderBufferIterCompare,
|
||||
state);
|
||||
|
||||
/* Now that the state fields are initialized, it is safe to return it. */
|
||||
*iter_state = state;
|
||||
|
||||
/*
|
||||
* Now insert items into the binary heap, in an unordered fashion. (We
|
||||
* will run a heap assembly step at the end; this is more efficient.)
|
||||
@ -1013,7 +1033,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
||||
{
|
||||
/* serialize remaining changes */
|
||||
ReorderBufferSerializeTXN(rb, txn);
|
||||
ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
|
||||
ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
|
||||
&state->entries[off].segno);
|
||||
}
|
||||
|
||||
@ -1043,7 +1063,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
||||
/* serialize remaining changes */
|
||||
ReorderBufferSerializeTXN(rb, cur_txn);
|
||||
ReorderBufferRestoreChanges(rb, cur_txn,
|
||||
&state->entries[off].fd,
|
||||
&state->entries[off].file,
|
||||
&state->entries[off].segno);
|
||||
}
|
||||
cur_change = dlist_head_element(ReorderBufferChange, node,
|
||||
@ -1059,8 +1079,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
|
||||
|
||||
/* assemble a valid binary heap */
|
||||
binaryheap_build(state->heap);
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1124,7 +1142,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
|
||||
dlist_delete(&change->node);
|
||||
dlist_push_tail(&state->old_change, &change->node);
|
||||
|
||||
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
|
||||
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
|
||||
&state->entries[off].segno))
|
||||
{
|
||||
/* successfully restored changes from disk */
|
||||
@ -1163,8 +1181,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
|
||||
|
||||
for (off = 0; off < state->nr_txns; off++)
|
||||
{
|
||||
if (state->entries[off].fd != -1)
|
||||
CloseTransientFile(state->entries[off].fd);
|
||||
if (state->entries[off].file.vfd != -1)
|
||||
FileClose(state->entries[off].file.vfd);
|
||||
}
|
||||
|
||||
/* free memory we might have "leaked" in the last *Next call */
|
||||
@ -1500,7 +1518,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
|
||||
|
||||
rb->begin(rb, txn);
|
||||
|
||||
iterstate = ReorderBufferIterTXNInit(rb, txn);
|
||||
ReorderBufferIterTXNInit(rb, txn, &iterstate);
|
||||
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
|
||||
{
|
||||
Relation relation = NULL;
|
||||
@ -2517,11 +2535,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
*/
|
||||
static Size
|
||||
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
int *fd, XLogSegNo *segno)
|
||||
TXNEntryFile *file, XLogSegNo *segno)
|
||||
{
|
||||
Size restored = 0;
|
||||
XLogSegNo last_segno;
|
||||
dlist_mutable_iter cleanup_iter;
|
||||
File *fd = &file->vfd;
|
||||
|
||||
Assert(txn->first_lsn != InvalidXLogRecPtr);
|
||||
Assert(txn->final_lsn != InvalidXLogRecPtr);
|
||||
@ -2562,7 +2581,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
|
||||
*segno);
|
||||
|
||||
*fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
|
||||
*fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
|
||||
|
||||
/* No harm in resetting the offset even in case of failure */
|
||||
file->curOffset = 0;
|
||||
|
||||
if (*fd < 0 && errno == ENOENT)
|
||||
{
|
||||
*fd = -1;
|
||||
@ -2582,14 +2605,14 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
* end of this file.
|
||||
*/
|
||||
ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
|
||||
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
|
||||
readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
|
||||
pgstat_report_wait_end();
|
||||
readBytes = FileRead(file->vfd, rb->outbuf,
|
||||
sizeof(ReorderBufferDiskChange),
|
||||
file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
|
||||
|
||||
/* eof */
|
||||
if (readBytes == 0)
|
||||
{
|
||||
CloseTransientFile(*fd);
|
||||
FileClose(*fd);
|
||||
*fd = -1;
|
||||
(*segno)++;
|
||||
continue;
|
||||
@ -2605,16 +2628,19 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
readBytes,
|
||||
(uint32) sizeof(ReorderBufferDiskChange))));
|
||||
|
||||
file->curOffset += readBytes;
|
||||
|
||||
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
|
||||
|
||||
ReorderBufferSerializeReserve(rb,
|
||||
sizeof(ReorderBufferDiskChange) + ondisk->size);
|
||||
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
|
||||
readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
|
||||
ondisk->size - sizeof(ReorderBufferDiskChange));
|
||||
pgstat_report_wait_end();
|
||||
readBytes = FileRead(file->vfd,
|
||||
rb->outbuf + sizeof(ReorderBufferDiskChange),
|
||||
ondisk->size - sizeof(ReorderBufferDiskChange),
|
||||
file->curOffset,
|
||||
WAIT_EVENT_REORDER_BUFFER_READ);
|
||||
|
||||
if (readBytes < 0)
|
||||
ereport(ERROR,
|
||||
@ -2627,6 +2653,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
|
||||
readBytes,
|
||||
(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
|
||||
|
||||
file->curOffset += readBytes;
|
||||
|
||||
/*
|
||||
* ok, read a full change from disk, now restore it into proper
|
||||
* in-memory format
|
||||
|
Reference in New Issue
Block a user