1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-30 11:03:19 +03:00

Move test modules from contrib to src/test/modules

This is advance preparation for introducing even more test modules; the
easy solution is to add them to contrib, but that's bloated enough that
it seems a good time to think of something different.

Moved modules are dummy_seclabel, test_shm_mq, test_parser and
worker_spi.

(test_decoding was also a candidate, but there was too much opposition
to moving that one.  We can always reconsider later.)
This commit is contained in:
Alvaro Herrera
2014-11-29 23:55:00 -03:00
parent 5b12987b2e
commit 22dfd116a1
38 changed files with 212 additions and 280 deletions

View File

@ -12,6 +12,14 @@ subdir = src/test
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
SUBDIRS = regress isolation
SUBDIRS = regress isolation modules
$(recurse)
# We want to recurse to all subdirs for all standard targets, except that
# installcheck and install should not recurse into the subdirectory "modules".
recurse_alldirs_targets := $(filter-out installcheck install, $(standard_targets))
installable_dirs := $(filter-out modules, $(SUBDIRS))
$(call recurse,$(recurse_alldirs_targets))
$(call recurse,installcheck, $(installable_dirs))
$(call recurse,install, $(installable_dirs))

13
src/test/modules/Makefile Normal file
View File

@ -0,0 +1,13 @@
# src/test/modules/Makefile
subdir = src/test/modules
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
SUBDIRS = \
worker_spi \
dummy_seclabel \
test_shm_mq \
test_parser
$(recurse)

View File

@ -0,0 +1,15 @@
# src/test/modules/dummy_seclabel/Makefile
MODULES = dummy_seclabel
PGFILEDESC = "dummy_seclabel - regression testing of the SECURITY LABEL statement"
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = src/test/modules/dummy_seclabel
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

View File

@ -0,0 +1,43 @@
The dummy_seclabel module exists only to support regression
testing of the SECURITY LABEL statement. It is not intended
to be used in production.
Rationale
=========
The SECURITY LABEL statement allows the user to assign security
labels to database objects; however, security labels can only be assigned
when specifically allowed by a loadable module, so this module is provided
to allow proper regression testing.
Security label providers intended to be used in production will typically be
dependent on a platform-specific feature such as
SE-Linux. This module is platform-independent,
and therefore better-suited to regression testing.
Usage
=====
Here's a simple example of usage:
# postgresql.conf
shared_preload_libraries = 'dummy_seclabel'
postgres=# CREATE TABLE t (a int, b text);
CREATE TABLE
postgres=# SECURITY LABEL ON TABLE t IS 'classified';
SECURITY LABEL
The dummy_seclabel module provides only four hardcoded
labels: unclassified, classified,
secret, and top secret.
It does not allow any other strings as security labels.
These labels are not used to enforce access controls. They are only used
to check whether the SECURITY LABEL statement works as expected,
or not.
Author
======
KaiGai Kohei <kaigai@ak.jp.nec.com>

View File

@ -0,0 +1,50 @@
/*
* dummy_seclabel.c
*
* Dummy security label provider.
*
* This module does not provide anything worthwhile from a security
* perspective, but allows regression testing independent of platform-specific
* features like SELinux.
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*/
#include "postgres.h"
#include "commands/seclabel.h"
#include "miscadmin.h"
#include "utils/rel.h"
PG_MODULE_MAGIC;
/* Entrypoint of the module */
void _PG_init(void);
static void
dummy_object_relabel(const ObjectAddress *object, const char *seclabel)
{
if (seclabel == NULL ||
strcmp(seclabel, "unclassified") == 0 ||
strcmp(seclabel, "classified") == 0)
return;
if (strcmp(seclabel, "secret") == 0 ||
strcmp(seclabel, "top secret") == 0)
{
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("only superuser can set '%s' label", seclabel)));
return;
}
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("'%s' is not a valid security label", seclabel)));
}
void
_PG_init(void)
{
register_label_provider("dummy", dummy_object_relabel);
}

View File

@ -0,0 +1,4 @@
# Generated subdirectories
/log/
/results/
/tmp_check/

View File

@ -0,0 +1,21 @@
# src/test/modules/test_parser/Makefile
MODULE_big = test_parser
OBJS = test_parser.o $(WIN32RES)
PGFILEDESC = "test_parser - example of a custom parser for full-text search"
EXTENSION = test_parser
DATA = test_parser--1.0.sql test_parser--unpackaged--1.0.sql
REGRESS = test_parser
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = src/test/modules/test_parser
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

View File

