mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-29 22:49:41 +03:00 
			
		
		
		
	This includes removing tabs after periods in C comments, which was applied to back branches, so this change should not effect backpatching.
		
			
				
	
	
		
			263 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			263 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*--------------------------------------------------------------------------
 | |
|  *
 | |
|  * test.c
 | |
|  *		Test harness code for shared memory message queues.
 | |
|  *
 | |
|  * Copyright (C) 2013, PostgreSQL Global Development Group
 | |
|  *
 | |
|  * IDENTIFICATION
 | |
|  *		contrib/test_shm_mq/test.c
 | |
|  *
 | |
|  * -------------------------------------------------------------------------
 | |
|  */
 | |
| 
 | |
| #include "postgres.h"
 | |
| 
 | |
| #include "fmgr.h"
 | |
| #include "miscadmin.h"
 | |
| 
 | |
| #include "test_shm_mq.h"
 | |
| 
 | |
| PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(test_shm_mq);
 | |
| PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
 | |
| 
 | |
| void		_PG_init(void);
 | |
| 
 | |
| static void verify_message(Size origlen, char *origdata, Size newlen,
 | |
| 			   char *newdata);
 | |
| 
 | |
| /*
 | |
|  * Simple test of the shared memory message queue infrastructure.
 | |
|  *
 | |
|  * We set up a ring of message queues passing through 1 or more background
 | |
|  * processes and eventually looping back to ourselves.  We then send a message
 | |
|  * through the ring a number of times indicated by the loop count.  At the end,
 | |
|  * we check whether the final message matches the one we started with.
 | |
|  */
 | |
| Datum
 | |
| test_shm_mq(PG_FUNCTION_ARGS)
 | |
| {
 | |
| 	int64		queue_size = PG_GETARG_INT64(0);
 | |
| 	text	   *message = PG_GETARG_TEXT_PP(1);
 | |
| 	char	   *message_contents = VARDATA_ANY(message);
 | |
| 	int			message_size = VARSIZE_ANY_EXHDR(message);
 | |
| 	int32		loop_count = PG_GETARG_INT32(2);
 | |
| 	int32		nworkers = PG_GETARG_INT32(3);
 | |
| 	dsm_segment *seg;
 | |
| 	shm_mq_handle *outqh;
 | |
| 	shm_mq_handle *inqh;
 | |
| 	shm_mq_result res;
 | |
| 	Size		len;
 | |
| 	void	   *data;
 | |
| 
 | |
| 	/* A negative loopcount is nonsensical. */
 | |
| 	if (loop_count < 0)
 | |
| 		ereport(ERROR,
 | |
| 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | |
| 				 errmsg("repeat count size must be a non-negative integer")));
 | |
| 
 | |
| 	/*
 | |
| 	 * Since this test sends data using the blocking interfaces, it cannot
 | |
| 	 * send data to itself.  Therefore, a minimum of 1 worker is required. Of
 | |
| 	 * course, a negative worker count is nonsensical.
 | |
| 	 */
 | |
| 	if (nworkers < 1)
 | |
| 		ereport(ERROR,
 | |
| 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | |
| 				 errmsg("number of workers must be a positive integer")));
 | |
| 
 | |
| 	/* Set up dynamic shared memory segment and background workers. */
 | |
| 	test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
 | |
| 
 | |
| 	/* Send the initial message. */
 | |
| 	res = shm_mq_send(outqh, message_size, message_contents, false);
 | |
| 	if (res != SHM_MQ_SUCCESS)
 | |
| 		ereport(ERROR,
 | |
| 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 | |
| 				 errmsg("could not send message")));
 | |
| 
 | |
| 	/*
 | |
| 	 * Receive a message and send it back out again.  Do this a number of
 | |
| 	 * times equal to the loop count.
 | |
| 	 */
 | |
| 	for (;;)
 | |
| 	{
 | |
| 		/* Receive a message. */
 | |
| 		res = shm_mq_receive(inqh, &len, &data, false);
 | |
| 		if (res != SHM_MQ_SUCCESS)
 | |
| 			ereport(ERROR,
 | |
| 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 | |
| 					 errmsg("could not receive message")));
 | |
| 
 | |
| 		/* If this is supposed to be the last iteration, stop here. */
 | |
| 		if (--loop_count <= 0)
 | |
| 			break;
 | |
| 
 | |
| 		/* Send it back out. */
 | |
| 		res = shm_mq_send(outqh, len, data, false);
 | |
| 		if (res != SHM_MQ_SUCCESS)
 | |
| 			ereport(ERROR,
 | |
| 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 | |
| 					 errmsg("could not send message")));
 | |
| 	}
 | |
| 
 | |
| 	/*
 | |
| 	 * Finally, check that we got back the same message from the last
 | |
| 	 * iteration that we originally sent.
 | |
| 	 */
 | |
| 	verify_message(message_size, message_contents, len, data);
 | |
| 
 | |
| 	/* Clean up. */
 | |
| 	dsm_detach(seg);
 | |
| 
 | |
| 	PG_RETURN_VOID();
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Pipelined test of the shared memory message queue infrastructure.
 | |
|  *
 | |
|  * As in the basic test, we set up a ring of message queues passing through
 | |
|  * 1 or more background processes and eventually looping back to ourselves.
 | |
|  * Then, we send N copies of the user-specified message through the ring and
 | |
|  * receive them all back.  Since this might fill up all message queues in the
 | |
|  * ring and then stall, we must be prepared to begin receiving the messages
 | |
|  * back before we've finished sending them.
 | |
|  */
 | |
| Datum
 | |
