mirror of
				https://github.com/postgres/postgres.git
				synced 2025-11-03 09:13:20 +03:00 
			
		
		
		
	Use latch instead of select() in walreceiver
Replace use of poll()/select() by WaitLatchOrSocket(), which is more portable and flexible. Also change walreceiver to use its procLatch instead of a custom latch. From: Petr Jelinek <petr@2ndquadrant.com>
This commit is contained in:
		@@ -3338,6 +3338,9 @@ pgstat_get_wait_client(WaitEventClient w)
 | 
			
		||||
		case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
 | 
			
		||||
			event_name = "WalReceiverWaitStart";
 | 
			
		||||
			break;
 | 
			
		||||
		case WAIT_EVENT_LIBPQWALRECEIVER_READ:
 | 
			
		||||
			event_name = "LibPQWalReceiverRead";
 | 
			
		||||
			break;
 | 
			
		||||
		case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
 | 
			
		||||
			event_name = "WalSenderWaitForWAL";
 | 
			
		||||
			break;
 | 
			
		||||
 
 | 
			
		||||
@@ -23,19 +23,11 @@
 | 
			
		||||
#include "pqexpbuffer.h"
 | 
			
		||||
#include "access/xlog.h"
 | 
			
		||||
#include "miscadmin.h"
 | 
			
		||||
#include "pgstat.h"
 | 
			
		||||
#include "replication/walreceiver.h"
 | 
			
		||||
#include "storage/proc.h"
 | 
			
		||||
#include "utils/builtins.h"
 | 
			
		||||
 | 
			
		||||
#ifdef HAVE_POLL_H
 | 
			
		||||
#include <poll.h>
 | 
			
		||||
#endif
 | 
			
		||||
#ifdef HAVE_SYS_POLL_H
 | 
			
		||||
#include <sys/poll.h>
 | 
			
		||||
#endif
 | 
			
		||||
#ifdef HAVE_SYS_SELECT_H
 | 
			
		||||
#include <sys/select.h>
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
PG_MODULE_MAGIC;
 | 
			
		||||
 | 
			
		||||
void		_PG_init(void);
 | 
			
		||||
@@ -59,7 +51,6 @@ static void libpqrcv_send(const char *buffer, int nbytes);
 | 
			
		||||
static void libpqrcv_disconnect(void);
 | 
			
		||||
 | 
			
		||||
/* Prototypes for private functions */
 | 
			
		||||
static bool libpq_select(int timeout_ms);
 | 
			
		||||
