diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 40a4c4f497c..cd090ecdfbf 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -101,7 +101,7 @@ typedef struct ReorderBufferIterTXNEntry XLogRecPtr lsn; ReorderBufferChange *change; ReorderBufferTXN *txn; - int fd; + File fd; XLogSegNo segno; } ReorderBufferIterTXNEntry; @@ -182,7 +182,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb); * subtransactions * --------------------------------------- */ -static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state); static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, @@ -199,7 +200,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change); static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno); + File *fd, XLogSegNo *segno); static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -942,15 +943,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg) /* * Allocate & initialize an iterator which iterates in lsn order over a * transaction and all its subtransactions. + * + * Note: The iterator state is returned through iter_state parameter rather + * than the function's return value. This is because the state gets cleaned up + * in a PG_CATCH block in the caller, so we want to make sure the caller gets + * back the state even if this function throws an exception. */ -static ReorderBufferIterTXNState * -ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) +static void +ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state) { Size nr_txns = 0; ReorderBufferIterTXNState *state; dlist_iter cur_txn_i; int32 off; + *iter_state = NULL; + /* * Calculate the size of our heap: one element for every transaction that * contains changes. (Besides the transactions already in the reorder @@ -994,6 +1003,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferIterCompare, state); + /* Now that the state fields are initialized, it is safe to return it. */ + *iter_state = state; + /* * Now insert items into the binary heap, in an unordered fashion. (We * will run a heap assembly step at the end; this is more efficient.) @@ -1056,8 +1068,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) /* assemble a valid binary heap */ binaryheap_build(state->heap); - - return state; } /* @@ -1161,7 +1171,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, for (off = 0; off < state->nr_txns; off++) { if (state->entries[off].fd != -1) - CloseTransientFile(state->entries[off].fd); + FileClose(state->entries[off].fd); } /* free memory we might have "leaked" in the last *Next call */ @@ -1496,7 +1506,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, rb->begin(rb, txn); - iterstate = ReorderBufferIterTXNInit(rb, txn); + ReorderBufferIterTXNInit(rb, txn, &iterstate); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) { Relation relation = NULL; @@ -2338,7 +2348,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, */ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno) + File *fd, XLogSegNo *segno) { Size restored = 0; XLogSegNo last_segno; @@ -2383,7 +2393,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, *segno); - *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0); + *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY, 0); if (*fd < 0 && errno == ENOENT) { *fd = -1; @@ -2404,12 +2414,12 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, * end of this file. */ ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); - readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange)); + readBytes = FileRead(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange)); /* eof */ if (readBytes == 0) { - CloseTransientFile(*fd); + FileClose(*fd); *fd = -1; (*segno)++; continue; @@ -2431,8 +2441,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, sizeof(ReorderBufferDiskChange) + ondisk->size); ondisk = (ReorderBufferDiskChange *) rb->outbuf; - readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), - ondisk->size - sizeof(ReorderBufferDiskChange)); + readBytes = FileRead(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), + ondisk->size - sizeof(ReorderBufferDiskChange)); if (readBytes < 0) ereport(ERROR,