| test_shm_mq_pipelined(PG_FUNCTION_ARGS)
 | |
| {
 | |
| 	int64		queue_size = PG_GETARG_INT64(0);
 | |
| 	text	   *message = PG_GETARG_TEXT_PP(1);
 | |
| 	char	   *message_contents = VARDATA_ANY(message);
 | |
| 	int			message_size = VARSIZE_ANY_EXHDR(message);
 | |
| 	int32		loop_count = PG_GETARG_INT32(2);
 | |
| 	int32		nworkers = PG_GETARG_INT32(3);
 | |
| 	bool		verify = PG_GETARG_BOOL(4);
 | |
| 	int32		send_count = 0;
 | |
| 	int32		receive_count = 0;
 | |
| 	dsm_segment *seg;
 | |
| 	shm_mq_handle *outqh;
 | |
| 	shm_mq_handle *inqh;
 | |
| 	shm_mq_result res;
 | |
| 	Size		len;
 | |
| 	void	   *data;
 | |
| 
 | |
| 	/* A negative loopcount is nonsensical. */
 | |
| 	if (loop_count < 0)
 | |
| 		ereport(ERROR,
 | |
| 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | |
| 				 errmsg("repeat count size must be a non-negative integer")));
 | |
| 
 | |
| 	/*
 | |
| 	 * Using the nonblocking interfaces, we can even send data to ourselves,
 | |
| 	 * so the minimum number of workers for this test is zero.
 | |
| 	 */
 | |
| 	if (nworkers < 0)
 | |
| 		ereport(ERROR,
 | |
| 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 | |
| 				 errmsg("number of workers must be a non-negative integer")));
 | |
| 
 | |
| 	/* Set up dynamic shared memory segment and background workers. */
 | |
| 	test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
 | |
| 
 | |
| 	/* Main loop. */
 | |
| 	for (;;)
 | |
| 	{
 | |
| 		bool		wait = true;
 | |
| 
 | |
| 		/*
 | |
| 		 * If we haven't yet sent the message the requisite number of times,
 | |
| 		 * try again to send it now.  Note that when shm_mq_send() returns
 | |
| 		 * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
 | |
| 		 * same message size and contents; that's not an issue here because
 | |
| 		 * we're sending the same message every time.
 | |
| 		 */
 | |
| 		if (send_count < loop_count)
 | |
| 		{
 | |
| 			res = shm_mq_send(outqh, message_size, message_contents, true);
 | |
| 			if (res == SHM_MQ_SUCCESS)
 | |
| 			{
 | |
| 				++send_count;
 | |
| 				wait = false;
 | |
| 			}
 | |
| 			else if (res == SHM_MQ_DETACHED)
 | |
| 				ereport(ERROR,
 | |
| 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 | |
| 						 errmsg("could not send message")));
 | |
| 		}
 | |
| 
 | |
| 		/*
 | |
| 		 * If we haven't yet received the message the requisite number of
 | |
| 		 * times, try to receive it again now.
 | |
| 		 */
 | |
| 		if (receive_count < loop_count)
 | |
| 		{
 | |
| 			res = shm_mq_receive(inqh, &len, &data, true);
 | |
| 			if (res == SHM_MQ_SUCCESS)
 | |
| 			{
 | |
| 				++receive_count;
 | |
| 				/* Verifying every time is slow, so it's optional. */
 | |
| 				if (verify)
 | |
| 					verify_message(message_size, message_contents, len, data);
 | |
| 				wait = false;
 | |
| 			}
 | |
| 			else if (res == SHM_MQ_DETACHED)
 | |
| 				ereport(ERROR,
 | |
| 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 | |
| 						 errmsg("could not receive message")));
 | |
| 		}
 | |
| 		else
 | |
| 		{
 | |
| 			/*
 | |
| 			 * Otherwise, we've received the message enough times.  This
 | |
| 			 * shouldn't happen unless we've also sent it enough times.
 | |
| 			 */
 | |
| 			if (send_count != receive_count)
 | |
| 				ereport(ERROR,
 | |
| 						(errcode(ERRCODE_INTERNAL_ERROR),
 | |
| 					   errmsg("message sent %d times, but received %d times",
 | |
| 							  send_count, receive_count)));
 | |
| 			break;
 | |
| 		}
 | |
| 
 | |
| 		if (wait)
 | |
| 		{
 | |
| 			/*
 | |
| 			 * If we made no progress, wait for one of the other processes to
 | |
| 			 * which we are connected to set our latch, indicating that they
 | |
| 			 * have read or written data and therefore there may now be work
 | |
| 			 * for us to do.
 | |
| 			 */
 | |
| 			WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
 | |
| 			CHECK_FOR_INTERRUPTS();
 | |
| 			ResetLatch(&MyProc->procLatch);
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	/* Clean up. */
 | |
| 	dsm_detach(seg);
 | |
| 
 | |
| 	PG_RETURN_VOID();
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * Verify that two messages are the same.
 | |
|  */
 | |
| static void
 | |
| verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
 | |
| {
 | |
| 	Size		i;
 | |
| 
 | |
| 	if (origlen != newlen)
 | |
| 		ereport(ERROR,
 | |
| 				(errmsg("message corrupted"),
 | |
| 				 errdetail("The original message was %zu bytes but the final message is %zu bytes.",
 | |
| 						   origlen, newlen)));
 | |
| 
 | |
| 	for (i = 0; i < origlen; ++i)
 | |
| 		if (origdata[i] != newdata[i])
 | |
| 			ereport(ERROR,
 | |
| 					(errmsg("message corrupted"),
 | |
| 					 errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
 | |
| }
 |