static PGresult *libpqrcv_PQexec(const char *query);
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
@@ -366,67 +357,6 @@ libpqrcv_readtimelinehistoryfile(TimeLineID tli,
 | 
			
		||||
	PQclear(res);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Wait until we can read WAL stream, or timeout.
 | 
			
		||||
 *
 | 
			
		||||
 * Returns true if data has become available for reading, false if timed out
 | 
			
		||||
 * or interrupted by signal.
 | 
			
		||||
 *
 | 
			
		||||
 * This is based on pqSocketCheck.
 | 
			
		||||
 */
 | 
			
		||||
static bool
 | 
			
		||||
libpq_select(int timeout_ms)
 | 
			
		||||
{
 | 
			
		||||
	int			ret;
 | 
			
		||||
 | 
			
		||||
	Assert(streamConn != NULL);
 | 
			
		||||
	if (PQsocket(streamConn) < 0)
 | 
			
		||||
		ereport(ERROR,
 | 
			
		||||
				(errcode_for_socket_access(),
 | 
			
		||||
				 errmsg("invalid socket: %s", PQerrorMessage(streamConn))));
 | 
			
		||||
 | 
			
		||||
	/* We use poll(2) if available, otherwise select(2) */
 | 
			
		||||
	{
 | 
			
		||||
#ifdef HAVE_POLL
 | 
			
		||||
		struct pollfd input_fd;
 | 
			
		||||
 | 
			
		||||
		input_fd.fd = PQsocket(streamConn);
 | 
			
		||||
		input_fd.events = POLLIN | POLLERR;
 | 
			
		||||
		input_fd.revents = 0;
 | 
			
		||||
 | 
			
		||||
		ret = poll(&input_fd, 1, timeout_ms);
 | 
			
		||||
#else							/* !HAVE_POLL */
 | 
			
		||||
 | 
			
		||||
		fd_set		input_mask;
 | 
			
		||||
		struct timeval timeout;
 | 
			
		||||
		struct timeval *ptr_timeout;
 | 
			
		||||
 | 
			
		||||
		FD_ZERO(&input_mask);
 | 
			
		||||
		FD_SET(PQsocket(streamConn), &input_mask);
 | 
			
		||||
 | 
			
		||||
		if (timeout_ms < 0)
 | 
			
		||||
			ptr_timeout = NULL;
 | 
			
		||||
		else
 | 
			
		||||
		{
 | 
			
		||||
			timeout.tv_sec = timeout_ms / 1000;
 | 
			
		||||
			timeout.tv_usec = (timeout_ms % 1000) * 1000;
 | 
			
		||||
			ptr_timeout = &timeout;
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		ret = select(PQsocket(streamConn) + 1, &input_mask,
 | 
			
		||||
					 NULL, NULL, ptr_timeout);
 | 
			
		||||
#endif   /* HAVE_POLL */
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if (ret == 0 || (ret < 0 && errno == EINTR))
 | 
			
		||||
		return false;
 | 
			
		||||
	if (ret < 0)
 | 
			
		||||
		ereport(ERROR,
 | 
			
		||||
				(errcode_for_socket_access(),
 | 
			
		||||
				 errmsg("select() failed: %m")));
 | 
			
		||||
	return true;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Send a query and wait for the results by using the asynchronous libpq
 | 
			
		||||
 * functions and the backend version of select().
 | 
			
		||||
@@ -470,14 +400,31 @@ libpqrcv_PQexec(const char *query)
 | 
			
		||||
		 */
 | 
			
		||||
		while (PQisBusy(streamConn))
 | 
			
		||||
		{
 | 
			
		||||
			int			rc;
 | 
			
		||||
 | 
			
		||||
			/*
 | 
			
		||||
			 * We don't need to break down the sleep into smaller increments,
 | 
			
		||||
			 * and check for interrupts after each nap, since we can just
 | 
			
		||||
			 * elog(FATAL) within SIGTERM signal handler if the signal arrives
 | 
			
		||||
			 * in the middle of establishment of replication connection.
 | 
			
		||||
			 * since we'll get interrupted by signals and can either handle
 | 
			
		||||
			 * interrupts here or elog(FATAL) within SIGTERM signal handler if
 | 
			
		||||
			 * the signal arrives in the middle of establishment of
 | 
			
		||||
			 * replication connection.
 | 
			
		||||
			 */
 | 
			
		||||
			if (!libpq_select(-1))
 | 
			
		||||
				continue;		/* interrupted */
 | 
			
		||||
			ResetLatch(&MyProc->procLatch);
 | 
			
		||||
			rc = WaitLatchOrSocket(&MyProc->procLatch,
 | 
			
		||||
								   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
 | 
			
		||||
								   WL_LATCH_SET,
 | 
			
		||||
								   PQsocket(streamConn),
 | 
			
		||||
								   0,
 | 
			
		||||
								   WAIT_EVENT_LIBPQWALRECEIVER_READ);
 | 
			
		||||
			if (rc & WL_POSTMASTER_DEATH)
 | 
			
		||||
				exit(1);
 | 
			
		||||
 | 
			
		||||
			/* interrupted */
 | 
			
		||||
			if (rc & WL_LATCH_SET)
 | 
			
		||||
			{
 | 
			
		||||
				CHECK_FOR_INTERRUPTS();
 | 
			
		||||
				continue;
 | 
			
		||||
			}
 | 
			
		||||
			if (PQconsumeInput(streamConn) == 0)
 | 
			
		||||
				return NULL;	/* trouble */
 | 
			
		||||
		}
 | 
			
		||||
 
 | 
			
		||||
@@ -261,7 +261,7 @@ WalReceiverMain(void)
 | 
			
		||||
	/* Arrange to clean up at walreceiver exit */
 | 
			
		||||
	on_shmem_exit(WalRcvDie, 0);
 | 
			
		||||
 | 
			
		||||
	OwnLatch(&walrcv->latch);
 | 
			
		||||
	walrcv->latch = &MyProc->procLatch;
 | 
			
		||||
 | 
			
		||||
	/* Properly accept or ignore signals the postmaster might send us */
 | 
			
		||||
	pqsignal(SIGHUP, WalRcvSigHupHandler);		/* set flag to read config
 | 
			
		||||
@@ -483,7 +483,7 @@ WalReceiverMain(void)
 | 
			
		||||
				 * avoiding some system calls.
 | 
			
		||||
				 */
 | 
			
		||||
				Assert(wait_fd != PGINVALID_SOCKET);
 | 
			
		||||
				rc = WaitLatchOrSocket(&walrcv->latch,
 | 
			
		||||
				rc = WaitLatchOrSocket(walrcv->latch,
 | 
			
		||||
								   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
 | 
			
		||||
									   WL_TIMEOUT | WL_LATCH_SET,
 | 
			
		||||
									   wait_fd,
 | 
			
		||||
@@ -491,7 +491,7 @@ WalReceiverMain(void)
 | 
			
		||||
									   WAIT_EVENT_WAL_RECEIVER_MAIN);
 | 
			
		||||
				if (rc & WL_LATCH_SET)
 | 
			
		||||
				{
 | 
			
		||||
					ResetLatch(&walrcv->latch);
 | 
			
		||||
					ResetLatch(walrcv->latch);
 | 
			
		||||
					if (walrcv->force_reply)
 | 
			
		||||
					{
 | 
			
		||||
						/*
 | 
			
		||||
@@ -652,7 +652,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 | 
			
		||||
	WakeupRecovery();
 | 
			
		||||
	for (;;)
 | 
			
		||||
	{
 | 
			
		||||
		ResetLatch(&walrcv->latch);
 | 
			
		||||
		ResetLatch(walrcv->latch);
 | 
			
		||||
 | 
			
		||||
		/*
 | 
			
		||||
		 * Emergency bailout if postmaster has died.  This is to avoid the
 | 
			
		||||
@@ -687,7 +687,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
 | 
			
		||||
		}
 | 
			
		||||
		SpinLockRelease(&walrcv->mutex);
 | 
			
		||||
 | 
			
		||||
		WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
 | 
			
		||||
		WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
 | 
			
		||||
				  WAIT_EVENT_WAL_RECEIVER_WAIT_START);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -763,7 +763,7 @@ WalRcvDie(int code, Datum arg)
 | 
			
		||||
	/* Ensure that all WAL records received are flushed to disk */
 | 
			
		||||
	XLogWalRcvFlush(true);
 | 
			
		||||
 | 
			
		||||
	DisownLatch(&walrcv->latch);
 | 
			
		||||
	walrcv->latch = NULL;
 | 
			
		||||
 | 
			
		||||
	SpinLockAcquire(&walrcv->mutex);
 | 
			
		||||
	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
 | 
			
		||||
@@ -812,7 +812,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
 | 
			
		||||
 | 
			
		||||
	got_SIGTERM = true;
 | 
			
		||||
 | 
			
		||||
	SetLatch(&WalRcv->latch);
 | 
			
		||||
	if (WalRcv->latch)
 | 
			
		||||
		SetLatch(WalRcv->latch);
 | 
			
		||||
 | 
			
		||||
	/* Don't joggle the elbow of proc_exit */
 | 
			
		||||
	if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
 | 
			
		||||
@@ -1297,7 +1298,8 @@ void
 | 
			
		||||
WalRcvForceReply(void)
 | 
			
		||||
{
 | 
			
		||||
	WalRcv->force_reply = true;
 | 
			
		||||
	SetLatch(&WalRcv->latch);
 | 
			
		||||
	if (WalRcv->latch)
 | 
			
		||||
		SetLatch(WalRcv->latch);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 
 | 
			
		||||
@@ -64,7 +64,7 @@ WalRcvShmemInit(void)
 | 
			
		||||
		MemSet(WalRcv, 0, WalRcvShmemSize());
 | 
			
		||||
		WalRcv->walRcvState = WALRCV_STOPPED;
 | 
			
		||||
		SpinLockInit(&WalRcv->mutex);
 | 
			
		||||
		InitSharedLatch(&WalRcv->latch);
 | 
			
		||||
		WalRcv->latch = NULL;
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -279,8 +279,8 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 | 
			
		||||
 | 
			
		||||
	if (launch)
 | 
			
		||||
		SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
 | 
			
		||||
	else
 | 
			
		||||
		SetLatch(&walrcv->latch);
 | 
			
		||||
	else if (walrcv->latch)
 | 
			
		||||
		SetLatch(walrcv->latch);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 
 | 
			
		||||
@@ -763,6 +763,7 @@ typedef enum
 | 
			
		||||
	WAIT_EVENT_CLIENT_WRITE,
 | 
			
		||||
	WAIT_EVENT_SSL_OPEN_SERVER,
 | 
			
		||||
	WAIT_EVENT_WAL_RECEIVER_WAIT_START,
 | 
			
		||||
	WAIT_EVENT_LIBPQWALRECEIVER_READ,
 | 
			
		||||
	WAIT_EVENT_WAL_SENDER_WAIT_WAL,
 | 
			
		||||
	WAIT_EVENT_WAL_SENDER_WRITE_DATA
 | 
			
		||||
} WaitEventClient;
 | 
			
		||||
 
 | 
			
		||||
@@ -127,8 +127,9 @@ typedef struct
 | 
			
		||||
	 * where to start streaming (after setting receiveStart and
 | 
			
		||||
	 * receiveStartTLI), and also to tell it to send apply feedback to the
 | 
			
		||||
	 * primary whenever specially marked commit records are applied.
 | 
			
		||||
	 * This is normally mapped to procLatch when walreceiver is running.
 | 
			
		||||
	 */
 | 
			
		||||
	Latch		latch;
 | 
			
		||||
	Latch	   *latch;
 | 
			
		||||
} WalRcvData;
 | 
			
		||||
 | 
			
		||||
extern WalRcvData *WalRcv;
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user