mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-31 10:30:33 +03:00 
			
		
		
		
	This includes removing tabs after periods in C comments, which was applied to back branches, so this change should not effect backpatching.
		
			
				
	
	
		
			358 lines
		
	
	
		
			8.6 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			358 lines
		
	
	
		
			8.6 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*
 | |
|  *	parallel.c
 | |
|  *
 | |
|  *	multi-process support
 | |
|  *
 | |
|  *	Copyright (c) 2010-2014, PostgreSQL Global Development Group
 | |
|  *	contrib/pg_upgrade/parallel.c
 | |
|  */
 | |
| 
 | |
| #include "postgres_fe.h"
 | |
| 
 | |
| #include "pg_upgrade.h"
 | |
| 
 | |
| #include <stdlib.h>
 | |
| #include <string.h>
 | |
| #include <sys/types.h>
 | |
| #include <sys/wait.h>
 | |
| 
 | |
| #ifdef WIN32
 | |
| #include <io.h>
 | |
| #endif
 | |
| 
 | |
| static int	parallel_jobs;
 | |
| 
 | |
| #ifdef WIN32
 | |
| /*
 | |
|  *	Array holding all active threads.  There can't be any gaps/zeros so
 | |
|  *	it can be passed to WaitForMultipleObjects().  We use two arrays
 | |
|  *	so the thread_handles array can be passed to WaitForMultipleObjects().
 | |
|  */
 | |
| HANDLE	   *thread_handles;
 | |
| 
 | |
| typedef struct
 | |
| {
 | |
| 	char	   *log_file;
 | |
| 	char	   *opt_log_file;
 | |
| 	char	   *cmd;
 | |
| } exec_thread_arg;
 | |
| 
 | |
| typedef struct
 | |
| {
 | |
| 	DbInfoArr  *old_db_arr;
 | |
| 	DbInfoArr  *new_db_arr;
 | |
| 	char	   *old_pgdata;
 | |
| 	char	   *new_pgdata;
 | |
| 	char	   *old_tablespace;
 | |
| } transfer_thread_arg;
 | |
| 
 | |
| exec_thread_arg **exec_thread_args;
 | |
| transfer_thread_arg **transfer_thread_args;
 | |
| 
 | |
| /* track current thread_args struct so reap_child() can be used for all cases */
 | |
| void	  **cur_thread_args;
 | |
| 
 | |
| DWORD		win32_exec_prog(exec_thread_arg *args);
 | |
| DWORD		win32_transfer_all_new_dbs(transfer_thread_arg *args);
 | |
| #endif
 | |
| 
 | |
| /*
 | |
|  *	parallel_exec_prog
 | |
|  *
 | |
|  *	This has the same API as exec_prog, except it does parallel execution,
 | |
|  *	and therefore must throw errors and doesn't return an error status.
 | |
|  */
 | |
| void
 | |
| parallel_exec_prog(const char *log_file, const char *opt_log_file,
 | |
| 				   const char *fmt,...)
 | |
