mirror of
https://github.com/postgres/postgres.git
synced 2025-07-31 22:04:40 +03:00
config
contrib
adminpack
auth_delay
auto_explain
btree_gin
btree_gist
chkpass
citext
cube
dblink
dict_int
dict_xsyn
dummy_seclabel
earthdistance
file_fdw
fuzzystrmatch
hstore
intagg
intarray
isn
lo
ltree
oid2name
pageinspect
passwordcheck
pg_archivecleanup
pg_buffercache
pg_freespacemap
pg_prewarm
pg_standby
pg_stat_statements
pg_test_fsync
pg_test_timing
pg_trgm
pg_upgrade
pg_upgrade_support
pg_xlogdump
pgbench
pgcrypto
pgrowlocks
pgstattuple
postgres_fdw
seg
sepgsql
spi
sslinfo
start-scripts
tablefunc
tcn
test_decoding
test_parser
test_shm_mq
expected
sql
.gitignore
Makefile
setup.c
test.c
test_shm_mq--1.0.sql
test_shm_mq.control
test_shm_mq.h
worker.c
tsearch2
unaccent
uuid-ossp
vacuumlo
worker_spi
xml2
Makefile
README
contrib-global.mk
doc
src
.dir-locals.el
.gitattributes
.gitignore
COPYRIGHT
GNUmakefile.in
HISTORY
Makefile
README
README.git
aclocal.m4
configure
configure.in
Commit 3bd261ca18
updated the API but
neglected to make the corresponding edits here.
Per Tom Lane and the buildfarm.
225 lines
6.9 KiB
C
225 lines
6.9 KiB
C
/*--------------------------------------------------------------------------
|
|
*
|
|
* worker.c
|
|
* Code for sample worker making use of shared memory message queues.
|
|
* Our test worker simply reads messages from one message queue and
|
|
* writes them back out to another message queue. In a real
|
|
* application, you'd presumably want the worker to do some more
|
|
* complex calculation rather than simply returning the input,
|
|
* but it should be possible to use much of the control logic just
|
|
* as presented here.
|
|
*
|
|
* Copyright (C) 2013, PostgreSQL Global Development Group
|
|
*
|
|
* IDENTIFICATION
|
|
* contrib/test_shm_mq/worker.c
|
|
*
|
|
* -------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "miscadmin.h"
|
|
#include "storage/ipc.h"
|
|
#include "storage/procarray.h"
|
|
#include "storage/shm_mq.h"
|
|
#include "storage/shm_toc.h"
|
|
#include "utils/resowner.h"
|
|
|
|
#include "test_shm_mq.h"
|
|
|
|
static void handle_sigterm(SIGNAL_ARGS);
|
|
static void attach_to_queues(dsm_segment *seg, shm_toc *toc,
|
|
int myworkernumber, shm_mq_handle **inqhp,
|
|
shm_mq_handle **outqhp);
|
|
static void copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh);
|
|
|
|
/*
|
|
* Background worker entrypoint.
|
|
*
|
|
* This is intended to demonstrate how a background worker can be used to
|
|
* facilitate a parallel computation. Most of the logic here is fairly
|
|
* boilerplate stuff, designed to attach to the shared memory segment,
|
|
* notify the user backend that we're alive, and so on. The
|
|
* application-specific bits of logic that you'd replace for your own worker
|
|
* are attach_to_queues() and copy_messages().
|
|
*/
|
|
void
|
|
test_shm_mq_main(Datum main_arg)
|
|
{
|
|
dsm_segment *seg;
|
|
shm_toc *toc;
|
|
shm_mq_handle *inqh;
|
|
shm_mq_handle *outqh;
|
|
volatile test_shm_mq_header *hdr;
|
|
int myworkernumber;
|
|
PGPROC *registrant;
|
|
|
|
/*
|
|
* Establish signal handlers.
|
|
*
|
|
* We want CHECK_FOR_INTERRUPTS() to kill off this worker process just
|
|
* as it would a normal user backend. To make that happen, we establish
|
|
* a signal handler that is a stripped-down version of die(). We don't
|
|
* have any equivalent of the backend's command-read loop, where interrupts
|
|
* can be processed immediately, so make sure ImmediateInterruptOK is
|
|
* turned off.
|
|
*/
|
|
pqsignal(SIGTERM, handle_sigterm);
|
|
ImmediateInterruptOK = false;
|
|
BackgroundWorkerUnblockSignals();
|
|
|
|
/*
|
|
* Connect to the dynamic shared memory segment.
|
|
*
|
|
* The backend that registered this worker passed us the ID of a shared
|
|
* memory segment to which we must attach for further instructions. In
|
|
* order to attach to dynamic shared memory, we need a resource owner.
|
|
* Once we've mapped the segment in our address space, attach to the table
|
|
* of contents so we can locate the various data structures we'll need
|
|
* to find within the segment.
|
|
*/
|
|
CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_shm_mq worker");
|
|
seg = dsm_attach(DatumGetInt32(main_arg));
|
|
if (seg == NULL)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("unable to map dynamic shared memory segment")));
|
|
toc = shm_toc_attach(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg));
|
|
if (toc == NULL)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("bad magic number in dynamic shared memory segment")));
|
|
|
|
/*
|
|
* Acquire a worker number.
|
|
*
|
|
* By convention, the process registering this background worker should
|
|
* have stored the control structure at key 0. We look up that key to
|
|
* find it. Our worker number gives our identity: there may be just one
|
|
* worker involved in this parallel operation, or there may be many.
|
|
*/
|
|
hdr = shm_toc_lookup(toc, 0);
|
|
SpinLockAcquire(&hdr->mutex);
|
|
myworkernumber = ++hdr->workers_attached;
|
|
SpinLockRelease(&hdr->mutex);
|
|
if (myworkernumber > hdr->workers_total)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("too many message queue testing workers already")));
|
|
|
|
/*
|
|
* Attach to the appropriate message queues.
|
|
*/
|
|
attach_to_queues(seg, toc, myworkernumber, &inqh, &outqh);
|
|
|
|
/*
|
|
* Indicate that we're fully initialized and ready to begin the main
|
|
* part of the parallel operation.
|
|
*
|
|
* Once we signal that we're ready, the user backend is entitled to assume
|
|
* that our on_dsm_detach callbacks will fire before we disconnect from
|
|
* the shared memory segment and exit. Generally, that means we must have
|
|
* attached to all relevant dynamic shared memory data structures by now.
|
|
*/
|
|
SpinLockAcquire(&hdr->mutex);
|
|
++hdr->workers_ready;
|
|
SpinLockRelease(&hdr->mutex);
|
|
registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
|
|
if (registrant == NULL)
|
|
{
|
|
elog(DEBUG1, "registrant backend has exited prematurely");
|
|
proc_exit(1);
|
|
}
|
|
SetLatch(®istrant->procLatch);
|
|
|
|
/* Do the work. */
|
|
copy_messages(inqh, outqh);
|
|
|
|
/*
|
|
* We're done. Explicitly detach the shared memory segment so that we
|
|
* don't get a resource leak warning at commit time. This will fire any
|
|
* on_dsm_detach callbacks we've registered, as well. Once that's done,
|
|
* we can go ahead and exit.
|
|
*/
|
|
dsm_detach(seg);
|
|
proc_exit(1);
|
|
}
|
|
|
|
/*
|
|
* Attach to shared memory message queues.
|
|
*
|
|
* We use our worker number to determine to which queue we should attach.
|
|
* The queues are registered at keys 1..<number-of-workers>. The user backend
|
|
* writes to queue #1 and reads from queue #<number-of-workers>; each worker
|
|
* reads from the queue whose number is equal to its worker number and writes
|
|
* to the next higher-numbered queue.
|
|
*/
|
|
static void
|
|
attach_to_queues(dsm_segment *seg, shm_toc *toc, int myworkernumber,
|
|
shm_mq_handle **inqhp, shm_mq_handle **outqhp)
|
|
{
|
|
shm_mq *inq;
|
|
shm_mq *outq;
|
|
|
|
inq = shm_toc_lookup(toc, myworkernumber);
|
|
shm_mq_set_receiver(inq, MyProc);
|
|
*inqhp = shm_mq_attach(inq, seg, NULL);
|
|
outq = shm_toc_lookup(toc, myworkernumber + 1);
|
|
shm_mq_set_sender(outq, MyProc);
|
|
*outqhp = shm_mq_attach(outq, seg, NULL);
|
|
}
|
|
|
|
/*
|
|
* Loop, receiving and sending messages, until the connection is broken.
|
|
*
|
|
* This is the "real work" performed by this worker process. Everything that
|
|
* happens before this is initialization of one form or another, and everything
|
|
* after this point is cleanup.
|
|
*/
|
|
static void
|
|
copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh)
|
|
{
|
|
Size len;
|
|
void *data;
|
|
shm_mq_result res;
|
|
|
|
for (;;)
|
|
{
|
|
/* Notice any interrupts that have occurred. */
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
/* Receive a message. */
|
|
res = shm_mq_receive(inqh, &len, &data, false);
|
|
if (res != SHM_MQ_SUCCESS)
|
|
break;
|
|
|
|
/* Send it back out. */
|
|
res = shm_mq_send(outqh, len, data, false);
|
|
if (res != SHM_MQ_SUCCESS)
|
|
break;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
|
|
* like a normal backend. The next CHECK_FOR_INTERRUPTS() will do the right
|
|
* thing.
|
|
*/
|
|
static void
|
|
handle_sigterm(SIGNAL_ARGS)
|
|
{
|
|
int save_errno = errno;
|
|
|
|
if (MyProc)
|
|
SetLatch(&MyProc->procLatch);
|
|
|
|
if (!proc_exit_inprogress)
|
|
{
|
|
InterruptPending = true;
|
|
ProcDiePending = true;
|
|
}
|
|
|
|
errno = save_errno;
|
|
}
|