mirror of
https://github.com/postgres/postgres.git
synced 2025-04-24 10:47:04 +03:00
Prepare to support non-tables in publications
This by itself doesn't change any functionality but prepares the way for having relations other than base tables in publications. Make arrangements for COPY handling the initial table sync. For non-tables we have to use COPY (SELECT ...) instead of directly copying from the table, but then we have to take care to omit generated columns from the column list. Also, remove a hardcoded reference to relkind = 'r' and rely on the publisher to send only what it can actually publish, which will be correct even in future cross-version scenarios. Reviewed-by: Amit Langote <amitlangote09@gmail.com> Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com
This commit is contained in:
parent
1d253bae57
commit
c314c147c0
@ -639,8 +639,8 @@ fetch_remote_table_info(char *nspname, char *relname,
|
|||||||
WalRcvExecResult *res;
|
WalRcvExecResult *res;
|
||||||
StringInfoData cmd;
|
StringInfoData cmd;
|
||||||
TupleTableSlot *slot;
|
TupleTableSlot *slot;
|
||||||
Oid tableRow[2] = {OIDOID, CHAROID};
|
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
|
||||||
Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
|
Oid attrRow[] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
|
||||||
bool isnull;
|
bool isnull;
|
||||||
int natt;
|
int natt;
|
||||||
|
|
||||||
@ -649,16 +649,15 @@ fetch_remote_table_info(char *nspname, char *relname,
|
|||||||
|
|
||||||
/* First fetch Oid and replica identity. */
|
/* First fetch Oid and replica identity. */
|
||||||
initStringInfo(&cmd);
|
initStringInfo(&cmd);
|
||||||
appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
|
appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
|
||||||
" FROM pg_catalog.pg_class c"
|
" FROM pg_catalog.pg_class c"
|
||||||
" INNER JOIN pg_catalog.pg_namespace n"
|
" INNER JOIN pg_catalog.pg_namespace n"
|
||||||
" ON (c.relnamespace = n.oid)"
|
" ON (c.relnamespace = n.oid)"
|
||||||
" WHERE n.nspname = %s"
|
" WHERE n.nspname = %s"
|
||||||
" AND c.relname = %s"
|
" AND c.relname = %s",
|
||||||
" AND c.relkind = 'r'",
|
|
||||||
quote_literal_cstr(nspname),
|
quote_literal_cstr(nspname),
|
||||||
quote_literal_cstr(relname));
|
quote_literal_cstr(relname));
|
||||||
res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
|
res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
|
||||||
|
|
||||||
if (res->status != WALRCV_OK_TUPLES)
|
if (res->status != WALRCV_OK_TUPLES)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
@ -675,6 +674,8 @@ fetch_remote_table_info(char *nspname, char *relname,
|
|||||||
Assert(!isnull);
|
Assert(!isnull);
|
||||||
lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
|
lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
|
||||||
Assert(!isnull);
|
Assert(!isnull);
|
||||||
|
lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
|
||||||
|
Assert(!isnull);
|
||||||
|
|
||||||
ExecDropSingleTupleTableSlot(slot);
|
ExecDropSingleTupleTableSlot(slot);
|
||||||
walrcv_clear_result(res);
|
walrcv_clear_result(res);
|
||||||
@ -696,7 +697,7 @@ fetch_remote_table_info(char *nspname, char *relname,
|
|||||||
lrel->remoteid,
|
lrel->remoteid,
|
||||||
(walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
|
(walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
|
||||||
lrel->remoteid);
|
lrel->remoteid);
|
||||||
res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
|
res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
|
||||||
|
|
||||||
if (res->status != WALRCV_OK_TUPLES)
|
if (res->status != WALRCV_OK_TUPLES)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
@ -765,8 +766,25 @@ copy_table(Relation rel)
|
|||||||
|
|
||||||
/* Start copy on the publisher. */
|
/* Start copy on the publisher. */
|
||||||
initStringInfo(&cmd);
|
initStringInfo(&cmd);
|
||||||
|
if (lrel.relkind == RELKIND_RELATION)
|
||||||
appendStringInfo(&cmd, "COPY %s TO STDOUT",
|
appendStringInfo(&cmd, "COPY %s TO STDOUT",
|
||||||
quote_qualified_identifier(lrel.nspname, lrel.relname));
|
quote_qualified_identifier(lrel.nspname, lrel.relname));
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* For non-tables, we need to do COPY (SELECT ...), but we can't just
|
||||||
|
* do SELECT * because we need to not copy generated columns.
|
||||||
|
*/
|
||||||
|
appendStringInfo(&cmd, "COPY (SELECT ");
|
||||||
|
for (int i = 0; i < lrel.natts; i++)
|
||||||
|
{
|
||||||
|
appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
|
||||||
|
if (i < lrel.natts - 1)
|
||||||
|
appendStringInfoString(&cmd, ", ");
|
||||||
|
}
|
||||||
|
appendStringInfo(&cmd, " FROM %s) TO STDOUT",
|
||||||
|
quote_qualified_identifier(lrel.nspname, lrel.relname));
|
||||||
|
}
|
||||||
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
|
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
|
||||||
pfree(cmd.data);
|
pfree(cmd.data);
|
||||||
if (res->status != WALRCV_OK_COPY_OUT)
|
if (res->status != WALRCV_OK_COPY_OUT)
|
||||||
|
@ -49,6 +49,7 @@ typedef struct LogicalRepRelation
|
|||||||
char **attnames; /* column names */
|
char **attnames; /* column names */
|
||||||
Oid *atttyps; /* column types */
|
Oid *atttyps; /* column types */
|
||||||
char replident; /* replica identity */
|
char replident; /* replica identity */
|
||||||
|
char relkind; /* remote relation kind */
|
||||||
Bitmapset *attkeys; /* Bitmap of key columns */
|
Bitmapset *attkeys; /* Bitmap of key columns */
|
||||||
} LogicalRepRelation;
|
} LogicalRepRelation;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user