@ -0,0 +1,61 @@
test_parser is an example of a custom parser for full-text
search. It doesn't do anything especially useful, but can serve as
a starting point for developing your own parser.
test_parser recognizes words separated by white space,
and returns just two token types:
mydb=# SELECT * FROM ts_token_type('testparser');
tokid | alias | description
-------+-------+---------------
3 | word | Word
12 | blank | Space symbols
(2 rows)
These token numbers have been chosen to be compatible with the default
parser's numbering. This allows us to use its headline()
function, thus keeping the example simple.
Usage
=====
Installing the test_parser extension creates a text search
parser testparser. It has no user-configurable parameters.
You can test the parser with, for example,
mydb=# SELECT * FROM ts_parse('testparser', 'That''s my first own parser');
tokid | token
-------+--------
3 | That's
12 |
3 | my
12 |
3 | first
12 |
3 | own
12 |
3 | parser
Real-world use requires setting up a text search configuration
that uses the parser. For example,
mydb=# CREATE TEXT SEARCH CONFIGURATION testcfg ( PARSER = testparser );
CREATE TEXT SEARCH CONFIGURATION
mydb=# ALTER TEXT SEARCH CONFIGURATION testcfg
mydb-# ADD MAPPING FOR word WITH english_stem;
ALTER TEXT SEARCH CONFIGURATION
mydb=# SELECT to_tsvector('testcfg', 'That''s my first own parser');
to_tsvector
-------------------------------
'that':1 'first':3 'parser':5
(1 row)
mydb=# SELECT ts_headline('testcfg', 'Supernovae stars are the brightest phenomena in galaxies',
mydb(# to_tsquery('testcfg', 'star'));
ts_headline
-----------------------------------------------------------------
Supernovae <b>stars</b> are the brightest phenomena in galaxies
(1 row)

View File

@ -0,0 +1,44 @@
CREATE EXTENSION test_parser;
-- make test configuration using parser
CREATE TEXT SEARCH CONFIGURATION testcfg (PARSER = testparser);
ALTER TEXT SEARCH CONFIGURATION testcfg ADD MAPPING FOR word WITH simple;
-- ts_parse
SELECT * FROM ts_parse('testparser', 'That''s simple parser can''t parse urls like http://some.url/here/');
tokid | token
-------+-----------------------
3 | That's
12 |
3 | simple
12 |
3 | parser
12 |
3 | can't
12 |
3 | parse
12 |
3 | urls
12 |
3 | like
12 |
3 | http://some.url/here/
(15 rows)
SELECT to_tsvector('testcfg','That''s my first own parser');
to_tsvector
-------------------------------------------------
'first':3 'my':2 'own':4 'parser':5 'that''s':1
(1 row)
SELECT to_tsquery('testcfg', 'star');
to_tsquery
------------
'star'
(1 row)
SELECT ts_headline('testcfg','Supernovae stars are the brightest phenomena in galaxies',
to_tsquery('testcfg', 'stars'));
ts_headline
-----------------------------------------------------------------
Supernovae <b>stars</b> are the brightest phenomena in galaxies
(1 row)

View File

@ -0,0 +1,18 @@
CREATE EXTENSION test_parser;
-- make test configuration using parser
CREATE TEXT SEARCH CONFIGURATION testcfg (PARSER = testparser);
ALTER TEXT SEARCH CONFIGURATION testcfg ADD MAPPING FOR word WITH simple;
-- ts_parse
SELECT * FROM ts_parse('testparser', 'That''s simple parser can''t parse urls like http://some.url/here/');
SELECT to_tsvector('testcfg','That''s my first own parser');
SELECT to_tsquery('testcfg', 'star');
SELECT ts_headline('testcfg','Supernovae stars are the brightest phenomena in galaxies',
to_tsquery('testcfg', 'stars'));

View File

@ -0,0 +1,32 @@
/* src/test/modules/test_parser/test_parser--1.0.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION test_parser" to load this file. \quit
CREATE FUNCTION testprs_start(internal, int4)
RETURNS internal
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
CREATE FUNCTION testprs_getlexeme(internal, internal, internal)
RETURNS internal
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
CREATE FUNCTION testprs_end(internal)
RETURNS void
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
CREATE FUNCTION testprs_lextype(internal)
RETURNS internal
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT;
CREATE TEXT SEARCH PARSER testparser (
START = testprs_start,
GETTOKEN = testprs_getlexeme,
END = testprs_end,
HEADLINE = pg_catalog.prsd_headline,
LEXTYPES = testprs_lextype
);

View File

@ -0,0 +1,10 @@
/* src/test/modules/test_parser/test_parser--unpackaged--1.0.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION test_parser FROM unpackaged" to load this file. \quit
ALTER EXTENSION test_parser ADD function testprs_start(internal,integer);
ALTER EXTENSION test_parser ADD function testprs_getlexeme(internal,internal,internal);
ALTER EXTENSION test_parser ADD function testprs_end(internal);
ALTER EXTENSION test_parser ADD function testprs_lextype(internal);
ALTER EXTENSION test_parser ADD text search parser testparser;

View File

@ -0,0 +1,128 @@
/*-------------------------------------------------------------------------
*
* test_parser.c
* Simple example of a text search parser
*
* Copyright (c) 2007-2014, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/test/modules/test_parser/test_parser.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
PG_MODULE_MAGIC;
/*
* types
*/
/* self-defined type */
typedef struct
{
char *buffer; /* text to parse */
int len; /* length of the text in buffer */
int pos; /* position of the parser */
} ParserState;
/* copy-paste from wparser.h of tsearch2 */
typedef struct
{
int lexid;
char *alias;
char *descr;
} LexDescr;
/*
* functions
*/
PG_FUNCTION_INFO_V1(testprs_start);
PG_FUNCTION_INFO_V1(testprs_getlexeme);
PG_FUNCTION_INFO_V1(testprs_end);
PG_FUNCTION_INFO_V1(testprs_lextype);
Datum
testprs_start(PG_FUNCTION_ARGS)
{
ParserState *pst = (ParserState *) palloc0(sizeof(ParserState));
pst->buffer = (char *) PG_GETARG_POINTER(0);
pst->len = PG_GETARG_INT32(1);
pst->pos = 0;
PG_RETURN_POINTER(pst);
}
Datum
testprs_getlexeme(PG_FUNCTION_ARGS)
{
ParserState *pst = (ParserState *) PG_GETARG_POINTER(0);
char **t = (char **) PG_GETARG_POINTER(1);
int *tlen = (int *) PG_GETARG_POINTER(2);
int startpos = pst->pos;
int type;
*t = pst->buffer + pst->pos;
if (pst->pos < pst->len &&
(pst->buffer)[pst->pos] == ' ')
{
/* blank type */
type = 12;
/* go to the next non-space character */
while (pst->pos < pst->len &&
(pst->buffer)[pst->pos] == ' ')
(pst->pos)++;
}
else
{
/* word type */
type = 3;
/* go to the next space character */
while (pst->pos < pst->len &&
(pst->buffer)[pst->pos] != ' ')
(pst->pos)++;
}
*tlen = pst->pos - startpos;
/* we are finished if (*tlen == 0) */
if (*tlen == 0)
type = 0;
PG_RETURN_INT32(type);
}
Datum
testprs_end(PG_FUNCTION_ARGS)
{
ParserState *pst = (ParserState *) PG_GETARG_POINTER(0);
pfree(pst);
PG_RETURN_VOID();
}
Datum
testprs_lextype(PG_FUNCTION_ARGS)
{
/*
* Remarks: - we have to return the blanks for headline reason - we use
* the same lexids like Teodor in the default word parser; in this way we
* can reuse the headline function of the default word parser.
*/
LexDescr *descr = (LexDescr *) palloc(sizeof(LexDescr) * (2 + 1));
/* there are only two types in this parser */
descr[0].lexid = 3;
descr[0].alias = pstrdup("word");
descr[0].descr = pstrdup("Word");
descr[1].lexid = 12;
descr[1].alias = pstrdup("blank");
descr[1].descr = pstrdup("Space symbols");
descr[2].lexid = 0;
PG_RETURN_POINTER(descr);
}

View File

@ -0,0 +1,5 @@
# test_parser extension
comment = 'example of a custom parser for full-text search'
default_version = '1.0'
module_pathname = '$libdir/test_parser'
relocatable = true

View File

@ -0,0 +1,4 @@
# Generated subdirectories
/log/
/results/
/tmp_check/

View File

@ -0,0 +1,21 @@
# src/test/modules/test_shm_mq/Makefile
MODULE_big = test_shm_mq
OBJS = test.o setup.o worker.o $(WIN32RES)
PGFILEDESC = "test_shm_mq - example use of shared memory message queue"
EXTENSION = test_shm_mq
DATA = test_shm_mq--1.0.sql
REGRESS = test_shm_mq
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = src/test/modules/test_shm_mq
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

View File

@ -0,0 +1,49 @@
test_shm_mq is an example of how to use dynamic shared memory
and the shared memory message queue facilities to coordinate a user backend
with the efforts of one or more background workers. It is not intended to
do anything useful on its own; rather, it is a demonstration of how these
facilities can be used, and a unit test of those facilities.
The function is this extension send the same message repeatedly through
a loop of processes. The message payload, the size of the message queue
through which it is sent, and the number of processes in the loop are
configurable. At the end, the message may be verified to ensure that it
has not been corrupted in transmission.
Functions
=========
test_shm_mq(queue_size int8, message text,
repeat_count int4 default 1, num_workers int4 default 1)
RETURNS void
This function sends and receives messages synchronously. The user
backend sends the provided message to the first background worker using
a message queue of the given size. The first background worker sends
the message to the second background worker, if the number of workers
is greater than one, and so forth. Eventually, the last background
worker sends the message back to the user backend. If the repeat count
is greater than one, the user backend then sends the message back to
the first worker. Once the message has been sent and received by all
the coordinating processes a number of times equal to the repeat count,
the user backend verifies that the message finally received matches the
one originally sent and throws an error if not.
test_shm_mq_pipelined(queue_size int8, message text,
repeat_count int4 default 1, num_workers int4 default 1,
verify bool default true)
RETURNS void
This function sends the same message multiple times, as specified by the
repeat count, to the first background worker using a queue of the given
size. These messages are then forwarded to each background worker in
turn, in each case using a queue of the given size. Finally, the last
background worker sends the messages back to the user backend. The user
backend uses non-blocking sends and receives, so that it may begin receiving
copies of the message before it has finished sending all copies of the
message. The 'verify' argument controls whether or not the
received copies are checked against the message that was sent. (This
takes nontrivial time so it may be useful to disable it for benchmarking
purposes.)

View File

@ -0,0 +1,36 @@
CREATE EXTENSION test_shm_mq;
--
-- These tests don't produce any interesting output. We're checking that
-- the operations complete without crashing or hanging and that none of their
-- internal sanity tests fail.
--
SELECT test_shm_mq(1024, '', 2000, 1);
test_shm_mq
-------------
(1 row)
SELECT test_shm_mq(1024, 'a', 2001, 1);
test_shm_mq
-------------
(1 row)
SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+900*random())::int)), 10000, 1);
test_shm_mq
-------------
(1 row)
SELECT test_shm_mq(100, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+200*random())::int)), 10000, 1);
test_shm_mq
-------------
(1 row)
SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,270000)), 200, 3);
test_shm_mq_pipelined
-----------------------
(1 row)

