mirror of
https://github.com/postgres/postgres.git
synced 2025-09-02 04:21:28 +03:00
Custom WAL Resource Managers.
Allow extensions to specify a new custom resource manager (rmgr), which allows specialized WAL. This is meant to be used by a Table Access Method or Index Access Method. Prior to this commit, only Generic WAL was available, which offers support for recovery and physical replication but not logical replication. Reviewed-by: Julien Rouhaud, Bharath Rupireddy, Andres Freund Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
This commit is contained in:
@@ -24,16 +24,138 @@
|
||||
#include "commands/dbcommands_xlog.h"
|
||||
#include "commands/sequence.h"
|
||||
#include "commands/tablespace.h"
|
||||
#include "fmgr.h"
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
#include "replication/decode.h"
|
||||
#include "replication/message.h"
|
||||
#include "replication/origin.h"
|
||||
#include "storage/standby.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/relmapper.h"
|
||||
|
||||
/* must be kept in sync with RmgrData definition in xlog_internal.h */
|
||||
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
|
||||
{ name, redo, desc, identify, startup, cleanup, mask, decode },
|
||||
|
||||
const RmgrData RmgrTable[RM_MAX_ID + 1] = {
|
||||
RmgrData RmgrTable[RM_MAX_ID + 1] = {
|
||||
#include "access/rmgrlist.h"
|
||||
};
|
||||
|
||||
/*
|
||||
* Start up all resource managers.
|
||||
*/
|
||||
void
|
||||
RmgrStartup(void)
|
||||
{
|
||||
for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
|
||||
{
|
||||
if (!RmgrIdExists(rmid))
|
||||
continue;
|
||||
|
||||
if (RmgrTable[rmid].rm_startup != NULL)
|
||||
RmgrTable[rmid].rm_startup();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Clean up all resource managers.
|
||||
*/
|
||||
void
|
||||
RmgrCleanup(void)
|
||||
{
|
||||
for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
|
||||
{
|
||||
if (!RmgrIdExists(rmid))
|
||||
continue;
|
||||
|
||||
if (RmgrTable[rmid].rm_cleanup != NULL)
|
||||
RmgrTable[rmid].rm_cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Emit ERROR when we encounter a record with an RmgrId we don't
|
||||
* recognize.
|
||||
*/
|
||||
void
|
||||
RmgrNotFound(RmgrId rmid)
|
||||
{
|
||||
ereport(ERROR, (errmsg("resource manager with ID %d not registered", rmid),
|
||||
errhint("Include the extension module that implements this resource manager in shared_preload_libraries.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Register a new custom WAL resource manager.
|
||||
*
|
||||
* Resource manager IDs must be globally unique across all extensions. Refer
|
||||
* to https://wiki.postgresql.org/wiki/CustomWALResourceManager to reserve a
|
||||
* unique RmgrId for your extension, to avoid conflicts with other extension
|
||||
* developers. During development, use RM_EXPERIMENTAL_ID to avoid needlessly
|
||||
* reserving a new ID.
|
||||
*/
|
||||
void
|
||||
RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
|
||||
{
|
||||
if (rmgr->rm_name == NULL || strlen(rmgr->rm_name) == 0)
|
||||
ereport(ERROR, (errmsg("custom resource manager name is invalid"),
|
||||
errhint("Provide a non-empty name for the custom resource manager.")));
|
||||
|
||||
if (!RMID_IS_CUSTOM(rmid))
|
||||
ereport(ERROR, (errmsg("custom resource manager ID %d is out of range", rmid),
|
||||
errhint("Provide a custom resource manager ID between %d and %d.",
|
||||
RM_MIN_CUSTOM_ID, RM_MAX_CUSTOM_ID)));
|
||||
|
||||
if (!process_shared_preload_libraries_in_progress)
|
||||
ereport(ERROR,
|
||||
(errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid),
|
||||
errdetail("Custom resource manager must be registered while initializing modules in shared_preload_libraries.")));
|
||||
|
||||
if (RmgrTable[rmid].rm_name != NULL)
|
||||
ereport(ERROR,
|
||||
(errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid),
|
||||
errdetail("Custom resource manager \"%s\" already registered with the same ID.",
|
||||
RmgrTable[rmid].rm_name)));
|
||||
|
||||
/* check for existing rmgr with the same name */
|
||||
for (int existing_rmid = 0; existing_rmid <= RM_MAX_ID; existing_rmid++)
|
||||
{
|
||||
if (!RmgrIdExists(existing_rmid))
|
||||
continue;
|
||||
|
||||
if (!pg_strcasecmp(RmgrTable[existing_rmid].rm_name, rmgr->rm_name))
|
||||
ereport(ERROR,
|
||||
(errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid),
|
||||
errdetail("Existing resource manager with ID %d has the same name.", existing_rmid)));
|
||||
}
|
||||
|
||||
/* register it */
|
||||
RmgrTable[rmid] = *rmgr;
|
||||
ereport(LOG,
|
||||
(errmsg("registered custom resource manager \"%s\" with ID %d",
|
||||
rmgr->rm_name, rmid)));
|
||||
}
|
||||
|
||||
/* SQL SRF showing loaded resource managers */
|
||||
Datum
|
||||
pg_get_wal_resource_managers(PG_FUNCTION_ARGS)
|
||||
{
|
||||
#define PG_GET_RESOURCE_MANAGERS_COLS 3
|
||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
Datum values[PG_GET_RESOURCE_MANAGERS_COLS];
|
||||
bool nulls[PG_GET_RESOURCE_MANAGERS_COLS] = {0};
|
||||
|
||||
SetSingleFuncCall(fcinfo, 0);
|
||||
|
||||
for (int rmid = 0; rmid <= RM_MAX_ID; rmid++)
|
||||
{
|
||||
if (!RmgrIdExists(rmid))
|
||||
continue;
|
||||
values[0] = Int32GetDatum(rmid);
|
||||
values[1] = CStringGetTextDatum(GetRmgr(rmid).rm_name);
|
||||
values[2] = BoolGetDatum(RMID_IS_BUILTIN(rmid));
|
||||
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
|
||||
}
|
||||
|
||||
return (Datum) 0;
|
||||
}
|
||||
|
@@ -1102,7 +1102,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
|
||||
(uint32) SizeOfXLogRecord, record->xl_tot_len);
|
||||
return false;
|
||||
}
|
||||
if (record->xl_rmid > RM_MAX_ID)
|
||||
if (!RMID_IS_VALID(record->xl_rmid))
|
||||
{
|
||||
report_invalid_record(state,
|
||||
"invalid resource manager ID %u at %X/%X",
|
||||
|
@@ -1541,7 +1541,6 @@ ShutdownWalRecovery(void)
|
||||
void
|
||||
PerformWalRecovery(void)
|
||||
{
|
||||
int rmid;
|
||||
XLogRecord *record;
|
||||
bool reachedRecoveryTarget = false;
|
||||
TimeLineID replayTLI;
|
||||
@@ -1614,12 +1613,7 @@ PerformWalRecovery(void)
|
||||
|
||||
InRedo = true;
|
||||
|
||||
/* Initialize resource managers */
|
||||
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
|
||||
{
|
||||
if (RmgrTable[rmid].rm_startup != NULL)
|
||||
RmgrTable[rmid].rm_startup();
|
||||
}
|
||||
RmgrStartup();
|
||||
|
||||
ereport(LOG,
|
||||
(errmsg("redo starts at %X/%X",
|
||||
@@ -1756,12 +1750,7 @@ PerformWalRecovery(void)
|
||||
}
|
||||
}
|
||||
|
||||
/* Allow resource managers to do any required cleanup. */
|
||||
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
|
||||
{
|
||||
if (RmgrTable[rmid].rm_cleanup != NULL)
|
||||
RmgrTable[rmid].rm_cleanup();
|
||||
}
|
||||
RmgrCleanup();
|
||||
|
||||
ereport(LOG,
|
||||
(errmsg("redo done at %X/%X system usage: %s",
|
||||
@@ -1881,7 +1870,7 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
|
||||
xlogrecovery_redo(xlogreader, *replayTLI);
|
||||
|
||||
/* Now apply the WAL record itself */
|
||||
RmgrTable[record->xl_rmid].rm_redo(xlogreader);
|
||||
GetRmgr(record->xl_rmid).rm_redo(xlogreader);
|
||||
|
||||
/*
|
||||
* After redo, check whether the backup pages associated with the WAL
|
||||
@@ -2111,20 +2100,20 @@ rm_redo_error_callback(void *arg)
|
||||
void
|
||||
xlog_outdesc(StringInfo buf, XLogReaderState *record)
|
||||
{
|
||||
RmgrId rmid = XLogRecGetRmid(record);
|
||||
RmgrData rmgr = GetRmgr(XLogRecGetRmid(record));
|
||||
uint8 info = XLogRecGetInfo(record);
|
||||
const char *id;
|
||||
|
||||
appendStringInfoString(buf, RmgrTable[rmid].rm_name);
|
||||
appendStringInfoString(buf, rmgr.rm_name);
|
||||
appendStringInfoChar(buf, '/');
|
||||
|
||||
id = RmgrTable[rmid].rm_identify(info);
|
||||
id = rmgr.rm_identify(info);
|
||||
if (id == NULL)
|
||||
appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
|
||||
else
|
||||
appendStringInfo(buf, "%s: ", id);
|
||||
|
||||
RmgrTable[rmid].rm_desc(buf, record);
|
||||
rmgr.rm_desc(buf, record);
|
||||
}
|
||||
|
||||
#ifdef WAL_DEBUG
|
||||
@@ -2273,7 +2262,7 @@ getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
|
||||
static void
|
||||
verifyBackupPageConsistency(XLogReaderState *record)
|
||||
{
|
||||
RmgrId rmid = XLogRecGetRmid(record);
|
||||
RmgrData rmgr = GetRmgr(XLogRecGetRmid(record));
|
||||
RelFileNode rnode;
|
||||
ForkNumber forknum;
|
||||
BlockNumber blkno;
|
||||
@@ -2353,10 +2342,10 @@ verifyBackupPageConsistency(XLogReaderState *record)
|
||||
* If masking function is defined, mask both the primary and replay
|
||||
* images
|
||||
*/
|
||||
if (RmgrTable[rmid].rm_mask != NULL)
|
||||
if (rmgr.rm_mask != NULL)
|
||||
{
|
||||
RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
|
||||
RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
|
||||
rmgr.rm_mask(replay_image_masked, blkno);
|
||||
rmgr.rm_mask(primary_image_masked, blkno);
|
||||
}
|
||||
|
||||
/* Time to compare the primary and replay images. */
|
||||
|
@@ -1039,6 +1039,12 @@ PostmasterMain(int argc, char *argv[])
|
||||
*/
|
||||
InitializeShmemGUCs();
|
||||
|
||||
/*
|
||||
* Now that modules have been loaded, we can process any custom resource
|
||||
* managers specified in the wal_consistency_checking GUC.
|
||||
*/
|
||||
InitializeWalConsistencyChecking();
|
||||
|
||||
/*
|
||||
* If -C was specified with a runtime-computed GUC, we held off printing
|
||||
* the value earlier, as the GUC was not yet initialized. We handle -C
|
||||
|
@@ -94,7 +94,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
|
||||
{
|
||||
XLogRecordBuffer buf;
|
||||
TransactionId txid;
|
||||
RmgrId rmid;
|
||||
RmgrData rmgr;
|
||||
|
||||
buf.origptr = ctx->reader->ReadRecPtr;
|
||||
buf.endptr = ctx->reader->EndRecPtr;
|
||||
@@ -115,10 +115,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
|
||||
buf.origptr);
|
||||
}
|
||||
|
||||
rmid = XLogRecGetRmid(record);
|
||||
rmgr = GetRmgr(XLogRecGetRmid(record));
|
||||
|
||||
if (RmgrTable[rmid].rm_decode != NULL)
|
||||
RmgrTable[rmid].rm_decode(ctx, &buf);
|
||||
if (rmgr.rm_decode != NULL)
|
||||
rmgr.rm_decode(ctx, &buf);
|
||||
else
|
||||
{
|
||||
/* just deal with xid, and done */
|
||||
|
@@ -1610,6 +1610,7 @@ char *local_preload_libraries_string = NULL;
|
||||
|
||||
/* Flag telling that we are loading shared_preload_libraries */
|
||||
bool process_shared_preload_libraries_in_progress = false;
|
||||
bool process_shared_preload_libraries_done = false;
|
||||
|
||||
/*
|
||||
* load the shared libraries listed in 'libraries'
|
||||
@@ -1677,6 +1678,7 @@ process_shared_preload_libraries(void)
|
||||
"shared_preload_libraries",
|
||||
false);
|
||||
process_shared_preload_libraries_in_progress = false;
|
||||
process_shared_preload_libraries_done = true;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@@ -245,6 +245,11 @@ static bool check_default_with_oids(bool *newval, void **extra, GucSource source
|
||||
static ConfigVariable *ProcessConfigFileInternal(GucContext context,
|
||||
bool applySettings, int elevel);
|
||||
|
||||
/*
|
||||
* Track whether there were any deferred checks for custom resource managers
|
||||
* specified in wal_consistency_checking.
|
||||
*/
|
||||
static bool check_wal_consistency_checking_deferred = false;
|
||||
|
||||
/*
|
||||
* Options for enum values defined in this module.
|
||||
@@ -5835,6 +5840,36 @@ InitializeGUCOptions(void)
|
||||
InitializeGUCOptionsFromEnvironment();
|
||||
}
|
||||
|
||||
/*
|
||||
* If any custom resource managers were specified in the
|
||||
* wal_consistency_checking GUC, processing was deferred. Now that
|
||||
* shared_preload_libraries have been loaded, process wal_consistency_checking
|
||||
* again.
|
||||
*/
|
||||
void
|
||||
InitializeWalConsistencyChecking(void)
|
||||
{
|
||||
Assert(process_shared_preload_libraries_done);
|
||||
|
||||
if (check_wal_consistency_checking_deferred)
|
||||
{
|
||||
struct config_generic *guc;
|
||||
|
||||
guc = find_option("wal_consistency_checking", false, false, ERROR);
|
||||
|
||||
check_wal_consistency_checking_deferred = false;
|
||||
|
||||
set_config_option("wal_consistency_checking",
|
||||
wal_consistency_checking_string,
|
||||
PGC_POSTMASTER, guc->source,
|
||||
GUC_ACTION_SET, true, ERROR, false);
|
||||
|
||||
/* checking should not be deferred again */
|
||||
Assert(!check_wal_consistency_checking_deferred);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Assign any GUC values that can come from the server's environment.
|
||||
*
|
||||
@@ -11882,13 +11917,13 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
|
||||
{
|
||||
char *tok = (char *) lfirst(l);
|
||||
bool found = false;
|
||||
RmgrId rmid;
|
||||
int rmid;
|
||||
|
||||
/* Check for 'all'. */
|
||||
if (pg_strcasecmp(tok, "all") == 0)
|
||||
{
|
||||
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
|
||||
if (RmgrTable[rmid].rm_mask != NULL)
|
||||
if (RmgrIdExists(rmid) && GetRmgr(rmid).rm_mask != NULL)
|
||||
newwalconsistency[rmid] = true;
|
||||
found = true;
|
||||
}
|
||||
@@ -11900,8 +11935,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
|
||||
*/
|
||||
for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
|
||||
{
|
||||
if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
|
||||
RmgrTable[rmid].rm_mask != NULL)
|
||||
if (RmgrIdExists(rmid) && GetRmgr(rmid).rm_mask != NULL &&
|
||||
pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0)
|
||||
{
|
||||
newwalconsistency[rmid] = true;
|
||||
found = true;
|
||||
@@ -11912,10 +11947,21 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
|
||||
/* If a valid resource manager is found, check for the next one. */
|
||||
if (!found)
|
||||
{
|
||||
GUC_check_errdetail("Unrecognized key word: \"%s\".", tok);
|
||||
pfree(rawstring);
|
||||
list_free(elemlist);
|
||||
return false;
|
||||
/*
|
||||
* Perhaps it's a custom resource manager. If so, defer checking
|
||||
* until InitializeWalConsistencyChecking().
|
||||
*/
|
||||
if (!process_shared_preload_libraries_done)
|
||||
{
|
||||
check_wal_consistency_checking_deferred = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
GUC_check_errdetail("Unrecognized key word: \"%s\".", tok);
|
||||
pfree(rawstring);
|
||||
list_free(elemlist);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11931,7 +11977,20 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
|
||||
static void
|
||||
assign_wal_consistency_checking(const char *newval, void *extra)
|
||||
{
|
||||
wal_consistency_checking = (bool *) extra;
|
||||
/*
|
||||
* If some checks were deferred, it's possible that the checks will fail
|
||||
* later during InitializeWalConsistencyChecking(). But in that case, the
|
||||
* postmaster will exit anyway, so it's safe to proceed with the
|
||||
* assignment.
|
||||
*
|
||||
* Any built-in resource managers specified are assigned immediately,
|
||||
* which affects WAL created before shared_preload_libraries are
|
||||
* processed. Any custom resource managers specified won't be assigned
|
||||
* until after shared_preload_libraries are processed, but that's OK
|
||||
* because WAL for a custom resource manager can't be written before the
|
||||
* module is loaded anyway.
|
||||
*/
|
||||
wal_consistency_checking = extra;
|
||||
}
|
||||
|
||||
static bool
|
||||
|
Reference in New Issue
Block a user