1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-08 11:42:09 +03:00

XLOG stuff for sequences.

CommitDelay in guc.c
This commit is contained in:
Vadim B. Mikheev
2000-11-30 01:47:33 +00:00
parent 680b7357ce
commit 741510521c
10 changed files with 241 additions and 67 deletions

View File

@ -22,19 +22,12 @@
#define SEQ_MAXVALUE ((int4)0x7FFFFFFF)
#define SEQ_MINVALUE -(SEQ_MAXVALUE)
typedef struct FormData_pg_sequence
{
NameData sequence_name;
int4 last_value;
int4 increment_by;
int4 max_value;
int4 min_value;
int4 cache_value;
char is_cycled;
char is_called;
} FormData_pg_sequence;
typedef FormData_pg_sequence *Form_pg_sequence;
/*
* We don't want to log each fetching values from sequences,
* so we pre-log a few fetches in advance. In the event of
* crash we can lose as much as we pre-logged.
*/
#define SEQ_LOG_VALS 32
typedef struct sequence_magic
{
@ -138,6 +131,11 @@ DefineSequence(CreateSeqStmt *seq)
coldef->colname = "cache_value";
value[i - 1] = Int32GetDatum(new.cache_value);
break;
case SEQ_COL_LOG:
typnam->name = "int4";
coldef->colname = "log_cnt";
value[i - 1] = Int32GetDatum((int32)1);
break;
case SEQ_COL_CYCLE:
typnam->name = "char";
coldef->colname = "is_cycled";
@ -196,10 +194,14 @@ nextval(PG_FUNCTION_ARGS)
int32 incby,
maxv,
minv,
cache;
cache,
log,
fetch,
last;
int32 result,
next,
rescnt = 0;
bool logit = false;
if (pg_aclcheck(seqname, GetUserId(), ACL_WR) != ACLCHECK_OK)
elog(ERROR, "%s.nextval: you don't have permissions to set sequence %s",
@ -219,16 +221,27 @@ nextval(PG_FUNCTION_ARGS)
seq = read_info("nextval", elm, &buf); /* lock page' buffer and
* read tuple */
next = result = seq->last_value;
last = next = result = seq->last_value;
incby = seq->increment_by;
maxv = seq->max_value;
minv = seq->min_value;
cache = seq->cache_value;
fetch = cache = seq->cache_value;
log = seq->log_cnt;
if (seq->is_called != 't')
{
rescnt++; /* last_value if not called */
fetch--;
log--;
}
while (rescnt < cache) /* try to fetch cache numbers */
if (log < fetch)
{
fetch = log = fetch - log + SEQ_LOG_VALS;
logit = true;
}
while (fetch) /* try to fetch cache [+ log ] numbers */
{
/*
@ -242,7 +255,7 @@ nextval(PG_FUNCTION_ARGS)
(maxv < 0 && next + incby > maxv))
{
if (rescnt > 0)
break; /* stop caching */
break; /* stop fetching */
if (seq->is_cycled != 't')
elog(ERROR, "%s.nextval: got MAXVALUE (%d)",
elm->name, maxv);
@ -258,7 +271,7 @@ nextval(PG_FUNCTION_ARGS)
(minv >= 0 && next + incby < minv))
{
if (rescnt > 0)
break; /* stop caching */
break; /* stop fetching */
if (seq->is_cycled != 't')
elog(ERROR, "%s.nextval: got MINVALUE (%d)",
elm->name, minv);
@ -267,17 +280,43 @@ nextval(PG_FUNCTION_ARGS)
else
next += incby;
}
rescnt++; /* got result */
if (rescnt == 1) /* if it's first one - */
result = next; /* it's what to return */
fetch--;
if (rescnt < cache)
{
log--;
rescnt++;
last = next;
if (rescnt == 1) /* if it's first result - */
result = next; /* it's what to return */
}
}
/* save info in local cache */
elm->last = result; /* last returned number */
elm->cached = next; /* last cached number */
elm->cached = last; /* last fetched number */
if (logit)
{
xl_seq_rec xlrec;
XLogRecPtr recptr;
if (fetch) /* not all numbers were fetched */
log -= fetch;
xlrec.node = elm->rel->rd_node;
xlrec.value = next;
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG|XLOG_NO_TRAN,
(char*) &xlrec, sizeof(xlrec), NULL, 0);
PageSetLSN(BufferGetPage(buf), recptr);
PageSetSUI(BufferGetPage(buf), ThisStartUpID);
}
/* save info in sequence relation */
seq->last_value = next; /* last fetched number */
seq->last_value = last; /* last fetched number */
Assert(log >= 0);
seq->log_cnt = log; /* how much is logged */
seq->is_called = 't';
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
@ -349,6 +388,21 @@ do_setval(char *seqname, int32 next, bool iscalled)
/* save info in sequence relation */
seq->last_value = next; /* last fetched number */
seq->is_called = iscalled ? 't' : 'f';
seq->log_cnt = (iscalled) ? 0 : 1;
{
xl_seq_rec xlrec;
XLogRecPtr recptr;
xlrec.node = elm->rel->rd_node;
xlrec.value = next;
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_SET|XLOG_NO_TRAN,
(char*) &xlrec, sizeof(xlrec), NULL, 0);
PageSetLSN(BufferGetPage(buf), recptr);
PageSetSUI(BufferGetPage(buf), ThisStartUpID);
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
@ -638,7 +692,6 @@ init_params(CreateSeqStmt *seq, Form_pg_sequence new)
}
static int
get_param(DefElem *def)
{
@ -651,3 +704,80 @@ get_param(DefElem *def)
elog(ERROR, "DefineSequence: \"%s\" is to be integer", def->defname);
return -1;
}
void seq_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
Relation reln;
Buffer buffer;
Page page;
ItemId lp;
HeapTupleData tuple;
Form_pg_sequence seq;
xl_seq_rec *xlrec;
if (info != XLOG_SEQ_LOG && info != XLOG_SEQ_SET)
elog(STOP, "seq_redo: unknown op code %u", info);
xlrec = (xl_seq_rec*) XLogRecGetData(record);
reln = XLogOpenRelation(true, RM_SEQ_ID, xlrec->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln, 0);
if (!BufferIsValid(buffer))
elog(STOP, "seq_redo: can't read block of %u/%u",
xlrec->node.tblNode, xlrec->node.relNode);
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page) ||
((sequence_magic *) PageGetSpecialPointer(page))->magic != SEQ_MAGIC)
elog(STOP, "seq_redo: uninitialized page of %u/%u",
xlrec->node.tblNode, xlrec->node.relNode);
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
return;
}
lp = PageGetItemId(page, FirstOffsetNumber);
Assert(ItemIdIsUsed(lp));
tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
seq = (Form_pg_sequence) GETSTRUCT(&tuple);
seq->last_value = xlrec->value; /* last logged value */
seq->is_called = 't';
seq->log_cnt = 0;
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
return;
}
void seq_undo(XLogRecPtr lsn, XLogRecord *record)
{
}
void seq_desc(char *buf, uint8 xl_info, char* rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
xl_seq_rec *xlrec = (xl_seq_rec*) rec;
if (info == XLOG_SEQ_LOG)
strcat(buf, "log: ");
else if (info == XLOG_SEQ_SET)
strcat(buf, "set: ");
else
{
strcat(buf, "UNKNOWN");
return;
}
sprintf(buf + strlen(buf), "node %u/%u; value %d",
xlrec->node.tblNode, xlrec->node.relNode, xlrec->value);
}