| {
 | |
| 	va_list		args;
 | |
| 	char		cmd[MAX_STRING];
 | |
| 
 | |
| #ifndef WIN32
 | |
| 	pid_t		child;
 | |
| #else
 | |
| 	HANDLE		child;
 | |
| 	exec_thread_arg *new_arg;
 | |
| #endif
 | |
| 
 | |
| 	va_start(args, fmt);
 | |
| 	vsnprintf(cmd, sizeof(cmd), fmt, args);
 | |
| 	va_end(args);
 | |
| 
 | |
| 	if (user_opts.jobs <= 1)
 | |
| 		/* throw_error must be true to allow jobs */
 | |
| 		exec_prog(log_file, opt_log_file, true, "%s", cmd);
 | |
| 	else
 | |
| 	{
 | |
| 		/* parallel */
 | |
| #ifdef WIN32
 | |
| 		if (thread_handles == NULL)
 | |
| 			thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
 | |
| 
 | |
| 		if (exec_thread_args == NULL)
 | |
| 		{
 | |
| 			int			i;
 | |
| 
 | |
| 			exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
 | |
| 
 | |
| 			/*
 | |
| 			 * For safety and performance, we keep the args allocated during
 | |
| 			 * the entire life of the process, and we don't free the args in a
 | |
| 			 * thread different from the one that allocated it.
 | |
| 			 */
 | |
| 			for (i = 0; i < user_opts.jobs; i++)
 | |
| 				exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
 | |
| 		}
 | |
| 
 | |
| 		cur_thread_args = (void **) exec_thread_args;
 | |
| #endif
 | |
| 		/* harvest any dead children */
 | |
| 		while (reap_child(false) == true)
 | |
| 			;
 | |
| 
 | |
| 		/* must we wait for a dead child? */
 | |
| 		if (parallel_jobs >= user_opts.jobs)
 | |
| 			reap_child(true);
 | |
| 
 | |
| 		/* set this before we start the job */
 | |
| 		parallel_jobs++;
 | |
| 
 | |
| 		/* Ensure stdio state is quiesced before forking */
 | |
| 		fflush(NULL);
 | |
| 
 | |
| #ifndef WIN32
 | |
| 		child = fork();
 | |
| 		if (child == 0)
 | |
| 			/* use _exit to skip atexit() functions */
 | |
| 			_exit(!exec_prog(log_file, opt_log_file, true, "%s", cmd));
 | |
| 		else if (child < 0)
 | |
| 			/* fork failed */
 | |
| 			pg_fatal("could not create worker process: %s\n", strerror(errno));
 | |
| #else
 | |
| 		/* empty array element are always at the end */
 | |
| 		new_arg = exec_thread_args[parallel_jobs - 1];
 | |
| 
 | |
| 		/* Can only pass one pointer into the function, so use a struct */
 | |
| 		if (new_arg->log_file)
 | |
| 			pg_free(new_arg->log_file);
 | |
| 		new_arg->log_file = pg_strdup(log_file);
 | |
| 		if (new_arg->opt_log_file)
 | |
| 			pg_free(new_arg->opt_log_file);
 | |
| 		new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
 | |
| 		if (new_arg->cmd)
 | |
| 			pg_free(new_arg->cmd);
 | |
| 		new_arg->cmd = pg_strdup(cmd);
 | |
| 
 | |
| 		child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
 | |
| 										new_arg, 0, NULL);
 | |
| 		if (child == 0)
 | |
| 			pg_fatal("could not create worker thread: %s\n", strerror(errno));
 | |
| 
 | |
| 		thread_handles[parallel_jobs - 1] = child;
 | |
| #endif
 | |
| 	}
 | |
| 
 | |
| 	return;
 | |
| }
 | |
| 
 | |
| 
 | |
| #ifdef WIN32
 | |
| DWORD
 | |
| win32_exec_prog(exec_thread_arg *args)
 | |
| {
 | |
| 	int			ret;
 | |
| 
 | |
| 	ret = !exec_prog(args->log_file, args->opt_log_file, true, "%s", args->cmd);
 | |
| 
 | |
| 	/* terminates thread */
 | |
| 	return ret;
 | |
| }
 | |
| #endif
 | |
| 
 | |
| 
 | |
| /*
 | |
|  *	parallel_transfer_all_new_dbs
 | |
|  *
 | |
|  *	This has the same API as transfer_all_new_dbs, except it does parallel execution
 | |
|  *	by transfering multiple tablespaces in parallel
 | |
|  */
 | |
| void
 | |
| parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
 | |
| 							  char *old_pgdata, char *new_pgdata,
 | |
| 							  char *old_tablespace)
 | |
| {
 | |
| #ifndef WIN32
 | |
| 	pid_t		child;
 | |
| #else
 | |
| 	HANDLE		child;
 | |
| 	transfer_thread_arg *new_arg;
 | |
| #endif
 | |
| 
 | |
| 	if (user_opts.jobs <= 1)
 | |
| 		/* throw_error must be true to allow jobs */
 | |
| 		transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
 | |
| 	else
 | |
| 	{
 | |
| 		/* parallel */
 | |
| #ifdef WIN32
 | |
| 		if (thread_handles == NULL)
 | |
| 			thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
 | |
| 
 | |
| 		if (transfer_thread_args == NULL)
 | |
| 		{
 | |
| 			int			i;
 | |
| 
 | |
| 			transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
 | |
| 
 | |
| 			/*
 | |
| 			 * For safety and performance, we keep the args allocated during
 | |
| 			 * the entire life of the process, and we don't free the args in a
 | |
| 			 * thread different from the one that allocated it.
 | |
| 			 */
 | |
| 			for (i = 0; i < user_opts.jobs; i++)
 | |
| 				transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
 | |
| 		}
 | |
| 
 | |
| 		cur_thread_args = (void **) transfer_thread_args;
 | |
| #endif
 | |
| 		/* harvest any dead children */
 | |
| 		while (reap_child(false) == true)
 | |
| 			;
 | |
| 
 | |
| 		/* must we wait for a dead child? */
 | |
| 		if (parallel_jobs >= user_opts.jobs)
 | |
| 			reap_child(true);
 | |
| 
 | |
| 		/* set this before we start the job */
 | |
| 		parallel_jobs++;
 | |
| 
 | |
| 		/* Ensure stdio state is quiesced before forking */
 | |
| 		fflush(NULL);
 | |
| 
 | |
| #ifndef WIN32
 | |
| 		child = fork();
 | |
| 		if (child == 0)
 | |
| 		{
 | |
| 			transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
 | |
| 								 old_tablespace);
 | |
| 			/* if we take another exit path, it will be non-zero */
 | |
| 			/* use _exit to skip atexit() functions */
 | |
| 			_exit(0);
 | |
| 		}
 | |
| 		else if (child < 0)
 | |
| 			/* fork failed */
 | |
| 			pg_fatal("could not create worker process: %s\n", strerror(errno));
 | |
| #else
 | |
| 		/* empty array element are always at the end */
 | |
| 		new_arg = transfer_thread_args[parallel_jobs - 1];
 | |
| 
 | |
| 		/* Can only pass one pointer into the function, so use a struct */
 | |
| 		new_arg->old_db_arr = old_db_arr;
 | |
| 		new_arg->new_db_arr = new_db_arr;
 | |
| 		if (new_arg->old_pgdata)
 | |
| 			pg_free(new_arg->old_pgdata);
 | |
| 		new_arg->old_pgdata = pg_strdup(old_pgdata);
 | |
| 		if (new_arg->new_pgdata)
 | |
| 			pg_free(new_arg->new_pgdata);
 | |
| 		new_arg->new_pgdata = pg_strdup(new_pgdata);
 | |
| 		if (new_arg->old_tablespace)
 | |
| 			pg_free(new_arg->old_tablespace);
 | |
| 		new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
 | |
| 
 | |
| 		child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
 | |
| 										new_arg, 0, NULL);
 | |
