1
0
mirror of https://github.com/postgres/postgres.git synced 2025-08-30 06:01:21 +03:00

Clean up parsing of synchronous_standby_names GUC variable.

Commit 989be0810d added a flex/bison lexer/parser to interpret
synchronous_standby_names.  It was done in a pretty crufty way, though,
making assorted end-use sites responsible for calling the parser at the
right times.  That was not only vulnerable to errors of omission, but made
it possible that lexer/parser errors occur at very undesirable times,
and created memory leakages even if there was no error.

Instead, perform the parsing once during check_synchronous_standby_names
and let guc.c manage the resulting data.  To do that, we have to flatten
the parsed representation into a single hunk of malloc'd memory, but that
is not very hard.

While at it, work a little harder on making useful error reports for
parsing problems; the previous code felt that "synchronous_standby_names
parser returned 1" was an appropriate user-facing error message.  (To
be fair, it did also log a syntax error message, but separately from the
GUC problem report, which is at best confusing.)  It had some outright
bugs in the face of invalid input, too.

I (tgl) also concluded that we need to restrict unquoted names in
synchronous_standby_names to be just SQL identifiers.  The previous coding
would accept darn near anything, which (1) makes the quoting convention
both nearly-unnecessary and formally ambiguous, (2) makes it very hard to
understand what is a syntax error and what is a creative interpretation of
the input as a standby name, and (3) makes it impossible to further extend
the syntax in future without a compatibility break.  I presume that we're
intending future extensions of the syntax, else this parsing infrastructure
is massive overkill, so (3) is an important objection.  Since we've taken
a compatibility hit for non-identifier names with this change anyway, we
might as well lock things down now and insist that users use double quotes
for standby names that aren't identifiers.

Kyotaro Horiguchi and Tom Lane
This commit is contained in:
Tom Lane
2016-04-27 17:55:19 -04:00
parent 372ff7cae2
commit 4c804fbdfb
7 changed files with 211 additions and 197 deletions

View File

