1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-14 18:42:34 +03:00

PL/Python explicit subtransactions

Adds a context manager, obtainable by plpy.subtransaction(), to run a
group of statements in a subtransaction.

Jan Urbański, reviewed by Steve Singer, additional scribbling by me
This commit is contained in:
Peter Eisentraut
2011-02-27 17:09:56 +02:00
parent 438cdf6e48
commit 22690719ea
8 changed files with 1404 additions and 8 deletions

View File

@ -233,6 +233,13 @@ typedef struct PLyProcedureEntry
PLyProcedure *proc;
} PLyProcedureEntry;
/* explicit subtransaction data */
typedef struct PLySubtransactionData
{
MemoryContext oldcontext;
ResourceOwner oldowner;
} PLySubtransactionData;
/* Python objects */
typedef struct PLyPlanObject
@ -254,6 +261,13 @@ typedef struct PLyResultObject
PyObject *status; /* query status, SPI_OK_*, or SPI_ERR_* */
} PLyResultObject;
typedef struct PLySubtransactionObject
{
PyObject_HEAD
bool started;
bool exited;
} PLySubtransactionObject;
/* function declarations */
@ -382,6 +396,9 @@ static HeapTuple PLyGenericObject_ToTuple(PLyTypeInfo *, TupleDesc, PyObject *);
*/
static PLyProcedure *PLy_curr_procedure = NULL;
/* list of explicit subtransaction data */
static List *explicit_subtransactions = NIL;
static PyObject *PLy_interp_globals = NULL;
static PyObject *PLy_interp_safe_globals = NULL;
static HTAB *PLy_procedure_cache = NULL;
@ -401,6 +418,10 @@ static char PLy_result_doc[] = {
"Results of a PostgreSQL query"
};
static char PLy_subtransaction_doc[] = {
"PostgreSQL subtransaction context manager"
};
/*
* the function definitions
@ -1226,20 +1247,71 @@ PLy_function_handler(FunctionCallInfo fcinfo, PLyProcedure *proc)
return rv;
}
/*
* Abort lingering subtransactions that have been explicitly started
* by plpy.subtransaction().start() and not properly closed.
*/
static void
PLy_abort_open_subtransactions(int save_subxact_level)
{
Assert(save_subxact_level >= 0);
while (list_length(explicit_subtransactions) > save_subxact_level)
{
PLySubtransactionData *subtransactiondata;
Assert(explicit_subtransactions != NIL);
ereport(WARNING,
(errmsg("forcibly aborting a subtransaction that has not been exited")));
RollbackAndReleaseCurrentSubTransaction();
SPI_restore_connection();
subtransactiondata = (PLySubtransactionData *) linitial(explicit_subtransactions);
explicit_subtransactions = list_delete_first(explicit_subtransactions);
MemoryContextSwitchTo(subtransactiondata->oldcontext);
CurrentResourceOwner = subtransactiondata->oldowner;
PLy_free(subtransactiondata);
}
}
static PyObject *
PLy_procedure_call(PLyProcedure *proc, char *kargs, PyObject *vargs)
{
PyObject *rv;
int volatile save_subxact_level = list_length(explicit_subtransactions);
PyDict_SetItemString(proc->globals, kargs, vargs);
PG_TRY();
{
#if PY_VERSION_HEX >= 0x03020000
rv = PyEval_EvalCode(proc->code,
proc->globals, proc->globals);
rv = PyEval_EvalCode(proc->code,
proc->globals, proc->globals);
#else
rv = PyEval_EvalCode((PyCodeObject *) proc->code,
proc->globals, proc->globals);
rv = PyEval_EvalCode((PyCodeObject *) proc->code,
proc->globals, proc->globals);
#endif
/*
* Since plpy will only let you close subtransactions that
* you started, you cannot *unnest* subtransactions, only
* *nest* them without closing.
*/
Assert(list_length(explicit_subtransactions) >= save_subxact_level);
}
PG_CATCH();
{
PLy_abort_open_subtransactions(save_subxact_level);
PG_RE_THROW();
}
PG_END_TRY();
PLy_abort_open_subtransactions(save_subxact_level);
/* If the Python code returned an error, propagate it */
if (rv == NULL)
PLy_elog(ERROR, NULL);
@ -2762,6 +2834,12 @@ static PyObject *PLy_quote_literal(PyObject *self, PyObject *args);
static PyObject *PLy_quote_nullable(PyObject *self, PyObject *args);
static PyObject *PLy_quote_ident(PyObject *self, PyObject *args);
static PyObject *PLy_subtransaction(PyObject *, PyObject *);
static PyObject *PLy_subtransaction_new(void);
static void PLy_subtransaction_dealloc(PyObject *);
static PyObject *PLy_subtransaction_enter(PyObject *, PyObject *);
static PyObject *PLy_subtransaction_exit(PyObject *, PyObject *);
static PyMethodDef PLy_plan_methods[] = {
{"status", PLy_plan_status, METH_VARARGS, NULL},
@ -2854,6 +2932,50 @@ static PyTypeObject PLy_ResultType = {
PLy_result_methods, /* tp_tpmethods */
};
static PyMethodDef PLy_subtransaction_methods[] = {
{"__enter__", PLy_subtransaction_enter, METH_VARARGS, NULL},
{"__exit__", PLy_subtransaction_exit, METH_VARARGS, NULL},
/* user-friendly names for Python <2.6 */
{"enter", PLy_subtransaction_enter, METH_VARARGS, NULL},
{"exit", PLy_subtransaction_exit, METH_VARARGS, NULL},
{NULL, NULL, 0, NULL}
};
static PyTypeObject PLy_SubtransactionType = {
PyVarObject_HEAD_INIT(NULL, 0)
"PLySubtransaction", /* tp_name */
sizeof(PLySubtransactionObject), /* tp_size */
0, /* tp_itemsize */
/*
* methods
*/
PLy_subtransaction_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
PLy_subtransaction_doc, /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
PLy_subtransaction_methods, /* tp_tpmethods */
};
static PyMethodDef PLy_methods[] = {
/*
* logging methods
@ -2883,6 +3005,11 @@ static PyMethodDef PLy_methods[] = {
{"quote_nullable", PLy_quote_nullable, METH_VARARGS, NULL},
{"quote_ident", PLy_quote_ident, METH_VARARGS, NULL},
/*
* create the subtransaction context manager
*/
{"subtransaction", PLy_subtransaction, METH_NOARGS, NULL},
{NULL, NULL, 0, NULL}
};
@ -3553,6 +3680,150 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status)
return (PyObject *) result;
}
/* s = plpy.subtransaction() */
static PyObject *
PLy_subtransaction(PyObject *self, PyObject *unused)
{
return PLy_subtransaction_new();
}
/* Allocate and initialize a PLySubtransactionObject */
static PyObject *
PLy_subtransaction_new(void)
{
PLySubtransactionObject *ob;
ob = PyObject_New(PLySubtransactionObject, &PLy_SubtransactionType);
if (ob == NULL)
return NULL;
ob->started = false;
ob->exited = false;
return (PyObject *) ob;
}
/* Python requires a dealloc function to be defined */
static void
PLy_subtransaction_dealloc(PyObject *subxact)
{
}
/*
* subxact.__enter__() or subxact.enter()
*
* Start an explicit subtransaction. SPI calls within an explicit
* subtransaction will not start another one, so you can atomically
* execute many SPI calls and still get a controllable exception if
* one of them fails.
*/
static PyObject *
PLy_subtransaction_enter(PyObject *self, PyObject *unused)
{
PLySubtransactionData *subxactdata;
MemoryContext oldcontext;
PLySubtransactionObject *subxact = (PLySubtransactionObject *) self;
if (subxact->started)
{
PLy_exception_set(PyExc_ValueError, "this subtransaction has already been entered");
return NULL;
}
if (subxact->exited)
{
PLy_exception_set(PyExc_ValueError, "this subtransaction has already been exited");
return NULL;
}
subxact->started = true;
oldcontext = CurrentMemoryContext;
subxactdata = PLy_malloc(sizeof(*subxactdata));
subxactdata->oldcontext = oldcontext;
subxactdata->oldowner = CurrentResourceOwner;
BeginInternalSubTransaction(NULL);
/* Do not want to leave the previous memory context */
MemoryContextSwitchTo(oldcontext);
explicit_subtransactions = lcons(subxactdata, explicit_subtransactions);
Py_INCREF(self);
return self;
}
/*
* subxact.__exit__(exc_type, exc, tb) or subxact.exit(exc_type, exc, tb)
*
* Exit an explicit subtransaction. exc_type is an exception type, exc
* is the exception object, tb is the traceback. If exc_type is None,
* commit the subtransactiony, if not abort it.
*
* The method signature is chosen to allow subtransaction objects to
* be used as context managers as described in
* <http://www.python.org/dev/peps/pep-0343/>.
*/
static PyObject *
PLy_subtransaction_exit(PyObject *self, PyObject *args)
{
PyObject *type;
PyObject *value;
PyObject *traceback;
PLySubtransactionData *subxactdata;
PLySubtransactionObject *subxact = (PLySubtransactionObject *) self;
if (!PyArg_ParseTuple(args, "OOO", &type, &value, &traceback))
return NULL;
if (!subxact->started)
{
PLy_exception_set(PyExc_ValueError, "this subtransaction has not been entered");
return NULL;
}
if (subxact->exited)
{
PLy_exception_set(PyExc_ValueError, "this subtransaction has already been exited");
return NULL;
}
if (explicit_subtransactions == NIL)
{
PLy_exception_set(PyExc_ValueError, "there is no subtransaction to exit from");
return NULL;
}
subxact->exited = true;
if (type != Py_None)
{
/* Abort the inner transaction */
RollbackAndReleaseCurrentSubTransaction();
}
else
{
ReleaseCurrentSubTransaction();
}
subxactdata = (PLySubtransactionData *) linitial(explicit_subtransactions);
explicit_subtransactions = list_delete_first(explicit_subtransactions);
MemoryContextSwitchTo(subxactdata->oldcontext);
CurrentResourceOwner = subxactdata->oldowner;
PLy_free(subxactdata);
/*
* AtEOSubXact_SPI() should not have popped any SPI context, but
* just in case it did, make sure we remain connected.
*/
SPI_restore_connection();
Py_INCREF(Py_None);
return Py_None;
}
/*
* language handler and interpreter initialization
@ -3653,6 +3924,8 @@ _PG_init(void)
PLy_trigger_cache = hash_create("PL/Python triggers", 32, &hash_ctl,
HASH_ELEM | HASH_FUNCTION);
explicit_subtransactions = NIL;
inited = true;
}
@ -3688,6 +3961,8 @@ PLy_init_plpy(void)
elog(ERROR, "could not initialize PLy_PlanType");
if (PyType_Ready(&PLy_ResultType) < 0)
elog(ERROR, "could not initialize PLy_ResultType");
if (PyType_Ready(&PLy_SubtransactionType) < 0)
elog(ERROR, "could not initialize PLy_SubtransactionType");
#if PY_MAJOR_VERSION >= 3
plpy = PyModule_Create(&PLy_module);