1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-27 12:41:57 +03:00

Avoid unnecessary catalog updates in ALTER SEQUENCE

ALTER SEQUENCE can do nontransactional changes to the sequence (RESTART
clause) and transactional updates to the pg_sequence catalog (most other
clauses).  When just calling RESTART, the code would still needlessly do
a catalog update without any changes.  This would entangle that
operation in the concurrency issues of a catalog update (causing either
locking or concurrency errors, depending on how that issue is to be
resolved).

Fix by keeping track during options parsing whether a catalog update is
needed, and skip it if not.

Reported-by: Jason Petersen <jason@citusdata.com>
This commit is contained in:
Peter Eisentraut
2017-05-02 10:41:48 -04:00
parent a35ac7c4e3
commit 3d092fe540

View File

@ -101,6 +101,7 @@ static Form_pg_sequence_data read_seq_tuple(Relation rel,
static void init_params(ParseState *pstate, List *options, bool for_identity, static void init_params(ParseState *pstate, List *options, bool for_identity,
bool isInit, bool isInit,
Form_pg_sequence seqform, Form_pg_sequence seqform,
bool *changed_seqform,
Form_pg_sequence_data seqdataform, List **owned_by); Form_pg_sequence_data seqdataform, List **owned_by);
static void do_setval(Oid relid, int64 next, bool iscalled); static void do_setval(Oid relid, int64 next, bool iscalled);
static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity); static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity);
@ -115,6 +116,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
{ {
FormData_pg_sequence seqform; FormData_pg_sequence seqform;
FormData_pg_sequence_data seqdataform; FormData_pg_sequence_data seqdataform;
bool changed_seqform = false; /* not used here */
List *owned_by; List *owned_by;
CreateStmt *stmt = makeNode(CreateStmt); CreateStmt *stmt = makeNode(CreateStmt);
Oid seqoid; Oid seqoid;
@ -153,7 +155,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq)
} }
/* Check and set all option values */ /* Check and set all option values */
init_params(pstate, seq->options, seq->for_identity, true, &seqform, &seqdataform, &owned_by); init_params(pstate, seq->options, seq->for_identity, true, &seqform, &changed_seqform, &seqdataform, &owned_by);
/* /*
* Create relation (and fill value[] and null[] for the tuple) * Create relation (and fill value[] and null[] for the tuple)
@ -418,6 +420,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
Form_pg_sequence seqform; Form_pg_sequence seqform;
Form_pg_sequence_data seqdata; Form_pg_sequence_data seqdata;
FormData_pg_sequence_data newseqdata; FormData_pg_sequence_data newseqdata;
bool changed_seqform = false;
List *owned_by; List *owned_by;
ObjectAddress address; ObjectAddress address;
Relation rel; Relation rel;
@ -443,7 +446,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
/* lock page' buffer and read tuple into new sequence structure */ /* lock page' buffer and read tuple into new sequence structure */
seqdata = read_seq_tuple(seqrel, &buf, &seqdatatuple); seqdata = read_seq_tuple(seqrel, &buf, &seqdatatuple);
/* Copy old values of options into workspace */ /* Copy old sequence data into workspace */
memcpy(&newseqdata, seqdata, sizeof(FormData_pg_sequence_data)); memcpy(&newseqdata, seqdata, sizeof(FormData_pg_sequence_data));
rel = heap_open(SequenceRelationId, RowExclusiveLock); rel = heap_open(SequenceRelationId, RowExclusiveLock);
@ -456,7 +459,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
seqform = (Form_pg_sequence) GETSTRUCT(tuple); seqform = (Form_pg_sequence) GETSTRUCT(tuple);
/* Check and set new values */ /* Check and set new values */
init_params(pstate, stmt->options, stmt->for_identity, false, seqform, &newseqdata, &owned_by); init_params(pstate, stmt->options, stmt->for_identity, false, seqform, &changed_seqform, &newseqdata, &owned_by);
/* Clear local cache so that we don't think we have cached numbers */ /* Clear local cache so that we don't think we have cached numbers */
/* Note that we do not change the currval() state */ /* Note that we do not change the currval() state */
@ -507,6 +510,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt)
relation_close(seqrel, NoLock); relation_close(seqrel, NoLock);
if (changed_seqform)
CatalogTupleUpdate(rel, &tuple->t_self, tuple); CatalogTupleUpdate(rel, &tuple->t_self, tuple);
heap_close(rel, RowExclusiveLock); heap_close(rel, RowExclusiveLock);
@ -1213,9 +1217,12 @@ read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple)
} }
/* /*
* init_params: process the options list of CREATE or ALTER SEQUENCE, * init_params: process the options list of CREATE or ALTER SEQUENCE, and
* and store the values into appropriate fields of *new. Also set * store the values into appropriate fields of seqform, for changes that go
* *owned_by to any OWNED BY option, or to NIL if there is none. * into the pg_sequence catalog, and seqdataform for changes to the sequence
* relation itself. Set *changed_seqform to true if seqform was changed
* (interesting for ALTER SEQUENCE). Also set *owned_by to any OWNED BY
* option, or to NIL if there is none.
* *
* If isInit is true, fill any unspecified options with default values; * If isInit is true, fill any unspecified options with default values;
* otherwise, do not change existing options that aren't explicitly overridden. * otherwise, do not change existing options that aren't explicitly overridden.
@ -1224,7 +1231,9 @@ static void
init_params(ParseState *pstate, List *options, bool for_identity, init_params(ParseState *pstate, List *options, bool for_identity,
bool isInit, bool isInit,
Form_pg_sequence seqform, Form_pg_sequence seqform,
Form_pg_sequence_data seqdataform, List **owned_by) bool *changed_seqform,
Form_pg_sequence_data seqdataform,
List **owned_by)
{ {
DefElem *as_type = NULL; DefElem *as_type = NULL;
DefElem *start_value = NULL; DefElem *start_value = NULL;
@ -1342,6 +1351,8 @@ init_params(ParseState *pstate, List *options, bool for_identity,
defel->defname); defel->defname);
} }
*changed_seqform = false;
/* /*
* We must reset log_cnt when isInit or when changing any parameters that * We must reset log_cnt when isInit or when changing any parameters that
* would affect future nextval allocations. * would affect future nextval allocations.
@ -1382,14 +1393,19 @@ init_params(ParseState *pstate, List *options, bool for_identity,
} }
seqform->seqtypid = newtypid; seqform->seqtypid = newtypid;
*changed_seqform = true;
} }
else if (isInit) else if (isInit)
{
seqform->seqtypid = INT8OID; seqform->seqtypid = INT8OID;
*changed_seqform = true;
}
/* INCREMENT BY */ /* INCREMENT BY */
if (increment_by != NULL) if (increment_by != NULL)
{ {
seqform->seqincrement = defGetInt64(increment_by); seqform->seqincrement = defGetInt64(increment_by);
*changed_seqform = true;
if (seqform->seqincrement == 0) if (seqform->seqincrement == 0)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@ -1397,22 +1413,30 @@ init_params(ParseState *pstate, List *options, bool for_identity,
seqdataform->log_cnt = 0; seqdataform->log_cnt = 0;
} }
else if (isInit) else if (isInit)
{
seqform->seqincrement = 1; seqform->seqincrement = 1;
*changed_seqform = true;
}
/* CYCLE */ /* CYCLE */
if (is_cycled != NULL) if (is_cycled != NULL)
{ {
seqform->seqcycle = intVal(is_cycled->arg); seqform->seqcycle = intVal(is_cycled->arg);
*changed_seqform = true;
Assert(BoolIsValid(seqform->seqcycle)); Assert(BoolIsValid(seqform->seqcycle));
seqdataform->log_cnt = 0; seqdataform->log_cnt = 0;
} }
else if (isInit) else if (isInit)
{
seqform->seqcycle = false; seqform->seqcycle = false;
*changed_seqform = true;
}
/* MAXVALUE (null arg means NO MAXVALUE) */ /* MAXVALUE (null arg means NO MAXVALUE) */
if (max_value != NULL && max_value->arg) if (max_value != NULL && max_value->arg)
{ {
seqform->seqmax = defGetInt64(max_value); seqform->seqmax = defGetInt64(max_value);
*changed_seqform = true;
seqdataform->log_cnt = 0; seqdataform->log_cnt = 0;
} }
else if (isInit || max_value != NULL || reset_max_value) else if (isInit || max_value != NULL || reset_max_value)
@ -1429,6 +1453,7 @@ init_params(ParseState *pstate, List *options, bool for_identity,
} }
else else
seqform->seqmax = -1; /* descending seq */ seqform->seqmax = -1; /* descending seq */
*changed_seqform = true;
seqdataform->log_cnt = 0; seqdataform->log_cnt = 0;
} }
@ -1450,6 +1475,7 @@ init_params(ParseState *pstate, List *options, bool for_identity,
if (min_value != NULL && min_value->arg) if (min_value != NULL && min_value->arg)
{ {
seqform->seqmin = defGetInt64(min_value); seqform->seqmin = defGetInt64(min_value);
*changed_seqform = true;
seqdataform->log_cnt = 0; seqdataform->log_cnt = 0;
} }
else if (isInit || min_value != NULL || reset_min_value) else if (isInit || min_value != NULL || reset_min_value)
@ -1466,6 +1492,7 @@ init_params(ParseState *pstate, List *options, bool for_identity,
} }
else else
seqform->seqmin = 1; /* ascending seq */ seqform->seqmin = 1; /* ascending seq */
*changed_seqform = true;
seqdataform->log_cnt = 0; seqdataform->log_cnt = 0;
} }
@ -1499,13 +1526,17 @@ init_params(ParseState *pstate, List *options, bool for_identity,
/* START WITH */ /* START WITH */
if (start_value != NULL) if (start_value != NULL)
{
seqform->seqstart = defGetInt64(start_value); seqform->seqstart = defGetInt64(start_value);
*changed_seqform = true;
}
else if (isInit) else if (isInit)
{ {
if (seqform->seqincrement > 0) if (seqform->seqincrement > 0)
seqform->seqstart = seqform->seqmin; /* ascending seq */ seqform->seqstart = seqform->seqmin; /* ascending seq */
else else
seqform->seqstart = seqform->seqmax; /* descending seq */ seqform->seqstart = seqform->seqmax; /* descending seq */
*changed_seqform = true;
} }
/* crosscheck START */ /* crosscheck START */
@ -1580,6 +1611,7 @@ init_params(ParseState *pstate, List *options, bool for_identity,
if (cache_value != NULL) if (cache_value != NULL)
{ {
seqform->seqcache = defGetInt64(cache_value); seqform->seqcache = defGetInt64(cache_value);
*changed_seqform = true;
if (seqform->seqcache <= 0) if (seqform->seqcache <= 0)
{ {
char buf[100]; char buf[100];
@ -1593,7 +1625,10 @@ init_params(ParseState *pstate, List *options, bool for_identity,
seqdataform->log_cnt = 0; seqdataform->log_cnt = 0;
} }
else if (isInit) else if (isInit)
{
seqform->seqcache = 1; seqform->seqcache = 1;
*changed_seqform = true;
}
} }
/* /*