@@ -78,7 +78,7 @@ char *SyncRepStandbyNames;
static bool announce_next_takeover = true;
SyncRepConfigData *SyncRepConfig;
static SyncRepConfigData *SyncRepConfig = NULL;
static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
static void SyncRepQueueInsert(int mode);
@@ -361,11 +361,6 @@ SyncRepInitConfig(void)
{
int priority;
/* Update the config data of synchronous replication */
SyncRepFreeConfig(SyncRepConfig);
SyncRepConfig = NULL;
SyncRepUpdateConfig();
/*
* Determine if we are a potential sync standby and remember the result
* for handling replies from standby.
@@ -509,7 +504,9 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
* Quick exit if we are not managing a sync standby or there are not
* enough synchronous standbys.
*/
if (!(*am_sync) || list_length(sync_standbys) < SyncRepConfig->num_sync)
if (!(*am_sync) ||
SyncRepConfig == NULL ||
list_length(sync_standbys) < SyncRepConfig->num_sync)
{
list_free(sync_standbys);
return false;
@@ -568,14 +565,15 @@ SyncRepGetSyncStandbys(bool *am_sync)
volatile WalSnd *walsnd; /* Use volatile pointer to prevent
* code rearrangement */
/* Set default result */
if (am_sync != NULL)
*am_sync = false;
/* Quick exit if sync replication is not requested */
if (SyncRepConfig == NULL)
return NIL;
if (am_sync != NULL)
*am_sync = false;
lowest_priority = list_length(SyncRepConfig->members);
lowest_priority = SyncRepConfig->nmembers;
next_highest_priority = lowest_priority + 1;
/*
@@ -730,9 +728,8 @@ SyncRepGetSyncStandbys(bool *am_sync)
static int
SyncRepGetStandbyPriority(void)
{
List *members;
ListCell *l;
int priority = 0;
const char *standby_name;
int priority;
bool found = false;
/*
@@ -742,22 +739,19 @@ SyncRepGetStandbyPriority(void)
if (am_cascading_walsender)
return 0;
if (!SyncStandbysDefined())
if (!SyncStandbysDefined() || SyncRepConfig == NULL)
return 0;
members = SyncRepConfig->members;
foreach(l, members)
standby_name = SyncRepConfig->member_names;
for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
{
char *standby_name = (char *) lfirst(l);
priority++;
if (pg_strcasecmp(standby_name, application_name) == 0 ||
pg_strcasecmp(standby_name, "*") == 0)
strcmp(standby_name, "*") == 0)
{
found = true;
break;
}
standby_name += strlen(standby_name) + 1;
}
return (found ? priority : 0);
@@ -867,50 +861,6 @@ SyncRepUpdateSyncStandbysDefined(void)
}
}
/*
* Parse synchronous_standby_names and update the config data
* of synchronous standbys.
*/
void
SyncRepUpdateConfig(void)
{
int parse_rc;
if (!SyncStandbysDefined())
return;
/*
* check_synchronous_standby_names() verifies the setting value of
* synchronous_standby_names before this function is called. So
* syncrep_yyparse() must not cause an error here.
*/
syncrep_scanner_init(SyncRepStandbyNames);
parse_rc = syncrep_yyparse();
syncrep_scanner_finish();
if (parse_rc != 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg_internal("synchronous_standby_names parser returned %d",
parse_rc)));
SyncRepConfig = syncrep_parse_result;
syncrep_parse_result = NULL;
}
/*
* Free a previously-allocated config data of synchronous replication.
*/
void
SyncRepFreeConfig(SyncRepConfigData *config)
{
if (!config)
return;
list_free_deep(config->members);
pfree(config);
}
#ifdef USE_ASSERT_CHECKING
static bool
SyncRepQueueIsOrderedByLSN(int mode)
@@ -955,78 +905,104 @@ SyncRepQueueIsOrderedByLSN(int mode)
bool
check_synchronous_standby_names(char **newval, void **extra, GucSource source)
{
int parse_rc;
if (*newval != NULL && (*newval)[0] != '\0')
{
int parse_rc;
SyncRepConfigData *pconf;
/* Reset communication variables to ensure a fresh start */
syncrep_parse_result = NULL;
syncrep_parse_error_msg = NULL;
/* Parse the synchronous_standby_names string */
syncrep_scanner_init(*newval);
parse_rc = syncrep_yyparse();
syncrep_scanner_finish();
if (parse_rc != 0)
if (parse_rc != 0 || syncrep_parse_result == NULL)
{
GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
GUC_check_errdetail("synchronous_standby_names parser returned %d",
parse_rc);
if (syncrep_parse_error_msg)
GUC_check_errdetail("%s", syncrep_parse_error_msg);
else
GUC_check_errdetail("synchronous_standby_names parser failed");
return false;
}
/*
* Warn if num_sync exceeds the number of names of potential sync
* standbys. This setting doesn't make sense in most cases because
* it implies that enough number of sync standbys will not appear,
* which makes transaction commits wait for sync replication
* infinitely.
* standbys. This setting doesn't make sense in most cases because it
* implies that enough number of sync standbys will not appear, which
* makes transaction commits wait for sync replication infinitely.
*
* If there are more than one standbys having the same name and
* priority, we can see enough sync standbys to complete transaction
* commits. However it's not recommended to run multiple standbys
* with the same priority because we cannot gain full control of
* the selection of sync standbys from them.
* commits. However it's not recommended to run multiple standbys with
* the same priority because we cannot gain full control of the
* selection of sync standbys from them.
*
* OTOH, that setting is OK if we understand the above problem
* regarding the selection of sync standbys and intentionally
* specify * to match all the standbys.
* regarding the selection of sync standbys and intentionally specify *
* to match all the standbys.
*/
if (syncrep_parse_result->num_sync >
list_length(syncrep_parse_result->members))
if (syncrep_parse_result->num_sync > syncrep_parse_result->nmembers)
{
ListCell *l;
bool has_asterisk = false;
const char *standby_name;
int i;
bool has_asterisk = false;
foreach(l, syncrep_parse_result->members)
standby_name = syncrep_parse_result->member_names;
for (i = 1; i <= syncrep_parse_result->nmembers; i++)
{
char *standby_name = (char *) lfirst(l);
if (pg_strcasecmp(standby_name, "*") == 0)
if (strcmp(standby_name, "*") == 0)
{
has_asterisk = true;
break;
}
standby_name += strlen(standby_name) + 1;
}
/*
* Only the postmaster warns this inappropriate setting
* to avoid cluttering the log.
* Only the postmaster warns about this inappropriate setting to
* avoid cluttering the log.
*/
if (!has_asterisk && !IsUnderPostmaster)
ereport(WARNING,
(errmsg("The configured number of synchronous standbys (%d) exceeds the number of names of potential synchronous ones (%d)",
syncrep_parse_result->num_sync, list_length(syncrep_parse_result->members)),
(errmsg("configured number of synchronous standbys (%d) exceeds the number of names of potential synchronous ones (%d)",
syncrep_parse_result->num_sync,
syncrep_parse_result->nmembers),
errhint("Specify more names of potential synchronous standbys in synchronous_standby_names.")));
}
/* GUC extra value must be malloc'd, not palloc'd */
pconf = (SyncRepConfigData *)
malloc(syncrep_parse_result->config_size);
if (pconf == NULL)
return false;
memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
*extra = (void *) pconf;
/*
* syncrep_yyparse sets the global syncrep_parse_result as side effect.
* But this function is required to just check, so frees it
* after parsing the parameter.
* We need not explicitly clean up syncrep_parse_result. It, and any
* other cruft generated during parsing, will be freed when the
* current memory context is deleted. (This code is generally run in
* a short-lived context used for config file processing, so that will
* not be very long.)
*/
SyncRepFreeConfig(syncrep_parse_result);
}
else
*extra = NULL;
return true;
}
void
assign_synchronous_standby_names(const char *newval, void *extra)
{
SyncRepConfig = (SyncRepConfigData *) extra;
}
void
assign_synchronous_commit(int newval, void *extra)
{