1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-28 23:42:10 +03:00

Added async query capability. Original patch by

Kai Londenberg, modified by Joe Conway
This commit is contained in:
Joe Conway
2006-09-02 21:11:15 +00:00
parent 1cc9299a7a
commit 52a3ed9fac
8 changed files with 688 additions and 168 deletions

View File

@ -8,7 +8,7 @@
* Darko Prenosil <Darko.Prenosil@finteh.hr>
* Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
*
* $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.57 2006/07/11 16:35:31 momjian Exp $
* $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.58 2006/09/02 21:11:15 joe Exp $
* Copyright (c) 2001-2006, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
@ -73,6 +73,7 @@ typedef struct remoteConn
/*
* Internal declarations
*/
static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn * rconn);
@ -690,6 +691,26 @@ dblink_fetch(PG_FUNCTION_ARGS)
PG_FUNCTION_INFO_V1(dblink_record);
Datum
dblink_record(PG_FUNCTION_ARGS)
{
return dblink_record_internal(fcinfo, false, false);
}
PG_FUNCTION_INFO_V1(dblink_send_query);
Datum
dblink_send_query(PG_FUNCTION_ARGS)
{
return dblink_record_internal(fcinfo, true, false);
}
PG_FUNCTION_INFO_V1(dblink_get_result);
Datum
dblink_get_result(PG_FUNCTION_ARGS)
{
return dblink_record_internal(fcinfo, true, true);
}
static Datum
dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
{
FuncCallContext *funcctx;
TupleDesc tupdesc = NULL;
@ -723,128 +744,187 @@ dblink_record(PG_FUNCTION_ARGS)
*/
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
if (PG_NARGS() == 3)
if (!is_async)
{
/* text,text,bool */
DBLINK_GET_CONN;
sql = GET_STR(PG_GETARG_TEXT_P(1));
fail = PG_GETARG_BOOL(2);
}
else if (PG_NARGS() == 2)
{
/* text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
if (PG_NARGS() == 3)
{
/* text,text,bool */
DBLINK_GET_CONN;
sql = GET_STR(PG_GETARG_TEXT_P(1));
fail = PG_GETARG_BOOL(2);
}
else if (PG_NARGS() == 2)
{
/* text,text or text,bool */
if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
}
else
{
DBLINK_GET_CONN;
sql = GET_STR(PG_GETARG_TEXT_P(1));
}
}
else if (PG_NARGS() == 1)
{
/* text */
conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
fail = PG_GETARG_BOOL(1);
}
else
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
}
else if (is_async && do_get)
{
/* get async result */
if (PG_NARGS() == 2)
{
/* text,bool */
DBLINK_GET_CONN;
fail = PG_GETARG_BOOL(2);
}
else if (PG_NARGS() == 1)
{
/* text */
DBLINK_GET_CONN;
}
else
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
}
else
{
/* send async query */
if (PG_NARGS() == 2)
{
DBLINK_GET_CONN;
sql = GET_STR(PG_GETARG_TEXT_P(1));
}
else
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
}
else if (PG_NARGS() == 1)
{
/* text */
conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
}
else
/* shouldn't happen */
elog(ERROR, "wrong number of arguments");
if (!conn)
DBLINK_CONN_NOT_AVAIL;
res = PQexec(conn, sql);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
if (!is_async || (is_async && do_get))
{
if (fail)
DBLINK_RES_ERROR("sql error");
/* synchronous query, or async result retrieval */
if (!is_async)
res = PQexec(conn, sql);
else
{
DBLINK_RES_ERROR_AS_NOTICE("sql error");
if (freeconn)
PQfinish(conn);
res = PQgetResult(conn);
/* NULL means we're all done with the async results */
if (!res)
SRF_RETURN_DONE(funcctx);
}
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
{
if (fail)
DBLINK_RES_ERROR("sql error");
else
{
DBLINK_RES_ERROR_AS_NOTICE("sql error");
if (freeconn)
PQfinish(conn);
SRF_RETURN_DONE(funcctx);
}
}
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
is_sql_cmd = true;
/* need a tuple descriptor representing one TEXT column */
tupdesc = CreateTemplateTupleDesc(1, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
TEXTOID, -1, 0);
/*
* and save a copy of the command status string to return as our
* result tuple
*/
sql_cmd_status = PQcmdStatus(res);
funcctx->max_calls = 1;
}
else
funcctx->max_calls = PQntuples(res);
/* got results, keep track of them */
funcctx->user_fctx = res;
/* if needed, close the connection to the database and cleanup */
if (freeconn)
PQfinish(conn);
if (!is_sql_cmd)
{
/* get a tuple descriptor for our result type */
switch (get_call_result_type(fcinfo, NULL, &tupdesc))
{
case TYPEFUNC_COMPOSITE:
/* success */
break;
case TYPEFUNC_RECORD:
/* failed to determine actual type of RECORD */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
break;
default:
/* result type isn't composite */
elog(ERROR, "return type must be a row type");
break;
}
/* make sure we have a persistent copy of the tupdesc */
tupdesc = CreateTupleDescCopy(tupdesc);
}
/* check result and tuple descriptor have the same number of columns */
if (PQnfields(res) != tupdesc->natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("remote query result rowtype does not match "
"the specified FROM clause rowtype")));
/* fast track when no results */
if (funcctx->max_calls < 1)
{
if (res)
PQclear(res);
SRF_RETURN_DONE(funcctx);
}
}
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
is_sql_cmd = true;
/* need a tuple descriptor representing one TEXT column */
tupdesc = CreateTemplateTupleDesc(1, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
TEXTOID, -1, 0);
/*
* and save a copy of the command status string to return as our
* result tuple
*/
sql_cmd_status = PQcmdStatus(res);
funcctx->max_calls = 1;
/* store needed metadata for subsequent calls */
attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;
MemoryContextSwitchTo(oldcontext);
}
else
funcctx->max_calls = PQntuples(res);
/* got results, keep track of them */
funcctx->user_fctx = res;
/* if needed, close the connection to the database and cleanup */
if (freeconn)
PQfinish(conn);
if (!is_sql_cmd)
{
/* get a tuple descriptor for our result type */
switch (get_call_result_type(fcinfo, NULL, &tupdesc))
{
case TYPEFUNC_COMPOSITE:
/* success */
break;
case TYPEFUNC_RECORD:
/* failed to determine actual type of RECORD */
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("function returning record called in context "
"that cannot accept type record")));
break;
default:
/* result type isn't composite */
elog(ERROR, "return type must be a row type");
break;
}
/* make sure we have a persistent copy of the tupdesc */
tupdesc = CreateTupleDescCopy(tupdesc);
/* async query send */
MemoryContextSwitchTo(oldcontext);
PG_RETURN_INT32(PQsendQuery(conn, sql));
}
}
/* check result and tuple descriptor have the same number of columns */
if (PQnfields(res) != tupdesc->natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("remote query result rowtype does not match "
"the specified FROM clause rowtype")));
if (is_async && !do_get)
{
/* async query send -- should not happen */
elog(ERROR, "async query send called more than once");
/* fast track when no results */
if (funcctx->max_calls < 1)
{
if (res)
PQclear(res);
SRF_RETURN_DONE(funcctx);
}
/* store needed metadata for subsequent calls */
attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;
MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
@ -902,6 +982,140 @@ dblink_record(PG_FUNCTION_ARGS)
}
}
/*
* List all open dblink connections by name.
* Returns an array of all connection names.
* Takes no params
*/
PG_FUNCTION_INFO_V1(dblink_get_connections);
Datum
dblink_get_connections(PG_FUNCTION_ARGS)
{
HASH_SEQ_STATUS status;
remoteConnHashEnt *hentry;
ArrayBuildState *astate = NULL;
if (remoteConnHash)
{
hash_seq_init(&status, remoteConnHash);
while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
{
/* stash away current value */
astate = accumArrayResult(astate,
PointerGetDatum(GET_TEXT(hentry->name)),
false, TEXTOID, CurrentMemoryContext);
}
}
if (astate)
PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
CurrentMemoryContext));
else
PG_RETURN_NULL();
}
/*
* Checks if a given remote connection is busy
*
* Returns 1 if the connection is busy, 0 otherwise
* Params:
* text connection_name - name of the connection to check
*
*/
PG_FUNCTION_INFO_V1(dblink_is_busy);
Datum
dblink_is_busy(PG_FUNCTION_ARGS)
{
char *msg;
PGconn *conn = NULL;
char *conname = NULL;
char *connstr = NULL;
remoteConn *rconn = NULL;
bool freeconn = false;
DBLINK_INIT;
DBLINK_GET_CONN;
if (!conn)
DBLINK_CONN_NOT_AVAIL;
PQconsumeInput(conn);
PG_RETURN_INT32(PQisBusy(conn));
}
/*
* Cancels a running request on a connection
*
* Returns text:
* "OK" if the cancel request has been sent correctly,
* an error message otherwise
*
* Params:
* text connection_name - name of the connection to check
*
*/
PG_FUNCTION_INFO_V1(dblink_cancel_query);
Datum
dblink_cancel_query(PG_FUNCTION_ARGS)
{
char *msg;
int res = 0;
PGconn *conn = NULL;
char *conname = NULL;
char *connstr = NULL;
remoteConn *rconn = NULL;
bool freeconn = false;
PGcancel *cancel;
char errbuf[256];
DBLINK_INIT;
DBLINK_GET_CONN;
if (!conn)
DBLINK_CONN_NOT_AVAIL;
cancel = PQgetCancel(conn);
res = PQcancel(cancel, errbuf, 256);
PQfreeCancel(cancel);
if (res == 0)
PG_RETURN_TEXT_P(GET_TEXT("OK"));
else
PG_RETURN_TEXT_P(GET_TEXT(errbuf));
}
/*
* Get error message from a connection
*
* Returns text:
* "OK" if no error, an error message otherwise
*
* Params:
* text connection_name - name of the connection to check
*
*/
PG_FUNCTION_INFO_V1(dblink_error_message);
Datum
dblink_error_message(PG_FUNCTION_ARGS)
{
char *msg;
PGconn *conn = NULL;
char *conname = NULL;
char *connstr = NULL;
remoteConn *rconn = NULL;
bool freeconn = false;
DBLINK_INIT;
DBLINK_GET_CONN;
if (!conn)
DBLINK_CONN_NOT_AVAIL;
msg = PQerrorMessage(conn);
if (!msg)
PG_RETURN_TEXT_P(GET_TEXT("OK"));
else
PG_RETURN_TEXT_P(GET_TEXT(msg));
}
/*
* Execute an SQL non-SELECT command
*/