mirror of
https://github.com/postgres/postgres.git
synced 2025-08-30 06:01:21 +03:00
Move some pg_dump function around.
Move functions used only by pg_dump and pg_restore from dumputils.c to a new file, pg_backup_utils.c. dumputils.c is linked into psql and some programs in bin/scripts, so it seems good to keep it slim. The parallel functionality is moved to parallel.c, as is exit_horribly, because the interesting code in exit_horribly is parallel-related. This refactoring gets rid of the on_exit_msg_func function pointer. It was problematic, because a modern gcc version with -Wmissing-format-attribute complained if it wasn't marked with PF_PRINTF_ATTRIBUTE, but the ancient gcc version that Tom Lane's old HP-UX box has didn't accept that attribute on a function pointer, and gave an error. We still use a similar function pointer trick for getLocalPQBuffer() function, to use a thread-local version of that in parallel mode on Windows, but that dodges the problem because it doesn't take printf-like arguments.
This commit is contained in:
@@ -16,9 +16,9 @@
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "pg_backup_db.h"
|
||||
#include "postgres_fe.h"
|
||||
|
||||
#include "dumputils.h"
|
||||
#include "pg_backup_utils.h"
|
||||
#include "parallel.h"
|
||||
|
||||
#ifndef WIN32
|
||||
@@ -78,10 +78,6 @@ static const char *modulename = gettext_noop("parallel archiver");
|
||||
|
||||
static ParallelSlot *GetMyPSlot(ParallelState *pstate);
|
||||
static void
|
||||
parallel_exit_msg_func(const char *modulename,
|
||||
const char *fmt, va_list ap)
|
||||
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 0)));
|
||||
static void
|
||||
parallel_msg_master(ParallelSlot *slot, const char *modulename,
|
||||
const char *fmt, va_list ap)
|
||||
__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0)));
|
||||
@@ -112,6 +108,47 @@ static char *readMessageFromPipe(int fd);
|
||||
#define messageEquals(msg, pattern) \
|
||||
(strcmp(msg, pattern) == 0)
|
||||
|
||||
#ifdef WIN32
|
||||
static void shutdown_parallel_dump_utils(int code, void *unused);
|
||||
bool parallel_init_done = false;
|
||||
static DWORD tls_index;
|
||||
DWORD mainThreadId;
|
||||
#endif
|
||||
|
||||
|
||||
#ifdef WIN32
|
||||
static void
|
||||
shutdown_parallel_dump_utils(int code, void *unused)
|
||||
{
|
||||
/* Call the cleanup function only from the main thread */
|
||||
if (mainThreadId == GetCurrentThreadId())
|
||||
WSACleanup();
|
||||
}
|
||||
#endif
|
||||
|
||||
void
|
||||
init_parallel_dump_utils(void)
|
||||
{
|
||||
#ifdef WIN32
|
||||
if (!parallel_init_done)
|
||||
{
|
||||
WSADATA wsaData;
|
||||
int err;
|
||||
|
||||
tls_index = TlsAlloc();
|
||||
mainThreadId = GetCurrentThreadId();
|
||||
err = WSAStartup(MAKEWORD(2, 2), &wsaData);
|
||||
if (err != 0)
|
||||
{
|
||||
fprintf(stderr, _("WSAStartup failed: %d\n"), err);
|
||||
exit_nicely(1);
|
||||
}
|
||||
on_exit_nicely(shutdown_parallel_dump_utils, NULL);
|
||||
parallel_init_done = true;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static ParallelSlot *
|
||||
GetMyPSlot(ParallelState *pstate)
|
||||
{
|
||||
@@ -129,29 +166,44 @@ GetMyPSlot(ParallelState *pstate)
|
||||
}
|
||||
|
||||
/*
|
||||
* This is the function that will be called from exit_horribly() to print the
|
||||
* error message. If the worker process does exit_horribly(), we forward its
|
||||
* Fail and die, with a message to stderr. Parameters as for write_msg.
|
||||
*
|
||||
* This is defined in parallel.c, because in parallel mode, things are more
|
||||
* complicated. If the worker process does exit_horribly(), we forward its
|
||||
* last words to the master process. The master process then does
|
||||
* exit_horribly() with this error message itself and prints it normally.
|
||||
* After printing the message, exit_horribly() on the master will shut down
|
||||
* the remaining worker processes.
|
||||
*/
|
||||
static void
|
||||
parallel_exit_msg_func(const char *modulename, const char *fmt, va_list ap)
|
||||
void
|
||||
exit_horribly(const char *modulename, const char *fmt,...)
|
||||
{
|
||||
va_list ap;
|
||||
ParallelState *pstate = shutdown_info.pstate;
|
||||
ParallelSlot *slot;
|
||||
|
||||
Assert(pstate);
|
||||
va_start(ap, fmt);
|
||||
|
||||
slot = GetMyPSlot(pstate);
|
||||
|
||||
if (!slot)
|
||||
/* We're the parent, just write the message out */
|
||||
if (pstate == NULL)
|
||||
{
|
||||
/* Not in parallel mode, just write to stderr */
|
||||
vwrite_msg(modulename, fmt, ap);
|
||||
}
|
||||
else
|
||||
/* If we're a worker process, send the msg to the master process */
|
||||
parallel_msg_master(slot, modulename, fmt, ap);
|
||||
{
|
||||
slot = GetMyPSlot(pstate);
|
||||
|
||||
if (!slot)
|
||||
/* We're the parent, just write the message out */
|
||||
vwrite_msg(modulename, fmt, ap);
|
||||
else
|
||||
/* If we're a worker process, send the msg to the master process */
|
||||
parallel_msg_master(slot, modulename, fmt, ap);
|
||||
}
|
||||
|
||||
va_end(ap);
|
||||
|
||||
exit_nicely(1);
|
||||
}
|
||||
|
||||
/* Sends the error message from the worker to the master process */
|
||||
@@ -172,6 +224,54 @@ parallel_msg_master(ParallelSlot *slot, const char *modulename,
|
||||
sendMessageToMaster(pipefd, buf);
|
||||
}
|
||||
|
||||
/*
|
||||
* A thread-local version of getLocalPQExpBuffer().
|
||||
*
|
||||
* Non-reentrant but reduces memory leakage. (On Windows the memory leakage
|
||||
* will be one buffer per thread, which is at least better than one per call).
|
||||
*/
|
||||
static PQExpBuffer
|
||||
getThreadLocalPQExpBuffer(void)
|
||||
{
|
||||
/*
|
||||
* The Tls code goes awry if we use a static var, so we provide for both
|
||||
* static and auto, and omit any use of the static var when using Tls.
|
||||
*/
|
||||
static PQExpBuffer s_id_return = NULL;
|
||||
PQExpBuffer id_return;
|
||||
|
||||
#ifdef WIN32
|
||||
if (parallel_init_done)
|
||||
id_return = (PQExpBuffer) TlsGetValue(tls_index); /* 0 when not set */
|
||||
else
|
||||
id_return = s_id_return;
|
||||
#else
|
||||
id_return = s_id_return;
|
||||
#endif
|
||||
|
||||
if (id_return) /* first time through? */
|
||||
{
|
||||
/* same buffer, just wipe contents */
|
||||
resetPQExpBuffer(id_return);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* new buffer */
|
||||
id_return = createPQExpBuffer();
|
||||
#ifdef WIN32
|
||||
if (parallel_init_done)
|
||||
TlsSetValue(tls_index, id_return);
|
||||
else
|
||||
s_id_return = id_return;
|
||||
#else
|
||||
s_id_return = id_return;
|
||||
#endif
|
||||
|
||||
}
|
||||
|
||||
return id_return;
|
||||
}
|
||||
|
||||
/*
|
||||
* pg_dump and pg_restore register the Archive pointer for the exit handler
|
||||
* (called from exit_horribly). This function mainly exists so that we can
|
||||
@@ -408,7 +508,7 @@ ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
|
||||
* set and falls back to AHX otherwise.
|
||||
*/
|
||||
shutdown_info.pstate = pstate;
|
||||
on_exit_msg_func = parallel_exit_msg_func;
|
||||
getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
|
||||
|
||||
#ifdef WIN32
|
||||
tMasterThreadId = GetCurrentThreadId();
|
||||
|
Reference in New Issue
Block a user