View File

@ -0,0 +1,328 @@
/*--------------------------------------------------------------------------
*
* setup.c
* Code to set up a dynamic shared memory segments and a specified
* number of background workers for shared memory message queue
* testing.
*
* Copyright (C) 2013-2014, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/test/modules/test_shm_mq/setup.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/procsignal.h"
#include "storage/shm_toc.h"
#include "utils/memutils.h"
#include "test_shm_mq.h"
typedef struct
{
int nworkers;
BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER];
} worker_state;
static void setup_dynamic_shared_memory(int64 queue_size, int nworkers,
dsm_segment **segp,
test_shm_mq_header **hdrp,
shm_mq **outp, shm_mq **inp);
static worker_state *setup_background_workers(int nworkers,
dsm_segment *seg);
static void cleanup_background_workers(dsm_segment *seg, Datum arg);
static void wait_for_workers_to_become_ready(worker_state *wstate,
volatile test_shm_mq_header *hdr);
static bool check_worker_status(worker_state *wstate);
/*
* Set up a dynamic shared memory segment and zero or more background workers
* for a test run.
*/
void
test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
shm_mq_handle **output, shm_mq_handle **input)
{
dsm_segment *seg;
test_shm_mq_header *hdr;
shm_mq *outq = NULL; /* placate compiler */
shm_mq *inq = NULL; /* placate compiler */
worker_state *wstate;
/* Set up a dynamic shared memory segment. */
setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
*segp = seg;
/* Register background workers. */
wstate = setup_background_workers(nworkers, seg);
/* Attach the queues. */
*output = shm_mq_attach(outq, seg, wstate->handle[0]);
*input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
/* Wait for workers to become ready. */
wait_for_workers_to_become_ready(wstate, hdr);
/*
* Once we reach this point, all workers are ready. We no longer need to
* kill them if we die; they'll die on their own as the message queues
* shut down.
*/
cancel_on_dsm_detach(seg, cleanup_background_workers,
PointerGetDatum(wstate));
pfree(wstate);
}
/*
* Set up a dynamic shared memory segment.
*
* We set up a small control region that contains only a test_shm_mq_header,
* plus one region per message queue. There are as many message queues as
* the number of workers, plus one.
*/
static void
setup_dynamic_shared_memory(int64 queue_size, int nworkers,
dsm_segment **segp, test_shm_mq_header **hdrp,
shm_mq **outp, shm_mq **inp)
{
shm_toc_estimator e;
int i;
Size segsize;
dsm_segment *seg;
shm_toc *toc;
test_shm_mq_header *hdr;
/* Ensure a valid queue size. */
if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("queue size must be at least %zu bytes",
shm_mq_minimum_size)));
if (queue_size != ((Size) queue_size))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("queue size overflows size_t")));
/*
* Estimate how much shared memory we need.
*
* Because the TOC machinery may choose to insert padding of oddly-sized
* requests, we must estimate each chunk separately.
*
* We need one key to register the location of the header, and we need
* nworkers + 1 keys to track the locations of the message queues.
*/
shm_toc_initialize_estimator(&e);
shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));
for (i = 0; i <= nworkers; ++i)
shm_toc_estimate_chunk(&e, (Size) queue_size);
shm_toc_estimate_keys(&e, 2 + nworkers);
segsize = shm_toc_estimate(&e);
/* Create the shared memory segment and establish a table of contents. */
seg = dsm_create(shm_toc_estimate(&e));
toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),
segsize);
/* Set up the header region. */
hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));
SpinLockInit(&hdr->mutex);
hdr->workers_total = nworkers;
hdr->workers_attached = 0;
hdr->workers_ready = 0;
shm_toc_insert(toc, 0, hdr);
/* Set up one message queue per worker, plus one. */
for (i = 0; i <= nworkers; ++i)
{
shm_mq *mq;
mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
(Size) queue_size);
shm_toc_insert(toc, i + 1, mq);
if (i == 0)
{
/* We send messages to the first queue. */
shm_mq_set_sender(mq, MyProc);
*outp = mq;
}
if (i == nworkers)
{
/* We receive messages from the last queue. */
shm_mq_set_receiver(mq, MyProc);
*inp = mq;
}
}
/* Return results to caller. */
*segp = seg;
*hdrp = hdr;
}
/*
* Register background workers.
*/
static worker_state *
setup_background_workers(int nworkers, dsm_segment *seg)
{
MemoryContext oldcontext;
BackgroundWorker worker;
worker_state *wstate;
int i;
/*
* We need the worker_state object and the background worker handles to
* which it points to be allocated in CurTransactionContext rather than
* ExprContext; otherwise, they'll be destroyed before the on_dsm_detach
* hooks run.
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
/* Create worker state object. */
wstate = MemoryContextAlloc(TopTransactionContext,
offsetof(worker_state, handle) +
sizeof(BackgroundWorkerHandle *) * nworkers);
wstate->nworkers = 0;
/*
* Arrange to kill all the workers if we abort before all workers are
* finished hooking themselves up to the dynamic shared memory segment.
*
* If we die after all the workers have finished hooking themselves up to
* the dynamic shared memory segment, we'll mark the two queues to which
* we're directly connected as detached, and the worker(s) connected to
* those queues will exit, marking any other queues to which they are
* connected as detached. This will cause any as-yet-unaware workers
* connected to those queues to exit in their turn, and so on, until
* everybody exits.
*
* But suppose the workers which are supposed to connect to the queues to
* which we're directly attached exit due to some error before they
* actually attach the queues. The remaining workers will have no way of
* knowing this. From their perspective, they're still waiting for those
* workers to start, when in fact they've already died.
*/
on_dsm_detach(seg, cleanup_background_workers,
PointerGetDatum(wstate));
/* Configure a worker. */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = NULL; /* new worker might not have library loaded */
sprintf(worker.bgw_library_name, "test_shm_mq");
sprintf(worker.bgw_function_name, "test_shm_mq_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "test_shm_mq");
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
/* set bgw_notify_pid, so we can detect if the worker stops */
worker.bgw_notify_pid = MyProcPid;
/* Register the workers. */
for (i = 0; i < nworkers; ++i)
{
if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not register background process"),
errhint("You may need to increase max_worker_processes.")));
++wstate->nworkers;
}
/* All done. */
MemoryContextSwitchTo(oldcontext);
return wstate;
}
static void
cleanup_background_workers(dsm_segment *seg, Datum arg)
{
worker_state *wstate = (worker_state *) DatumGetPointer(arg);
while (wstate->nworkers > 0)
{
--wstate->nworkers;
TerminateBackgroundWorker(wstate->handle[wstate->nworkers]);
}
}
static void
wait_for_workers_to_become_ready(worker_state *wstate,
volatile test_shm_mq_header *hdr)
{
bool save_set_latch_on_sigusr1;
bool result = false;
save_set_latch_on_sigusr1 = set_latch_on_sigusr1;
set_latch_on_sigusr1 = true;
PG_TRY();
{
for (;;)
{
int workers_ready;
/* If all the workers are ready, we have succeeded. */
SpinLockAcquire(&hdr->mutex);
workers_ready = hdr->workers_ready;
SpinLockRelease(&hdr->mutex);
if (workers_ready >= wstate->nworkers)
{
result = true;
break;
}
/* If any workers (or the postmaster) have died, we have failed. */
if (!check_worker_status(wstate))
{
result = false;
break;
}
/* Wait to be signalled. */
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
/* An interrupt may have occurred while we were waiting. */
CHECK_FOR_INTERRUPTS();
/* Reset the latch so we don't spin. */
ResetLatch(&MyProc->procLatch);
}
}
PG_CATCH();
{
set_latch_on_sigusr1 = save_set_latch_on_sigusr1;
PG_RE_THROW();
}
PG_END_TRY();
if (!result)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("one or more background workers failed to start")));
}
static bool
check_worker_status(worker_state *wstate)
{
int n;
/* If any workers (or the postmaster) have died, we have failed. */
for (n = 0; n < wstate->nworkers; ++n)
{
BgwHandleStatus status;
pid_t pid;
status = GetBackgroundWorkerPid(wstate->handle[n], &pid);
if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)
return false;
}
/* Otherwise, things still look OK. */
return true;
}

