mirror of
https://github.com/postgres/postgres.git
synced 2025-06-27 23:21:58 +03:00
Postgres95 1.01 Distribution - Virgin Sources
This commit is contained in:
605
src/backend/commands/async.c
Normal file
605
src/backend/commands/async.c
Normal file
@ -0,0 +1,605 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* async.c--
|
||||
* Asynchronous notification
|
||||
*
|
||||
* Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.1.1.1 1996/07/09 06:21:19 scrappy Exp $
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
/* New Async Notification Model:
|
||||
* 1. Multiple backends on same machine. Multiple backends listening on
|
||||
* one relation.
|
||||
*
|
||||
* 2. One of the backend does a 'notify <relname>'. For all backends that
|
||||
* are listening to this relation (all notifications take place at the
|
||||
* end of commit),
|
||||
* 2.a If the process is the same as the backend process that issued
|
||||
* notification (we are notifying something that we are listening),
|
||||
* signal the corresponding frontend over the comm channel using the
|
||||
* out-of-band channel.
|
||||
* 2.b For all other listening processes, we send kill(2) to wake up
|
||||
* the listening backend.
|
||||
* 3. Upon receiving a kill(2) signal from another backend process notifying
|
||||
* that one of the relation that we are listening is being notified,
|
||||
* we can be in either of two following states:
|
||||
* 3.a We are sleeping, wake up and signal our frontend.
|
||||
* 3.b We are in middle of another transaction, wait until the end of
|
||||
* of the current transaction and signal our frontend.
|
||||
* 4. Each frontend receives this notification and prcesses accordingly.
|
||||
*
|
||||
* -- jw, 12/28/93
|
||||
*
|
||||
*/
|
||||
/*
|
||||
* The following is the old model which does not work.
|
||||
*/
|
||||
/*
|
||||
* Model is:
|
||||
* 1. Multiple backends on same machine.
|
||||
*
|
||||
* 2. Query on one backend sends stuff over an asynchronous portal by
|
||||
* appending to a relation, and then doing an async. notification
|
||||
* (which takes place after commit) to all listeners on this relation.
|
||||
*
|
||||
* 3. Async. notification results in all backends listening on relation
|
||||
* to be woken up, by a process signal kill(2), with name of relation
|
||||
* passed in shared memory.
|
||||
*
|
||||
* 4. Each backend notifies its respective frontend over the comm
|
||||
* channel using the out-of-band channel.
|
||||
*
|
||||
* 5. Each frontend receives this notification and processes accordingly.
|
||||
*
|
||||
* #4,#5 are changing soon with pending rewrite of portal/protocol.
|
||||
*
|
||||
*/
|
||||
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
#include <errno.h>
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/attnum.h"
|
||||
#include "access/heapam.h"
|
||||
#include "access/htup.h"
|
||||
#include "access/relscan.h"
|
||||
#include "access/skey.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/tqual.h"
|
||||
#include "access/xact.h"
|
||||
|
||||
#include "commands/async.h"
|
||||
#include "commands/copy.h"
|
||||
#include "storage/buf.h"
|
||||
#include "storage/itemptr.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/portal.h"
|
||||
#include "utils/excid.h"
|
||||
#include "utils/elog.h"
|
||||
#include "utils/mcxt.h"
|
||||
#include "utils/palloc.h"
|
||||
#include "utils/rel.h"
|
||||
|
||||
#include "nodes/pg_list.h"
|
||||
#include "tcop/dest.h"
|
||||
#include "commands/command.h"
|
||||
|
||||
#include "catalog/catname.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "catalog/pg_attribute.h"
|
||||
#include "catalog/pg_proc.h"
|
||||
#include "catalog/pg_class.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "catalog/pg_listener.h"
|
||||
|
||||
#include "executor/execdefs.h"
|
||||
/* #include "executor/execdesc.h"*/
|
||||
|
||||
#include "storage/bufmgr.h"
|
||||
#include "lib/dllist.h"
|
||||
#include "libpq/libpq.h"
|
||||
|
||||
|
||||
static int notifyFrontEndPending = 0;
|
||||
static int notifyIssued = 0;
|
||||
static Dllist *pendingNotifies = NULL;
|
||||
|
||||
|
||||
static int AsyncExistsPendingNotify(char *);
|
||||
static void ClearPendingNotify(void);
|
||||
|
||||
/*
|
||||
*--------------------------------------------------------------
|
||||
* Async_NotifyHandler --
|
||||
*
|
||||
* This is the signal handler for SIGUSR2. When the backend
|
||||
* is signaled, the backend can be in two states.
|
||||
* 1. If the backend is in the middle of another transaction,
|
||||
* we set the flag, notifyFrontEndPending, and wait until
|
||||
* the end of the transaction to notify the front end.
|
||||
* 2. If the backend is not in the middle of another transaction,
|
||||
* we notify the front end immediately.
|
||||
*
|
||||
* -- jw, 12/28/93
|
||||
* Results:
|
||||
* none
|
||||
*
|
||||
* Side effects:
|
||||
* none
|
||||
*/
|
||||
void
|
||||
#if defined(PORTNAME_linux)
|
||||
Async_NotifyHandler(int i)
|
||||
#else
|
||||
Async_NotifyHandler()
|
||||
#endif
|
||||
{
|
||||
extern TransactionState CurrentTransactionState;
|
||||
|
||||
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
|
||||
(CurrentTransactionState->blockState == TRANS_DEFAULT)) {
|
||||
|
||||
elog(DEBUG, "Waking up sleeping backend process");
|
||||
Async_NotifyFrontEnd();
|
||||
|
||||
}else {
|
||||
elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
|
||||
CurrentTransactionState->state,
|
||||
CurrentTransactionState->blockState);
|
||||
notifyFrontEndPending = 1;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
*--------------------------------------------------------------
|
||||
* Async_Notify --
|
||||
*
|
||||
* Adds the relation to the list of pending notifies.
|
||||
* All notification happens at end of commit.
|
||||
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
*
|
||||
* All notification of backend processes happens here,
|
||||
* then each backend notifies its corresponding front end at
|
||||
* the end of commit.
|
||||
*
|
||||
* This correspond to 'notify <relname>' command
|
||||
* -- jw, 12/28/93
|
||||
*
|
||||
* Results:
|
||||
* XXX
|
||||
*
|
||||
* Side effects:
|
||||
* All tuples for relname in pg_listener are updated.
|
||||
*
|
||||
*--------------------------------------------------------------
|
||||
*/
|
||||
void
|
||||
Async_Notify(char *relname)
|
||||
{
|
||||
|
||||
HeapTuple lTuple, rTuple;
|
||||
Relation lRel;
|
||||
HeapScanDesc sRel;
|
||||
TupleDesc tdesc;
|
||||
ScanKeyData key;
|
||||
Buffer b;
|
||||
Datum d, value[3];
|
||||
bool isnull;
|
||||
char repl[3], nulls[3];
|
||||
|
||||
char *notifyName;
|
||||
|
||||
elog(DEBUG,"Async_Notify: %s",relname);
|
||||
|
||||
if (!pendingNotifies)
|
||||
pendingNotifies = DLNewList();
|
||||
|
||||
notifyName = pstrdup(relname);
|
||||
DLAddHead(pendingNotifies, DLNewElem(notifyName));
|
||||
|
||||
ScanKeyEntryInitialize(&key, 0,
|
||||
Anum_pg_listener_relname,
|
||||
NameEqualRegProcedure,
|
||||
PointerGetDatum(notifyName));
|
||||
|
||||
lRel = heap_openr(ListenerRelationName);
|
||||
tdesc = RelationGetTupleDescriptor(lRel);
|
||||
sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key);
|
||||
|
||||
nulls[0] = nulls[1] = nulls[2] = ' ';
|
||||
repl[0] = repl[1] = repl[2] = ' ';
|
||||
repl[Anum_pg_listener_notify - 1] = 'r';
|
||||
value[0] = value[1] = value[2] = (Datum) 0;
|
||||
value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
|
||||
|
||||
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0, &b))) {
|
||||
d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_notify,
|
||||
tdesc, &isnull);
|
||||
if (!DatumGetInt32(d)) {
|
||||
rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
|
||||
(void) heap_replace(lRel, &lTuple->t_ctid, rTuple);
|
||||
}
|
||||
ReleaseBuffer(b);
|
||||
}
|
||||
heap_endscan(sRel);
|
||||
heap_close(lRel);
|
||||
notifyIssued = 1;
|
||||
}
|
||||
|
||||
/*
|
||||
*--------------------------------------------------------------
|
||||
* Async_NotifyAtCommit --
|
||||
*
|
||||
* Signal our corresponding frontend process on relations that
|
||||
* were notified. Signal all other backend process that
|
||||
* are listening also.
|
||||
*
|
||||
* -- jw, 12/28/93
|
||||
*
|
||||
* Results:
|
||||
* XXX
|
||||
*
|
||||
* Side effects:
|
||||
* Tuples in pg_listener that has our listenerpid are updated so
|
||||
* that the notification is 0. We do not want to notify frontend
|
||||
* more than once.
|
||||
*
|
||||
* -- jw, 12/28/93
|
||||
*
|
||||
*--------------------------------------------------------------
|
||||
*/
|
||||
void
|
||||
Async_NotifyAtCommit()
|
||||
{
|
||||
HeapTuple lTuple;
|
||||
Relation lRel;
|
||||
HeapScanDesc sRel;
|
||||
TupleDesc tdesc;
|
||||
ScanKeyData key;
|
||||
Datum d;
|
||||
int ourpid;
|
||||
bool isnull;
|
||||
Buffer b;
|
||||
extern TransactionState CurrentTransactionState;
|
||||
|
||||
if (!pendingNotifies)
|
||||
pendingNotifies = DLNewList();
|
||||
|
||||
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
|
||||
(CurrentTransactionState->blockState == TRANS_DEFAULT)) {
|
||||
|
||||
if (notifyIssued) { /* 'notify <relname>' issued by us */
|
||||
notifyIssued = 0;
|
||||
StartTransactionCommand();
|
||||
elog(DEBUG, "Async_NotifyAtCommit.");
|
||||
ScanKeyEntryInitialize(&key, 0,
|
||||
Anum_pg_listener_notify,
|
||||
Integer32EqualRegProcedure,
|
||||
Int32GetDatum(1));
|
||||
lRel = heap_openr(ListenerRelationName);
|
||||
sRel = heap_beginscan(lRel, 0, NowTimeQual, 1, &key);
|
||||
tdesc = RelationGetTupleDescriptor(lRel);
|
||||
ourpid = getpid();
|
||||
|
||||
while (HeapTupleIsValid(lTuple = heap_getnext(sRel,0, &b))) {
|
||||
d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname,
|
||||
tdesc, &isnull);
|
||||
|
||||
if (AsyncExistsPendingNotify((char *) DatumGetPointer(d))) {
|
||||
d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_pid,
|
||||
tdesc, &isnull);
|
||||
|
||||
if (ourpid == DatumGetInt32(d)) {
|
||||
elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
|
||||
notifyFrontEndPending = 1;
|
||||
} else {
|
||||
elog(DEBUG, "Notifying others");
|
||||
#ifndef WIN32
|
||||
if (kill(DatumGetInt32(d), SIGUSR2) < 0) {
|
||||
if (errno == ESRCH) {
|
||||
heap_delete(lRel, &lTuple->t_ctid);
|
||||
}
|
||||
}
|
||||
#endif /* WIN32 */
|
||||
}
|
||||
}
|
||||
ReleaseBuffer(b);
|
||||
}
|
||||
CommitTransactionCommand();
|
||||
ClearPendingNotify();
|
||||
}
|
||||
|
||||
if (notifyFrontEndPending) { /* we need to notify the frontend of
|
||||
all pending notifies. */
|
||||
notifyFrontEndPending = 1;
|
||||
Async_NotifyFrontEnd();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
*--------------------------------------------------------------
|
||||
* Async_NotifyAtAbort --
|
||||
*
|
||||
* Gets rid of pending notifies. List elements are automatically
|
||||
* freed through memory context.
|
||||
*
|
||||
*
|
||||
* Results:
|
||||
* XXX
|
||||
*
|
||||
* Side effects:
|
||||
* XXX
|
||||
*
|
||||
*--------------------------------------------------------------
|
||||
*/
|
||||
void
|
||||
Async_NotifyAtAbort()
|
||||
{
|
||||
extern TransactionState CurrentTransactionState;
|
||||
|
||||
if (notifyIssued) {
|
||||
ClearPendingNotify();
|
||||
}
|
||||
notifyIssued = 0;
|
||||
if (pendingNotifies)
|
||||
DLFreeList(pendingNotifies);
|
||||
pendingNotifies = DLNewList();
|
||||
|
||||
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
|
||||
(CurrentTransactionState->blockState == TRANS_DEFAULT)) {
|
||||
if (notifyFrontEndPending) { /* don't forget to notify front end */
|
||||
Async_NotifyFrontEnd();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
*--------------------------------------------------------------
|
||||
* Async_Listen --
|
||||
*
|
||||
* Register a backend (identified by its Unix PID) as listening
|
||||
* on the specified relation.
|
||||
*
|
||||
* This corresponds to the 'listen <relation>' command in SQL
|
||||
*
|
||||
* One listener per relation, pg_listener relation is keyed
|
||||
* on (relname,pid) to provide multiple listeners in future.
|
||||
*
|
||||
* Results:
|
||||
* pg_listeners is updated.
|
||||
*
|
||||
* Side effects:
|
||||
* XXX
|
||||
*
|
||||
*--------------------------------------------------------------
|
||||
*/
|
||||
void
|
||||
Async_Listen(char *relname, int pid)
|
||||
{
|
||||
Datum values[Natts_pg_listener];
|
||||
char nulls[Natts_pg_listener];
|
||||
TupleDesc tdesc;
|
||||
HeapScanDesc s;
|
||||
HeapTuple htup,tup;
|
||||
Relation lDesc;
|
||||
Buffer b;
|
||||
Datum d;
|
||||
int i;
|
||||
bool isnull;
|
||||
int alreadyListener = 0;
|
||||
int ourPid = getpid();
|
||||
char *relnamei;
|
||||
TupleDesc tupDesc;
|
||||
|
||||
elog(DEBUG,"Async_Listen: %s",relname);
|
||||
for (i = 0 ; i < Natts_pg_listener; i++) {
|
||||
nulls[i] = ' ';
|
||||
values[i] = PointerGetDatum(NULL);
|
||||
}
|
||||
|
||||
i = 0;
|
||||
values[i++] = (Datum) relname;
|
||||
values[i++] = (Datum) pid;
|
||||
values[i++] = (Datum) 0; /* no notifies pending */
|
||||
|
||||
lDesc = heap_openr(ListenerRelationName);
|
||||
|
||||
/* is someone already listening. One listener per relation */
|
||||
tdesc = RelationGetTupleDescriptor(lDesc);
|
||||
s = heap_beginscan(lDesc,0,NowTimeQual,0,(ScanKey)NULL);
|
||||
while (HeapTupleIsValid(htup = heap_getnext(s,0,&b))) {
|
||||
d = (Datum) heap_getattr(htup,b,Anum_pg_listener_relname,tdesc,
|
||||
&isnull);
|
||||
relnamei = DatumGetPointer(d);
|
||||
if (!strncmp(relnamei,relname, NAMEDATALEN)) {
|
||||
d = (Datum) heap_getattr(htup,b,Anum_pg_listener_pid,tdesc,&isnull);
|
||||
pid = DatumGetInt32(d);
|
||||
if (pid == ourPid) {
|
||||
alreadyListener = 1;
|
||||
}
|
||||
}
|
||||
ReleaseBuffer(b);
|
||||
}
|
||||
heap_endscan(s);
|
||||
|
||||
if (alreadyListener) {
|
||||
elog(NOTICE, "Async_Listen: We are already listening on %s",
|
||||
relname);
|
||||
return;
|
||||
}
|
||||
|
||||
tupDesc = lDesc->rd_att;
|
||||
tup = heap_formtuple(tupDesc,
|
||||
values,
|
||||
nulls);
|
||||
heap_insert(lDesc, tup);
|
||||
|
||||
pfree(tup);
|
||||
/* if (alreadyListener) {
|
||||
elog(NOTICE,"Async_Listen: already one listener on %s (possibly dead)",relname);
|
||||
}*/
|
||||
heap_close(lDesc);
|
||||
|
||||
/*
|
||||
* now that we are listening, we should make a note to ourselves
|
||||
* to unlisten prior to dying.
|
||||
*/
|
||||
relnamei = malloc(NAMEDATALEN); /* persists to process exit */
|
||||
memset (relnamei, 0, NAMEDATALEN);
|
||||
strncpy(relnamei, relname, NAMEDATALEN);
|
||||
on_exitpg(Async_UnlistenOnExit, (caddr_t) relnamei);
|
||||
}
|
||||
|
||||
/*
|
||||
*--------------------------------------------------------------
|
||||
* Async_Unlisten --
|
||||
*
|
||||
* Remove the backend from the list of listening backends
|
||||
* for the specified relation.
|
||||
*
|
||||
* This would correspond to the 'unlisten <relation>'
|
||||
* command, but there isn't one yet.
|
||||
*
|
||||
* Results:
|
||||
* pg_listeners is updated.
|
||||
*
|
||||
* Side effects:
|
||||
* XXX
|
||||
*
|
||||
*--------------------------------------------------------------
|
||||
*/
|
||||
void
|
||||
Async_Unlisten(char *relname, int pid)
|
||||
{
|
||||
Relation lDesc;
|
||||
HeapTuple lTuple;
|
||||
|
||||
lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
|
||||
Int32GetDatum(pid),
|
||||
0,0);
|
||||
lDesc = heap_openr(ListenerRelationName);
|
||||
if (lTuple != NULL) {
|
||||
heap_delete(lDesc,&lTuple->t_ctid);
|
||||
}
|
||||
heap_close(lDesc);
|
||||
}
|
||||
|
||||
void
|
||||
Async_UnlistenOnExit(int code, /* from exitpg */
|
||||
char *relname)
|
||||
{
|
||||
Async_Unlisten((char *) relname, getpid());
|
||||
}
|
||||
|
||||
/*
|
||||
* --------------------------------------------------------------
|
||||
* Async_NotifyFrontEnd --
|
||||
*
|
||||
* Perform an asynchronous notification to front end over
|
||||
* portal comm channel. The name of the relation which contains the
|
||||
* data is sent to the front end.
|
||||
*
|
||||
* We remove the notification flag from the pg_listener tuple
|
||||
* associated with our process.
|
||||
*
|
||||
* Results:
|
||||
* XXX
|
||||
*
|
||||
* Side effects:
|
||||
*
|
||||
* We make use of the out-of-band channel to transmit the
|
||||
* notification to the front end. The actual data transfer takes
|
||||
* place at the front end's request.
|
||||
*
|
||||
* --------------------------------------------------------------
|
||||
*/
|
||||
GlobalMemory notifyContext = NULL;
|
||||
|
||||
void
|
||||
Async_NotifyFrontEnd()
|
||||
{
|
||||
extern CommandDest whereToSendOutput;
|
||||
HeapTuple lTuple, rTuple;
|
||||
Relation lRel;
|
||||
HeapScanDesc sRel;
|
||||
TupleDesc tdesc;
|
||||
ScanKeyData key[2];
|
||||
Datum d, value[3];
|
||||
char repl[3], nulls[3];
|
||||
Buffer b;
|
||||
int ourpid;
|
||||
bool isnull;
|
||||
|
||||
notifyFrontEndPending = 0;
|
||||
|
||||
elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
|
||||
|
||||
StartTransactionCommand();
|
||||
ourpid = getpid();
|
||||
ScanKeyEntryInitialize(&key[0], 0,
|
||||
Anum_pg_listener_notify,
|
||||
Integer32EqualRegProcedure,
|
||||
Int32GetDatum(1));
|
||||
ScanKeyEntryInitialize(&key[1], 0,
|
||||
Anum_pg_listener_pid,
|
||||
Integer32EqualRegProcedure,
|
||||
Int32GetDatum(ourpid));
|
||||
lRel = heap_openr(ListenerRelationName);
|
||||
tdesc = RelationGetTupleDescriptor(lRel);
|
||||
sRel = heap_beginscan(lRel, 0, NowTimeQual, 2, key);
|
||||
|
||||
nulls[0] = nulls[1] = nulls[2] = ' ';
|
||||
repl[0] = repl[1] = repl[2] = ' ';
|
||||
repl[Anum_pg_listener_notify - 1] = 'r';
|
||||
value[0] = value[1] = value[2] = (Datum) 0;
|
||||
value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
|
||||
|
||||
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0,&b))) {
|
||||
d = (Datum) heap_getattr(lTuple, b, Anum_pg_listener_relname,
|
||||
tdesc, &isnull);
|
||||
rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
|
||||
(void) heap_replace(lRel, &lTuple->t_ctid, rTuple);
|
||||
|
||||
/* notifying the front end */
|
||||
|
||||
if (whereToSendOutput == Remote) {
|
||||
pq_putnchar("A", 1);
|
||||
pq_putint(ourpid, 4);
|
||||
pq_putstr(DatumGetName(d)->data);
|
||||
pq_flush();
|
||||
} else {
|
||||
elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
|
||||
}
|
||||
ReleaseBuffer(b);
|
||||
}
|
||||
CommitTransactionCommand();
|
||||
}
|
||||
|
||||
static int
|
||||
AsyncExistsPendingNotify(char *relname)
|
||||
{
|
||||
Dlelem* p;
|
||||
for (p = DLGetHead(pendingNotifies);
|
||||
p != NULL;
|
||||
p = DLGetSucc(p)) {
|
||||
if (!strcmp(DLE_VAL(p), relname))
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
ClearPendingNotify()
|
||||
{
|
||||
Dlelem* p;
|
||||
while ( (p = DLRemHead(pendingNotifies)) != NULL)
|
||||
free(DLE_VAL(p));
|
||||
}
|
||||
|
Reference in New Issue
Block a user