1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-05 07:21:24 +03:00

Prepared queries for PLPerl, plus fixing a small plperl memory leak. Patch

and docs from Dmitry Karasik, slightly editorialised.
This commit is contained in:
Andrew Dunstan
2006-03-05 16:40:51 +00:00
parent f2f5b05655
commit 5d723d05c0
6 changed files with 724 additions and 20 deletions

View File

@ -33,7 +33,7 @@
* ENHANCEMENTS, OR MODIFICATIONS.
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.103 2006/02/28 23:38:13 neilc Exp $
* $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.104 2006/03/05 16:40:51 adunstan Exp $
*
**********************************************************************/
@ -56,6 +56,7 @@
#include "utils/typcache.h"
#include "miscadmin.h"
#include "mb/pg_wchar.h"
#include "parser/parse_type.h"
/* define this before the perl headers get a chance to mangle DLLIMPORT */
extern DLLIMPORT bool check_function_bodies;
@ -99,6 +100,18 @@ typedef struct plperl_call_data
MemoryContext tmp_cxt;
} plperl_call_data;
/**********************************************************************
* The information we cache about prepared and saved plans
**********************************************************************/
typedef struct plperl_query_desc
{
char qname[sizeof(long) * 2 + 1];
void *plan;
int nargs;
Oid *argtypes;
FmgrInfo *arginfuncs;
Oid *argtypioparams;
} plperl_query_desc;
/**********************************************************************
* Global data
@ -107,6 +120,7 @@ static bool plperl_firstcall = true;
static bool plperl_safe_init_done = false;
static PerlInterpreter *plperl_interp = NULL;
static HV *plperl_proc_hash = NULL;
static HV *plperl_query_hash = NULL;
static bool plperl_use_strict = false;
@ -233,7 +247,8 @@ plperl_init_all(void)
"$PLContainer->permit_only(':default');" \
"$PLContainer->permit(qw[:base_math !:base_io sort time]);" \
"$PLContainer->share(qw[&elog &spi_exec_query &return_next " \
"&spi_query &spi_fetchrow " \
"&spi_query &spi_fetchrow &spi_cursor_close " \
"&spi_prepare &spi_exec_prepared &spi_query_prepared &spi_freeplan " \
"&_plperl_to_pg_array " \
"&DEBUG &LOG &INFO &NOTICE &WARNING &ERROR %_SHARED ]);" \
"sub ::mksafefunc {" \
@ -312,6 +327,7 @@ plperl_init_interp(void)
perl_run(plperl_interp);
plperl_proc_hash = newHV();
plperl_query_hash = newHV();
#ifdef WIN32
@ -1302,7 +1318,7 @@ compile_plperl_function(Oid fn_oid, bool is_trigger)
{
bool uptodate;
prodesc = (plperl_proc_desc *) SvIV(*svp);
prodesc = INT2PTR( plperl_proc_desc *, SvUV(*svp));
/************************************************************
* If it's present, must check whether it's still up to date.
@ -1500,7 +1516,7 @@ compile_plperl_function(Oid fn_oid, bool is_trigger)
}
hv_store(plperl_proc_hash, internal_proname, proname_len,
newSViv((IV) prodesc), 0);
newSVuv( PTR2UV( prodesc)), 0);
}
ReleaseSysCache(procTup);
@ -1810,16 +1826,20 @@ plperl_spi_query(char *query)
PG_TRY();
{
void *plan;
Portal portal = NULL;
Portal portal;
/* Create a cursor for the query */
plan = SPI_prepare(query, 0, NULL);
if (plan)
portal = SPI_cursor_open(NULL, plan, NULL, NULL, false);
if (portal)
cursor = newSVpv(portal->name, 0);
else
cursor = newSV(0);
if ( plan == NULL)
elog(ERROR, "SPI_prepare() failed:%s",
SPI_result_code_string(SPI_result));
portal = SPI_cursor_open(NULL, plan, NULL, NULL, false);
SPI_freeplan( plan);
if ( portal == NULL)
elog(ERROR, "SPI_cursor_open() failed:%s",
SPI_result_code_string(SPI_result));
cursor = newSVpv(portal->name, 0);
/* Commit the inner transaction, return to outer xact context */
ReleaseCurrentSubTransaction();
@ -1886,14 +1906,16 @@ plperl_spi_fetchrow(char *cursor)
Portal p = SPI_cursor_find(cursor);
if (!p)
row = newSV(0);
{
row = &PL_sv_undef;
}
else
{
SPI_cursor_fetch(p, true, 1);
if (SPI_processed == 0)
{
SPI_cursor_close(p);
row = newSV(0);
row = &PL_sv_undef;
}
else
{
@ -1945,3 +1967,451 @@ plperl_spi_fetchrow(char *cursor)
return row;
}
void
plperl_spi_cursor_close(char *cursor)
{
Portal p = SPI_cursor_find(cursor);
if (p)
SPI_cursor_close(p);
}
SV *
plperl_spi_prepare(char* query, int argc, SV ** argv)
{
plperl_query_desc *qdesc;
void *plan;
int i;
HeapTuple typeTup;
MemoryContext oldcontext = CurrentMemoryContext;
ResourceOwner oldowner = CurrentResourceOwner;
BeginInternalSubTransaction(NULL);
MemoryContextSwitchTo(oldcontext);
/************************************************************
* Allocate the new querydesc structure
************************************************************/
qdesc = (plperl_query_desc *) malloc(sizeof(plperl_query_desc));
MemSet(qdesc, 0, sizeof(plperl_query_desc));
snprintf(qdesc-> qname, sizeof(qdesc-> qname), "%lx", (long) qdesc);
qdesc-> nargs = argc;
qdesc-> argtypes = (Oid *) malloc(argc * sizeof(Oid));
qdesc-> arginfuncs = (FmgrInfo *) malloc(argc * sizeof(FmgrInfo));
qdesc-> argtypioparams = (Oid *) malloc(argc * sizeof(Oid));
PG_TRY();
{
/************************************************************
* Lookup the argument types by name in the system cache
* and remember the required information for input conversion
************************************************************/
for (i = 0; i < argc; i++)
{
char *argcopy;
List *names = NIL;
ListCell *l;
TypeName *typename;
/************************************************************
* Use SplitIdentifierString() on a copy of the type name,
* turn the resulting pointer list into a TypeName node
* and call typenameType() to get the pg_type tuple.
************************************************************/
argcopy = pstrdup(SvPV(argv[i],PL_na));
SplitIdentifierString(argcopy, '.', &names);
typename = makeNode(TypeName);
foreach(l, names)
typename->names = lappend(typename->names, makeString(lfirst(l)));
typeTup = typenameType(typename);
qdesc->argtypes[i] = HeapTupleGetOid(typeTup);
perm_fmgr_info(((Form_pg_type) GETSTRUCT(typeTup))->typinput,
&(qdesc->arginfuncs[i]));
qdesc->argtypioparams[i] = getTypeIOParam(typeTup);
ReleaseSysCache(typeTup);
list_free(typename->names);
pfree(typename);
list_free(names);
pfree(argcopy);
}
/************************************************************
* Prepare the plan and check for errors
************************************************************/
plan = SPI_prepare(query, argc, qdesc->argtypes);
if (plan == NULL)
elog(ERROR, "SPI_prepare() failed:%s",
SPI_result_code_string(SPI_result));
/************************************************************
* Save the plan into permanent memory (right now it's in the
* SPI procCxt, which will go away at function end).
************************************************************/
qdesc->plan = SPI_saveplan(plan);
if (qdesc->plan == NULL)
elog(ERROR, "SPI_saveplan() failed: %s",
SPI_result_code_string(SPI_result));
/* Release the procCxt copy to avoid within-function memory leak */
SPI_freeplan(plan);
/* Commit the inner transaction, return to outer xact context */
ReleaseCurrentSubTransaction();
MemoryContextSwitchTo(oldcontext);
CurrentResourceOwner = oldowner;
/*
* AtEOSubXact_SPI() should not have popped any SPI context,
* but just in case it did, make sure we remain connected.
*/
SPI_restore_connection();
}
PG_CATCH();
{
ErrorData *edata;
free(qdesc-> argtypes);
free(qdesc-> arginfuncs);
free(qdesc-> argtypioparams);
free(qdesc);
/* Save error info */
MemoryContextSwitchTo(oldcontext);
edata = CopyErrorData();
FlushErrorState();
/* Abort the inner transaction */
RollbackAndReleaseCurrentSubTransaction();
MemoryContextSwitchTo(oldcontext);
CurrentResourceOwner = oldowner;
/*
* If AtEOSubXact_SPI() popped any SPI context of the subxact,
* it will have left us in a disconnected state. We need this
* hack to return to connected state.
*/
SPI_restore_connection();
/* Punt the error to Perl */
croak("%s", edata->message);
/* Can't get here, but keep compiler quiet */
return NULL;
}
PG_END_TRY();
/************************************************************
* Insert a hashtable entry for the plan and return
* the key to the caller.
************************************************************/
hv_store( plperl_query_hash, qdesc->qname, strlen(qdesc->qname), newSVuv( PTR2UV( qdesc)), 0);
return newSVpv( qdesc->qname, strlen(qdesc->qname));
}
HV *
plperl_spi_exec_prepared(char* query, HV * attr, int argc, SV ** argv)
{
HV *ret_hv;
SV **sv;
int i, limit, spi_rv;
char * nulls;
Datum *argvalues;
plperl_query_desc *qdesc;
/*
* Execute the query inside a sub-transaction, so we can cope with
* errors sanely
*/
MemoryContext oldcontext = CurrentMemoryContext;
ResourceOwner oldowner = CurrentResourceOwner;
BeginInternalSubTransaction(NULL);
/* Want to run inside function's memory context */
MemoryContextSwitchTo(oldcontext);
PG_TRY();
{
/************************************************************
* Fetch the saved plan descriptor, see if it's o.k.
************************************************************/
sv = hv_fetch(plperl_query_hash, query, strlen(query), 0);
if ( sv == NULL)
elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");
if ( *sv == NULL || !SvOK( *sv))
elog(ERROR, "spi_exec_prepared: panic - plperl_query_hash value corrupted");
qdesc = INT2PTR( plperl_query_desc *, SvUV(*sv));
if ( qdesc == NULL)
elog(ERROR, "spi_exec_prepared: panic - plperl_query_hash value vanished");
if ( qdesc-> nargs != argc)
elog(ERROR, "spi_exec_prepared: expected %d argument(s), %d passed",
qdesc-> nargs, argc);
/************************************************************
* Parse eventual attributes
************************************************************/
limit = 0;
if ( attr != NULL)
{
sv = hv_fetch( attr, "limit", 5, 0);
if ( *sv && SvIOK( *sv))
limit = SvIV( *sv);
}
/************************************************************
* Set up arguments
************************************************************/
if ( argc > 0)
{
nulls = (char *)palloc( argc);
argvalues = (Datum *) palloc(argc * sizeof(Datum));
if ( nulls == NULL || argvalues == NULL)
elog(ERROR, "spi_exec_prepared: not enough memory");
}
else
{
nulls = NULL;
argvalues = NULL;
}
for ( i = 0; i < argc; i++)
{
if ( SvTYPE( argv[i]) != SVt_NULL)
{
argvalues[i] =
FunctionCall3( &qdesc->arginfuncs[i],
CStringGetDatum( SvPV( argv[i], PL_na)),
ObjectIdGetDatum( qdesc->argtypioparams[i]),
Int32GetDatum(-1)
);
nulls[i] = ' ';
}
else
{
argvalues[i] = (Datum) 0;
nulls[i] = 'n';
}
}
/************************************************************
* go
************************************************************/
spi_rv = SPI_execute_plan(qdesc-> plan, argvalues, nulls,
current_call_data->prodesc->fn_readonly, limit);
ret_hv = plperl_spi_execute_fetch_result(SPI_tuptable, SPI_processed,
spi_rv);
if ( argc > 0)
{
pfree( argvalues);
pfree( nulls);
}
/* Commit the inner transaction, return to outer xact context */
ReleaseCurrentSubTransaction();
MemoryContextSwitchTo(oldcontext);
CurrentResourceOwner = oldowner;
/*
* AtEOSubXact_SPI() should not have popped any SPI context,
* but just in case it did, make sure we remain connected.
*/
SPI_restore_connection();
}
PG_CATCH();
{
ErrorData *edata;
/* Save error info */
MemoryContextSwitchTo(oldcontext);
edata = CopyErrorData();
FlushErrorState();
/* Abort the inner transaction */
RollbackAndReleaseCurrentSubTransaction();
MemoryContextSwitchTo(oldcontext);
CurrentResourceOwner = oldowner;
/*
* If AtEOSubXact_SPI() popped any SPI context of the subxact,
* it will have left us in a disconnected state. We need this
* hack to return to connected state.
*/
SPI_restore_connection();
/* Punt the error to Perl */
croak("%s", edata->message);
/* Can't get here, but keep compiler quiet */
return NULL;
}
PG_END_TRY();
return ret_hv;
}
SV *
plperl_spi_query_prepared(char* query, int argc, SV ** argv)
{
SV **sv;
int i;
char * nulls;
Datum *argvalues;
plperl_query_desc *qdesc;
SV *cursor;
Portal portal = NULL;
/*
* Execute the query inside a sub-transaction, so we can cope with
* errors sanely
*/
MemoryContext oldcontext = CurrentMemoryContext;
ResourceOwner oldowner = CurrentResourceOwner;
BeginInternalSubTransaction(NULL);
/* Want to run inside function's memory context */
MemoryContextSwitchTo(oldcontext);
PG_TRY();
{
/************************************************************
* Fetch the saved plan descriptor, see if it's o.k.
************************************************************/
sv = hv_fetch(plperl_query_hash, query, strlen(query), 0);
if ( sv == NULL)
elog(ERROR, "spi_query_prepared: Invalid prepared query passed");
if ( *sv == NULL || !SvOK( *sv))
elog(ERROR, "spi_query_prepared: panic - plperl_query_hash value corrupted");
qdesc = INT2PTR( plperl_query_desc *, SvUV(*sv));
if ( qdesc == NULL)
elog(ERROR, "spi_query_prepared: panic - plperl_query_hash value vanished");
if ( qdesc-> nargs != argc)
elog(ERROR, "spi_query_prepared: expected %d argument(s), %d passed",
qdesc-> nargs, argc);
/************************************************************
* Set up arguments
************************************************************/
if ( argc > 0)
{
nulls = (char *)palloc( argc);
argvalues = (Datum *) palloc(argc * sizeof(Datum));
if ( nulls == NULL || argvalues == NULL)
elog(ERROR, "spi_query_prepared: not enough memory");
}
else
{
nulls = NULL;
argvalues = NULL;
}
for ( i = 0; i < argc; i++)
{
if ( SvTYPE( argv[i]) != SVt_NULL)
{
argvalues[i] =
FunctionCall3( &qdesc->arginfuncs[i],
CStringGetDatum( SvPV( argv[i], PL_na)),
ObjectIdGetDatum( qdesc->argtypioparams[i]),
Int32GetDatum(-1)
);
nulls[i] = ' ';
}
else
{
argvalues[i] = (Datum) 0;
nulls[i] = 'n';
}
}
/************************************************************
* go
************************************************************/
portal = SPI_cursor_open(NULL, qdesc-> plan, argvalues, nulls,
current_call_data->prodesc->fn_readonly);
if ( argc > 0)
{
pfree( argvalues);
pfree( nulls);
}
if ( portal == NULL)
elog(ERROR, "SPI_cursor_open() failed:%s",
SPI_result_code_string(SPI_result));
cursor = newSVpv(portal->name, 0);
/* Commit the inner transaction, return to outer xact context */
ReleaseCurrentSubTransaction();
MemoryContextSwitchTo(oldcontext);
CurrentResourceOwner = oldowner;
/*
* AtEOSubXact_SPI() should not have popped any SPI context,
* but just in case it did, make sure we remain connected.
*/
SPI_restore_connection();
}
PG_CATCH();
{
ErrorData *edata;
/* Save error info */
MemoryContextSwitchTo(oldcontext);
edata = CopyErrorData();
FlushErrorState();
/* Abort the inner transaction */
RollbackAndReleaseCurrentSubTransaction();
MemoryContextSwitchTo(oldcontext);
CurrentResourceOwner = oldowner;
/*
* If AtEOSubXact_SPI() popped any SPI context of the subxact,
* it will have left us in a disconnected state. We need this
* hack to return to connected state.
*/
SPI_restore_connection();
/* Punt the error to Perl */
croak("%s", edata->message);
/* Can't get here, but keep compiler quiet */
return NULL;
}
PG_END_TRY();
return cursor;
}
void
plperl_spi_freeplan(char *query)
{
SV ** sv;
void * plan;
plperl_query_desc *qdesc;
sv = hv_fetch(plperl_query_hash, query, strlen(query), 0);
if ( sv == NULL)
elog(ERROR, "spi_exec_freeplan: Invalid prepared query passed");
if ( *sv == NULL || !SvOK( *sv))
elog(ERROR, "spi_exec_freeplan: panic - plperl_query_hash value corrupted");
qdesc = INT2PTR( plperl_query_desc *, SvUV(*sv));
if ( qdesc == NULL)
elog(ERROR, "spi_exec_freeplan: panic - plperl_query_hash value vanished");
/*
* free all memory before SPI_freeplan, so if it dies, nothing will be left over
*/
hv_delete(plperl_query_hash, query, strlen(query), G_DISCARD);
plan = qdesc-> plan;
free(qdesc-> argtypes);
free(qdesc-> arginfuncs);
free(qdesc-> argtypioparams);
free(qdesc);
SPI_freeplan( plan);
}