mirror of
https://github.com/postgres/postgres.git
synced 2025-04-27 22:56:53 +03:00
Fix syslogger to not lose log coherency under high load.
The original coding of the syslogger had an arbitrary limit of 20 large messages concurrently in progress, after which it would just punt and dump message fragments to the output file separately. Our ambitions are a bit higher than that now, so allow the data structure to expand as necessary. Reported and patched by Andrew Dunstan; some editing by Tom
This commit is contained in:
parent
d843ed2116
commit
c17e863bc7
@ -34,6 +34,7 @@
|
|||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "libpq/pqsignal.h"
|
#include "libpq/pqsignal.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
#include "pgtime.h"
|
#include "pgtime.h"
|
||||||
#include "postmaster/fork_process.h"
|
#include "postmaster/fork_process.h"
|
||||||
#include "postmaster/postmaster.h"
|
#include "postmaster/postmaster.h"
|
||||||
@ -93,11 +94,14 @@ static char *last_file_name = NULL;
|
|||||||
static char *last_csv_file_name = NULL;
|
static char *last_csv_file_name = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Buffers for saving partial messages from different backends. We don't expect
|
* Buffers for saving partial messages from different backends.
|
||||||
* that there will be very many outstanding at one time, so 20 seems plenty of
|
|
||||||
* leeway. If this array gets full we won't lose messages, but we will lose
|
|
||||||
* the protocol protection against them being partially written or interleaved.
|
|
||||||
*
|
*
|
||||||
|
* Keep NBUFFER_LISTS lists of these, with the entry for a given source pid
|
||||||
|
* being in the list numbered (pid % NBUFFER_LISTS), so as to cut down on
|
||||||
|
* the number of entries we have to examine for any one incoming message.
|
||||||
|
* There must never be more than one entry for the same source pid.
|
||||||
|
*
|
||||||
|
* An inactive buffer is not removed from its list, just held for re-use.
|
||||||
* An inactive buffer has pid == 0 and undefined contents of data.
|
* An inactive buffer has pid == 0 and undefined contents of data.
|
||||||
*/
|
*/
|
||||||
typedef struct
|
typedef struct
|
||||||
@ -106,8 +110,8 @@ typedef struct
|
|||||||
StringInfoData data; /* accumulated data, as a StringInfo */
|
StringInfoData data; /* accumulated data, as a StringInfo */
|
||||||
} save_buffer;
|
} save_buffer;
|
||||||
|
|
||||||
#define CHUNK_SLOTS 20
|
#define NBUFFER_LISTS 256
|
||||||
static save_buffer saved_chunks[CHUNK_SLOTS];
|
static List *buffer_lists[NBUFFER_LISTS];
|
||||||
|
|
||||||
/* These must be exported for EXEC_BACKEND case ... annoying */
|
/* These must be exported for EXEC_BACKEND case ... annoying */
|
||||||
#ifndef WIN32
|
#ifndef WIN32
|
||||||
@ -734,6 +738,12 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
|
|||||||
(p.is_last == 't' || p.is_last == 'f' ||
|
(p.is_last == 't' || p.is_last == 'f' ||
|
||||||
p.is_last == 'T' || p.is_last == 'F'))
|
p.is_last == 'T' || p.is_last == 'F'))
|
||||||
{
|
{
|
||||||
|
List *buffer_list;
|
||||||
|
ListCell *cell;
|
||||||
|
save_buffer *existing_slot = NULL,
|
||||||
|
*free_slot = NULL;
|
||||||
|
StringInfo str;
|
||||||
|
|
||||||
chunklen = PIPE_HEADER_SIZE + p.len;
|
chunklen = PIPE_HEADER_SIZE + p.len;
|
||||||
|
|
||||||
/* Fall out of loop if we don't have the whole chunk yet */
|
/* Fall out of loop if we don't have the whole chunk yet */
|
||||||
@ -743,52 +753,53 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
|
|||||||
dest = (p.is_last == 'T' || p.is_last == 'F') ?
|
dest = (p.is_last == 'T' || p.is_last == 'F') ?
|
||||||
LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR;
|
LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR;
|
||||||
|
|
||||||
|
/* Locate any existing buffer for this source pid */
|
||||||
|
buffer_list = buffer_lists[p.pid % NBUFFER_LISTS];
|
||||||
|
foreach(cell, buffer_list)
|
||||||
|
{
|
||||||
|
save_buffer *buf = (save_buffer *) lfirst(cell);
|
||||||
|
|
||||||
|
if (buf->pid == p.pid)
|
||||||
|
{
|
||||||
|
existing_slot = buf;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (buf->pid == 0 && free_slot == NULL)
|
||||||
|
free_slot = buf;
|
||||||
|
}
|
||||||
|
|
||||||
if (p.is_last == 'f' || p.is_last == 'F')
|
if (p.is_last == 'f' || p.is_last == 'F')
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Save a complete non-final chunk in the per-pid buffer if
|
* Save a complete non-final chunk in a per-pid buffer
|
||||||
* possible - if not just write it out.
|
|
||||||
*/
|
*/
|
||||||
int free_slot = -1,
|
if (existing_slot != NULL)
|
||||||
existing_slot = -1;
|
|
||||||
int i;
|
|
||||||
StringInfo str;
|
|
||||||
|
|
||||||
for (i = 0; i < CHUNK_SLOTS; i++)
|
|
||||||
{
|
{
|
||||||
if (saved_chunks[i].pid == p.pid)
|
/* Add chunk to data from preceding chunks */
|
||||||
{
|
str = &(existing_slot->data);
|
||||||
existing_slot = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (free_slot < 0 && saved_chunks[i].pid == 0)
|
|
||||||
free_slot = i;
|
|
||||||
}
|
|
||||||
if (existing_slot >= 0)
|
|
||||||
{
|
|
||||||
str = &(saved_chunks[existing_slot].data);
|
|
||||||
appendBinaryStringInfo(str,
|
|
||||||
cursor + PIPE_HEADER_SIZE,
|
|
||||||
p.len);
|
|
||||||
}
|
|
||||||
else if (free_slot >= 0)
|
|
||||||
{
|
|
||||||
saved_chunks[free_slot].pid = p.pid;
|
|
||||||
str = &(saved_chunks[free_slot].data);
|
|
||||||
initStringInfo(str);
|
|
||||||
appendBinaryStringInfo(str,
|
appendBinaryStringInfo(str,
|
||||||
cursor + PIPE_HEADER_SIZE,
|
cursor + PIPE_HEADER_SIZE,
|
||||||
p.len);
|
p.len);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
|
/* First chunk of message, save in a new buffer */
|
||||||
|
if (free_slot == NULL)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* If there is no free slot we'll just have to take our
|
* Need a free slot, but there isn't one in the list,
|
||||||
* chances and write out a partial message and hope that
|
* so create a new one and extend the list with it.
|
||||||
* it's not followed by something from another pid.
|
|
||||||
*/
|
*/
|
||||||
write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len,
|
free_slot = palloc(sizeof(save_buffer));
|
||||||
dest);
|
buffer_list = lappend(buffer_list, free_slot);
|
||||||
|
buffer_lists[p.pid % NBUFFER_LISTS] = buffer_list;
|
||||||
|
}
|
||||||
|
free_slot->pid = p.pid;
|
||||||
|
str = &(free_slot->data);
|
||||||
|
initStringInfo(str);
|
||||||
|
appendBinaryStringInfo(str,
|
||||||
|
cursor + PIPE_HEADER_SIZE,
|
||||||
|
p.len);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -797,26 +808,15 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
|
|||||||
* Final chunk --- add it to anything saved for that pid, and
|
* Final chunk --- add it to anything saved for that pid, and
|
||||||
* either way write the whole thing out.
|
* either way write the whole thing out.
|
||||||
*/
|
*/
|
||||||
int existing_slot = -1;
|
if (existing_slot != NULL)
|
||||||
int i;
|
|
||||||
StringInfo str;
|
|
||||||
|
|
||||||
for (i = 0; i < CHUNK_SLOTS; i++)
|
|
||||||
{
|
{
|
||||||
if (saved_chunks[i].pid == p.pid)
|
str = &(existing_slot->data);
|
||||||
{
|
|
||||||
existing_slot = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (existing_slot >= 0)
|
|
||||||
{
|
|
||||||
str = &(saved_chunks[existing_slot].data);
|
|
||||||
appendBinaryStringInfo(str,
|
appendBinaryStringInfo(str,
|
||||||
cursor + PIPE_HEADER_SIZE,
|
cursor + PIPE_HEADER_SIZE,
|
||||||
p.len);
|
p.len);
|
||||||
write_syslogger_file(str->data, str->len, dest);
|
write_syslogger_file(str->data, str->len, dest);
|
||||||
saved_chunks[existing_slot].pid = 0;
|
/* Mark the buffer unused, and reclaim string storage */
|
||||||
|
existing_slot->pid = 0;
|
||||||
pfree(str->data);
|
pfree(str->data);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -872,19 +872,29 @@ static void
|
|||||||
flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
|
flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
StringInfo str;
|
|
||||||
|
|
||||||
/* Dump any incomplete protocol messages */
|
/* Dump any incomplete protocol messages */
|
||||||
for (i = 0; i < CHUNK_SLOTS; i++)
|
for (i = 0; i < NBUFFER_LISTS; i++)
|
||||||
{
|
{
|
||||||
if (saved_chunks[i].pid != 0)
|
List *list = buffer_lists[i];
|
||||||
|
ListCell *cell;
|
||||||
|
|
||||||
|
foreach(cell, list)
|
||||||
{
|
{
|
||||||
str = &(saved_chunks[i].data);
|
save_buffer *buf = (save_buffer *) lfirst(cell);
|
||||||
write_syslogger_file(str->data, str->len, LOG_DESTINATION_STDERR);
|
|
||||||
saved_chunks[i].pid = 0;
|
if (buf->pid != 0)
|
||||||
|
{
|
||||||
|
StringInfo str = &(buf->data);
|
||||||
|
|
||||||
|
write_syslogger_file(str->data, str->len,
|
||||||
|
LOG_DESTINATION_STDERR);
|
||||||
|
/* Mark the buffer unused, and reclaim string storage */
|
||||||
|
buf->pid = 0;
|
||||||
pfree(str->data);
|
pfree(str->data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Force out any remaining pipe data as-is; we don't bother trying to
|
* Force out any remaining pipe data as-is; we don't bother trying to
|
||||||
|
Loading…
x
Reference in New Issue
Block a user