View File

@ -0,0 +1,12 @@
CREATE EXTENSION test_shm_mq;
--
-- These tests don't produce any interesting output. We're checking that
-- the operations complete without crashing or hanging and that none of their
-- internal sanity tests fail.
--
SELECT test_shm_mq(1024, '', 2000, 1);
SELECT test_shm_mq(1024, 'a', 2001, 1);
SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+900*random())::int)), 10000, 1);
SELECT test_shm_mq(100, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+200*random())::int)), 10000, 1);
SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,270000)), 200, 3);

View File

@ -0,0 +1,264 @@
/*--------------------------------------------------------------------------
*
* test.c
* Test harness code for shared memory message queues.
*
* Copyright (C) 2013-2014, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/test/modules/test_shm_mq/test.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "test_shm_mq.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(test_shm_mq);
PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
void _PG_init(void);
static void verify_message(Size origlen, char *origdata, Size newlen,
char *newdata);
/*
* Simple test of the shared memory message queue infrastructure.
*
* We set up a ring of message queues passing through 1 or more background
* processes and eventually looping back to ourselves. We then send a message
* through the ring a number of times indicated by the loop count. At the end,
* we check whether the final message matches the one we started with.
*/
Datum
test_shm_mq(PG_FUNCTION_ARGS)
{
int64 queue_size = PG_GETARG_INT64(0);
text *message = PG_GETARG_TEXT_PP(1);
char *message_contents = VARDATA_ANY(message);
int message_size = VARSIZE_ANY_EXHDR(message);
int32 loop_count = PG_GETARG_INT32(2);
int32 nworkers = PG_GETARG_INT32(3);
dsm_segment *seg;
shm_mq_handle *outqh;
shm_mq_handle *inqh;
shm_mq_result res;
Size len;
void *data;
/* A negative loopcount is nonsensical. */
if (loop_count < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("repeat count size must be a non-negative integer")));
/*
* Since this test sends data using the blocking interfaces, it cannot
* send data to itself. Therefore, a minimum of 1 worker is required. Of
* course, a negative worker count is nonsensical.
*/
if (nworkers < 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("number of workers must be a positive integer")));
/* Set up dynamic shared memory segment and background workers. */
test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
/* Send the initial message. */
res = shm_mq_send(outqh, message_size, message_contents, false);
if (res != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not send message")));
/*
* Receive a message and send it back out again. Do this a number of
* times equal to the loop count.
*/
for (;;)
{
/* Receive a message. */
res = shm_mq_receive(inqh, &len, &data, false);
if (res != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not receive message")));
/* If this is supposed to be the last iteration, stop here. */
if (--loop_count <= 0)
break;
/* Send it back out. */
res = shm_mq_send(outqh, len, data, false);
if (res != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not send message")));
}
/*
* Finally, check that we got back the same message from the last
* iteration that we originally sent.
*/
verify_message(message_size, message_contents, len, data);
/* Clean up. */
dsm_detach(seg);
PG_RETURN_VOID();
}
/*
* Pipelined test of the shared memory message queue infrastructure.
*
* As in the basic test, we set up a ring of message queues passing through
* 1 or more background processes and eventually looping back to ourselves.
* Then, we send N copies of the user-specified message through the ring and
* receive them all back. Since this might fill up all message queues in the
* ring and then stall, we must be prepared to begin receiving the messages
* back before we've finished sending them.
*/
Datum
test_shm_mq_pipelined(PG_FUNCTION_ARGS)
{
int64 queue_size = PG_GETARG_INT64(0);
text *message = PG_GETARG_TEXT_PP(1);
char *message_contents = VARDATA_ANY(message);
int message_size = VARSIZE_ANY_EXHDR(message);
int32 loop_count = PG_GETARG_INT32(2);
int32 nworkers = PG_GETARG_INT32(3);
bool verify = PG_GETARG_BOOL(4);
int32 send_count = 0;
int32 receive_count = 0;
dsm_segment *seg;
shm_mq_handle *outqh;
shm_mq_handle *inqh;
shm_mq_result res;
Size len;
void *data;
/* A negative loopcount is nonsensical. */
if (loop_count < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("repeat count size must be a non-negative integer")));
/*
* Using the nonblocking interfaces, we can even send data to ourselves,
* so the minimum number of workers for this test is zero.
*/
if (nworkers < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("number of workers must be a non-negative integer")));
/* Set up dynamic shared memory segment and background workers. */
test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
/* Main loop. */
for (;;)
{
bool wait = true;
/*
* If we haven't yet sent the message the requisite number of times,
* try again to send it now. Note that when shm_mq_send() returns
* SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
* same message size and contents; that's not an issue here because
* we're sending the same message every time.
*/
if (send_count < loop_count)
{
res = shm_mq_send(outqh, message_size, message_contents, true);
if (res == SHM_MQ_SUCCESS)
{
++send_count;
wait = false;
}
else if (res == SHM_MQ_DETACHED)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not send message")));
}
/*
* If we haven't yet received the message the requisite number of
* times, try to receive it again now.
*/
if (receive_count < loop_count)
{
res = shm_mq_receive(inqh, &len, &data, true);
if (res == SHM_MQ_SUCCESS)
{
++receive_count;
/* Verifying every time is slow, so it's optional. */
if (verify)
verify_message(message_size, message_contents, len, data);
wait = false;
}
else if (res == SHM_MQ_DETACHED)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not receive message")));
}
else
{
/*
* Otherwise, we've received the message enough times. This
* shouldn't happen unless we've also sent it enough times.
*/
if (send_count != receive_count)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("message sent %d times, but received %d times",
send_count, receive_count)));
break;
}
if (wait)
{
/*
* If we made no progress, wait for one of the other processes to
* which we are connected to set our latch, indicating that they
* have read or written data and therefore there may now be work
* for us to do.
*/
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
CHECK_FOR_INTERRUPTS();
ResetLatch(&MyProc->procLatch);
}
}
/* Clean up. */
dsm_detach(seg);
PG_RETURN_VOID();
}
/*
* Verify that two messages are the same.
*/
static void
verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
{
Size i;
if (origlen != newlen)
ereport(ERROR,
(errmsg("message corrupted"),
errdetail("The original message was %zu bytes but the final message is %zu bytes.",
origlen, newlen)));
for (i = 0; i < origlen; ++i)
if (origdata[i] != newdata[i])
ereport(ERROR,
(errmsg("message corrupted"),
errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
}

View File

@ -0,0 +1,19 @@
/* src/test/modules/test_shm_mq/test_shm_mq--1.0.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION test_shm_mq" to load this file. \quit
CREATE FUNCTION test_shm_mq(queue_size pg_catalog.int8,
message pg_catalog.text,
repeat_count pg_catalog.int4 default 1,
num_workers pg_catalog.int4 default 1)
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;
CREATE FUNCTION test_shm_mq_pipelined(queue_size pg_catalog.int8,
message pg_catalog.text,
repeat_count pg_catalog.int4 default 1,
num_workers pg_catalog.int4 default 1,
verify pg_catalog.bool default true)
RETURNS pg_catalog.void STRICT
AS 'MODULE_PATHNAME' LANGUAGE C;

View File

@ -0,0 +1,4 @@
comment = 'Test code for shared memory message queues'
default_version = '1.0'
module_pathname = '$libdir/test_shm_mq'
relocatable = true

View File

@ -0,0 +1,45 @@
/*--------------------------------------------------------------------------
*
* test_shm_mq.h
* Definitions for shared memory message queues
*
* Copyright (C) 2013, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/test/modules/test_shm_mq/test_shm_mq.h
*
* -------------------------------------------------------------------------
*/
#ifndef TEST_SHM_MQ_H
#define TEST_SHM_MQ_H
#include "storage/dsm.h"
#include "storage/shm_mq.h"
#include "storage/spin.h"
/* Identifier for shared memory segments used by this extension. */
#define PG_TEST_SHM_MQ_MAGIC 0x79fb2447
/*
* This structure is stored in the dynamic shared memory segment. We use
* it to determine whether all workers started up OK and successfully
* attached to their respective shared message queues.
*/
typedef struct
{
slock_t mutex;
int workers_total;
int workers_attached;
int workers_ready;
} test_shm_mq_header;
/* Set up dynamic shared memory and background workers for test run. */
extern void test_shm_mq_setup(int64 queue_size, int32 nworkers,
dsm_segment **seg, shm_mq_handle **output,
shm_mq_handle **input);
/* Main entrypoint for a worker. */
extern void test_shm_mq_main(Datum) __attribute__((noreturn));
#endif

View File

@ -0,0 +1,224 @@
/*--------------------------------------------------------------------------
*
* worker.c
* Code for sample worker making use of shared memory message queues.
* Our test worker simply reads messages from one message queue and
* writes them back out to another message queue. In a real
* application, you'd presumably want the worker to do some more
* complex calculation rather than simply returning the input,
* but it should be possible to use much of the control logic just
* as presented here.
*
* Copyright (C) 2013-2014, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/test/modules/test_shm_mq/worker.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/procarray.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
#include "utils/resowner.h"
#include "test_shm_mq.h"
static void handle_sigterm(SIGNAL_ARGS);
static void attach_to_queues(dsm_segment *seg, shm_toc *toc,
int myworkernumber, shm_mq_handle **inqhp,
shm_mq_handle **outqhp);
static void copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh);
/*
* Background worker entrypoint.
*
* This is intended to demonstrate how a background worker can be used to
* facilitate a parallel computation. Most of the logic here is fairly
* boilerplate stuff, designed to attach to the shared memory segment,
* notify the user backend that we're alive, and so on. The
* application-specific bits of logic that you'd replace for your own worker
* are attach_to_queues() and copy_messages().
*/
void
test_shm_mq_main(Datum main_arg)
{
dsm_segment *seg;
shm_toc *toc;
shm_mq_handle *inqh;
shm_mq_handle *outqh;
volatile test_shm_mq_header *hdr;
int myworkernumber;
PGPROC *registrant;
/*
* Establish signal handlers.
*
* We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
* it would a normal user backend. To make that happen, we establish a
* signal handler that is a stripped-down version of die(). We don't have
* any equivalent of the backend's command-read loop, where interrupts can
* be processed immediately, so make sure ImmediateInterruptOK is turned
* off.
*/
pqsignal(SIGTERM, handle_sigterm);
ImmediateInterruptOK = false;
BackgroundWorkerUnblockSignals();
/*
* Connect to the dynamic shared memory segment.
*
* The backend that registered this worker passed us the ID of a shared
* memory segment to which we must attach for further instructions. In
* order to attach to dynamic shared memory, we need a resource owner.
* Once we've mapped the segment in our address space, attach to the table
* of contents so we can locate the various data structures we'll need to
* find within the segment.
*/
CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_shm_mq worker");
seg = dsm_attach(DatumGetInt32(main_arg));
if (seg == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("unable to map dynamic shared memory segment")));
toc = shm_toc_attach(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg));
if (toc == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("bad magic number in dynamic shared memory segment")));
/*
* Acquire a worker number.
*
* By convention, the process registering this background worker should
* have stored the control structure at key 0. We look up that key to
* find it. Our worker number gives our identity: there may be just one
* worker involved in this parallel operation, or there may be many.
*/
hdr = shm_toc_lookup(toc, 0);
SpinLockAcquire(&hdr->mutex);
myworkernumber = ++hdr->workers_attached;
SpinLockRelease(&hdr->mutex);
if (myworkernumber > hdr->workers_total)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("too many message queue testing workers already")));
/*
* Attach to the appropriate message queues.
*/
attach_to_queues(seg, toc, myworkernumber, &inqh, &outqh);
/*
* Indicate that we're fully initialized and ready to begin the main part
* of the parallel operation.
*
* Once we signal that we're ready, the user backend is entitled to assume
* that our on_dsm_detach callbacks will fire before we disconnect from
* the shared memory segment and exit. Generally, that means we must have
* attached to all relevant dynamic shared memory data structures by now.
*/
SpinLockAcquire(&hdr->mutex);
++hdr->workers_ready;
SpinLockRelease(&hdr->mutex);
registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
if (registrant == NULL)
{
elog(DEBUG1, "registrant backend has exited prematurely");
proc_exit(1);
}
SetLatch(&registrant->procLatch);
/* Do the work. */
copy_messages(inqh, outqh);
/*
* We're done. Explicitly detach the shared memory segment so that we
* don't get a resource leak warning at commit time. This will fire any
* on_dsm_detach callbacks we've registered, as well. Once that's done,
* we can go ahead and exit.
*/
dsm_detach(seg);
proc_exit(1);
}
/*
* Attach to shared memory message queues.
*
* We use our worker number to determine to which queue we should attach.
* The queues are registered at keys 1..<number-of-workers>. The user backend
* writes to queue #1 and reads from queue #<number-of-workers>; each worker
* reads from the queue whose number is equal to its worker number and writes
* to the next higher-numbered queue.
*/
static void
attach_to_queues(dsm_segment *seg, shm_toc *toc, int myworkernumber,
shm_mq_handle **inqhp, shm_mq_handle **outqhp)
{
shm_mq *inq;
shm_mq *outq;
inq = shm_toc_lookup(toc, myworkernumber);
shm_mq_set_receiver(inq, MyProc);
*inqhp = shm_mq_attach(inq, seg, NULL);
outq = shm_toc_lookup(toc, myworkernumber + 1);
shm_mq_set_sender(outq, MyProc);
*outqhp = shm_mq_attach(outq, seg, NULL);
}
/*
* Loop, receiving and sending messages, until the connection is broken.
*
* This is the "real work" performed by this worker process. Everything that
* happens before this is initialization of one form or another, and everything
* after this point is cleanup.
*/
static void
copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh)
{
Size len;
void *data;
shm_mq_result res;
for (;;)
{
/* Notice any interrupts that have occurred. */
CHECK_FOR_INTERRUPTS();
/* Receive a message. */
res = shm_mq_receive(inqh, &len, &data, false);
if (res != SHM_MQ_SUCCESS)
break;
/* Send it back out. */
res = shm_mq_send(outqh, len, data, false);
if (res != SHM_MQ_SUCCESS)
break;
}
}
/*
* When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
* like a normal backend. The next CHECK_FOR_INTERRUPTS() will do the right
* thing.
*/
static void
handle_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
if (MyProc)
SetLatch(&MyProc->procLatch);
if (!proc_exit_inprogress)
{
InterruptPending = true;
ProcDiePending = true;
}
errno = save_errno;
}

