1
0
mirror of https://github.com/postgres/postgres.git synced 2025-09-02 04:21:28 +03:00

From: Massimo Dal Zotto <dz@cs.unitn.it>

This commit is contained in:
Marc G. Fournier
1998-08-30 21:05:27 +00:00
parent 6f3de1bb66
commit 6c4982851a
4 changed files with 295 additions and 117 deletions

View File

@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.37 1998/08/19 02:01:39 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.38 1998/08/30 21:04:43 scrappy Exp $
*
*-------------------------------------------------------------------------
*/
@@ -34,29 +34,7 @@
* -- jw, 12/28/93
*
*/
/*
* The following is the old model which does not work.
*/
/*
* Model is:
* 1. Multiple backends on same machine.
*
* 2. Query on one backend sends stuff over an asynchronous portal by
* appending to a relation, and then doing an async. notification
* (which takes place after commit) to all listeners on this relation.
*
* 3. Async. notification results in all backends listening on relation
* to be woken up, by a process signal kill(SIGUSR2), with name of relation
* passed in shared memory.
*
* 4. Each backend notifies its respective frontend over the comm
* channel using the out-of-band channel.
*
* 5. Each frontend receives this notification and processes accordingly.
*
* #4,#5 are changing soon with pending rewrite of portal/protocol.
*
*/
#include <unistd.h>
#include <signal.h>
#include <string.h>
@@ -82,17 +60,28 @@
#include "tcop/dest.h"
#include "utils/mcxt.h"
#include "utils/syscache.h"
#include <utils/trace.h>
#include <utils/ps_status.h>
#define NotifyUnlock pg_options[OPT_NOTIFYUNLOCK]
#define NotifyHack pg_options[OPT_NOTIFYHACK]
extern TransactionState CurrentTransactionState;
extern CommandDest whereToSendOutput;
GlobalMemory notifyContext = NULL;
static int notifyFrontEndPending = 0;
static int notifyIssued = 0;
static Dllist *pendingNotifies = NULL;
static int AsyncExistsPendingNotify(char *);
static void ClearPendingNotify(void);
static void Async_NotifyFrontEnd(void);
static void Async_NotifyFrontEnd_Aux(void);
void Async_Unlisten(char *relname, int pid);
static void Async_UnlistenOnExit(int code, char *relname);
static void Async_UnlistenAll(void);
/*
*--------------------------------------------------------------
@@ -116,33 +105,36 @@ static void Async_UnlistenOnExit(int code, char *relname);
void
Async_NotifyHandler(SIGNAL_ARGS)
{
extern TransactionState CurrentTransactionState;
TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler");
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
#ifdef ASYNC_DEBUG
elog(DEBUG, "Waking up sleeping backend process");
#endif
TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
"waking up sleeping backend process");
PS_SET_STATUS("async_notify");
Async_NotifyFrontEnd();
PS_SET_STATUS("idle");
}
else
{
#ifdef ASYNC_DEBUG
elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
CurrentTransactionState->state,
CurrentTransactionState->blockState);
#endif
TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
"process in middle of transaction, state=%d, blockstate=%d",
CurrentTransactionState->state,
CurrentTransactionState->blockState);
notifyFrontEndPending = 1;
TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending");
}
TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
}
/*
*--------------------------------------------------------------
* Async_Notify --
*
* This is executed by the SQL notify command.
*
* Adds the relation to the list of pending notifies.
* All notification happens at end of commit.
* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@@ -151,7 +143,6 @@ Async_NotifyHandler(SIGNAL_ARGS)
* then each backend notifies its corresponding front end at
* the end of commit.
*
* This correspond to 'notify <relname>' command
* -- jw, 12/28/93
*
* Results:
@@ -180,9 +171,7 @@ Async_Notify(char *relname)
char *notifyName;
#ifdef ASYNC_DEBUG
elog(DEBUG, "Async_Notify: %s", relname);
#endif
TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
if (!pendingNotifies)
pendingNotifies = DLNewList();
@@ -217,18 +206,32 @@ Async_Notify(char *relname)
{
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
heap_replace(lRel, &lTuple->t_ctid, rTuple);
/* notify is really issued only if a tuple has been changed */
notifyIssued = 1;
}
}
heap_endscan(sRel);
RelationUnsetLockForWrite(lRel);
/*
* Note: if the write lock is unset we can get multiple tuples
* with same oid if other backends notify the same relation.
* Use this option at your own risk.
*/
if (NotifyUnlock) {
RelationUnsetLockForWrite(lRel);
}
heap_close(lRel);
notifyIssued = 1;
TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname);
}
/*
*--------------------------------------------------------------
* Async_NotifyAtCommit --
*
* This is called at transaction commit.
*
* Signal our corresponding frontend process on relations that
* were notified. Signal all other backend process that
* are listening also.
@@ -265,14 +268,12 @@ Async_NotifyAtCommit()
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
if (notifyIssued)
{ /* 'notify <relname>' issued by us */
{
/* 'notify <relname>' issued by us */
notifyIssued = 0;
StartTransactionCommand();
#ifdef ASYNC_DEBUG
elog(DEBUG, "Async_NotifyAtCommit.");
#endif
TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit");
ScanKeyEntryInitialize(&key, 0,
Anum_pg_listener_notify,
F_INT4EQ,
@@ -294,16 +295,15 @@ Async_NotifyAtCommit()
if (MyProcPid == DatumGetInt32(d))
{
#ifdef ASYNC_DEBUG
elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
#endif
notifyFrontEndPending = 1;
TPRINTF(TRACE_NOTIFY,
"Async_NotifyAtCommit: notifying self");
}
else
{
#ifdef ASYNC_DEBUG
elog(DEBUG, "Notifying others");
#endif
TPRINTF(TRACE_NOTIFY,
"Async_NotifyAtCommit: notifying pid %d",
DatumGetInt32(d));
#ifdef HAVE_KILL
if (kill(DatumGetInt32(d), SIGUSR2) < 0)
{
@@ -315,19 +315,35 @@ Async_NotifyAtCommit()
}
}
heap_endscan(sRel);
RelationUnsetLockForWrite(lRel);
heap_close(lRel);
/*
* Notify the frontend inside the current transaction while
* we still have a valid write lock on pg_listeners. This
* avoid waiting until all other backends have finished
* with pg_listener.
*/
if (notifyFrontEndPending) {
/* The aux version is called inside transaction */
Async_NotifyFrontEnd_Aux();
}
TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit: done");
CommitTransactionCommand();
ClearPendingNotify();
}
else
{
/*
* No notifies issued by us. If notifyFrontEndPending has been set
* by Async_NotifyHandler notify the frontend of pending notifies
* from other backends.
*/
if (notifyFrontEndPending) {
Async_NotifyFrontEnd();
}
}
if (notifyFrontEndPending)
{ /* we need to notify the frontend of all
* pending notifies. */
notifyFrontEndPending = 1;
Async_NotifyFrontEnd();
}
ClearPendingNotify();
}
}
@@ -335,6 +351,8 @@ Async_NotifyAtCommit()
*--------------------------------------------------------------
* Async_NotifyAtAbort --
*
* This is called at transaction commit.
*
* Gets rid of pending notifies. List elements are automatically
* freed through memory context.
*
@@ -350,20 +368,19 @@ Async_NotifyAtCommit()
void
Async_NotifyAtAbort()
{
extern TransactionState CurrentTransactionState;
if (notifyIssued)
if (pendingNotifies) {
ClearPendingNotify();
notifyIssued = 0;
if (pendingNotifies)
DLFreeList(pendingNotifies);
}
pendingNotifies = DLNewList();
notifyIssued = 0;
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
/* don't forget to notify front end */
if (notifyFrontEndPending)
{ /* don't forget to notify front end */
{
Async_NotifyFrontEnd();
}
}
@@ -373,11 +390,11 @@ Async_NotifyAtAbort()
*--------------------------------------------------------------
* Async_Listen --
*
* This is executed by the SQL listen command.
*
* Register a backend (identified by its Unix PID) as listening
* on the specified relation.
*
* This corresponds to the 'listen <relation>' command in SQL
*
* One listener per relation, pg_listener relation is keyed
* on (relname,pid) to provide multiple listeners in future.
*
@@ -406,9 +423,13 @@ Async_Listen(char *relname, int pid)
char *relnamei;
TupleDesc tupDesc;
#ifdef ASYNC_DEBUG
elog(DEBUG, "Async_Listen: %s", relname);
#endif
if (whereToSendOutput != Remote) {
elog(NOTICE, "Async_Listen: "
"listen not available on interactive sessions");
return;
}
TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
for (i = 0; i < Natts_pg_listener; i++)
{
nulls[i] = ' ';
@@ -438,6 +459,10 @@ Async_Listen(char *relname, int pid)
if (pid == MyProcPid)
alreadyListener = 1;
}
if (alreadyListener) {
/* No need to scan the rest of the table */
break;
}
}
heap_endscan(scan);
@@ -445,15 +470,14 @@ Async_Listen(char *relname, int pid)
{
elog(NOTICE, "Async_Listen: We are already listening on %s",
relname);
RelationUnsetLockForWrite(lDesc);
heap_close(lDesc);
return;
}
tupDesc = lDesc->rd_att;
newtup = heap_formtuple(tupDesc,
values,
nulls);
newtup = heap_formtuple(tupDesc, values, nulls);
heap_insert(lDesc, newtup);
pfree(newtup);
/*
@@ -477,12 +501,11 @@ Async_Listen(char *relname, int pid)
*--------------------------------------------------------------
* Async_Unlisten --
*
* This is executed by the SQL unlisten command.
*
* Remove the backend from the list of listening backends
* for the specified relation.
*
* This would correspond to the 'unlisten <relation>'
* command, but there isn't one yet.
*
* Results:
* pg_listeners is updated.
*
@@ -497,20 +520,81 @@ Async_Unlisten(char *relname, int pid)
Relation lDesc;
HeapTuple lTuple;
lTuple = SearchSysCacheTuple(LISTENREL,
PointerGetDatum(relname),
/* Handle specially the `unlisten "*"' command */
if ((!relname) || (*relname == '\0') || (strcmp(relname,"*")==0)) {
Async_UnlistenAll();
return;
}
TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
Int32GetDatum(pid),
0, 0);
lDesc = heap_openr(ListenerRelationName);
RelationSetLockForWrite(lDesc);
if (lTuple != NULL)
{
lDesc = heap_openr(ListenerRelationName);
RelationSetLockForWrite(lDesc);
heap_delete(lDesc, &lTuple->t_ctid);
RelationUnsetLockForWrite(lDesc);
heap_close(lDesc);
RelationUnsetLockForWrite(lDesc);
heap_close(lDesc);
}
}
/*
*--------------------------------------------------------------
* Async_UnlistenAll --
*
* Unlisten all relations for this backend.
*
* Results:
* pg_listeners is updated.
*
* Side effects:
* XXX
*
*--------------------------------------------------------------
*/
static void
Async_UnlistenAll()
{
HeapTuple lTuple;
Relation lRel;
HeapScanDesc sRel;
TupleDesc tdesc;
ScanKeyData key[1];
TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
ScanKeyEntryInitialize(&key[0], 0,
Anum_pg_listener_pid,
F_INT4EQ,
Int32GetDatum(MyProcPid));
lRel = heap_openr(ListenerRelationName);
RelationSetLockForWrite(lRel);
tdesc = RelationGetTupleDescriptor(lRel);
sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
{
heap_delete(lRel, &lTuple->t_ctid);
}
heap_endscan(sRel);
RelationUnsetLockForWrite(lRel);
heap_close(lRel);
TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll: done");
}
/*
* --------------------------------------------------------------
* Async_UnlistenOnExit --
*
* This is called at backend exit for each registered listen.
*
* Results:
* XXX
*
* --------------------------------------------------------------
*/
static void
Async_UnlistenOnExit(int code, /* from exitpg */
char *relname)
@@ -522,6 +606,25 @@ Async_UnlistenOnExit(int code, /* from exitpg */
* --------------------------------------------------------------
* Async_NotifyFrontEnd --
*
* This is called outside transactions. The real work is done
* by Async_NotifyFrontEnd_Aux().
*
* --------------------------------------------------------------
*/
static void
Async_NotifyFrontEnd()
{
StartTransactionCommand();
Async_NotifyFrontEnd_Aux();
CommitTransactionCommand();
}
/*
* --------------------------------------------------------------
* Async_NotifyFrontEnd_Aux --
*
* This must be called inside a transaction block.
*
* Perform an asynchronous notification to front end over
* portal comm channel. The name of the relation which contains the
* data is sent to the front end.
@@ -534,12 +637,9 @@ Async_UnlistenOnExit(int code, /* from exitpg */
*
* --------------------------------------------------------------
*/
GlobalMemory notifyContext = NULL;
static void
Async_NotifyFrontEnd()
Async_NotifyFrontEnd_Aux()
{
extern CommandDest whereToSendOutput;
HeapTuple lTuple,
rTuple;
Relation lRel;
@@ -552,12 +652,15 @@ Async_NotifyFrontEnd()
nulls[3];
bool isnull;
#define MAX_DONE 64
char *done[MAX_DONE];
int ndone = 0;
int i;
notifyFrontEndPending = 0;
#ifdef ASYNC_DEBUG
elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
#endif
TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd");
StartTransactionCommand();
ScanKeyEntryInitialize(&key[0], 0,
Anum_pg_listener_notify,
@@ -580,11 +683,35 @@ Async_NotifyFrontEnd()
while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
{
d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc,
&isnull);
/*
* This hack deletes duplicate tuples which can be left
* in the table if the NotifyUnlock option is set.
* I'm further investigating this. -- dz
*/
if (NotifyHack) {
for (i=0; i<ndone; i++) {
if (strcmp(DatumGetName(d)->data, done[i]) == 0) {
TPRINTF(TRACE_NOTIFY,
"Async_NotifyFrontEnd: duplicate %s",
DatumGetName(d)->data);
heap_delete(lRel, &lTuple->t_ctid);
continue;
}
}
if (ndone < MAX_DONE) {
done[ndone++] = pstrdup(DatumGetName(d)->data);
}
}
rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
heap_replace(lRel, &lTuple->t_ctid, rTuple);
/* notifying the front end */
TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: notifying %s",
DatumGetName(d)->data);
if (whereToSendOutput == Remote)
{
@@ -593,12 +720,12 @@ Async_NotifyFrontEnd()
pq_putstr(DatumGetName(d)->data);
pq_flush();
}
else
elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
}
heap_endscan(sRel);
RelationUnsetLockForWrite(lRel);
heap_close(lRel);
CommitTransactionCommand();
TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: done");
}
static int