mirror of
				https://github.com/postgres/postgres.git
				synced 2025-10-29 22:49:41 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			762 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Perl
		
	
	
	
	
	
			
		
		
	
	
			762 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Perl
		
	
	
	
	
	
| # -*- perl -*-
 | |
| # RServ.pm
 | |
| # Vadim Mikheev, (c) 2000, PostgreSQL Inc.
 | |
| 
 | |
| package RServ;
 | |
| 
 | |
| require Exporter;
 | |
| @ISA = qw(Exporter);
 | |
| @EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog);
 | |
| @EXPORT_OK = qw();
 | |
| 
 | |
| use Pg;
 | |
| 
 | |
| $debug = 0;
 | |
| $quiet = 1;
 | |
| 
 | |
| my %Mtables = ();
 | |
| my %Stables = ();
 | |
| 
 | |
| sub PrepareSnapshot
 | |
| {
 | |
| 	my ($conn, $outf, $server) = @_; # (@_[0], @_[1], @_[2]);
 | |
| 
 | |
| 	my $result = $conn->exec("BEGIN");
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 	$result = $conn->exec("set transaction isolation level serializable");
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	# MAP oid --> tabname, keyname
 | |
| 	$result = $conn->exec("select pgc.oid, pgc.relname, pga.attname" . 
 | |
| 						  " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" . 
 | |
| 						  " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .
 | |
| 						  " and pga.attnum = rt.key");
 | |
| 	if ($result->resultStatus ne PGRES_TUPLES_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	my @row;
 | |
| 	while (@row = $result->fetchrow)
 | |
| 	{
 | |
| 	#	printf "$row[0], $row[1], $row[2]\n";
 | |
| 		push @{$Mtables{$row[0]}}, $row[1], $row[2];
 | |
| 	}
 | |
| 
 | |
| 	# Read last succeeded sync
 | |
| 	$sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" . 
 | |
| 		" where server = $server and syncid = (select max(syncid) from" . 
 | |
| 			" _RSERV_SYNC_ where server = $server and status > 0)";
 | |
| 
 | |
| 	printf "$sql\n" if $debug;
 | |
| 
 | |
| 	$result = $conn->exec($sql);
 | |
| 	if ($result->resultStatus ne PGRES_TUPLES_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	my @lastsync = $result->fetchrow;
 | |
| 
 | |
| 	my $sinfo = "";
 | |
| 	if ($lastsync[3] ne '')	# sync info
 | |
| 	{
 | |
| 		$sinfo = "and (l.logid >= $lastsync[3]";
 | |
| 		$sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne '';
 | |
| 		$sinfo .= ")";
 | |
| 	}
 | |
| 
 | |
| 	my $havedeal = 0;
 | |
| 
 | |
| 	# DELETED rows
 | |
| 	$sql = "select l.reloid, l.key from _RSERV_LOG_ l" .
 | |
| 		" where l.deleted = 1 $sinfo order by l.reloid";
 | |
| 
 | |
| 	printf "$sql\n" if $debug;
 | |
| 
 | |
| 	$result = $conn->exec($sql);
 | |
| 	if ($result->resultStatus ne PGRES_TUPLES_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	$lastoid = '';
 | |
| 	while (@row = $result->fetchrow)
 | |
| 	{
 | |
| 		next unless exists $Mtables{$row[0]};
 | |
| 		if ($lastoid != $row[0])
 | |
| 		{
 | |
| 			if ($lastoid eq '')
 | |
| 			{
 | |
| 				my $syncid = GetSYNCID($conn, $outf);
 | |
| 				return($syncid) if $syncid < 0;
 | |
| 				$havedeal = 1;
 | |
| 			}
 | |
| 			else
 | |
| 			{
 | |
| 				printf $outf "\\.\n";
 | |
| 			}
 | |
| 			printf $outf "-- DELETE $Mtables{$row[0]}[0]\n";
 | |
| 			$lastoid = $row[0];
 | |
| 		}
 | |
| 		if (! defined $row[1])
 | |
| 		{
 | |
| 			print STDERR "NULL key\n" unless ($quiet);
 | |
| 			$conn->exec("ROLLBACK");
 | |
| 			return(-2);
 | |
| 		}
 | |
| 		printf $outf "%s\n", OutputValue($row[1]);
 | |
| 	}
 | |
| 	printf $outf "\\.\n" if $lastoid ne '';
 | |
| 
 | |
| 	# UPDATED rows
 | |
| 
 | |
| 	my ($taboid, $tabname, $tabkey);
 | |
| 	foreach $taboid (keys %Mtables)
 | |
| 	{
 | |
| 		($tabname, $tabkey) = @{$Mtables{$taboid}};
 | |
| 		my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : '';
 | |
| 		$sql = sprintf "select $oidkey _$tabname.* from _RSERV_LOG_ l," .
 | |
| 			" $tabname _$tabname where l.reloid = $taboid and l.deleted = 0 $sinfo" .
 | |
| 				" and l.key = _$tabname.${tabkey}::text";
 | |
| 
 | |
| 		printf "$sql\n" if $debug;
 | |
| 
 | |
| 		$result = $conn->exec($sql);
 | |
| 		if ($result->resultStatus ne PGRES_TUPLES_OK)
 | |
| 		{
 | |
| 			printf $outf "-- ERROR\n" if $havedeal;
 | |
| 			print STDERR $conn->errorMessage unless ($quiet);
 | |
| 			$conn->exec("ROLLBACK");
 | |
| 			return(-1);
 | |
| 		}
 | |
| 		next if $result->ntuples <= 0;
 | |
| 		if (! $havedeal)
 | |
| 		{
 | |
| 			my $syncid = GetSYNCID($conn, $outf);
 | |
| 			return($syncid) if $syncid < 0;
 | |
| 			$havedeal = 1;
 | |
| 		}
 | |
| 		printf $outf "-- UPDATE $tabname\n";
 | |
| 		while (@row = $result->fetchrow)
 | |
| 		{
 | |
| 			for ($i = 0; $i <= $#row; $i++)
 | |
| 			{
 | |
| 				printf $outf "	" if $i;
 | |
| 				printf $outf "%s", OutputValue($row[$i]);
 | |
| 			}
 | |
| 			printf $outf "\n";
 | |
| 		}
 | |
| 		printf $outf "\\.\n";
 | |
| 	}
 | |
| 
 | |
| 	unless ($havedeal)
 | |
| 	{
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(0);
 | |
| 	}
 | |
| 
 | |
| 	# Remember this snapshot info
 | |
| 	$result = $conn->exec("select _rserv_sync_($server)");
 | |
| 	if ($result->resultStatus ne PGRES_TUPLES_OK)
 | |
| 	{
 | |
| 		printf $outf "-- ERROR\n";
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	$result = $conn->exec("COMMIT");
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		printf $outf "-- ERROR\n";
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 	printf $outf "-- OK\n";
 | |
| 
 | |
| 	return(1);
 | |
| 
 | |
| }
 | |
| 
 | |
| sub OutputValue
 | |
| {
 | |
| 	my ($val) = @_; # @_[0];
 | |
| 
 | |
| 	return("\\N") unless defined $val;
 | |
| 
 | |
| 	$val =~ s/\\/\\\\/g;
 | |
| 	$val =~ s/	/\\011/g;
 | |
| 	$val =~ s/\n/\\012/g;
 | |
| 	$val =~ s/\'/\\047/g;
 | |
| 
 | |
| 	return($val);
 | |
| }
 | |
| 
 | |
| # Get syncid for new snapshot
 | |
| sub GetSYNCID
 | |
| {
 | |
| 	my ($conn, $outf) = @_; # (@_[0], @_[1]);
 | |
| 
 | |
| 	my $result = $conn->exec("select nextval('_rserv_sync_seq_')");
 | |
| 	if ($result->resultStatus ne PGRES_TUPLES_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	my @row = $result->fetchrow;
 | |
| 
 | |
| 	printf $outf "-- SYNCID $row[0]\n";
 | |
| 	return($row[0]);
 | |
| }
 | |
| 
 | |
| 
 | |
| sub CleanLog
 | |
| {
 | |
| 	my ($conn, $howold) = @_; # (@_[0], @_[1]);
 | |
| 
 | |
| 	my $result = $conn->exec("BEGIN");
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" .
 | |
| 		" where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" .
 | |
| 			" where rs2.server = rs.server and rs2.status > 0) order by rs.maxid";
 | |
| 
 | |
| 	printf "$sql\n" if $debug;
 | |
| 
 | |
| 	$result = $conn->exec($sql);
 | |
| 	if ($result->resultStatus ne PGRES_TUPLES_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		return(-1);
 | |
| 	}
 | |
| 	my $maxid = '';
 | |
| 	my %active = ();
 | |
| 	while (my @row = $result->fetchrow)
 | |
| 	{
 | |
| 		$maxid = $row[0] if $maxid eq '';
 | |
| 		last if $row[0] > $maxid;
 | |
| 		my @ids = split(/[ 	]+,[ 	]+/, $row[1]);
 | |
| 		foreach $aid (@ids)
 | |
| 		{
 | |
| 			$active{$aid} = 1 unless exists $active{$aid};
 | |
| 		}
 | |
| 	}
 | |
| 	if ($maxid eq '')
 | |
| 	{
 | |
| 		print STDERR "No Sync IDs\n" unless ($quiet);
 | |
| 		return(0);
 | |
| 	}
 | |
| 	my $alist = join(',', keys %active);
 | |
| 	my $sinfo = "logid < $maxid";
 | |
| 	$sinfo .= " and logid not in ($alist)" if $alist ne '';
 | |
| 	
 | |
| 	$sql = "delete from _RSERV_LOG_ where " . 
 | |
| 		"logtime < now() - '$howold second'::interval and $sinfo";
 | |
| 
 | |
| 	printf "$sql\n" if $debug;
 | |
| 
 | |
| 	$result = $conn->exec($sql);
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 	$maxid = $result->cmdTuples;
 | |
| 
 | |
| 	$result = $conn->exec("COMMIT");
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	return($maxid);
 | |
| }
 | |
| 
 | |
| sub ApplySnapshot
 | |
| {
 | |
| 	my ($conn, $inpf) = @_; # (@_[0], @_[1]);
 | |
| 
 | |
| 	my $result = $conn->exec("BEGIN");
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	$result = $conn->exec("SET CONSTRAINTS ALL DEFERRED");
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	# MAP name --> oid, keyname, keynum
 | |
| 	my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" . 
 | |
| 		" from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" . 
 | |
| 			" where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" .
 | |
| 				" and pga.attnum = rt.key";
 | |
| 	$result = $conn->exec($sql);
 | |
| 	if ($result->resultStatus ne PGRES_TUPLES_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	while (@row = $result->fetchrow)
 | |
| 	{
 | |
| 	#	printf "	%s	%s\n", $row[1], $row[0];
 | |
| 		push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3];
 | |
| 	}
 | |
| 
 | |
| 	my $ok = 0;
 | |
| 	my $syncid = '';
 | |
| 	while(<$inpf>)
 | |
| 	{
 | |
| 		$_ =~ s/\n//;
 | |
| 		my ($cmt, $cmd, $prm) = split (/[ 	]+/, $_, 3);
 | |
| 		if ($cmt ne '--')
 | |
| 		{
 | |
| 			printf STDERR "Invalid format\n" unless ($quiet);
 | |
| 			$conn->exec("ROLLBACK");
 | |
| 			return(-2);
 | |
| 		}
 | |
| 		if ($cmd eq 'DELETE')
 | |
| 		{
 | |
| 			if ($syncid eq '')
 | |
| 			{
 | |
| 				printf STDERR "Sync ID unspecified\n" unless ($quiet);
 | |
| 				$conn->exec("ROLLBACK");
 | |
| 				return(-2);
 | |
| 			}
 | |
| 			$result = DoDelete($conn, $inpf, $prm);
 | |
| 			if ($result)
 | |
| 			{
 | |
| 				$conn->exec("ROLLBACK");
 | |
| 				return($result);
 | |
| 			}
 | |
| 		}
 | |
| 		elsif ($cmd eq 'UPDATE')
 | |
| 		{
 | |
| 			if ($syncid eq '')
 | |
| 			{
 | |
| 				printf STDERR "Sync ID unspecified\n" unless ($quiet);
 | |
| 				$conn->exec("ROLLBACK");
 | |
| 				return(-2);
 | |
| 			}
 | |
| 			$result = DoUpdate($conn, $inpf, $prm);
 | |
| 			if ($result)
 | |
| 			{
 | |
| 				$conn->exec("ROLLBACK");
 | |
| 				return($result);
 | |
| 			}
 | |
| 		}
 | |
| 		elsif ($cmd eq 'SYNCID')
 | |
| 		{
 | |
| 			if ($syncid ne '')
 | |
| 			{
 | |
| 				printf STDERR "Second Sync ID ?!\n" unless ($quiet);
 | |
| 				$conn->exec("ROLLBACK");
 | |
| 				return(-2);
 | |
| 			}
 | |
| 			if ($prm !~ /^\d+$/)
 | |
| 			{
 | |
| 				printf STDERR "Invalid Sync ID $prm\n" unless ($quiet);
 | |
| 				$conn->exec("ROLLBACK");
 | |
| 				return(-2);
 | |
| 			}
 | |
| 			$syncid = $prm;
 | |
| 
 | |
| 			printf STDERR "Sync ID $syncid\n" unless ($quiet);
 | |
| 
 | |
| 			$result = $conn->exec("select syncid, synctime from " . 
 | |
| 								  "_RSERV_SLAVE_SYNC_ where syncid = " . 
 | |
| 								  "(select max(syncid) from _RSERV_SLAVE_SYNC_)");
 | |
| 			if ($result->resultStatus ne PGRES_TUPLES_OK)
 | |
| 			{
 | |
| 				print STDERR $conn->errorMessage unless ($quiet);
 | |
| 				$conn->exec("ROLLBACK");
 | |
| 				return(-1);
 | |
| 			}
 | |
| 			my @row = $result->fetchrow;
 | |
| 			if (! defined $row[0])
 | |
| 			{
 | |
| 				$result = $conn->exec("insert into" .
 | |
| 									  " _RSERV_SLAVE_SYNC_(syncid, synctime) values ($syncid, now())");
 | |
| 			}
 | |
| 			elsif ($row[0] >= $prm)
 | |
| 			{
 | |
| 				printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet);
 | |
| 				$conn->exec("ROLLBACK");
 | |
| 				return(0);
 | |
| 			}
 | |
| 			else
 | |
| 			{
 | |
| 				$result = $conn->exec("update _RSERV_SLAVE_SYNC_" .
 | |
| 									  " set syncid = $syncid, synctime = now()");
 | |
| 			}
 | |
| 			if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 			{
 | |
| 				print STDERR $conn->errorMessage unless ($quiet);
 | |
| 				$conn->exec("ROLLBACK");
 | |
| 				return(-1);
 | |
| 			}
 | |
| 		}
 | |
| 		elsif ($cmd eq 'OK')
 | |
| 		{
 | |
| 			$ok = 1;
 | |
| 			last;
 | |
| 		}
 | |
| 		elsif ($cmd eq 'ERROR')
 | |
| 		{
 | |
| 			printf STDERR "ERROR signaled\n" unless ($quiet);
 | |
| 			$conn->exec("ROLLBACK");
 | |
| 			return(-2);
 | |
| 		}
 | |
| 		else
 | |
| 		{
 | |
| 			printf STDERR "Unknown command $cmd\n" unless ($quiet);
 | |
| 			$conn->exec("ROLLBACK");
 | |
| 			return(-2);
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if (! $ok)
 | |
| 	{
 | |
| 		printf STDERR "No OK flag in input\n" unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-2);
 | |
| 	}
 | |
| 
 | |
| 	$result = $conn->exec("COMMIT");
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	return(1);
 | |
| }
 | |
| 
 | |
| sub DoDelete
 | |
| {
 | |
| 	my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
 | |
| 
 | |
| 	my $ok = 0;
 | |
| 	while(<$inpf>)
 | |
| 	{
 | |
| 		if ($_ !~ /\n$/)
 | |
| 		{
 | |
| 			printf STDERR "Invalid format\n" unless ($quiet);
 | |
| 			return(-2);
 | |
| 		}
 | |
| 		my $key = $_;
 | |
| 		$key =~ s/\n//;
 | |
| 		if ($key eq '\.')
 | |
| 		{
 | |
| 			$ok = 1;
 | |
| 			last;
 | |
| 		}
 | |
| 
 | |
| 		my $sql = "delete from $tabname where $Stables{$tabname}->[1] = '$key'";
 | |
| 
 | |
| 		printf "$sql\n" if $debug;
 | |
| 
 | |
| 		my $result = $conn->exec($sql);
 | |
| 		if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 		{
 | |
| 			print STDERR $conn->errorMessage unless ($quiet);
 | |
| 			return(-1);
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if (! $ok)
 | |
| 	{
 | |
| 		printf STDERR "No end of input in DELETE section\n" unless ($quiet);
 | |
| 		return(-2);
 | |
| 	}
 | |
| 
 | |
| 	return(0);
 | |
| }
 | |
| 
 | |
| 
 | |
| sub DoUpdate
 | |
| {
 | |
| 	my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]);
 | |
| 	my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0;
 | |
| 
 | |
| 	my @CopyBuf = ();
 | |
| 	my $CBufLen = 0;
 | |
| 	my $CBufMax = 16 * 1024 * 1024;	# max size of buf for copy
 | |
| 
 | |
| 	my $sql = "select attnum, attname from pg_attribute" .
 | |
| 		" where attrelid = $Stables{$tabname}->[0] and attnum > 0";
 | |
| 
 | |
| 	my $result = $conn->exec($sql);
 | |
| 	if ($result->resultStatus ne PGRES_TUPLES_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	my @anames = ();
 | |
| 	while (@row = $result->fetchrow)
 | |
| 	{
 | |
| 		$anames[$row[0]] = $row[1];
 | |
| 	}
 | |
| 
 | |
| 	my $istring;
 | |
| 	my $ok = 0;
 | |
| 	while(<$inpf>)
 | |
| 	{
 | |
| 		if ($_ !~ /\n$/)
 | |
| 		{
 | |
| 			printf STDERR "Invalid format\n" unless ($quiet);
 | |
| 			return(-2);
 | |
| 		}
 | |
| 		$istring = $_;
 | |
| 		$istring =~ s/\n//;
 | |
| 		if ($istring eq '\.')
 | |
| 		{
 | |
| 			$ok = 1;
 | |
| 			last;
 | |
| 		}
 | |
| 		my @vals = split(/	/, $istring);
 | |
| 		if ($oidkey)
 | |
| 		{
 | |
| 			if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0)
 | |
| 			{
 | |
| 				printf STDERR "Invalid OID\n" unless ($quiet);
 | |
| 				return(-2);
 | |
| 			}
 | |
| 			$oidkey = $vals[0];
 | |
| 		}
 | |
| 		else
 | |
| 		{
 | |
| 			unshift @vals, '';
 | |
| 		}
 | |
| 
 | |
| 		$sql = "update $tabname set ";
 | |
| 		my $ocnt = 0;
 | |
| 		for (my $i = 1; $i <= $#anames; $i++)
 | |
| 		{
 | |
| 			if ($vals[$i] eq '\N')
 | |
| 			{
 | |
| 				if ($i == $Stables{$tabname}->[2])
 | |
| 				{
 | |
| 					printf STDERR "NULL key\n" unless ($quiet);
 | |
| 					return(-2);
 | |
| 				}
 | |
| 				$vals[$i] = 'null';
 | |
| 			}
 | |
| 			else
 | |
| 			{
 | |
| 				$vals[$i] = "'" . $vals[$i] . "'";
 | |
| 				next if $i == $Stables{$tabname}->[2];
 | |
| 			}
 | |
| 			$ocnt++;
 | |
| 			$sql .= ', ' if $ocnt > 1;
 | |
| 			$sql .= "$anames[$i] = $vals[$i]";
 | |
| 		}
 | |
| 		if ($oidkey)
 | |
| 		{
 | |
| 			$sql .= " where $Stables{$tabname}->[1] = $oidkey";
 | |
| 		}
 | |
| 		else
 | |
| 		{
 | |
| 			$sql .= " where $Stables{$tabname}->[1] = $vals[$Stables{$tabname}->[2]]";
 | |
| 		}
 | |
| 
 | |
| 		printf "$sql\n" if $debug;
 | |
| 
 | |
| 		$result = $conn->exec($sql);
 | |
| 		if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 		{
 | |
| 			print STDERR $conn->errorMessage unless ($quiet);
 | |
| 			return(-1);
 | |
| 		}
 | |
| 		next if $result->cmdTuples == 1;	# updated
 | |
| 
 | |
| 		if ($result->cmdTuples > 1)
 | |
| 		{
 | |
| 			printf STDERR "Duplicate keys\n" unless ($quiet);
 | |
| 			return(-2);
 | |
| 		}
 | |
| 
 | |
| 		# no key - copy
 | |
| 		push @CopyBuf, "$istring\n";
 | |
| 		$CBufLen += length($istring);
 | |
| 
 | |
| 		if ($CBufLen >= $CBufMax)
 | |
| 		{
 | |
| 			$result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
 | |
| 			return($result) if $result;
 | |
| 			@CopyBuf = ();
 | |
| 			$CBufLen = 0;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if (! $ok)
 | |
| 	{
 | |
| 		printf STDERR "No end of input in UPDATE section\n" unless ($quiet);
 | |
| 		return(-2);
 | |
| 	}
 | |
| 
 | |
| 	if ($CBufLen)
 | |
| 	{
 | |
| 		$result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf);
 | |
| 		return($result) if $result;
 | |
| 	}
 | |
| 
 | |
| 	return(0);
 | |
| }
 | |
| 
 | |
| 
 | |
| sub DoCopy
 | |
| {
 | |
| 	my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]);
 | |
| 
 | |
| 	my $sql = "COPY $tabname " . (($withoids) ? "WITH OIDS " : '') . 
 | |
| "FROM STDIN";
 | |
| 	my $result = $conn->exec($sql);
 | |
| 	if ($result->resultStatus ne PGRES_COPY_IN)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	foreach $str (@{$CBuf})
 | |
| 	{
 | |
| 		$conn->putline($str);
 | |
| 	}
 | |
| 
 | |
| 	$conn->putline("\\.\n");
 | |
| 
 | |
| 	if ($conn->endcopy)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	return(0);
 | |
| 
 | |
| }
 | |
| 
 | |
| 
 | |
| #
 | |
| # Returns last SyncID applied on Slave
 | |
| #
 | |
| sub GetSyncID
 | |
| {
 | |
| 	my ($conn) = @_; # (@_[0]);
 | |
| 	
 | |
| 	my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_");
 | |
| 	if ($result->resultStatus ne PGRES_TUPLES_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		return(-1);
 | |
| 	}
 | |
| 	my @row = $result->fetchrow;
 | |
| 	return(undef) unless defined $row[0];	# null
 | |
| 	return($row[0]);
 | |
| }
 | |
| 
 | |
| #
 | |
| # Updates _RSERV_SYNC_ on Master with Slave SyncID
 | |
| #
 | |
| sub SyncSyncID
 | |
| {
 | |
| 	my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]);
 | |
| 	
 | |
| 	my $result = $conn->exec("BEGIN");
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	$result = $conn->exec("select synctime, status from _RSERV_SYNC_" .
 | |
| 						  " where server = $server and syncid = $syncid" .
 | |
| 						  " for update");
 | |
| 	if ($result->resultStatus ne PGRES_TUPLES_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 	my @row = $result->fetchrow;
 | |
| 	if (! defined $row[0])
 | |
| 	{
 | |
| 		printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(0);
 | |
| 	}
 | |
| 	if ($row[1] > 0)
 | |
| 	{
 | |
| 		printf STDERR "SyncID $syncid for server $server already updated\n" unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(0);
 | |
| 	}
 | |
| 	$result = $conn->exec("update _RSERV_SYNC_" .
 | |
| 						  " set synctime = now(), status = 1" .
 | |
| 						  " where server = $server and syncid = $syncid");
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 	$result = $conn->exec("delete from _RSERV_SYNC_" .
 | |
| 						  " where server = $server and syncid < $syncid");
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	$result = $conn->exec("COMMIT");
 | |
| 	if ($result->resultStatus ne PGRES_COMMAND_OK)
 | |
| 	{
 | |
| 		print STDERR $conn->errorMessage unless ($quiet);
 | |
| 		$conn->exec("ROLLBACK");
 | |
| 		return(-1);
 | |
| 	}
 | |
| 
 | |
| 	return(1);
 | |
| }
 | |
| 
 | |
| 1;
 |