mirror of
https://github.com/postgres/postgres.git
synced 2025-04-25 21:42:33 +03:00
tableam: Add table_finish_bulk_insert().
This replaces the previous calls of heap_sync() in places using bulk-insert. By passing in the flags used for bulk-insert the AM can decide (first at insert time and then during the finish call) which of the optimizations apply to it, and what operations are necessary to finish a bulk insert operation. Also change HEAP_INSERT_* flags to TABLE_INSERT, and rename hi_options to ti_options. These changes are made even in copy.c, which hasn't yet been converted to tableam. There's no harm in doing so. Author: Andres Freund Discussion: https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
This commit is contained in:
parent
26a76cb640
commit
d45e401586
@ -540,6 +540,17 @@ tuple_lock_retry:
|
||||
return result;
|
||||
}
|
||||
|
||||
static void
|
||||
heapam_finish_bulk_insert(Relation relation, int options)
|
||||
{
|
||||
/*
|
||||
* If we skipped writing WAL, then we need to sync the heap (but not
|
||||
* indexes since those use WAL anyway / don't go through tableam)
|
||||
*/
|
||||
if (options & HEAP_INSERT_SKIP_WAL)
|
||||
heap_sync(relation);
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------------------
|
||||
* DDL related callbacks for heap AM.
|
||||
@ -2401,6 +2412,7 @@ static const TableAmRoutine heapam_methods = {
|
||||
.tuple_delete = heapam_tuple_delete,
|
||||
.tuple_update = heapam_tuple_update,
|
||||
.tuple_lock = heapam_tuple_lock,
|
||||
.finish_bulk_insert = heapam_finish_bulk_insert,
|
||||
|
||||
.tuple_fetch_row_version = heapam_fetch_row_version,
|
||||
.tuple_get_latest_tid = heap_get_latest_tid,
|
||||
|
@ -319,7 +319,7 @@ static uint64 CopyTo(CopyState cstate);
|
||||
static void CopyOneRowTo(CopyState cstate,
|
||||
Datum *values, bool *nulls);
|
||||
static void CopyFromInsertBatch(CopyState cstate, EState *estate,
|
||||
CommandId mycid, int hi_options,
|
||||
CommandId mycid, int ti_options,
|
||||
ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
|
||||
BulkInsertState bistate,
|
||||
int nBufferedTuples, HeapTuple *bufferedTuples,
|
||||
@ -2328,7 +2328,7 @@ CopyFrom(CopyState cstate)
|
||||
PartitionTupleRouting *proute = NULL;
|
||||
ErrorContextCallback errcallback;
|
||||
CommandId mycid = GetCurrentCommandId(true);
|
||||
int hi_options = 0; /* start with default heap_insert options */
|
||||
int ti_options = 0; /* start with default table_insert options */
|
||||
BulkInsertState bistate;
|
||||
CopyInsertMethod insertMethod;
|
||||
uint64 processed = 0;
|
||||
@ -2392,8 +2392,8 @@ CopyFrom(CopyState cstate)
|
||||
* - data is being written to relfilenode created in this transaction
|
||||
* then we can skip writing WAL. It's safe because if the transaction
|
||||
* doesn't commit, we'll discard the table (or the new relfilenode file).
|
||||
* If it does commit, we'll have done the heap_sync at the bottom of this
|
||||
* routine first.
|
||||
* If it does commit, we'll have done the table_finish_bulk_insert() at
|
||||
* the bottom of this routine first.
|
||||
*
|
||||
* As mentioned in comments in utils/rel.h, the in-same-transaction test
|
||||
* is not always set correctly, since in rare cases rd_newRelfilenodeSubid
|
||||
@ -2437,9 +2437,9 @@ CopyFrom(CopyState cstate)
|
||||
(cstate->rel->rd_createSubid != InvalidSubTransactionId ||
|
||||
cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId))
|
||||
{
|
||||
hi_options |= HEAP_INSERT_SKIP_FSM;
|
||||
ti_options |= TABLE_INSERT_SKIP_FSM;
|
||||
if (!XLogIsNeeded())
|
||||
hi_options |= HEAP_INSERT_SKIP_WAL;
|
||||
ti_options |= TABLE_INSERT_SKIP_WAL;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -2491,7 +2491,7 @@ CopyFrom(CopyState cstate)
|
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
|
||||
|
||||
hi_options |= HEAP_INSERT_FROZEN;
|
||||
ti_options |= TABLE_INSERT_FROZEN;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -2755,7 +2755,7 @@ CopyFrom(CopyState cstate)
|
||||
{
|
||||
MemoryContext oldcontext;
|
||||
|
||||
CopyFromInsertBatch(cstate, estate, mycid, hi_options,
|
||||
CopyFromInsertBatch(cstate, estate, mycid, ti_options,
|
||||
prevResultRelInfo, myslot, bistate,
|
||||
nBufferedTuples, bufferedTuples,
|
||||
firstBufferedLineNo);
|
||||
@ -2978,7 +2978,7 @@ CopyFrom(CopyState cstate)
|
||||
if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
|
||||
bufferedTuplesSize > 65535)
|
||||
{
|
||||
CopyFromInsertBatch(cstate, estate, mycid, hi_options,
|
||||
CopyFromInsertBatch(cstate, estate, mycid, ti_options,
|
||||
resultRelInfo, myslot, bistate,
|
||||
nBufferedTuples, bufferedTuples,
|
||||
firstBufferedLineNo);
|
||||
@ -3015,7 +3015,7 @@ CopyFrom(CopyState cstate)
|
||||
{
|
||||
tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
|
||||
heap_insert(resultRelInfo->ri_RelationDesc, tuple,
|
||||
mycid, hi_options, bistate);
|
||||
mycid, ti_options, bistate);
|
||||
ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
|
||||
slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
|
||||
}
|
||||
@ -3050,13 +3050,13 @@ CopyFrom(CopyState cstate)
|
||||
{
|
||||
if (insertMethod == CIM_MULTI_CONDITIONAL)
|
||||
{
|
||||
CopyFromInsertBatch(cstate, estate, mycid, hi_options,
|
||||
CopyFromInsertBatch(cstate, estate, mycid, ti_options,
|
||||
prevResultRelInfo, myslot, bistate,
|
||||
nBufferedTuples, bufferedTuples,
|
||||
firstBufferedLineNo);
|
||||
}
|
||||
else
|
||||
CopyFromInsertBatch(cstate, estate, mycid, hi_options,
|
||||
CopyFromInsertBatch(cstate, estate, mycid, ti_options,
|
||||
resultRelInfo, myslot, bistate,
|
||||
nBufferedTuples, bufferedTuples,
|
||||
firstBufferedLineNo);
|
||||
@ -3106,12 +3106,7 @@ CopyFrom(CopyState cstate)
|
||||
|
||||
FreeExecutorState(estate);
|
||||
|
||||
/*
|
||||
* If we skipped writing WAL, then we need to sync the heap (but not
|
||||
* indexes since those use WAL anyway)
|
||||
*/
|
||||
if (hi_options & HEAP_INSERT_SKIP_WAL)
|
||||
heap_sync(cstate->rel);
|
||||
table_finish_bulk_insert(cstate->rel, ti_options);
|
||||
|
||||
return processed;
|
||||
}
|
||||
@ -3123,7 +3118,7 @@ CopyFrom(CopyState cstate)
|
||||
*/
|
||||
static void
|
||||
CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
|
||||
int hi_options, ResultRelInfo *resultRelInfo,
|
||||
int ti_options, ResultRelInfo *resultRelInfo,
|
||||
TupleTableSlot *myslot, BulkInsertState bistate,
|
||||
int nBufferedTuples, HeapTuple *bufferedTuples,
|
||||
uint64 firstBufferedLineNo)
|
||||
@ -3149,7 +3144,7 @@ CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
|
||||
bufferedTuples,
|
||||
nBufferedTuples,
|
||||
mycid,
|
||||
hi_options,
|
||||
ti_options,
|
||||
bistate);
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
|
@ -27,8 +27,8 @@
|
||||
#include "access/heapam.h"
|
||||
#include "access/reloptions.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "access/tableam.h"
|
||||
#include "access/sysattr.h"
|
||||
#include "access/tableam.h"
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "catalog/namespace.h"
|
||||
@ -60,7 +60,7 @@ typedef struct
|
||||
Relation rel; /* relation to write to */
|
||||
ObjectAddress reladdr; /* address of rel, for ExecCreateTableAs */
|
||||
CommandId output_cid; /* cmin to insert in output tuples */
|
||||
int hi_options; /* heap_insert performance options */
|
||||
int ti_options; /* table_insert performance options */
|
||||
BulkInsertState bistate; /* bulk insert state */
|
||||
} DR_intorel;
|
||||
|
||||
@ -558,8 +558,8 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
|
||||
* We can skip WAL-logging the insertions, unless PITR or streaming
|
||||
* replication is in use. We can skip the FSM in any case.
|
||||
*/
|
||||
myState->hi_options = HEAP_INSERT_SKIP_FSM |
|
||||
(XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL);
|
||||
myState->ti_options = TABLE_INSERT_SKIP_FSM |
|
||||
(XLogIsNeeded() ? 0 : TABLE_INSERT_SKIP_WAL);
|
||||
myState->bistate = GetBulkInsertState();
|
||||
|
||||
/* Not using WAL requires smgr_targblock be initially invalid */
|
||||
@ -586,7 +586,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
|
||||
table_insert(myState->rel,
|
||||
slot,
|
||||
myState->output_cid,
|
||||
myState->hi_options,
|
||||
myState->ti_options,
|
||||
myState->bistate);
|
||||
|
||||
/* We know this is a newly created relation, so there are no indexes */
|
||||
@ -604,9 +604,7 @@ intorel_shutdown(DestReceiver *self)
|
||||
|
||||
FreeBulkInsertState(myState->bistate);
|
||||
|
||||
/* If we skipped using WAL, must heap_sync before commit */
|
||||
if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
|
||||
heap_sync(myState->rel);
|
||||
table_finish_bulk_insert(myState->rel, myState->ti_options);
|
||||
|
||||
/* close rel, but keep lock until commit */
|
||||
table_close(myState->rel, NoLock);
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "access/heapam.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "access/multixact.h"
|
||||
#include "access/tableam.h"
|
||||
#include "access/xact.h"
|
||||
#include "access/xlog.h"
|
||||
#include "catalog/catalog.h"
|
||||
@ -53,7 +54,7 @@ typedef struct
|
||||
/* These fields are filled by transientrel_startup: */
|
||||
Relation transientrel; /* relation to write to */
|
||||
CommandId output_cid; /* cmin to insert in output tuples */
|
||||
int hi_options; /* heap_insert performance options */
|
||||
int ti_options; /* table_insert performance options */
|
||||
BulkInsertState bistate; /* bulk insert state */
|
||||
} DR_transientrel;
|
||||
|
||||
@ -257,7 +258,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
|
||||
* as open scans.
|
||||
*
|
||||
* NB: We count on this to protect us against problems with refreshing the
|
||||
* data using HEAP_INSERT_FROZEN.
|
||||
* data using TABLE_INSERT_FROZEN.
|
||||
*/
|
||||
CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
|
||||
|
||||
@ -461,9 +462,9 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
|
||||
* We can skip WAL-logging the insertions, unless PITR or streaming
|
||||
* replication is in use. We can skip the FSM in any case.
|
||||
*/
|
||||
myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
|
||||
myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
|
||||
if (!XLogIsNeeded())
|
||||
myState->hi_options |= HEAP_INSERT_SKIP_WAL;
|
||||
myState->ti_options |= TABLE_INSERT_SKIP_WAL;
|
||||
myState->bistate = GetBulkInsertState();
|
||||
|
||||
/* Not using WAL requires smgr_targblock be initially invalid */
|
||||
@ -490,7 +491,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
|
||||
table_insert(myState->transientrel,
|
||||
slot,
|
||||
myState->output_cid,
|
||||
myState->hi_options,
|
||||
myState->ti_options,
|
||||
myState->bistate);
|
||||
|
||||
/* We know this is a newly created relation, so there are no indexes */
|
||||
@ -508,9 +509,7 @@ transientrel_shutdown(DestReceiver *self)
|
||||
|
||||
FreeBulkInsertState(myState->bistate);
|
||||
|
||||
/* If we skipped using WAL, must heap_sync before commit */
|
||||
if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
|
||||
heap_sync(myState->transientrel);
|
||||
table_finish_bulk_insert(myState->transientrel, myState->ti_options);
|
||||
|
||||
/* close transientrel, but keep lock until commit */
|
||||
table_close(myState->transientrel, NoLock);
|
||||
|
@ -4687,7 +4687,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
|
||||
EState *estate;
|
||||
CommandId mycid;
|
||||
BulkInsertState bistate;
|
||||
int hi_options;
|
||||
int ti_options;
|
||||
ExprState *partqualstate = NULL;
|
||||
|
||||
/*
|
||||
@ -4704,7 +4704,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
|
||||
newrel = NULL;
|
||||
|
||||
/*
|
||||
* Prepare a BulkInsertState and options for heap_insert. Because we're
|
||||
* Prepare a BulkInsertState and options for table_insert. Because we're
|
||||
* building a new heap, we can skip WAL-logging and fsync it to disk at
|
||||
* the end instead (unless WAL-logging is required for archiving or
|
||||
* streaming replication). The FSM is empty too, so don't bother using it.
|
||||
@ -4714,16 +4714,16 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
|
||||
mycid = GetCurrentCommandId(true);
|
||||
bistate = GetBulkInsertState();
|
||||
|
||||
hi_options = HEAP_INSERT_SKIP_FSM;
|
||||
ti_options = TABLE_INSERT_SKIP_FSM;
|
||||
if (!XLogIsNeeded())
|
||||
hi_options |= HEAP_INSERT_SKIP_WAL;
|
||||
ti_options |= TABLE_INSERT_SKIP_WAL;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* keep compiler quiet about using these uninitialized */
|
||||
mycid = 0;
|
||||
bistate = NULL;
|
||||
hi_options = 0;
|
||||
ti_options = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -4977,7 +4977,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
|
||||
|
||||
/* Write the tuple out to the new relation */
|
||||
if (newrel)
|
||||
table_insert(newrel, insertslot, mycid, hi_options, bistate);
|
||||
table_insert(newrel, insertslot, mycid, ti_options, bistate);
|
||||
|
||||
ResetExprContext(econtext);
|
||||
|
||||
@ -5000,9 +5000,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
|
||||
{
|
||||
FreeBulkInsertState(bistate);
|
||||
|
||||
/* If we skipped writing WAL, then we need to sync the heap. */
|
||||
if (hi_options & HEAP_INSERT_SKIP_WAL)
|
||||
heap_sync(newrel);
|
||||
table_finish_bulk_insert(newrel, ti_options);
|
||||
|
||||
table_close(newrel, NoLock);
|
||||
}
|
||||
|
@ -380,6 +380,21 @@ typedef struct TableAmRoutine
|
||||
uint8 flags,
|
||||
TM_FailureData *tmfd);
|
||||
|
||||
/*
|
||||
* Perform operations necessary to complete insertions made via
|
||||
* tuple_insert and multi_insert with a BulkInsertState specified. This
|
||||
* e.g. may e.g. used to flush the relation when inserting with
|
||||
* TABLE_INSERT_SKIP_WAL specified.
|
||||
*
|
||||
* Typically callers of tuple_insert and multi_insert will just pass all
|
||||
* the flags the apply to them, and each AM has to decide which of them
|
||||
* make sense for it, and then only take actions in finish_bulk_insert
|
||||
* that make sense for a specific AM.
|
||||
*
|
||||
* Optional callback.
|
||||
*/
|
||||
void (*finish_bulk_insert) (Relation rel, int options);
|
||||
|
||||
|
||||
/* ------------------------------------------------------------------------
|
||||
* DDL related functionality.
|
||||
@ -1011,7 +1026,8 @@ table_compute_xid_horizon_for_tuples(Relation rel,
|
||||
*
|
||||
*
|
||||
* The BulkInsertState object (if any; bistate can be NULL for default
|
||||
* behavior) is also just passed through to RelationGetBufferForTuple.
|
||||
* behavior) is also just passed through to RelationGetBufferForTuple. If
|
||||
* `bistate` is provided, table_finish_bulk_insert() needs to be called.
|
||||
*
|
||||
* On return the slot's tts_tid and tts_tableOid are updated to reflect the
|
||||
* insertion. But note that any toasting of fields within the slot is NOT
|
||||
@ -1185,6 +1201,20 @@ table_lock_tuple(Relation rel, ItemPointer tid, Snapshot snapshot,
|
||||
flags, tmfd);
|
||||
}
|
||||
|
||||
/*
|
||||
* Perform operations necessary to complete insertions made via
|
||||
* tuple_insert and multi_insert with a BulkInsertState specified. This
|
||||
* e.g. may e.g. used to flush the relation when inserting with
|
||||
* TABLE_INSERT_SKIP_WAL specified.
|
||||
*/
|
||||
static inline void
|
||||
table_finish_bulk_insert(Relation rel, int options)
|
||||
{
|
||||
/* optional callback */
|
||||
if (rel->rd_tableam && rel->rd_tableam->finish_bulk_insert)
|
||||
rel->rd_tableam->finish_bulk_insert(rel, options);
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------------------
|
||||
* DDL related functionality.
|
||||
|
Loading…
x
Reference in New Issue
Block a user