mirror of
https://github.com/postgres/postgres.git
synced 2025-07-27 12:41:57 +03:00
Run pgindent on the changes of the previous patch.
This step can be checked mechanically. Author: Tom Lane <tgl@sss.pgh.pa.us> Reviewed-by: Matheus Alcantara <matheusssilv97@gmail.com> Discussion: https://postgr.es/m/2976982.1748049023@sss.pgh.pa.us
This commit is contained in:
@ -870,115 +870,115 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
|
||||
/* prepTuplestoreResult must have been called previously */
|
||||
Assert(rsinfo->returnMode == SFRM_Materialize);
|
||||
|
||||
if (PQresultStatus(res) == PGRES_COMMAND_OK)
|
||||
{
|
||||
is_sql_cmd = true;
|
||||
|
||||
/*
|
||||
* need a tuple descriptor representing one TEXT column to return
|
||||
* the command status string as our result tuple
|
||||
*/
|
||||
tupdesc = CreateTemplateTupleDesc(1);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
|
||||
TEXTOID, -1, 0);
|
||||
ntuples = 1;
|
||||
nfields = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
|
||||
|
||||
is_sql_cmd = false;
|
||||
|
||||
/* get a tuple descriptor for our result type */
|
||||
switch (get_call_result_type(fcinfo, NULL, &tupdesc))
|
||||
{
|
||||
case TYPEFUNC_COMPOSITE:
|
||||
/* success */
|
||||
break;
|
||||
case TYPEFUNC_RECORD:
|
||||
/* failed to determine actual type of RECORD */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("function returning record called in context "
|
||||
"that cannot accept type record")));
|
||||
break;
|
||||
default:
|
||||
/* result type isn't composite */
|
||||
elog(ERROR, "return type must be a row type");
|
||||
break;
|
||||
}
|
||||
|
||||
/* make sure we have a persistent copy of the tupdesc */
|
||||
tupdesc = CreateTupleDescCopy(tupdesc);
|
||||
ntuples = PQntuples(res);
|
||||
nfields = PQnfields(res);
|
||||
}
|
||||
if (PQresultStatus(res) == PGRES_COMMAND_OK)
|
||||
{
|
||||
is_sql_cmd = true;
|
||||
|
||||
/*
|
||||
* check result and tuple descriptor have the same number of columns
|
||||
* need a tuple descriptor representing one TEXT column to return the
|
||||
* command status string as our result tuple
|
||||
*/
|
||||
if (nfields != tupdesc->natts)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DATATYPE_MISMATCH),
|
||||
errmsg("remote query result rowtype does not match "
|
||||
"the specified FROM clause rowtype")));
|
||||
tupdesc = CreateTemplateTupleDesc(1);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
|
||||
TEXTOID, -1, 0);
|
||||
ntuples = 1;
|
||||
nfields = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
|
||||
|
||||
if (ntuples > 0)
|
||||
is_sql_cmd = false;
|
||||
|
||||
/* get a tuple descriptor for our result type */
|
||||
switch (get_call_result_type(fcinfo, NULL, &tupdesc))
|
||||
{
|
||||
AttInMetadata *attinmeta;
|
||||
int nestlevel = -1;
|
||||
Tuplestorestate *tupstore;
|
||||
MemoryContext oldcontext;
|
||||
int row;
|
||||
char **values;
|
||||
|
||||
attinmeta = TupleDescGetAttInMetadata(tupdesc);
|
||||
|
||||
/* Set GUCs to ensure we read GUC-sensitive data types correctly */
|
||||
if (!is_sql_cmd)
|
||||
nestlevel = applyRemoteGucs(conn);
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
|
||||
tupstore = tuplestore_begin_heap(true, false, work_mem);
|
||||
rsinfo->setResult = tupstore;
|
||||
rsinfo->setDesc = tupdesc;
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
values = palloc_array(char *, nfields);
|
||||
|
||||
/* put all tuples into the tuplestore */
|
||||
for (row = 0; row < ntuples; row++)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
|
||||
if (!is_sql_cmd)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < nfields; i++)
|
||||
{
|
||||
if (PQgetisnull(res, row, i))
|
||||
values[i] = NULL;
|
||||
else
|
||||
values[i] = PQgetvalue(res, row, i);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
values[0] = PQcmdStatus(res);
|
||||
}
|
||||
|
||||
/* build the tuple and put it into the tuplestore. */
|
||||
tuple = BuildTupleFromCStrings(attinmeta, values);
|
||||
tuplestore_puttuple(tupstore, tuple);
|
||||
}
|
||||
|
||||
/* clean up GUC settings, if we changed any */
|
||||
restoreLocalGucs(nestlevel);
|
||||
case TYPEFUNC_COMPOSITE:
|
||||
/* success */
|
||||
break;
|
||||
case TYPEFUNC_RECORD:
|
||||
/* failed to determine actual type of RECORD */
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("function returning record called in context "
|
||||
"that cannot accept type record")));
|
||||
break;
|
||||
default:
|
||||
/* result type isn't composite */
|
||||
elog(ERROR, "return type must be a row type");
|
||||
break;
|
||||
}
|
||||
|
||||
PQclear(res);
|
||||
/* make sure we have a persistent copy of the tupdesc */
|
||||
tupdesc = CreateTupleDescCopy(tupdesc);
|
||||
ntuples = PQntuples(res);
|
||||
nfields = PQnfields(res);
|
||||
}
|
||||
|
||||
/*
|
||||
* check result and tuple descriptor have the same number of columns
|
||||
*/
|
||||
if (nfields != tupdesc->natts)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DATATYPE_MISMATCH),
|
||||
errmsg("remote query result rowtype does not match "
|
||||
"the specified FROM clause rowtype")));
|
||||
|
||||
if (ntuples > 0)
|
||||
{
|
||||
AttInMetadata *attinmeta;
|
||||
int nestlevel = -1;
|
||||
Tuplestorestate *tupstore;
|
||||
MemoryContext oldcontext;
|
||||
int row;
|
||||
char **values;
|
||||
|
||||
attinmeta = TupleDescGetAttInMetadata(tupdesc);
|
||||
|
||||
/* Set GUCs to ensure we read GUC-sensitive data types correctly */
|
||||
if (!is_sql_cmd)
|
||||
nestlevel = applyRemoteGucs(conn);
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
|
||||
tupstore = tuplestore_begin_heap(true, false, work_mem);
|
||||
rsinfo->setResult = tupstore;
|
||||
rsinfo->setDesc = tupdesc;
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
values = palloc_array(char *, nfields);
|
||||
|
||||
/* put all tuples into the tuplestore */
|
||||
for (row = 0; row < ntuples; row++)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
|
||||
if (!is_sql_cmd)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < nfields; i++)
|
||||
{
|
||||
if (PQgetisnull(res, row, i))
|
||||
values[i] = NULL;
|
||||
else
|
||||
values[i] = PQgetvalue(res, row, i);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
values[0] = PQcmdStatus(res);
|
||||
}
|
||||
|
||||
/* build the tuple and put it into the tuplestore. */
|
||||
tuple = BuildTupleFromCStrings(attinmeta, values);
|
||||
tuplestore_puttuple(tupstore, tuple);
|
||||
}
|
||||
|
||||
/* clean up GUC settings, if we changed any */
|
||||
restoreLocalGucs(nestlevel);
|
||||
}
|
||||
|
||||
PQclear(res);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -982,40 +982,40 @@ void
|
||||
pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
|
||||
const char *sql)
|
||||
{
|
||||
char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
|
||||
char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
|
||||
char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
|
||||
char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
|
||||
char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
|
||||
int sqlstate;
|
||||
char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
|
||||
char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
|
||||
char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
|
||||
char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
|
||||
char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
|
||||
int sqlstate;
|
||||
|
||||
if (diag_sqlstate)
|
||||
sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
|
||||
diag_sqlstate[1],
|
||||
diag_sqlstate[2],
|
||||
diag_sqlstate[3],
|
||||
diag_sqlstate[4]);
|
||||
else
|
||||
sqlstate = ERRCODE_CONNECTION_FAILURE;
|
||||
if (diag_sqlstate)
|
||||
sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
|
||||
diag_sqlstate[1],
|
||||
diag_sqlstate[2],
|
||||
diag_sqlstate[3],
|
||||
diag_sqlstate[4]);
|
||||
else
|
||||
sqlstate = ERRCODE_CONNECTION_FAILURE;
|
||||
|
||||
/*
|
||||
* If we don't get a message from the PGresult, try the PGconn. This
|
||||
* is needed because for connection-level failures, PQgetResult may
|
||||
* just return NULL, not a PGresult at all.
|
||||
*/
|
||||
if (message_primary == NULL)
|
||||
message_primary = pchomp(PQerrorMessage(conn));
|
||||
/*
|
||||
* If we don't get a message from the PGresult, try the PGconn. This is
|
||||
* needed because for connection-level failures, PQgetResult may just
|
||||
* return NULL, not a PGresult at all.
|
||||
*/
|
||||
if (message_primary == NULL)
|
||||
message_primary = pchomp(PQerrorMessage(conn));
|
||||
|
||||
ereport(elevel,
|
||||
(errcode(sqlstate),
|
||||
(message_primary != NULL && message_primary[0] != '\0') ?
|
||||
errmsg_internal("%s", message_primary) :
|
||||
errmsg("could not obtain message string for remote error"),
|
||||
message_detail ? errdetail_internal("%s", message_detail) : 0,
|
||||
message_hint ? errhint("%s", message_hint) : 0,
|
||||
message_context ? errcontext("%s", message_context) : 0,
|
||||
sql ? errcontext("remote SQL command: %s", sql) : 0));
|
||||
PQclear(res);
|
||||
ereport(elevel,
|
||||
(errcode(sqlstate),
|
||||
(message_primary != NULL && message_primary[0] != '\0') ?
|
||||
errmsg_internal("%s", message_primary) :
|
||||
errmsg("could not obtain message string for remote error"),
|
||||
message_detail ? errdetail_internal("%s", message_detail) : 0,
|
||||
message_hint ? errhint("%s", message_hint) : 0,
|
||||
message_context ? errcontext("%s", message_context) : 0,
|
||||
sql ? errcontext("remote SQL command: %s", sql) : 0));
|
||||
PQclear(res);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1617,83 +1617,83 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
|
||||
|
||||
*result = NULL;
|
||||
*timed_out = false;
|
||||
for (;;)
|
||||
for (;;)
|
||||
{
|
||||
PGresult *res;
|
||||
|
||||
while (PQisBusy(conn))
|
||||
{
|
||||
PGresult *res;
|
||||
int wc;
|
||||
TimestampTz now = GetCurrentTimestamp();
|
||||
long cur_timeout;
|
||||
|
||||
while (PQisBusy(conn))
|
||||
/* If timeout has expired, give up. */
|
||||
if (now >= endtime)
|
||||
{
|
||||
int wc;
|
||||
TimestampTz now = GetCurrentTimestamp();
|
||||
long cur_timeout;
|
||||
|
||||
/* If timeout has expired, give up. */
|
||||
if (now >= endtime)
|
||||
{
|
||||
*timed_out = true;
|
||||
failed = true;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
/* If we need to re-issue the cancel request, do that. */
|
||||
if (now >= retrycanceltime)
|
||||
{
|
||||
/* We ignore failure to issue the repeated request. */
|
||||
(void) libpqsrv_cancel(conn, endtime);
|
||||
|
||||
/* Recompute "now" in case that took measurable time. */
|
||||
now = GetCurrentTimestamp();
|
||||
|
||||
/* Adjust re-cancel timeout in increasing steps. */
|
||||
retrycanceltime = TimestampTzPlusMilliseconds(now,
|
||||
canceldelta);
|
||||
canceldelta += canceldelta;
|
||||
}
|
||||
|
||||
/* If timeout has expired, give up, else get sleep time. */
|
||||
cur_timeout = TimestampDifferenceMilliseconds(now,
|
||||
Min(endtime,
|
||||
retrycanceltime));
|
||||
if (cur_timeout <= 0)
|
||||
{
|
||||
*timed_out = true;
|
||||
failed = true;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
/* first time, allocate or get the custom wait event */
|
||||
if (pgfdw_we_cleanup_result == 0)
|
||||
pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitLatchOrSocket(MyLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
||||
PQsocket(conn),
|
||||
cur_timeout, pgfdw_we_cleanup_result);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket? */
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(conn))
|
||||
{
|
||||
/* connection trouble */
|
||||
failed = true;
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
*timed_out = true;
|
||||
failed = true;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
res = PQgetResult(conn);
|
||||
if (res == NULL)
|
||||
break; /* query is complete */
|
||||
/* If we need to re-issue the cancel request, do that. */
|
||||
if (now >= retrycanceltime)
|
||||
{
|
||||
/* We ignore failure to issue the repeated request. */
|
||||
(void) libpqsrv_cancel(conn, endtime);
|
||||
|
||||
PQclear(last_res);
|
||||
last_res = res;
|
||||
/* Recompute "now" in case that took measurable time. */
|
||||
now = GetCurrentTimestamp();
|
||||
|
||||
/* Adjust re-cancel timeout in increasing steps. */
|
||||
retrycanceltime = TimestampTzPlusMilliseconds(now,
|
||||
canceldelta);
|
||||
canceldelta += canceldelta;
|
||||
}
|
||||
|
||||
/* If timeout has expired, give up, else get sleep time. */
|
||||
cur_timeout = TimestampDifferenceMilliseconds(now,
|
||||
Min(endtime,
|
||||
retrycanceltime));
|
||||
if (cur_timeout <= 0)
|
||||
{
|
||||
*timed_out = true;
|
||||
failed = true;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
/* first time, allocate or get the custom wait event */
|
||||
if (pgfdw_we_cleanup_result == 0)
|
||||
pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitLatchOrSocket(MyLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
||||
PQsocket(conn),
|
||||
cur_timeout, pgfdw_we_cleanup_result);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket? */
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(conn))
|
||||
{
|
||||
/* connection trouble */
|
||||
failed = true;
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res = PQgetResult(conn);
|
||||
if (res == NULL)
|
||||
break; /* query is complete */
|
||||
|
||||
PQclear(last_res);
|
||||
last_res = res;
|
||||
}
|
||||
exit:
|
||||
if (failed)
|
||||
PQclear(last_res);
|
||||
|
@ -3605,31 +3605,31 @@ get_remote_estimate(const char *sql, PGconn *conn,
|
||||
Cost *startup_cost, Cost *total_cost)
|
||||
{
|
||||
PGresult *res;
|
||||
char *line;
|
||||
char *p;
|
||||
int n;
|
||||
char *line;
|
||||
char *p;
|
||||
int n;
|
||||
|
||||
/*
|
||||
* Execute EXPLAIN remotely.
|
||||
*/
|
||||
res = pgfdw_exec_query(conn, sql, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, sql);
|
||||
/*
|
||||
* Execute EXPLAIN remotely.
|
||||
*/
|
||||
res = pgfdw_exec_query(conn, sql, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, sql);
|
||||
|
||||
/*
|
||||
* Extract cost numbers for topmost plan node. Note we search for a
|
||||
* left paren from the end of the line to avoid being confused by
|
||||
* other uses of parentheses.
|
||||
*/
|
||||
line = PQgetvalue(res, 0, 0);
|
||||
p = strrchr(line, '(');
|
||||
if (p == NULL)
|
||||
elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
|
||||
n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
|
||||
startup_cost, total_cost, rows, width);
|
||||
if (n != 4)
|
||||
elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
|
||||
PQclear(res);
|
||||
/*
|
||||
* Extract cost numbers for topmost plan node. Note we search for a left
|
||||
* paren from the end of the line to avoid being confused by other uses of
|
||||
* parentheses.
|
||||
*/
|
||||
line = PQgetvalue(res, 0, 0);
|
||||
p = strrchr(line, '(');
|
||||
if (p == NULL)
|
||||
elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
|
||||
n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
|
||||
startup_cost, total_cost, rows, width);
|
||||
if (n != 4)
|
||||
elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
|
||||
PQclear(res);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -3812,63 +3812,63 @@ fetch_more_data(ForeignScanState *node)
|
||||
MemoryContextReset(fsstate->batch_cxt);
|
||||
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
|
||||
|
||||
if (fsstate->async_capable)
|
||||
{
|
||||
Assert(fsstate->conn_state->pendingAreq);
|
||||
if (fsstate->async_capable)
|
||||
{
|
||||
Assert(fsstate->conn_state->pendingAreq);
|
||||
|
||||
/*
|
||||
* The query was already sent by an earlier call to
|
||||
* fetch_more_data_begin. So now we just fetch the result.
|
||||
*/
|
||||
res = pgfdw_get_result(conn);
|
||||
/* On error, report the original query, not the FETCH. */
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, fsstate->query);
|
||||
/*
|
||||
* The query was already sent by an earlier call to
|
||||
* fetch_more_data_begin. So now we just fetch the result.
|
||||
*/
|
||||
res = pgfdw_get_result(conn);
|
||||
/* On error, report the original query, not the FETCH. */
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, fsstate->query);
|
||||
|
||||
/* Reset per-connection state */
|
||||
fsstate->conn_state->pendingAreq = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
char sql[64];
|
||||
/* Reset per-connection state */
|
||||
fsstate->conn_state->pendingAreq = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
char sql[64];
|
||||
|
||||
/* This is a regular synchronous fetch. */
|
||||
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
|
||||
fsstate->fetch_size, fsstate->cursor_number);
|
||||
/* This is a regular synchronous fetch. */
|
||||
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
|
||||
fsstate->fetch_size, fsstate->cursor_number);
|
||||
|
||||
res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
|
||||
/* On error, report the original query, not the FETCH. */
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, fsstate->query);
|
||||
}
|
||||
res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
|
||||
/* On error, report the original query, not the FETCH. */
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, fsstate->query);
|
||||
}
|
||||
|
||||
/* Convert the data into HeapTuples */
|
||||
numrows = PQntuples(res);
|
||||
fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
|
||||
fsstate->num_tuples = numrows;
|
||||
fsstate->next_tuple = 0;
|
||||
/* Convert the data into HeapTuples */
|
||||
numrows = PQntuples(res);
|
||||
fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
|
||||
fsstate->num_tuples = numrows;
|
||||
fsstate->next_tuple = 0;
|
||||
|
||||
for (i = 0; i < numrows; i++)
|
||||
{
|
||||
Assert(IsA(node->ss.ps.plan, ForeignScan));
|
||||
for (i = 0; i < numrows; i++)
|
||||
{
|
||||
Assert(IsA(node->ss.ps.plan, ForeignScan));
|
||||
|
||||
fsstate->tuples[i] =
|
||||
make_tuple_from_result_row(res, i,
|
||||
fsstate->rel,
|
||||
fsstate->attinmeta,
|
||||
fsstate->retrieved_attrs,
|
||||
node,
|
||||
fsstate->temp_cxt);
|
||||
}
|
||||
fsstate->tuples[i] =
|
||||
make_tuple_from_result_row(res, i,
|
||||
fsstate->rel,
|
||||
fsstate->attinmeta,
|
||||
fsstate->retrieved_attrs,
|
||||
node,
|
||||
fsstate->temp_cxt);
|
||||
}
|
||||
|
||||
/* Update fetch_ct_2 */
|
||||
if (fsstate->fetch_ct_2 < 2)
|
||||
fsstate->fetch_ct_2++;
|
||||
/* Update fetch_ct_2 */
|
||||
if (fsstate->fetch_ct_2 < 2)
|
||||
fsstate->fetch_ct_2++;
|
||||
|
||||
/* Must be EOF if we didn't get as many tuples as we asked for. */
|
||||
fsstate->eof_reached = (numrows < fsstate->fetch_size);
|
||||
/* Must be EOF if we didn't get as many tuples as we asked for. */
|
||||
fsstate->eof_reached = (numrows < fsstate->fetch_size);
|
||||
|
||||
PQclear(res);
|
||||
PQclear(res);
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
@ -4322,20 +4322,20 @@ static void
|
||||
store_returning_result(PgFdwModifyState *fmstate,
|
||||
TupleTableSlot *slot, PGresult *res)
|
||||
{
|
||||
HeapTuple newtup;
|
||||
HeapTuple newtup;
|
||||
|
||||
newtup = make_tuple_from_result_row(res, 0,
|
||||
fmstate->rel,
|
||||
fmstate->attinmeta,
|
||||
fmstate->retrieved_attrs,
|
||||
NULL,
|
||||
fmstate->temp_cxt);
|
||||
newtup = make_tuple_from_result_row(res, 0,
|
||||
fmstate->rel,
|
||||
fmstate->attinmeta,
|
||||
fmstate->retrieved_attrs,
|
||||
NULL,
|
||||
fmstate->temp_cxt);
|
||||
|
||||
/*
|
||||
* The returning slot will not necessarily be suitable to store
|
||||
* heaptuples directly, so allow for conversion.
|
||||
*/
|
||||
ExecForceStoreHeapTuple(newtup, slot, true);
|
||||
/*
|
||||
* The returning slot will not necessarily be suitable to store heaptuples
|
||||
* directly, so allow for conversion.
|
||||
*/
|
||||
ExecForceStoreHeapTuple(newtup, slot, true);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -4921,14 +4921,14 @@ postgresAnalyzeForeignTable(Relation relation,
|
||||
initStringInfo(&sql);
|
||||
deparseAnalyzeSizeSql(&sql, relation);
|
||||
|
||||
res = pgfdw_exec_query(conn, sql.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, sql.data);
|
||||
res = pgfdw_exec_query(conn, sql.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, sql.data);
|
||||
|
||||
if (PQntuples(res) != 1 || PQnfields(res) != 1)
|
||||
elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
|
||||
*totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
|
||||
PQclear(res);
|
||||
if (PQntuples(res) != 1 || PQnfields(res) != 1)
|
||||
elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query");
|
||||
*totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
|
||||
PQclear(res);
|
||||
|
||||
ReleaseConnection(conn);
|
||||
|
||||
@ -4970,15 +4970,15 @@ postgresGetAnalyzeInfoForForeignTable(Relation relation, bool *can_tablesample)
|
||||
initStringInfo(&sql);
|
||||
deparseAnalyzeInfoSql(&sql, relation);
|
||||
|
||||
res = pgfdw_exec_query(conn, sql.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, sql.data);
|
||||
res = pgfdw_exec_query(conn, sql.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, sql.data);
|
||||
|
||||
if (PQntuples(res) != 1 || PQnfields(res) != 2)
|
||||
elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query");
|
||||
reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
|
||||
relkind = *(PQgetvalue(res, 0, 1));
|
||||
PQclear(res);
|
||||
if (PQntuples(res) != 1 || PQnfields(res) != 2)
|
||||
elog(ERROR, "unexpected result from deparseAnalyzeInfoSql query");
|
||||
reltuples = strtod(PQgetvalue(res, 0, 0), NULL);
|
||||
relkind = *(PQgetvalue(res, 0, 1));
|
||||
PQclear(res);
|
||||
|
||||
ReleaseConnection(conn);
|
||||
|
||||
@ -5200,76 +5200,76 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
|
||||
|
||||
deparseAnalyzeSql(&sql, relation, method, sample_frac, &astate.retrieved_attrs);
|
||||
|
||||
res = pgfdw_exec_query(conn, sql.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, sql.data);
|
||||
PQclear(res);
|
||||
res = pgfdw_exec_query(conn, sql.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_COMMAND_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, sql.data);
|
||||
PQclear(res);
|
||||
|
||||
/*
|
||||
* Determine the fetch size. The default is arbitrary, but shouldn't be
|
||||
* enormous.
|
||||
*/
|
||||
fetch_size = 100;
|
||||
foreach(lc, server->options)
|
||||
{
|
||||
DefElem *def = (DefElem *) lfirst(lc);
|
||||
|
||||
if (strcmp(def->defname, "fetch_size") == 0)
|
||||
{
|
||||
(void) parse_int(defGetString(def), &fetch_size, 0, NULL);
|
||||
break;
|
||||
}
|
||||
}
|
||||
foreach(lc, table->options)
|
||||
{
|
||||
DefElem *def = (DefElem *) lfirst(lc);
|
||||
|
||||
if (strcmp(def->defname, "fetch_size") == 0)
|
||||
{
|
||||
(void) parse_int(defGetString(def), &fetch_size, 0, NULL);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* Construct command to fetch rows from remote. */
|
||||
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
|
||||
fetch_size, cursor_number);
|
||||
|
||||
/* Retrieve and process rows a batch at a time. */
|
||||
for (;;)
|
||||
{
|
||||
int numrows;
|
||||
int i;
|
||||
|
||||
/* Allow users to cancel long query */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/*
|
||||
* Determine the fetch size. The default is arbitrary, but shouldn't
|
||||
* be enormous.
|
||||
* XXX possible future improvement: if rowstoskip is large, we could
|
||||
* issue a MOVE rather than physically fetching the rows, then just
|
||||
* adjust rowstoskip and samplerows appropriately.
|
||||
*/
|
||||
fetch_size = 100;
|
||||
foreach(lc, server->options)
|
||||
{
|
||||
DefElem *def = (DefElem *) lfirst(lc);
|
||||
|
||||
if (strcmp(def->defname, "fetch_size") == 0)
|
||||
{
|
||||
(void) parse_int(defGetString(def), &fetch_size, 0, NULL);
|
||||
break;
|
||||
}
|
||||
}
|
||||
foreach(lc, table->options)
|
||||
{
|
||||
DefElem *def = (DefElem *) lfirst(lc);
|
||||
/* Fetch some rows */
|
||||
res = pgfdw_exec_query(conn, fetch_sql, NULL);
|
||||
/* On error, report the original query, not the FETCH. */
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, sql.data);
|
||||
|
||||
if (strcmp(def->defname, "fetch_size") == 0)
|
||||
{
|
||||
(void) parse_int(defGetString(def), &fetch_size, 0, NULL);
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* Process whatever we got. */
|
||||
numrows = PQntuples(res);
|
||||
for (i = 0; i < numrows; i++)
|
||||
analyze_row_processor(res, i, &astate);
|
||||
|
||||
/* Construct command to fetch rows from remote. */
|
||||
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
|
||||
fetch_size, cursor_number);
|
||||
PQclear(res);
|
||||
|
||||
/* Retrieve and process rows a batch at a time. */
|
||||
for (;;)
|
||||
{
|
||||
int numrows;
|
||||
int i;
|
||||
/* Must be EOF if we didn't get all the rows requested. */
|
||||
if (numrows < fetch_size)
|
||||
break;
|
||||
}
|
||||
|
||||
/* Allow users to cancel long query */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/*
|
||||
* XXX possible future improvement: if rowstoskip is large, we
|
||||
* could issue a MOVE rather than physically fetching the rows,
|
||||
* then just adjust rowstoskip and samplerows appropriately.
|
||||
*/
|
||||
|
||||
/* Fetch some rows */
|
||||
res = pgfdw_exec_query(conn, fetch_sql, NULL);
|
||||
/* On error, report the original query, not the FETCH. */
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, sql.data);
|
||||
|
||||
/* Process whatever we got. */
|
||||
numrows = PQntuples(res);
|
||||
for (i = 0; i < numrows; i++)
|
||||
analyze_row_processor(res, i, &astate);
|
||||
|
||||
PQclear(res);
|
||||
|
||||
/* Must be EOF if we didn't get all the rows requested. */
|
||||
if (numrows < fetch_size)
|
||||
break;
|
||||
}
|
||||
|
||||
/* Close the cursor, just to be tidy. */
|
||||
close_cursor(conn, cursor_number, NULL);
|
||||
/* Close the cursor, just to be tidy. */
|
||||
close_cursor(conn, cursor_number, NULL);
|
||||
|
||||
ReleaseConnection(conn);
|
||||
|
||||
@ -5420,234 +5420,231 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
|
||||
/* Create workspace for strings */
|
||||
initStringInfo(&buf);
|
||||
|
||||
/* Check that the schema really exists */
|
||||
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
|
||||
deparseStringLiteral(&buf, stmt->remote_schema);
|
||||
/* Check that the schema really exists */
|
||||
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
|
||||
deparseStringLiteral(&buf, stmt->remote_schema);
|
||||
|
||||
res = pgfdw_exec_query(conn, buf.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, buf.data);
|
||||
res = pgfdw_exec_query(conn, buf.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, buf.data);
|
||||
|
||||
if (PQntuples(res) != 1)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
|
||||
errmsg("schema \"%s\" is not present on foreign server \"%s\"",
|
||||
stmt->remote_schema, server->servername)));
|
||||
if (PQntuples(res) != 1)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
|
||||
errmsg("schema \"%s\" is not present on foreign server \"%s\"",
|
||||
stmt->remote_schema, server->servername)));
|
||||
|
||||
PQclear(res);
|
||||
resetStringInfo(&buf);
|
||||
PQclear(res);
|
||||
resetStringInfo(&buf);
|
||||
|
||||
/*
|
||||
* Fetch all table data from this schema, possibly restricted by
|
||||
* EXCEPT or LIMIT TO. (We don't actually need to pay any attention
|
||||
* to EXCEPT/LIMIT TO here, because the core code will filter the
|
||||
* statements we return according to those lists anyway. But it
|
||||
* should save a few cycles to not process excluded tables in the
|
||||
* first place.)
|
||||
*
|
||||
* Import table data for partitions only when they are explicitly
|
||||
* specified in LIMIT TO clause. Otherwise ignore them and only
|
||||
* include the definitions of the root partitioned tables to allow
|
||||
* access to the complete remote data set locally in the schema
|
||||
* imported.
|
||||
*
|
||||
* Note: because we run the connection with search_path restricted to
|
||||
* pg_catalog, the format_type() and pg_get_expr() outputs will always
|
||||
* include a schema name for types/functions in other schemas, which
|
||||
* is what we want.
|
||||
*/
|
||||
/*
|
||||
* Fetch all table data from this schema, possibly restricted by EXCEPT or
|
||||
* LIMIT TO. (We don't actually need to pay any attention to EXCEPT/LIMIT
|
||||
* TO here, because the core code will filter the statements we return
|
||||
* according to those lists anyway. But it should save a few cycles to
|
||||
* not process excluded tables in the first place.)
|
||||
*
|
||||
* Import table data for partitions only when they are explicitly
|
||||
* specified in LIMIT TO clause. Otherwise ignore them and only include
|
||||
* the definitions of the root partitioned tables to allow access to the
|
||||
* complete remote data set locally in the schema imported.
|
||||
*
|
||||
* Note: because we run the connection with search_path restricted to
|
||||
* pg_catalog, the format_type() and pg_get_expr() outputs will always
|
||||
* include a schema name for types/functions in other schemas, which is
|
||||
* what we want.
|
||||
*/
|
||||
appendStringInfoString(&buf,
|
||||
"SELECT relname, "
|
||||
" attname, "
|
||||
" format_type(atttypid, atttypmod), "
|
||||
" attnotnull, "
|
||||
" pg_get_expr(adbin, adrelid), ");
|
||||
|
||||
/* Generated columns are supported since Postgres 12 */
|
||||
if (PQserverVersion(conn) >= 120000)
|
||||
appendStringInfoString(&buf,
|
||||
"SELECT relname, "
|
||||
" attname, "
|
||||
" format_type(atttypid, atttypmod), "
|
||||
" attnotnull, "
|
||||
" pg_get_expr(adbin, adrelid), ");
|
||||
|
||||
/* Generated columns are supported since Postgres 12 */
|
||||
if (PQserverVersion(conn) >= 120000)
|
||||
appendStringInfoString(&buf,
|
||||
" attgenerated, ");
|
||||
else
|
||||
appendStringInfoString(&buf,
|
||||
" NULL, ");
|
||||
|
||||
if (import_collate)
|
||||
appendStringInfoString(&buf,
|
||||
" collname, "
|
||||
" collnsp.nspname ");
|
||||
else
|
||||
appendStringInfoString(&buf,
|
||||
" NULL, NULL ");
|
||||
|
||||
" attgenerated, ");
|
||||
else
|
||||
appendStringInfoString(&buf,
|
||||
"FROM pg_class c "
|
||||
" JOIN pg_namespace n ON "
|
||||
" relnamespace = n.oid "
|
||||
" LEFT JOIN pg_attribute a ON "
|
||||
" attrelid = c.oid AND attnum > 0 "
|
||||
" AND NOT attisdropped "
|
||||
" LEFT JOIN pg_attrdef ad ON "
|
||||
" adrelid = c.oid AND adnum = attnum ");
|
||||
|
||||
if (import_collate)
|
||||
appendStringInfoString(&buf,
|
||||
" LEFT JOIN pg_collation coll ON "
|
||||
" coll.oid = attcollation "
|
||||
" LEFT JOIN pg_namespace collnsp ON "
|
||||
" collnsp.oid = collnamespace ");
|
||||
" NULL, ");
|
||||
|
||||
if (import_collate)
|
||||
appendStringInfoString(&buf,
|
||||
"WHERE c.relkind IN ("
|
||||
CppAsString2(RELKIND_RELATION) ","
|
||||
CppAsString2(RELKIND_VIEW) ","
|
||||
CppAsString2(RELKIND_FOREIGN_TABLE) ","
|
||||
CppAsString2(RELKIND_MATVIEW) ","
|
||||
CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
|
||||
" AND n.nspname = ");
|
||||
deparseStringLiteral(&buf, stmt->remote_schema);
|
||||
" collname, "
|
||||
" collnsp.nspname ");
|
||||
else
|
||||
appendStringInfoString(&buf,
|
||||
" NULL, NULL ");
|
||||
|
||||
/* Partitions are supported since Postgres 10 */
|
||||
if (PQserverVersion(conn) >= 100000 &&
|
||||
stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO)
|
||||
appendStringInfoString(&buf, " AND NOT c.relispartition ");
|
||||
appendStringInfoString(&buf,
|
||||
"FROM pg_class c "
|
||||
" JOIN pg_namespace n ON "
|
||||
" relnamespace = n.oid "
|
||||
" LEFT JOIN pg_attribute a ON "
|
||||
" attrelid = c.oid AND attnum > 0 "
|
||||
" AND NOT attisdropped "
|
||||
" LEFT JOIN pg_attrdef ad ON "
|
||||
" adrelid = c.oid AND adnum = attnum ");
|
||||
|
||||
/* Apply restrictions for LIMIT TO and EXCEPT */
|
||||
if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
|
||||
stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
|
||||
if (import_collate)
|
||||
appendStringInfoString(&buf,
|
||||
" LEFT JOIN pg_collation coll ON "
|
||||
" coll.oid = attcollation "
|
||||
" LEFT JOIN pg_namespace collnsp ON "
|
||||
" collnsp.oid = collnamespace ");
|
||||
|
||||
appendStringInfoString(&buf,
|
||||
"WHERE c.relkind IN ("
|
||||
CppAsString2(RELKIND_RELATION) ","
|
||||
CppAsString2(RELKIND_VIEW) ","
|
||||
CppAsString2(RELKIND_FOREIGN_TABLE) ","
|
||||
CppAsString2(RELKIND_MATVIEW) ","
|
||||
CppAsString2(RELKIND_PARTITIONED_TABLE) ") "
|
||||
" AND n.nspname = ");
|
||||
deparseStringLiteral(&buf, stmt->remote_schema);
|
||||
|
||||
/* Partitions are supported since Postgres 10 */
|
||||
if (PQserverVersion(conn) >= 100000 &&
|
||||
stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO)
|
||||
appendStringInfoString(&buf, " AND NOT c.relispartition ");
|
||||
|
||||
/* Apply restrictions for LIMIT TO and EXCEPT */
|
||||
if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
|
||||
stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
|
||||
{
|
||||
bool first_item = true;
|
||||
|
||||
appendStringInfoString(&buf, " AND c.relname ");
|
||||
if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
|
||||
appendStringInfoString(&buf, "NOT ");
|
||||
appendStringInfoString(&buf, "IN (");
|
||||
|
||||
/* Append list of table names within IN clause */
|
||||
foreach(lc, stmt->table_list)
|
||||
{
|
||||
bool first_item = true;
|
||||
RangeVar *rv = (RangeVar *) lfirst(lc);
|
||||
|
||||
appendStringInfoString(&buf, " AND c.relname ");
|
||||
if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
|
||||
appendStringInfoString(&buf, "NOT ");
|
||||
appendStringInfoString(&buf, "IN (");
|
||||
|
||||
/* Append list of table names within IN clause */
|
||||
foreach(lc, stmt->table_list)
|
||||
{
|
||||
RangeVar *rv = (RangeVar *) lfirst(lc);
|
||||
|
||||
if (first_item)
|
||||
first_item = false;
|
||||
else
|
||||
appendStringInfoString(&buf, ", ");
|
||||
deparseStringLiteral(&buf, rv->relname);
|
||||
}
|
||||
appendStringInfoChar(&buf, ')');
|
||||
if (first_item)
|
||||
first_item = false;
|
||||
else
|
||||
appendStringInfoString(&buf, ", ");
|
||||
deparseStringLiteral(&buf, rv->relname);
|
||||
}
|
||||
appendStringInfoChar(&buf, ')');
|
||||
}
|
||||
|
||||
/* Append ORDER BY at the end of query to ensure output ordering */
|
||||
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
|
||||
/* Append ORDER BY at the end of query to ensure output ordering */
|
||||
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
|
||||
|
||||
/* Fetch the data */
|
||||
res = pgfdw_exec_query(conn, buf.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, buf.data);
|
||||
/* Fetch the data */
|
||||
res = pgfdw_exec_query(conn, buf.data, NULL);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK)
|
||||
pgfdw_report_error(ERROR, res, conn, buf.data);
|
||||
|
||||
/* Process results */
|
||||
numrows = PQntuples(res);
|
||||
/* note: incrementation of i happens in inner loop's while() test */
|
||||
for (i = 0; i < numrows;)
|
||||
/* Process results */
|
||||
numrows = PQntuples(res);
|
||||
/* note: incrementation of i happens in inner loop's while() test */
|
||||
for (i = 0; i < numrows;)
|
||||
{
|
||||
char *tablename = PQgetvalue(res, i, 0);
|
||||
bool first_item = true;
|
||||
|
||||
resetStringInfo(&buf);
|
||||
appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
|
||||
quote_identifier(tablename));
|
||||
|
||||
/* Scan all rows for this table */
|
||||
do
|
||||
{
|
||||
char *tablename = PQgetvalue(res, i, 0);
|
||||
bool first_item = true;
|
||||
char *attname;
|
||||
char *typename;
|
||||
char *attnotnull;
|
||||
char *attgenerated;
|
||||
char *attdefault;
|
||||
char *collname;
|
||||
char *collnamespace;
|
||||
|
||||
resetStringInfo(&buf);
|
||||
appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
|
||||
quote_identifier(tablename));
|
||||
/* If table has no columns, we'll see nulls here */
|
||||
if (PQgetisnull(res, i, 1))
|
||||
continue;
|
||||
|
||||
/* Scan all rows for this table */
|
||||
do
|
||||
{
|
||||
char *attname;
|
||||
char *typename;
|
||||
char *attnotnull;
|
||||
char *attgenerated;
|
||||
char *attdefault;
|
||||
char *collname;
|
||||
char *collnamespace;
|
||||
attname = PQgetvalue(res, i, 1);
|
||||
typename = PQgetvalue(res, i, 2);
|
||||
attnotnull = PQgetvalue(res, i, 3);
|
||||
attdefault = PQgetisnull(res, i, 4) ? NULL :
|
||||
PQgetvalue(res, i, 4);
|
||||
attgenerated = PQgetisnull(res, i, 5) ? NULL :
|
||||
PQgetvalue(res, i, 5);
|
||||
collname = PQgetisnull(res, i, 6) ? NULL :
|
||||
PQgetvalue(res, i, 6);
|
||||
collnamespace = PQgetisnull(res, i, 7) ? NULL :
|
||||
PQgetvalue(res, i, 7);
|
||||
|
||||
/* If table has no columns, we'll see nulls here */
|
||||
if (PQgetisnull(res, i, 1))
|
||||
continue;
|
||||
if (first_item)
|
||||
first_item = false;
|
||||
else
|
||||
appendStringInfoString(&buf, ",\n");
|
||||
|
||||
attname = PQgetvalue(res, i, 1);
|
||||
typename = PQgetvalue(res, i, 2);
|
||||
attnotnull = PQgetvalue(res, i, 3);
|
||||
attdefault = PQgetisnull(res, i, 4) ? NULL :
|
||||
PQgetvalue(res, i, 4);
|
||||
attgenerated = PQgetisnull(res, i, 5) ? NULL :
|
||||
PQgetvalue(res, i, 5);
|
||||
collname = PQgetisnull(res, i, 6) ? NULL :
|
||||
PQgetvalue(res, i, 6);
|
||||
collnamespace = PQgetisnull(res, i, 7) ? NULL :
|
||||
PQgetvalue(res, i, 7);
|
||||
|
||||
if (first_item)
|
||||
first_item = false;
|
||||
else
|
||||
appendStringInfoString(&buf, ",\n");
|
||||
|
||||
/* Print column name and type */
|
||||
appendStringInfo(&buf, " %s %s",
|
||||
quote_identifier(attname),
|
||||
typename);
|
||||
|
||||
/*
|
||||
* Add column_name option so that renaming the foreign table's
|
||||
* column doesn't break the association to the underlying
|
||||
* column.
|
||||
*/
|
||||
appendStringInfoString(&buf, " OPTIONS (column_name ");
|
||||
deparseStringLiteral(&buf, attname);
|
||||
appendStringInfoChar(&buf, ')');
|
||||
|
||||
/* Add COLLATE if needed */
|
||||
if (import_collate && collname != NULL && collnamespace != NULL)
|
||||
appendStringInfo(&buf, " COLLATE %s.%s",
|
||||
quote_identifier(collnamespace),
|
||||
quote_identifier(collname));
|
||||
|
||||
/* Add DEFAULT if needed */
|
||||
if (import_default && attdefault != NULL &&
|
||||
(!attgenerated || !attgenerated[0]))
|
||||
appendStringInfo(&buf, " DEFAULT %s", attdefault);
|
||||
|
||||
/* Add GENERATED if needed */
|
||||
if (import_generated && attgenerated != NULL &&
|
||||
attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
|
||||
{
|
||||
Assert(attdefault != NULL);
|
||||
appendStringInfo(&buf,
|
||||
" GENERATED ALWAYS AS (%s) STORED",
|
||||
attdefault);
|
||||
}
|
||||
|
||||
/* Add NOT NULL if needed */
|
||||
if (import_not_null && attnotnull[0] == 't')
|
||||
appendStringInfoString(&buf, " NOT NULL");
|
||||
}
|
||||
while (++i < numrows &&
|
||||
strcmp(PQgetvalue(res, i, 0), tablename) == 0);
|
||||
/* Print column name and type */
|
||||
appendStringInfo(&buf, " %s %s",
|
||||
quote_identifier(attname),
|
||||
typename);
|
||||
|
||||
/*
|
||||
* Add server name and table-level options. We specify remote
|
||||
* schema and table name as options (the latter to ensure that
|
||||
* renaming the foreign table doesn't break the association).
|
||||
* Add column_name option so that renaming the foreign table's
|
||||
* column doesn't break the association to the underlying column.
|
||||
*/
|
||||
appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
|
||||
quote_identifier(server->servername));
|
||||
appendStringInfoString(&buf, " OPTIONS (column_name ");
|
||||
deparseStringLiteral(&buf, attname);
|
||||
appendStringInfoChar(&buf, ')');
|
||||
|
||||
appendStringInfoString(&buf, "schema_name ");
|
||||
deparseStringLiteral(&buf, stmt->remote_schema);
|
||||
appendStringInfoString(&buf, ", table_name ");
|
||||
deparseStringLiteral(&buf, tablename);
|
||||
/* Add COLLATE if needed */
|
||||
if (import_collate && collname != NULL && collnamespace != NULL)
|
||||
appendStringInfo(&buf, " COLLATE %s.%s",
|
||||
quote_identifier(collnamespace),
|
||||
quote_identifier(collname));
|
||||
|
||||
appendStringInfoString(&buf, ");");
|
||||
/* Add DEFAULT if needed */
|
||||
if (import_default && attdefault != NULL &&
|
||||
(!attgenerated || !attgenerated[0]))
|
||||
appendStringInfo(&buf, " DEFAULT %s", attdefault);
|
||||
|
||||
commands = lappend(commands, pstrdup(buf.data));
|
||||
/* Add GENERATED if needed */
|
||||
if (import_generated && attgenerated != NULL &&
|
||||
attgenerated[0] == ATTRIBUTE_GENERATED_STORED)
|
||||
{
|
||||
Assert(attdefault != NULL);
|
||||
appendStringInfo(&buf,
|
||||
" GENERATED ALWAYS AS (%s) STORED",
|
||||
attdefault);
|
||||
}
|
||||
|
||||
/* Add NOT NULL if needed */
|
||||
if (import_not_null && attnotnull[0] == 't')
|
||||
appendStringInfoString(&buf, " NOT NULL");
|
||||
}
|
||||
PQclear(res);
|
||||
while (++i < numrows &&
|
||||
strcmp(PQgetvalue(res, i, 0), tablename) == 0);
|
||||
|
||||
/*
|
||||
* Add server name and table-level options. We specify remote schema
|
||||
* and table name as options (the latter to ensure that renaming the
|
||||
* foreign table doesn't break the association).
|
||||
*/
|
||||
appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (",
|
||||
quote_identifier(server->servername));
|
||||
|
||||
appendStringInfoString(&buf, "schema_name ");
|
||||
deparseStringLiteral(&buf, stmt->remote_schema);
|
||||
appendStringInfoString(&buf, ", table_name ");
|
||||
deparseStringLiteral(&buf, tablename);
|
||||
|
||||
appendStringInfoString(&buf, ");");
|
||||
|
||||
commands = lappend(commands, pstrdup(buf.data));
|
||||
}
|
||||
PQclear(res);
|
||||
|
||||
ReleaseConnection(conn);
|
||||
|
||||
|
@ -281,28 +281,28 @@ libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
|
||||
{
|
||||
PGresult *lastResult = NULL;
|
||||
|
||||
for (;;)
|
||||
{
|
||||
/* Wait for, and collect, the next PGresult. */
|
||||
PGresult *result;
|
||||
for (;;)
|
||||
{
|
||||
/* Wait for, and collect, the next PGresult. */
|
||||
PGresult *result;
|
||||
|
||||
result = libpqsrv_get_result(conn, wait_event_info);
|
||||
if (result == NULL)
|
||||
break; /* query is complete, or failure */
|
||||
result = libpqsrv_get_result(conn, wait_event_info);
|
||||
if (result == NULL)
|
||||
break; /* query is complete, or failure */
|
||||
|
||||
/*
|
||||
* Emulate PQexec()'s behavior of returning the last result when
|
||||
* there are many.
|
||||
*/
|
||||
PQclear(lastResult);
|
||||
lastResult = result;
|
||||
/*
|
||||
* Emulate PQexec()'s behavior of returning the last result when there
|
||||
* are many.
|
||||
*/
|
||||
PQclear(lastResult);
|
||||
lastResult = result;
|
||||
|
||||
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
|
||||
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
|
||||
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
|
||||
PQstatus(conn) == CONNECTION_BAD)
|
||||
break;
|
||||
}
|
||||
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
|
||||
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
|
||||
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
|
||||
PQstatus(conn) == CONNECTION_BAD)
|
||||
break;
|
||||
}
|
||||
return lastResult;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user