| 		if (child == 0)
 | |
| 			pg_fatal("could not create worker thread: %s\n", strerror(errno));
 | |
| 
 | |
| 		thread_handles[parallel_jobs - 1] = child;
 | |
| #endif
 | |
| 	}
 | |
| 
 | |
| 	return;
 | |
| }
 | |
| 
 | |
| 
 | |
| #ifdef WIN32
 | |
| DWORD
 | |
| win32_transfer_all_new_dbs(transfer_thread_arg *args)
 | |
| {
 | |
| 	transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
 | |
| 						 args->new_pgdata, args->old_tablespace);
 | |
| 
 | |
| 	/* terminates thread */
 | |
| 	return 0;
 | |
| }
 | |
| #endif
 | |
| 
 | |
| 
 | |
| /*
 | |
|  *	collect status from a completed worker child
 | |
|  */
 | |
| bool
 | |
| reap_child(bool wait_for_child)
 | |
| {
 | |
| #ifndef WIN32
 | |
| 	int			work_status;
 | |
| 	int			ret;
 | |
| #else
 | |
| 	int			thread_num;
 | |
| 	DWORD		res;
 | |
| #endif
 | |
| 
 | |
| 	if (user_opts.jobs <= 1 || parallel_jobs == 0)
 | |
| 		return false;
 | |
| 
 | |
| #ifndef WIN32
 | |
| 	ret = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
 | |
| 
 | |
| 	/* no children or, for WNOHANG, no dead children */
 | |
| 	if (ret <= 0 || !WIFEXITED(work_status))
 | |
| 		return false;
 | |
| 
 | |
| 	if (WEXITSTATUS(work_status) != 0)
 | |
| 		pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
 | |
| #else
 | |
| 	/* wait for one to finish */
 | |
| 	thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
 | |
| 										false, wait_for_child ? INFINITE : 0);
 | |
| 
 | |
| 	if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
 | |
| 		return false;
 | |
| 
 | |
| 	/* compute thread index in active_threads */
 | |
| 	thread_num -= WAIT_OBJECT_0;
 | |
| 
 | |
| 	/* get the result */
 | |
| 	GetExitCodeThread(thread_handles[thread_num], &res);
 | |
| 	if (res != 0)
 | |
| 		pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
 | |
| 
 | |
| 	/* dispose of handle to stop leaks */
 | |
| 	CloseHandle(thread_handles[thread_num]);
 | |
| 
 | |
| 	/* Move last slot into dead child's position */
 | |
| 	if (thread_num != parallel_jobs - 1)
 | |
| 	{
 | |
| 		void	   *tmp_args;
 | |
| 
 | |
| 		thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
 | |
| 
 | |
| 		/*
 | |
| 		 * Move last active thead arg struct into the now-dead slot, and the
 | |
| 		 * now-dead slot to the end for reuse by the next thread. Though the
 | |
| 		 * thread struct is in use by another thread, we can safely swap the
 | |
| 		 * struct pointers within the array.
 | |
| 		 */
 | |
| 		tmp_args = cur_thread_args[thread_num];
 | |
| 		cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
 | |
| 		cur_thread_args[parallel_jobs - 1] = tmp_args;
 | |
| 	}
 | |
| #endif
 | |
| 
 | |
| 	/* do this after job has been removed */
 | |
| 	parallel_jobs--;
 | |
| 
 | |
| 	return true;
 | |
| }
 |