View File

@ -0,0 +1,18 @@
# src/test/modules/worker_spi/Makefile
MODULES = worker_spi
EXTENSION = worker_spi
DATA = worker_spi--1.0.sql
PGFILEDESC = "worker_spi - background worker example"
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = src/test/modules/worker_spi
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

View File

@ -0,0 +1,9 @@
/* src/test/modules/worker_spi/worker_spi--1.0.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION worker_spi" to load this file. \quit
CREATE FUNCTION worker_spi_launch(pg_catalog.int4)
RETURNS pg_catalog.int4 STRICT
AS 'MODULE_PATHNAME'
LANGUAGE C;

View File

@ -0,0 +1,407 @@
/* -------------------------------------------------------------------------
*
* worker_spi.c
* Sample background worker code that demonstrates various coding
* patterns: establishing a database connection; starting and committing
* transactions; using GUC variables, and heeding SIGHUP to reread
* the configuration file; reporting to pg_stat_activity; using the
* process latch to sleep and exit in case of postmaster death.
*
* This code connects to a database, creates a schema and table, and summarizes
* the numbers contained therein. To see it working, insert an initial value
* with "total" type and some initial value; then insert some other rows with
* "delta" type. Delta rows will be deleted by this worker and their values
* aggregated into the total.
*
* Copyright (C) 2013-2014, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/test/modules/worker_spi/worker_spi.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
/* These are always necessary for a bgworker */
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shmem.h"
/* these headers are used by this particular worker's code */
#include "access/xact.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "pgstat.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
#include "tcop/utility.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(worker_spi_launch);
void _PG_init(void);
void worker_spi_main(Datum) __attribute__((noreturn));
/* flags set by signal handlers */
static volatile sig_atomic_t got_sighup = false;
static volatile sig_atomic_t got_sigterm = false;
/* GUC variables */
static int worker_spi_naptime = 10;
static int worker_spi_total_workers = 2;
typedef struct worktable
{
const char *schema;
const char *name;
} worktable;
/*
* Signal handler for SIGTERM
* Set a flag to let the main loop to terminate, and set our latch to wake
* it up.
*/
static void
worker_spi_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
got_sigterm = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}
/*
* Signal handler for SIGHUP
* Set a flag to tell the main loop to reread the config file, and set
* our latch to wake it up.
*/
static void
worker_spi_sighup(SIGNAL_ARGS)
{
int save_errno = errno;
got_sighup = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}
/*
* Initialize workspace for a worker process: create the schema if it doesn't
* already exist.
*/
static void
initialize_worker_spi(worktable *table)
{
int ret;
int ntup;
bool isnull;
StringInfoData buf;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema");
/* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
initStringInfo(&buf);
appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
table->schema);
ret = SPI_execute(buf.data, true, 0);
if (ret != SPI_OK_SELECT)
elog(FATAL, "SPI_execute failed: error code %d", ret);
if (SPI_processed != 1)
elog(FATAL, "not a singleton result");
ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (isnull)
elog(FATAL, "null result");
if (ntup == 0)
{
resetStringInfo(&buf);
appendStringInfo(&buf,
"CREATE SCHEMA \"%s\" "
"CREATE TABLE \"%s\" ("
" type text CHECK (type IN ('total', 'delta')), "
" value integer)"
"CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
"WHERE type = 'total'",
table->schema, table->name, table->name, table->name);
/* set statement start time */
SetCurrentStatementStartTimestamp();
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UTILITY)
elog(FATAL, "failed to create my schema");
}
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
pgstat_report_activity(STATE_IDLE, NULL);
}
void
worker_spi_main(Datum main_arg)
{
int index = DatumGetInt32(main_arg);
worktable *table;
StringInfoData buf;
char name[20];
table = palloc(sizeof(worktable));
sprintf(name, "schema%d", index);
table->schema = pstrdup(name);
table->name = pstrdup("counted");
/* Establish signal handlers before unblocking signals. */
pqsignal(SIGHUP, worker_spi_sighup);
pqsignal(SIGTERM, worker_spi_sigterm);
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
/* Connect to our database */
BackgroundWorkerInitializeConnection("postgres", NULL);
elog(LOG, "%s initialized with %s.%s",
MyBgworkerEntry->bgw_name, table->schema, table->name);
initialize_worker_spi(table);
/*
* Quote identifiers passed to us. Note that this must be done after
* initialize_worker_spi, because that routine assumes the names are not
* quoted.
*
* Note some memory might be leaked here.
*/
table->schema = quote_identifier(table->schema);
table->name = quote_identifier(table->name);
initStringInfo(&buf);
appendStringInfo(&buf,
"WITH deleted AS (DELETE "
"FROM %s.%s "
"WHERE type = 'delta' RETURNING value), "
"total AS (SELECT coalesce(sum(value), 0) as sum "
"FROM deleted) "
"UPDATE %s.%s "
"SET value = %s.value + total.sum "
"FROM total WHERE type = 'total' "
"RETURNING %s.value",
table->schema, table->name,
table->schema, table->name,
table->name,
table->name);
/*
* Main loop: do this until the SIGTERM handler tells us to terminate
*/
while (!got_sigterm)
{
int ret;
int rc;
/*
* Background workers mustn't call usleep() or any direct equivalent:
* instead, they may wait on their process latch, which sleeps as
* necessary, but is awakened if postmaster dies. That way the
* background process goes away immediately in an emergency.
*/
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
worker_spi_naptime * 1000L);
ResetLatch(&MyProc->procLatch);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
/*
* In case of a SIGHUP, just reload the configuration.
*/
if (got_sighup)
{
got_sighup = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* Start a transaction on which we can run queries. Note that each
* StartTransactionCommand() call should be preceded by a
* SetCurrentStatementStartTimestamp() call, which sets both the time
* for the statement we're about the run, and also the transaction
* start time. Also, each other query sent to SPI should probably be
* preceded by SetCurrentStatementStartTimestamp(), so that statement
* start time is always up to date.
*
* The SPI_connect() call lets us run queries through the SPI manager,
* and the PushActiveSnapshot() call creates an "active" snapshot
* which is necessary for queries to have MVCC data to work on.
*
* The pgstat_report_activity() call makes our activity visible
* through the pgstat views.
*/
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, buf.data);
/* We can now execute queries via SPI */
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UPDATE_RETURNING)
elog(FATAL, "cannot select from table %s.%s: error code %d",
table->schema, table->name, ret);
if (SPI_processed > 0)
{
bool isnull;
int32 val;
val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (!isnull)
elog(LOG, "%s: count in %s.%s is now %d",
MyBgworkerEntry->bgw_name,
table->schema, table->name, val);
}
/*
* And finish our transaction.
*/
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
pgstat_report_activity(STATE_IDLE, NULL);
}
proc_exit(1);
}
/*
* Entrypoint of this module.
*
* We register more than one worker process here, to demonstrate how that can
* be done.
*/
void
_PG_init(void)
{
BackgroundWorker worker;
unsigned int i;
/* get the configuration */
DefineCustomIntVariable("worker_spi.naptime",
"Duration between each check (in seconds).",
NULL,
&worker_spi_naptime,
10,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (!process_shared_preload_libraries_in_progress)
return;
DefineCustomIntVariable("worker_spi.total_workers",
"Number of workers.",
NULL,
&worker_spi_total_workers,
2,
1,
100,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
/* set up common data for all our workers */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = worker_spi_main;
worker.bgw_notify_pid = 0;
/*
* Now fill in worker-specific data, and do the actual registrations.
*/
for (i = 1; i <= worker_spi_total_workers; i++)
{
snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
worker.bgw_main_arg = Int32GetDatum(i);
RegisterBackgroundWorker(&worker);
}
}
/*
* Dynamically launch an SPI worker.
*/
Datum
worker_spi_launch(PG_FUNCTION_ARGS)
{
int32 i = PG_GETARG_INT32(0);
BackgroundWorker worker;
BackgroundWorkerHandle *handle;
BgwHandleStatus status;
pid_t pid;
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = NULL; /* new worker might not have library loaded */
sprintf(worker.bgw_library_name, "worker_spi");
sprintf(worker.bgw_function_name, "worker_spi_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
worker.bgw_main_arg = Int32GetDatum(i);
/* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
PG_RETURN_NULL();
status = WaitForBackgroundWorkerStartup(handle, &pid);
if (status == BGWH_STOPPED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not start background process"),
errhint("More details may be available in the server log.")));
if (status == BGWH_POSTMASTER_DIED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("cannot start background processes without postmaster"),
errhint("Kill all remaining database processes and restart the database.")));
Assert(status == BGWH_STARTED);
PG_RETURN_INT32(pid);
}

View File

@ -0,0 +1,5 @@
# worker_spi extension
comment = 'Sample background worker'
default_version = '1.0'
module_pathname = '$libdir/worker_spi'
relocatable = true

View File

@ -101,7 +101,7 @@ installdirs-tests: installdirs
$(MKDIR_P) $(patsubst $(srcdir)/%/,'$(DESTDIR)$(pkglibdir)/regress/%',$(sort $(dir $(regress_data_files))))
# Get some extra C modules from contrib/spi and contrib/dummy_seclabel...
# Get some extra C modules from contrib/spi and src/test/modules/dummy_seclabel...
all: refint$(DLSUFFIX) autoinc$(DLSUFFIX) dummy_seclabel$(DLSUFFIX)
@ -111,22 +111,22 @@ refint$(DLSUFFIX): $(top_builddir)/contrib/spi/refint$(DLSUFFIX)
autoinc$(DLSUFFIX): $(top_builddir)/contrib/spi/autoinc$(DLSUFFIX)
cp $< $@
dummy_seclabel$(DLSUFFIX): $(top_builddir)/contrib/dummy_seclabel/dummy_seclabel$(DLSUFFIX)
dummy_seclabel$(DLSUFFIX): $(top_builddir)/src/test/modules/dummy_seclabel/dummy_seclabel$(DLSUFFIX)
cp $< $@
$(top_builddir)/contrib/spi/refint$(DLSUFFIX): | submake-contrib-spi ;
$(top_builddir)/contrib/spi/autoinc$(DLSUFFIX): | submake-contrib-spi ;
$(top_builddir)/contrib/dummy_seclabel/dummy_seclabel$(DLSUFFIX): | submake-contrib-dummy_seclabel ;
$(top_builddir)/src/test/modules/dummy_seclabel/dummy_seclabel$(DLSUFFIX): | submake-dummy_seclabel ;
submake-contrib-spi:
$(MAKE) -C $(top_builddir)/contrib/spi
submake-contrib-dummy_seclabel:
$(MAKE) -C $(top_builddir)/contrib/dummy_seclabel
submake-dummy_seclabel:
$(MAKE) -C $(top_builddir)/src/test/modules/dummy_seclabel
.PHONY: submake-contrib-spi submake-contrib-dummy_seclabel
.PHONY: submake-contrib-spi submake-dummy_seclabel
# Tablespace setup