diff --git a/doc/xml/release.xml b/doc/xml/release.xml index 35b448bc9..63c5ef5f0 100644 --- a/doc/xml/release.xml +++ b/doc/xml/release.xml @@ -13,6 +13,10 @@ + +

The archive-push command is now partially coded in C which allows the archive_command to run significantly faster when processing status messages from the asynchronous archive process.

+
+ diff --git a/lib/pgBackRest/Archive/Push/Async.pm b/lib/pgBackRest/Archive/Push/Async.pm index 68364952b..e22f9674b 100644 --- a/lib/pgBackRest/Archive/Push/Async.pm +++ b/lib/pgBackRest/Archive/Push/Async.pm @@ -286,8 +286,8 @@ sub processQueue my $iCode = exceptionCode($EVAL_ERROR); my $strMessage = exceptionMessage($EVAL_ERROR); - # Error all ready jobs - foreach my $strWalFile (@{$self->readyList()}) + # Error all queued jobs + foreach my $strWalFile (@{$stryWalFile}) { $self->walStatusWrite( WAL_STATUS_ERROR, $strWalFile, $iCode, $strMessage); diff --git a/lib/pgBackRest/Archive/Push/Push.pm b/lib/pgBackRest/Archive/Push/Push.pm index f8a3f4dea..faa332ddf 100644 --- a/lib/pgBackRest/Archive/Push/Push.pm +++ b/lib/pgBackRest/Archive/Push/Push.pm @@ -71,44 +71,13 @@ sub process my $strWalPath = dirname(walPath($strWalPathFile, cfgOption(CFGOPT_DB_PATH, false), cfgCommandName(cfgCommandGet()))); my $strWalFile = basename($strWalPathFile); - # Is the async client or server? - my $bClient = true; - # Start the async process and wait for WAL to complete if (cfgOption(CFGOPT_ARCHIVE_ASYNC)) { - # Get the spool path - $self->{strSpoolPath} = storageSpool()->pathGet(STORAGE_SPOOL_ARCHIVE_OUT); - - # Loop to check for status files and launch async process - my $bPushed = false; - my $oWait = waitInit(cfgOption(CFGOPT_ARCHIVE_TIMEOUT)); - $self->{bConfessOnError} = false; - - do - { - # Check WAL status - $bPushed = $self->walStatus($self->{strSpoolPath}, $strWalFile, $self->{bConfessOnError}); - - # If not found then launch async process - if (!$bPushed) - { - # Load module dynamically - require pgBackRest::Archive::Push::Async; - $bClient = (new pgBackRest::Archive::Push::Async( - $strWalPath, $self->{strSpoolPath}, $self->{strBackRestBin}))->process(); - } - - $self->{bConfessOnError} = true; - } - while ($bClient && !$bPushed && waitMore($oWait)); - - if (!$bPushed && $bClient) - { - confess &log(ERROR, - "unable to push WAL ${strWalFile} asynchronously after " . cfgOption(CFGOPT_ARCHIVE_TIMEOUT) . " second(s)", - ERROR_ARCHIVE_TIMEOUT); - } + # Load module dynamically + require pgBackRest::Archive::Push::Async; + (new pgBackRest::Archive::Push::Async( + $strWalPath, storageSpool()->pathGet(STORAGE_SPOOL_ARCHIVE_OUT), $self->{strBackRestBin}))->process(); } # Else push synchronously else @@ -129,15 +98,10 @@ sub process else { archivePushFile($strWalPath, $strWalFile, cfgOption(CFGOPT_COMPRESS), cfgOption(CFGOPT_COMPRESS_LEVEL)); + &log(INFO, "pushed WAL segment ${strWalFile}"); } } - # Only print the message if this is the async client or the WAL file was pushed synchronously - if ($bClient) - { - &log(INFO, "pushed WAL segment ${strWalFile}" . (cfgOption(CFGOPT_ARCHIVE_ASYNC) ? ' asynchronously' : '')); - } - # Return from function and log return values if any return logDebugReturn ( @@ -146,106 +110,6 @@ sub process ); } -#################################################################################################################################### -# walStatus -# -# Read a WAL status file and return success or raise a warning or error. -#################################################################################################################################### -sub walStatus -{ - my $self = shift; - - # Assign function parameters, defaults, and log debug info - my - ( - $strOperation, - $strSpoolPath, - $strWalFile, - $bConfessOnError, - ) = - logDebugParam - ( - __PACKAGE__ . '->walStatus', \@_, - {name => 'strSpoolPath'}, - {name => 'strWalFile'}, - {name => 'bConfessOnError', default => true}, - ); - - # Default result is false - my $bResult = false; - - # Find matching status files - my @stryStatusFile = storageSpool()->list( - $strSpoolPath, {strExpression => '^' . $strWalFile . '\.(ok|error)$', bIgnoreMissing => true}); - - if (@stryStatusFile > 0) - { - # If more than one status file was found then assert - this could be a bug in the async process - if (@stryStatusFile > 1) - { - confess &log(ASSERT, - "multiple status files found in ${strSpoolPath} for ${strWalFile}: " . join(', ', @stryStatusFile)); - } - - # Read the status file - my $rstrWalStatus = storageSpool()->get("${strSpoolPath}/$stryStatusFile[0]"); - my @stryWalStatus = split("\n", defined($$rstrWalStatus) ? $$rstrWalStatus : ''); - - # Status file must have at least two lines if it has content - my $iCode; - my $strMessage; - - # Parse status content - if (@stryWalStatus != 0) - { - if (@stryWalStatus < 2) - { - confess &log(ASSERT, "$stryStatusFile[0] content must have at least two lines:\n" . join("\n", @stryWalStatus)); - } - - $iCode = shift(@stryWalStatus); - $strMessage = join("\n", @stryWalStatus); - } - - # Process ok files - if ($stryStatusFile[0] =~ /\.ok$/) - { - # If there is content in the status file it is a warning - if (@stryWalStatus != 0) - { - # If error code is not success, then this was a renamed .error file - if ($iCode != 0) - { - $strMessage = - "WAL segment ${strWalFile} was not pushed due to error and was manually skipped:\n" . $strMessage; - } - - &log(WARN, $strMessage); - } - - $bResult = true; - } - # Process error files - elsif ($bConfessOnError) - { - # Error files must have content - if (@stryWalStatus == 0) - { - confess &log(ASSERT, "$stryStatusFile[0] has no content"); - } - - confess &log(ERROR, $strMessage, $iCode); - } - } - - # Return from function and log return values if any - return logDebugReturn - ( - $strOperation, - {name => 'bResult', value => $bResult} - ); -} - #################################################################################################################################### # readyList # diff --git a/src/Makefile b/src/Makefile index 5f9a52466..f7b87b534 100644 --- a/src/Makefile +++ b/src/Makefile @@ -3,6 +3,7 @@ CFLAGS=-I. -Wfatal-errors -Wall -Wextra -Wwrite-strings -Wno-clobbered -std=c99 DESTDIR= pgbackrest: \ + command/archive/push/push.o \ command/command.o \ common/error.o \ common/errorType.o \ @@ -29,6 +30,7 @@ pgbackrest: \ storage/storage.o \ main.o $(CC) $(CFLAGS) -o pgbackrest \ + command/archive/push/push.o \ command/command.o \ common/error.o \ common/errorType.o \ diff --git a/src/command/archive/push/push.c b/src/command/archive/push/push.c new file mode 100644 index 000000000..d823a50e4 --- /dev/null +++ b/src/command/archive/push/push.c @@ -0,0 +1,186 @@ +/*********************************************************************************************************************************** +Archive Push Command +***********************************************************************************************************************************/ +#include +#include +#include +#include +#include +#include + +#include "common/error.h" +#include "common/log.h" +#include "common/memContext.h" +#include "common/regExp.h" +#include "common/type.h" +#include "common/wait.h" +#include "config/config.h" +#include "perl/exec.h" +#include "storage/helper.h" + +/*********************************************************************************************************************************** +Check for ok/error status files in the spool out directory +***********************************************************************************************************************************/ +static bool +walStatus(const String *walSegment, bool confessOnError) +{ + bool result = false; + + MEM_CONTEXT_TEMP_BEGIN() + { + StringList *fileList = storageList( + storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_OUT), strNewFmt("^%s\\.(ok|error)$", strPtr(walSegment)), true); + + if (fileList != NULL && strLstSize(fileList) > 0) + { + // If more than one status file was found then assert - this could be a bug in the async process + if (strLstSize(fileList) != 1) + { + THROW( + AssertError, "multiple status files found in '%s' for WAL segment '%s'", + strPtr(storagePath(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_OUT))), strPtr(walSegment)); + } + + // Get the status file content + const String *statusFile = strLstGet(fileList, 0); + + String *content = strNewBuf( + storageGet(storageSpool(), strNewFmt("%s/%s", STORAGE_SPOOL_ARCHIVE_OUT, strPtr(statusFile)), false)); + + // Get the code and message if the file has content + int code = 0; + const String *message = NULL; + + if (strSize(content) != 0) + { + // Find the line feed after the error code -- should be the first one + const char *linefeedPtr = strchr(strPtr(content), '\n'); + + // Error if linefeed not found + if (linefeedPtr == NULL) + THROW(FormatError, "%s content must have at least two lines", strPtr(statusFile)); + + // Error if message is zero-length + if (strlen(linefeedPtr + 1) == 0) + THROW(FormatError, "%s message must be > 0", strPtr(statusFile)); + + // Get contents + code = varIntForce(varNewStr(strNewN(strPtr(content), linefeedPtr - strPtr(content)))); + message = strTrim(strNew(linefeedPtr + 1)); + } + + // Process OK files + if (strEndsWithZ(statusFile, ".ok")) + { + // If there is content in the status file it is a warning + if (strSize(content) != 0) + { + // If error code is not success, then this was a renamed .error file + if (code != 0) + { + message = strNewFmt( + "WAL segment '%s' was not pushed due to error [%d] and was manually skipped: %s", strPtr(walSegment), + code, strPtr(message)); + } + + LOG_WARN(strPtr(message)); + } + + result = true; + } + else if (confessOnError) + { + // Error status files must have content + if (strSize(content) == 0) + THROW(AssertError, "status file '%s' has no content", strPtr(statusFile)); + + // Throw error using the code passed in the file + THROW_CODE(code, strPtr(message)); + } + } + } + MEM_CONTEXT_TEMP_END(); + + return result; +} + +/*********************************************************************************************************************************** +Push a WAL segment to the repository +***********************************************************************************************************************************/ +void +cmdArchivePush() +{ + MEM_CONTEXT_TEMP_BEGIN() + { + // Make sure there is a parameter to retrieve the WAL segment from + const StringList *commandParam = cfgCommandParam(); + + if (strLstSize(commandParam) != 1) + THROW(ParamRequiredError, "WAL segment to push required"); + + // Get the segment name + String *walSegment = strBase(strLstGet(commandParam, 0)); + + if (cfgOptionBool(cfgOptArchiveAsync)) + { + bool pushed = false; // Has the WAL segment been pushed yet? + bool confessOnError = false; // Should we confess errors? + + // Loop and wait for the WAL segment to be pushed + Wait *wait = waitNew(cfgOptionDbl(cfgOptArchiveTimeout)); + + do + { + // Check if the WAL segment has been pushed. Errors will not be confessed on the first try to allow the async + // process a chance to fix them. + pushed = walStatus(walSegment, confessOnError); + + // If the WAL segment has not already been pushed then start the async process to push it + if (!pushed) + { + // Only want to see warnings and errors from async process + cfgOptionSet(cfgOptLogLevelConsole, cfgSourceParam, varNewStrZ("warn")); + + // Async process is currently implemented in Perl + int processId = 0; + + if ((processId = fork()) == 0) + { + perlExec(perlCommand()); + } + // Wait for async process to exit (this should happen quickly) and report any errors + else + { + int processStatus; + + THROW_ON_SYS_ERROR( + waitpid(processId, &processStatus, 0) != processId, AssertError, "unable to find perl child process"); + + if (WEXITSTATUS(processStatus) != 0) + THROW(AssertError, "perl exited with error %d", WEXITSTATUS(processStatus)); + } + } + + // Now that the async process has been launched, confess any errors that are found + confessOnError = true; + } + while (!pushed && waitMore(wait)); + + waitFree(wait); + + // If the WAL segment was not pushed then error + if (!pushed) + { + THROW( + ArchiveTimeoutError, "unable to push WAL segment '%s' asynchronously after %lg second(s)", strPtr(walSegment), + cfgOptionDbl(cfgOptArchiveTimeout)); + } + + // Log success + LOG_INFO("pushed WAL segment %s asynchronously", strPtr(walSegment)); + } + else + THROW(AssertError, "archive-push in C does not support synchronous mode"); + } + MEM_CONTEXT_TEMP_END(); +} diff --git a/src/command/archive/push/push.h b/src/command/archive/push/push.h new file mode 100644 index 000000000..c88ba722b --- /dev/null +++ b/src/command/archive/push/push.h @@ -0,0 +1,12 @@ +/*********************************************************************************************************************************** +Archive Push Command +***********************************************************************************************************************************/ +#ifndef COMMAND_ARCHIVE_PUSH_H +#define COMMAND_ARCHIVE_PUSH_H + +/*********************************************************************************************************************************** +Functions +***********************************************************************************************************************************/ +void cmdArchivePush(); + +#endif diff --git a/src/common/errorType.c b/src/common/errorType.c index 5423d69eb..16421aa41 100644 --- a/src/common/errorType.c +++ b/src/common/errorType.c @@ -37,9 +37,12 @@ ERROR_DEFINE(ERROR_CODE_MIN + 07, OptionInvalidValueError, RuntimeError); ERROR_DEFINE(ERROR_CODE_MIN + 12, OptionRequiredError, RuntimeError); ERROR_DEFINE(ERROR_CODE_MIN + 16, FileOpenError, RuntimeError); ERROR_DEFINE(ERROR_CODE_MIN + 17, FileReadError, RuntimeError); +ERROR_DEFINE(ERROR_CODE_MIN + 18, ParamRequiredError, RuntimeError); +ERROR_DEFINE(ERROR_CODE_MIN + 19, ArchiveMismatchError, RuntimeError); ERROR_DEFINE(ERROR_CODE_MIN + 23, CommandInvalidError, FormatError); ERROR_DEFINE(ERROR_CODE_MIN + 28, PathOpenError, RuntimeError); ERROR_DEFINE(ERROR_CODE_MIN + 39, FileWriteError, RuntimeError); +ERROR_DEFINE(ERROR_CODE_MIN + 57, ArchiveTimeoutError, RuntimeError); ERROR_DEFINE(ERROR_CODE_MIN + 69, MemoryError, RuntimeError); ERROR_DEFINE(ERROR_CODE_MIN + 70, CipherError, FormatError); @@ -59,9 +62,12 @@ static const ErrorType *errorTypeList[] = &OptionRequiredError, &FileOpenError, &FileReadError, + &ParamRequiredError, + &ArchiveMismatchError, &CommandInvalidError, &PathOpenError, &FileWriteError, + &ArchiveTimeoutError, &MemoryError, &CipherError, diff --git a/src/common/errorType.h b/src/common/errorType.h index 0b7057994..18414bfd6 100644 --- a/src/common/errorType.h +++ b/src/common/errorType.h @@ -23,9 +23,12 @@ ERROR_DECLARE(OptionInvalidValueError); ERROR_DECLARE(OptionRequiredError); ERROR_DECLARE(FileOpenError); ERROR_DECLARE(FileReadError); +ERROR_DECLARE(ParamRequiredError); +ERROR_DECLARE(ArchiveMismatchError); ERROR_DECLARE(CommandInvalidError); ERROR_DECLARE(PathOpenError); ERROR_DECLARE(FileWriteError); +ERROR_DECLARE(ArchiveTimeoutError); ERROR_DECLARE(MemoryError); ERROR_DECLARE(CipherError); diff --git a/src/main.c b/src/main.c index 6267192e6..67b8427fc 100644 --- a/src/main.c +++ b/src/main.c @@ -4,6 +4,8 @@ Main #include #include +#include "command/archive/push/push.h" +#include "command/command.h" #include "common/error.h" #include "common/exit.h" #include "config/config.h" @@ -28,6 +30,15 @@ int main(int argListSize, const char *argList[]) exit(0); } + // Archive push command. Currently only implements to local operations of async archive push. + // ------------------------------------------------------------------------------------------------------------------------- + if (cfgCommand() == cfgCmdArchivePush && cfgOptionBool(cfgOptArchiveAsync)) + { + cmdBegin(); + cmdArchivePush(); + exit(exitSafe(false)); + } + // Execute Perl for commands not implemented in C // ------------------------------------------------------------------------------------------------------------------------- perlExec(perlCommand()); diff --git a/test/lib/pgBackRestTest/Common/DefineTest.pm b/test/lib/pgBackRestTest/Common/DefineTest.pm index 5266b009f..d6037a7fb 100644 --- a/test/lib/pgBackRestTest/Common/DefineTest.pm +++ b/test/lib/pgBackRestTest/Common/DefineTest.pm @@ -691,6 +691,16 @@ my $oTestDef = 'Protocol/Local/Minion' => TESTDEF_COVERAGE_PARTIAL, }, }, + { + &TESTDEF_NAME => 'push', + &TESTDEF_TOTAL => 2, + &TESTDEF_C => true, + + &TESTDEF_COVERAGE => + { + 'command/archive/push/push' => TESTDEF_COVERAGE_FULL, + }, + }, { &TESTDEF_NAME => 'stop', &TESTDEF_TOTAL => 7, diff --git a/test/lib/pgBackRestTest/Module/Archive/ArchivePushPerlTest.pm b/test/lib/pgBackRestTest/Module/Archive/ArchivePushPerlTest.pm index d9e390417..d9f5b6082 100644 --- a/test/lib/pgBackRestTest/Module/Archive/ArchivePushPerlTest.pm +++ b/test/lib/pgBackRestTest/Module/Archive/ArchivePushPerlTest.pm @@ -22,6 +22,7 @@ use pgBackRest::Archive::Push::File; use pgBackRest::Common::Exception; use pgBackRest::Common::Lock; use pgBackRest::Common::Log; +use pgBackRest::Common::Wait; use pgBackRest::Config::Config; use pgBackRest::DbVersion; use pgBackRest::Protocol::Helper; @@ -323,7 +324,7 @@ sub run } ################################################################################################################################ - if ($self->begin("ArchivePushAsync->walStatusWrite() & ArchivePush->walStatus()")) + if ($self->begin("ArchivePushAsync->walStatusWrite()")) { my $oPush = new pgBackRest::Archive::Push::Push(); @@ -338,32 +339,13 @@ sub run #--------------------------------------------------------------------------------------------------------------------------- my $strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++); - $self->testResult(sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment)}, 0, "${strSegment} WAL no status"); - - #--------------------------------------------------------------------------------------------------------------------------- # Generate a normal ok $oPushAsync->walStatusWrite(WAL_STATUS_OK, $strSegment); - # Check status - $self->testResult(sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment)}, 1, "${strSegment} WAL ok"); - - #--------------------------------------------------------------------------------------------------------------------------- - # Generate a bogus warning ok (if content is present there must be two lines) - $strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++); - storageTest()->put("$self->{strSpoolPath}/${strSegment}.ok", "Test Warning"); - - # Check status - $self->testException( - sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment)}, ERROR_ASSERT, - "${strSegment}.ok content must have at least two lines:\nTest Warning"); - #--------------------------------------------------------------------------------------------------------------------------- # Generate a valid warning ok $oPushAsync->walStatusWrite(WAL_STATUS_OK, $strSegment, 0, 'Test Warning'); - # Check status - $self->testResult(sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment)}, 1, "${strSegment} WAL warning ok"); - #--------------------------------------------------------------------------------------------------------------------------- # Generate an invalid error $self->testException( @@ -376,53 +358,10 @@ sub run sub {$oPushAsync->walStatusWrite(WAL_STATUS_ERROR, $strSegment, ERROR_ASSERT)}, ERROR_ASSERT, "strMessage must be set when iCode is set"); - #--------------------------------------------------------------------------------------------------------------------------- - # Generate an invalid error - storageTest()->put("$self->{strSpoolPath}/${strSegment}.error"); - - # Check status (will error because there are now two status files) - $self->testException( - sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment);}, ERROR_ASSERT, - "multiple status files found in " . $self->testPath() . "/repo/archive/db/out for ${strSegment}:" . - " ${strSegment}.error, ${strSegment}.ok"); - - #--------------------------------------------------------------------------------------------------------------------------- - # Remove the ok file - storageTest()->remove("$self->{strSpoolPath}/${strSegment}.ok"); - - # Check status - $self->testException( - sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment);}, ERROR_ASSERT, "${strSegment}.error has no content"); - #--------------------------------------------------------------------------------------------------------------------------- # Generate a valid error $oPushAsync->walStatusWrite( WAL_STATUS_ERROR, $strSegment, ERROR_ARCHIVE_DUPLICATE, "WAL segment ${strSegment} already exists in the archive"); - - # Check status - $self->testException(sub { - $oPush->walStatus($self->{strSpoolPath}, $strSegment)}, ERROR_ARCHIVE_DUPLICATE, - "WAL segment ${strSegment} already exists in the archive"); - - #--------------------------------------------------------------------------------------------------------------------------- - # Change the error file to an ok file - storageTest()->move("$self->{strSpoolPath}/${strSegment}.error", "$self->{strSpoolPath}/${strSegment}.ok"); - - # Check status - $self->testResult( - sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment);}, 1, - "${strSegment} WAL warning ok (converted from .error)"); - - #--------------------------------------------------------------------------------------------------------------------------- - # Generate a normal ok - $strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++); - $oPushAsync->walStatusWrite(WAL_STATUS_OK, $strSegment); - - #--------------------------------------------------------------------------------------------------------------------------- - $strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++); - - # Check status - $self->testResult(sub {$oPush->walStatus($self->{strSpoolPath}, $strSegment)}, 0, "${strSegment} WAL no status"); } ################################################################################################################################ @@ -693,57 +632,27 @@ sub run $self->optionTestSet(CFGOPT_SPOOL_PATH, $self->{strRepoPath}); $self->configTestLoad(CFGCMD_ARCHIVE_PUSH); - # Write an error file and verify that it doesn't error the first time around - $strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++); - storageTest()->pathCreate($self->{strSpoolPath}, {bCreateParent => true}); - storageTest()->put("$self->{strSpoolPath}/${strSegment}.error", ERROR_ARCHIVE_TIMEOUT . "\ntest error"); - - $self->testException( - sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, ERROR_ARCHIVE_TIMEOUT, - "test error"); - - $self->testResult($oPush->{bConfessOnError}, true, "went through error loop"); - - $self->testResult( - sub {walSegmentFind(storageRepo(), $self->{strArchiveId}, $strSegment)}, '[undef]', - "${strSegment} WAL not in archive"); - - #--------------------------------------------------------------------------------------------------------------------------- - # Write an OK file so the async process is not actually started - $strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++); - storageTest()->put("$self->{strSpoolPath}/${strSegment}.ok"); - - $self->testResult( - sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, 0, - "${strSegment} WAL pushed async from synthetic ok file"); - - $self->testResult( - sub {walSegmentFind(storageRepo(), $self->{strArchiveId}, $strSegment)}, '[undef]', - "${strSegment} WAL not in archive"); - - #--------------------------------------------------------------------------------------------------------------------------- $strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++); $self->walGenerate($self->{strWalPath}, PG_VERSION_94, 1, $strSegment); $self->testResult(sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, 0, "${strSegment} WAL pushed async"); - exit if ($iProcessId != $PID); + $self->testResult(sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, 0, "hit lock - already running"); + + # Wait for child process to exit + if ($iProcessId == $PID) + { + waitpid(-1, 0); + } + else + { + exit 0; + } $self->testResult( - sub {walSegmentFind(storageRepo(), $self->{strArchiveId}, $strSegment)}, "${strSegment}-$self->{strWalHash}", + sub {walSegmentFind(storageRepo(), $self->{strArchiveId}, $strSegment, 5)}, "${strSegment}-$self->{strWalHash}", "${strSegment} WAL in archive"); $self->walRemove($self->{strWalPath}, $strSegment); - #--------------------------------------------------------------------------------------------------------------------------- - $strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++); - - $self->optionTestSet(CFGOPT_ARCHIVE_TIMEOUT, 1); - $self->configTestLoad(CFGCMD_ARCHIVE_PUSH); - - $self->testException( - sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, ERROR_ARCHIVE_TIMEOUT, - "unable to push WAL ${strSegment} asynchronously after 1 second(s)"); - exit if ($iProcessId != $PID); - #--------------------------------------------------------------------------------------------------------------------------- $strSegment = $self->walSegment($iWalTimeline, $iWalMajor, $iWalMinor++); $self->walGenerate($self->{strWalPath}, PG_VERSION_94, 1, $strSegment); @@ -753,10 +662,32 @@ sub run $self->optionTestSet(CFGOPT_ARCHIVE_TIMEOUT, 5); $self->configTestLoad(CFGCMD_ARCHIVE_PUSH); - $self->testException( - sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, ERROR_FILE_READ, - "remote process on '" . BOGUS . "' terminated.*"); - exit if ($iProcessId != $PID); + + # Wait for error file to appear + my $oWait = waitInit(10); + my $strErrorFile = STORAGE_SPOOL_ARCHIVE_OUT . "/${strSegment}.error"; + + do + { + $self->testResult(sub {$oPush->process("$self->{strWalPath}/${strSegment}")}, 0, 'process connect error'); + + # Wait for child process to exit + if ($iProcessId == $PID) + { + waitpid(-1, 0); + } + else + { + exit 0; + } + } + while (!storageSpool()->exists($strErrorFile) && waitMore($oWait)); + + # Check contents of error file + my $strErrorFileContents = ${storageSpool()->get($strErrorFile)}; + + $self->testResult( + $strErrorFileContents =~ ("42\nremote process on '" . BOGUS . "' terminated.*"), true, "check error file contents"); # Disable async archiving $self->optionTestClear(CFGOPT_BACKUP_HOST); diff --git a/test/src/module/archive/pushTest.c b/test/src/module/archive/pushTest.c new file mode 100644 index 000000000..02ed5294d --- /dev/null +++ b/test/src/module/archive/pushTest.c @@ -0,0 +1,169 @@ +/*********************************************************************************************************************************** +Test Archive Push Command +***********************************************************************************************************************************/ +#include + +#include "config/parse.h" + +/*********************************************************************************************************************************** +Test Run +***********************************************************************************************************************************/ +void testRun() +{ + // ***************************************************************************************************************************** + if (testBegin("walStatus()")) + { + StringList *argList = strLstNew(); + strLstAddZ(argList, "pgbackrest"); + strLstAdd(argList, strNewFmt("--spool-path=%s", testPath())); + strLstAddZ(argList, "--archive-async"); + strLstAddZ(argList, "--archive-timeout=1"); + strLstAddZ(argList, "--stanza=db"); + strLstAddZ(argList, "archive-push"); + configParse(strLstSize(argList), strLstPtr(argList)); + + // ------------------------------------------------------------------------------------------------------------------------- + String *segment = strNew("000000010000000100000001"); + + TEST_RESULT_BOOL(walStatus(segment, false), false, "directory and status file not present"); + + // ------------------------------------------------------------------------------------------------------------------------- + mkdir(strPtr(strNewFmt("%s/archive", testPath())), 0750); + mkdir(strPtr(strNewFmt("%s/archive/db", testPath())), 0750); + mkdir(strPtr(strNewFmt("%s/archive/db/out", testPath())), 0750); + + TEST_RESULT_BOOL(walStatus(segment, false), false, "status file not present"); + + // ------------------------------------------------------------------------------------------------------------------------- + storagePut(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)), bufNewStr(strNew(BOGUS_STR))); + TEST_ERROR(walStatus(segment, false), FormatError, "000000010000000100000001.ok content must have at least two lines"); + + storagePut( + storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)), bufNewStr(strNew(BOGUS_STR "\n"))); + TEST_ERROR(walStatus(segment, false), FormatError, "000000010000000100000001.ok message must be > 0"); + + storagePut( + storageSpool(), + strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)), bufNewStr(strNew(BOGUS_STR "\nmessage"))); + TEST_ERROR(walStatus(segment, false), FormatError, "unable to convert str 'BOGUS' to int"); + + storagePut( + storageSpool(), + strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)), bufNewStr(strNew("0\nwarning"))); + TEST_RESULT_BOOL(walStatus(segment, false), true, "ok file with warning"); + testLogResult("P00 WARN: warning"); + + storagePut( + storageSpool(), + strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment)), bufNewStr(strNew("25\nerror"))); + TEST_RESULT_BOOL(walStatus(segment, false), true, "error status renamed to ok"); + testLogResult( + "P00 WARN: WAL segment '000000010000000100000001' was not pushed due to error [25] and was manually skipped: error"); + + // ------------------------------------------------------------------------------------------------------------------------- + storagePut(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.error", strPtr(segment)), bufNewStr(strNew(""))); + TEST_ERROR( + walStatus(segment, false), AssertError, + strPtr( + strNewFmt( + "multiple status files found in '%s/archive/db/out' for WAL segment '000000010000000100000001'", testPath()))); + + unlink(strPtr(storagePath(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.ok", strPtr(segment))))); + TEST_ERROR(walStatus(segment, true), AssertError, "status file '000000010000000100000001.error' has no content"); + + storagePut( + storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.error", strPtr(segment)), bufNewStr(strNew("25\nmessage"))); + TEST_ERROR(walStatus(segment, true), AssertError, "message"); + + TEST_RESULT_BOOL(walStatus(segment, false), false, "suppress error"); + + unlink(strPtr(storagePath(storageSpool(), strNewFmt(STORAGE_SPOOL_ARCHIVE_OUT "/%s.error", strPtr(segment))))); + } + + // ***************************************************************************************************************************** + if (testBegin("cmdArchivePush()")) + { + int processId = getpid(); + + StringList *argList = strLstNew(); + strLstAddZ(argList, "pgbackrest"); + strLstAddZ(argList, "--archive-timeout=1"); + strLstAddZ(argList, "--stanza=db"); + strLstAddZ(argList, "archive-push"); + configParse(strLstSize(argList), strLstPtr(argList)); + + TEST_ERROR(cmdArchivePush(), ParamRequiredError, "WAL segment to push required"); + + // ------------------------------------------------------------------------------------------------------------------------- + strLstAddZ(argList, "000000010000000100000001"); + configParse(strLstSize(argList), strLstPtr(argList)); + + TEST_ERROR(cmdArchivePush(), AssertError, "archive-push in C does not support synchronous mode"); + + // Test that a bogus perl bin generates the correct errors + // ------------------------------------------------------------------------------------------------------------------------- + String *perlBin = strNewFmt("%s/perl-test.sh", testPath()); + + strLstAdd(argList, strNewFmt("--perl-bin=%s", strPtr(perlBin))); + strLstAdd(argList, strNewFmt("--spool-path=%s", testPath())); + strLstAddZ(argList, "--archive-async"); + configParse(strLstSize(argList), strLstPtr(argList)); + + TRY_BEGIN() + { + cmdArchivePush(); + + THROW(AssertError, "error should have been thrown"); // {uncoverable - test should not get here} + } + CATCH_ANY() + { + // Exit with error if this is the child process + if (getpid() != processId) + exit(errorCode()); + + // Check expected error on the parent process + TEST_RESULT_INT(errorCode(), errorTypeCode(&AssertError), "error code matches after failed Perl exec"); + TEST_RESULT_STR(errorMessage(), "perl exited with error 25", "error message matches after failed Perl exec"); + } + TRY_END(); + + // Write a blank script for the perl bin and make sure the process times out + // ------------------------------------------------------------------------------------------------------------------------- + Storage *storage = storageNew(strNew(testPath()), 0750, 65536, NULL); + storagePut(storage, perlBin, bufNewStr(strNew(""))); + + TEST_ERROR( + cmdArchivePush(), ArchiveTimeoutError, + "unable to push WAL segment '000000010000000100000001' asynchronously after 1 second(s)"); + + // Write out a bogus .error file to make sure it is ignored on the first loop. The perl bin will write the real one when it + // executes. + // ------------------------------------------------------------------------------------------------------------------------- + String *errorFile = storagePath(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_OUT "/000000010000000100000001.error")); + + mkdir(strPtr(strNewFmt("%s/archive", testPath())), 0750); + mkdir(strPtr(strNewFmt("%s/archive/db", testPath())), 0750); + mkdir(strPtr(strNewFmt("%s/archive/db/out", testPath())), 0750); + storagePut(storageSpool(), errorFile, bufNewStr(strNew(""))); + + storagePut(storage, perlBin, bufNewStr(strNewFmt( + "set -e\n" + "echo '25' > %s\n" + "echo 'generic error message' >> %s\n", + strPtr(errorFile), strPtr(errorFile)))); + + TEST_ERROR(cmdArchivePush(), AssertError, "generic error message"); + + unlink(strPtr(errorFile)); + + // Modify script to write out a valid ok file + // ------------------------------------------------------------------------------------------------------------------------- + storagePut(storage, perlBin, bufNewStr(strNewFmt( + "set -e\n" + "touch %s\n", + strPtr(storagePath(storageSpool(), strNew(STORAGE_SPOOL_ARCHIVE_OUT "/000000010000000100000001.ok")))))); + + TEST_RESULT_VOID(cmdArchivePush(), "successful push"); + testLogResult("P00 INFO: pushed WAL segment 000000010000000100000001 asynchronously"); + } +}