diff --git a/doc/src/sgml/plpython.sgml b/doc/src/sgml/plpython.sgml
index eda2bbf34c5..618f8d055e9 100644
--- a/doc/src/sgml/plpython.sgml
+++ b/doc/src/sgml/plpython.sgml
@@ -891,6 +891,15 @@ $$ LANGUAGE plpythonu;
can be modified.
+
+ Note that calling plpy.execute will cause the entire
+ result set to be read into memory. Only use that function when you are sure
+ that the result set will be relatively small. If you don't want to risk
+ excessive memory usage when fetching large results,
+ use plpy.cursor rather
+ than plpy.execute.
+
+
For example:
@@ -958,6 +967,78 @@ $$ LANGUAGE plpythonu;
+
+ Accessing Data with Cursors
+
+
+ The plpy.cursor function accepts the same arguments
+ as plpy.execute (except for limit)
+ and returns a cursor object, which allows you to process large result sets
+ in smaller chunks. As with plpy.execute, either a query
+ string or a plan object along with a list of arguments can be used. The
+ cursor object provides a fetch method that accepts an
+ integer parameter and returns a result object. Each time you
+ call fetch, the returned object will contain the next
+ batch of rows, never larger than the parameter value. Once all rows are
+ exhausted, fetch starts returning an empty result
+ object. Cursor objects also provide an
+ iterator
+ interface, yielding one row at a time until all rows are exhausted.
+ Data fetched that way is not returned as result objects, but rather as
+ dictionaries, each dictionary corresponding to a single result row.
+
+
+
+ Cursors are automatically disposed of. But if you want to explicitly
+ release all resources held by a cursor, use the close
+ method. Once closed, a cursor cannot be fetched from anymore.
+
+
+
+
+ Do not confuse objects created by plpy.cursor with
+ DB-API cursors as defined by
+ the Python Database
+ API specification. They don't have anything in common except for
+ the name.
+
+
+
+
+ An example of two ways of processing data from a large table is:
+
+CREATE FUNCTION count_odd_iterator() RETURNS integer AS $$
+odd = 0
+for row in plpy.cursor("select num from largetable"):
+ if row['num'] % 2:
+ odd += 1
+return odd
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION count_odd_fetch(batch_size integer) RETURNS integer AS $$
+odd = 0
+cursor = plpy.cursor("select num from largetable")
+while True:
+ rows = cursor.fetch(batch_size)
+ if not rows:
+ break
+ for row in rows:
+ if row['num'] % 2:
+ odd += 1
+return odd
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION count_odd_prepared() RETURNS integer AS $$
+odd = 0
+plan = plpy.prepare("select num from largetable where num % $1 <> 0", ["integer"])
+rows = list(plpy.cursor(plan, [2]))
+
+return len(rows)
+$$ LANGUAGE plpythonu;
+
+
+
+
Trapping Errors
diff --git a/src/pl/plpython/expected/plpython_spi.out b/src/pl/plpython/expected/plpython_spi.out
index 7f4ae5ca997..3b4d7a30105 100644
--- a/src/pl/plpython/expected/plpython_spi.out
+++ b/src/pl/plpython/expected/plpython_spi.out
@@ -133,3 +133,154 @@ CONTEXT: PL/Python function "result_nrows_test"
2
(1 row)
+-- cursor objects
+CREATE FUNCTION simple_cursor_test() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+does = 0
+for row in res:
+ if row['lname'] == 'doe':
+ does += 1
+return does
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION double_cursor_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+res.close()
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+assert len(res.fetch(3)) == 3
+assert len(res.fetch(3)) == 1
+assert len(res.fetch(3)) == 0
+assert len(res.fetch(3)) == 0
+try:
+ # use next() or __next__(), the method name changed in
+ # http://www.python.org/dev/peps/pep-3114/
+ try:
+ res.next()
+ except AttributeError:
+ res.__next__()
+except StopIteration:
+ pass
+else:
+ assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users order by fname")
+assert len(res.fetch(2)) == 2
+
+item = None
+try:
+ item = res.next()
+except AttributeError:
+ item = res.__next__()
+assert item['fname'] == 'rick'
+
+assert len(res.fetch(2)) == 1
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION fetch_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+ res.fetch(1)
+except ValueError:
+ pass
+else:
+ assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION next_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+ try:
+ res.next()
+ except AttributeError:
+ res.__next__()
+except ValueError:
+ pass
+else:
+ assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users where false")
+assert len(res.fetch(1)) == 0
+try:
+ try:
+ res.next()
+ except AttributeError:
+ res.__next__()
+except StopIteration:
+ pass
+else:
+ assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$
+plan = plpy.prepare(
+ "select fname, lname from users where fname like $1 || '%' order by fname",
+ ["text"])
+for row in plpy.cursor(plan, ["w"]):
+ yield row['fname']
+for row in plpy.cursor(plan, ["j"]):
+ yield row['fname']
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$
+plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'",
+ ["text"])
+c = plpy.cursor(plan, ["a", "b"])
+$$ LANGUAGE plpythonu;
+SELECT simple_cursor_test();
+ simple_cursor_test
+--------------------
+ 3
+(1 row)
+
+SELECT double_cursor_close();
+ double_cursor_close
+---------------------
+
+(1 row)
+
+SELECT cursor_fetch();
+ cursor_fetch
+--------------
+
+(1 row)
+
+SELECT cursor_mix_next_and_fetch();
+ cursor_mix_next_and_fetch
+---------------------------
+
+(1 row)
+
+SELECT fetch_after_close();
+ fetch_after_close
+-------------------
+
+(1 row)
+
+SELECT next_after_close();
+ next_after_close
+------------------
+
+(1 row)
+
+SELECT cursor_fetch_next_empty();
+ cursor_fetch_next_empty
+-------------------------
+
+(1 row)
+
+SELECT cursor_plan();
+ cursor_plan
+-------------
+ willem
+ jane
+ john
+(3 rows)
+
+SELECT cursor_plan_wrong_args();
+ERROR: TypeError: Expected sequence of 1 argument, got 2: ['a', 'b']
+CONTEXT: Traceback (most recent call last):
+ PL/Python function "cursor_plan_wrong_args", line 4, in
+ c = plpy.cursor(plan, ["a", "b"])
+PL/Python function "cursor_plan_wrong_args"
diff --git a/src/pl/plpython/expected/plpython_subtransaction.out b/src/pl/plpython/expected/plpython_subtransaction.out
index 515b0bb7344..c2c22f0ae41 100644
--- a/src/pl/plpython/expected/plpython_subtransaction.out
+++ b/src/pl/plpython/expected/plpython_subtransaction.out
@@ -409,3 +409,69 @@ SELECT * FROM subtransaction_tbl;
(1 row)
DROP TABLE subtransaction_tbl;
+-- cursor/subtransactions interactions
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10);
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(10)
+ return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ plpy.execute('create temporary table tmp(i) '
+ 'as select generate_series(1, 10)')
+ plan = plpy.prepare("select i from tmp")
+ cur = plpy.cursor(plan)
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(5)
+ return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor('select 1')
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ cur.close()
+ return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+SELECT cursor_in_subxact();
+ cursor_in_subxact
+-------------------
+ 16
+(1 row)
+
+SELECT cursor_aborted_subxact();
+ERROR: ValueError: iterating a cursor in an aborted subtransaction
+CONTEXT: Traceback (most recent call last):
+ PL/Python function "cursor_aborted_subxact", line 8, in
+ fetched = cur.fetch(10)
+PL/Python function "cursor_aborted_subxact"
+SELECT cursor_plan_aborted_subxact();
+ERROR: ValueError: iterating a cursor in an aborted subtransaction
+CONTEXT: Traceback (most recent call last):
+ PL/Python function "cursor_plan_aborted_subxact", line 10, in
+ fetched = cur.fetch(5)
+PL/Python function "cursor_plan_aborted_subxact"
+SELECT cursor_close_aborted_subxact();
+ERROR: ValueError: closing a cursor in an aborted subtransaction
+CONTEXT: Traceback (most recent call last):
+ PL/Python function "cursor_close_aborted_subxact", line 7, in
+ cur.close()
+PL/Python function "cursor_close_aborted_subxact"
diff --git a/src/pl/plpython/expected/plpython_subtransaction_0.out b/src/pl/plpython/expected/plpython_subtransaction_0.out
index 4017c41edc9..ece0134a946 100644
--- a/src/pl/plpython/expected/plpython_subtransaction_0.out
+++ b/src/pl/plpython/expected/plpython_subtransaction_0.out
@@ -382,3 +382,73 @@ SELECT * FROM subtransaction_tbl;
(0 rows)
DROP TABLE subtransaction_tbl;
+-- cursor/subtransactions interactions
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_in_subxact"
+DETAIL: SyntaxError: invalid syntax (line 3)
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10);
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(10)
+ return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_aborted_subxact"
+DETAIL: SyntaxError: invalid syntax (line 4)
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ plpy.execute('create temporary table tmp(i) '
+ 'as select generate_series(1, 10)')
+ plan = plpy.prepare("select i from tmp")
+ cur = plpy.cursor(plan)
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(5)
+ return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_plan_aborted_subxact"
+DETAIL: SyntaxError: invalid syntax (line 4)
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor('select 1')
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ cur.close()
+ return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_close_aborted_subxact"
+DETAIL: SyntaxError: invalid syntax (line 4)
+SELECT cursor_in_subxact();
+ERROR: function cursor_in_subxact() does not exist
+LINE 1: SELECT cursor_in_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_aborted_subxact();
+ERROR: function cursor_aborted_subxact() does not exist
+LINE 1: SELECT cursor_aborted_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_plan_aborted_subxact();
+ERROR: function cursor_plan_aborted_subxact() does not exist
+LINE 1: SELECT cursor_plan_aborted_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_close_aborted_subxact();
+ERROR: function cursor_close_aborted_subxact() does not exist
+LINE 1: SELECT cursor_close_aborted_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
diff --git a/src/pl/plpython/expected/plpython_subtransaction_5.out b/src/pl/plpython/expected/plpython_subtransaction_5.out
index 9216151b94e..66de2394990 100644
--- a/src/pl/plpython/expected/plpython_subtransaction_5.out
+++ b/src/pl/plpython/expected/plpython_subtransaction_5.out
@@ -382,3 +382,73 @@ SELECT * FROM subtransaction_tbl;
(0 rows)
DROP TABLE subtransaction_tbl;
+-- cursor/subtransactions interactions
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_in_subxact"
+DETAIL: SyntaxError: invalid syntax (, line 3)
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10);
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(10)
+ return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_aborted_subxact"
+DETAIL: SyntaxError: invalid syntax (, line 4)
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ plpy.execute('create temporary table tmp(i) '
+ 'as select generate_series(1, 10)')
+ plan = plpy.prepare("select i from tmp")
+ cur = plpy.cursor(plan)
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(5)
+ return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_plan_aborted_subxact"
+DETAIL: SyntaxError: invalid syntax (, line 4)
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor('select 1')
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ cur.close()
+ return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+ERROR: could not compile PL/Python function "cursor_close_aborted_subxact"
+DETAIL: SyntaxError: invalid syntax (, line 4)
+SELECT cursor_in_subxact();
+ERROR: function cursor_in_subxact() does not exist
+LINE 1: SELECT cursor_in_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_aborted_subxact();
+ERROR: function cursor_aborted_subxact() does not exist
+LINE 1: SELECT cursor_aborted_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_plan_aborted_subxact();
+ERROR: function cursor_plan_aborted_subxact() does not exist
+LINE 1: SELECT cursor_plan_aborted_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_close_aborted_subxact();
+ERROR: function cursor_close_aborted_subxact() does not exist
+LINE 1: SELECT cursor_close_aborted_subxact();
+ ^
+HINT: No function matches the given name and argument types. You might need to add explicit type casts.
diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out
index f2dda66532e..a884fc0e27f 100644
--- a/src/pl/plpython/expected/plpython_test.out
+++ b/src/pl/plpython/expected/plpython_test.out
@@ -43,9 +43,9 @@ contents.sort()
return ", ".join(contents)
$$ LANGUAGE plpythonu;
select module_contents();
- module_contents
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
- Error, Fatal, SPIError, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning
+ module_contents
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Error, Fatal, SPIError, cursor, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning
(1 row)
CREATE FUNCTION elog_test() RETURNS void
diff --git a/src/pl/plpython/plpython.c b/src/pl/plpython/plpython.c
index afd5dfce83a..29e0ac7c454 100644
--- a/src/pl/plpython/plpython.c
+++ b/src/pl/plpython/plpython.c
@@ -134,6 +134,11 @@ typedef int Py_ssize_t;
PyObject_HEAD_INIT(type) size,
#endif
+/* Python 3 removed the Py_TPFLAGS_HAVE_ITER flag */
+#if PY_MAJOR_VERSION >= 3
+#define Py_TPFLAGS_HAVE_ITER 0
+#endif
+
/* define our text domain for translations */
#undef TEXTDOMAIN
#define TEXTDOMAIN PG_TEXTDOMAIN("plpython")
@@ -310,6 +315,14 @@ typedef struct PLySubtransactionObject
bool exited;
} PLySubtransactionObject;
+typedef struct PLyCursorObject
+{
+ PyObject_HEAD
+ char *portalname;
+ PLyTypeInfo result;
+ bool closed;
+} PLyCursorObject;
+
/* A list of all known exceptions, generated from backend/utils/errcodes.txt */
typedef struct ExceptionMap
{
@@ -486,6 +499,10 @@ static char PLy_subtransaction_doc[] = {
"PostgreSQL subtransaction context manager"
};
+static char PLy_cursor_doc[] = {
+ "Wrapper around a PostgreSQL cursor"
+};
+
/*
* the function definitions
@@ -2963,6 +2980,14 @@ static void PLy_subtransaction_dealloc(PyObject *);
static PyObject *PLy_subtransaction_enter(PyObject *, PyObject *);
static PyObject *PLy_subtransaction_exit(PyObject *, PyObject *);
+static PyObject *PLy_cursor(PyObject *self, PyObject *unused);
+static PyObject *PLy_cursor_query(const char *query);
+static PyObject *PLy_cursor_plan(PyObject *ob, PyObject *args);
+static void PLy_cursor_dealloc(PyObject *arg);
+static PyObject *PLy_cursor_iternext(PyObject *self);
+static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
+static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
+
static PyMethodDef PLy_plan_methods[] = {
{"status", PLy_plan_status, METH_VARARGS, NULL},
@@ -3099,6 +3124,47 @@ static PyTypeObject PLy_SubtransactionType = {
PLy_subtransaction_methods, /* tp_tpmethods */
};
+static PyMethodDef PLy_cursor_methods[] = {
+ {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
+ {"close", PLy_cursor_close, METH_NOARGS, NULL},
+ {NULL, NULL, 0, NULL}
+};
+
+static PyTypeObject PLy_CursorType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ "PLyCursor", /* tp_name */
+ sizeof(PLyCursorObject), /* tp_size */
+ 0, /* tp_itemsize */
+
+ /*
+ * methods
+ */
+ PLy_cursor_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER, /* tp_flags */
+ PLy_cursor_doc, /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ PyObject_SelfIter, /* tp_iter */
+ PLy_cursor_iternext, /* tp_iternext */
+ PLy_cursor_methods, /* tp_tpmethods */
+};
+
static PyMethodDef PLy_methods[] = {
/*
* logging methods
@@ -3133,6 +3199,11 @@ static PyMethodDef PLy_methods[] = {
*/
{"subtransaction", PLy_subtransaction, METH_NOARGS, NULL},
+ /*
+ * create a cursor
+ */
+ {"cursor", PLy_cursor, METH_VARARGS, NULL},
+
{NULL, NULL, 0, NULL}
};
@@ -3833,6 +3904,575 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status)
return (PyObject *) result;
}
+/*
+ * c = plpy.cursor("select * from largetable")
+ * c = plpy.cursor(plan, [])
+ */
+static PyObject *
+PLy_cursor(PyObject *self, PyObject *args)
+{
+ char *query;
+ PyObject *plan;
+ PyObject *planargs = NULL;
+
+ if (PyArg_ParseTuple(args, "s", &query))
+ return PLy_cursor_query(query);
+
+ PyErr_Clear();
+
+ if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
+ return PLy_cursor_plan(plan, planargs);
+
+ PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
+ return NULL;
+}
+
+
+static PyObject *
+PLy_cursor_query(const char *query)
+{
+ PLyCursorObject *cursor;
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
+
+ if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
+ return NULL;
+ cursor->portalname = NULL;
+ cursor->closed = false;
+ PLy_typeinfo_init(&cursor->result);
+
+ oldcontext = CurrentMemoryContext;
+ oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_TRY();
+ {
+ SPIPlanPtr plan;
+ Portal portal;
+
+ pg_verifymbstr(query, strlen(query), false);
+
+ plan = SPI_prepare(query, 0, NULL);
+ if (plan == NULL)
+ elog(ERROR, "SPI_prepare failed: %s",
+ SPI_result_code_string(SPI_result));
+
+ portal = SPI_cursor_open(NULL, plan, NULL, NULL,
+ PLy_curr_procedure->fn_readonly);
+ SPI_freeplan(plan);
+
+ if (portal == NULL)
+ elog(ERROR, "SPI_cursor_open() failed:%s",
+ SPI_result_code_string(SPI_result));
+
+ cursor->portalname = PLy_strdup(portal->name);
+
+ /* Commit the inner transaction, return to outer xact context */
+ ReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ * in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+ PLyExceptionEntry *entry;
+ PyObject *exc;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ Py_DECREF(cursor);
+
+ /*
+ * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ * have left us in a disconnected state. We need this hack to return
+ * to connected state.
+ */
+ SPI_restore_connection();
+
+ /* Look up the correct exception */
+ entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
+ HASH_FIND, NULL);
+ /* We really should find it, but just in case have a fallback */
+ Assert(entry != NULL);
+ exc = entry ? entry->exc : PLy_exc_spi_error;
+ /* Make Python raise the exception */
+ PLy_spi_exception_set(exc, edata);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ Assert(cursor->portalname != NULL);
+ return (PyObject *) cursor;
+}
+
+static PyObject *
+PLy_cursor_plan(PyObject *ob, PyObject *args)
+{
+ PLyCursorObject *cursor;
+ volatile int nargs;
+ int i;
+ PLyPlanObject *plan;
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
+
+ if (args)
+ {
+ if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
+ {
+ PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
+ return NULL;
+ }
+ nargs = PySequence_Length(args);
+ }
+ else
+ nargs = 0;
+
+ plan = (PLyPlanObject *) ob;
+
+ if (nargs != plan->nargs)
+ {
+ char *sv;
+ PyObject *so = PyObject_Str(args);
+
+ if (!so)
+ PLy_elog(ERROR, "could not execute plan");
+ sv = PyString_AsString(so);
+ PLy_exception_set_plural(PyExc_TypeError,
+ "Expected sequence of %d argument, got %d: %s",
+ "Expected sequence of %d arguments, got %d: %s",
+ plan->nargs,
+ plan->nargs, nargs, sv);
+ Py_DECREF(so);
+
+ return NULL;
+ }
+
+ if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
+ return NULL;
+ cursor->portalname = NULL;
+ cursor->closed = false;
+ PLy_typeinfo_init(&cursor->result);
+
+ oldcontext = CurrentMemoryContext;
+ oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_TRY();
+ {
+ Portal portal;
+ char *volatile nulls;
+ volatile int j;
+
+ if (nargs > 0)
+ nulls = palloc(nargs * sizeof(char));
+ else
+ nulls = NULL;
+
+ for (j = 0; j < nargs; j++)
+ {
+ PyObject *elem;
+
+ elem = PySequence_GetItem(args, j);
+ if (elem != Py_None)
+ {
+ PG_TRY();
+ {
+ plan->values[j] =
+ plan->args[j].out.d.func(&(plan->args[j].out.d),
+ -1,
+ elem);
+ }
+ PG_CATCH();
+ {
+ Py_DECREF(elem);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ Py_DECREF(elem);
+ nulls[j] = ' ';
+ }
+ else
+ {
+ Py_DECREF(elem);
+ plan->values[j] =
+ InputFunctionCall(&(plan->args[j].out.d.typfunc),
+ NULL,
+ plan->args[j].out.d.typioparam,
+ -1);
+ nulls[j] = 'n';
+ }
+ }
+
+ portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
+ PLy_curr_procedure->fn_readonly);
+ if (portal == NULL)
+ elog(ERROR, "SPI_cursor_open() failed:%s",
+ SPI_result_code_string(SPI_result));
+
+ cursor->portalname = PLy_strdup(portal->name);
+
+ /* Commit the inner transaction, return to outer xact context */
+ ReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ * in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+ }
+ PG_CATCH();
+ {
+ int k;
+ ErrorData *edata;
+ PLyExceptionEntry *entry;
+ PyObject *exc;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* cleanup plan->values array */
+ for (k = 0; k < nargs; k++)
+ {
+ if (!plan->args[k].out.d.typbyval &&
+ (plan->values[k] != PointerGetDatum(NULL)))
+ {
+ pfree(DatumGetPointer(plan->values[k]));
+ plan->values[k] = PointerGetDatum(NULL);
+ }
+ }
+
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ Py_DECREF(cursor);
+
+ /*
+ * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ * have left us in a disconnected state. We need this hack to return
+ * to connected state.
+ */
+ SPI_restore_connection();
+
+ /* Look up the correct exception */
+ entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
+ HASH_FIND, NULL);
+ /* We really should find it, but just in case have a fallback */
+ Assert(entry != NULL);
+ exc = entry ? entry->exc : PLy_exc_spi_error;
+ /* Make Python raise the exception */
+ PLy_spi_exception_set(exc, edata);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ for (i = 0; i < nargs; i++)
+ {
+ if (!plan->args[i].out.d.typbyval &&
+ (plan->values[i] != PointerGetDatum(NULL)))
+ {
+ pfree(DatumGetPointer(plan->values[i]));
+ plan->values[i] = PointerGetDatum(NULL);
+ }
+ }
+
+ Assert(cursor->portalname != NULL);
+ return (PyObject *) cursor;
+}
+
+static void
+PLy_cursor_dealloc(PyObject *arg)
+{
+ PLyCursorObject *cursor;
+ Portal portal;
+
+ cursor = (PLyCursorObject *) arg;
+
+ if (!cursor->closed)
+ {
+ portal = GetPortalByName(cursor->portalname);
+
+ if (PortalIsValid(portal))
+ SPI_cursor_close(portal);
+ }
+
+ PLy_free(cursor->portalname);
+ cursor->portalname = NULL;
+
+ PLy_typeinfo_dealloc(&cursor->result);
+ arg->ob_type->tp_free(arg);
+}
+
+static PyObject *
+PLy_cursor_iternext(PyObject *self)
+{
+ PLyCursorObject *cursor;
+ PyObject *ret;
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
+ Portal portal;
+
+ cursor = (PLyCursorObject *) self;
+
+ if (cursor->closed)
+ {
+ PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
+ return NULL;
+ }
+
+ portal = GetPortalByName(cursor->portalname);
+ if (!PortalIsValid(portal))
+ {
+ PLy_exception_set(PyExc_ValueError,
+ "iterating a cursor in an aborted subtransaction");
+ return NULL;
+ }
+
+ oldcontext = CurrentMemoryContext;
+ oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_TRY();
+ {
+ SPI_cursor_fetch(portal, true, 1);
+ if (SPI_processed == 0)
+ {
+ PyErr_SetNone(PyExc_StopIteration);
+ ret = NULL;
+ }
+ else
+ {
+ if (cursor->result.is_rowtype != 1)
+ PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
+
+ ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc);
+ }
+
+ SPI_freetuptable(SPI_tuptable);
+
+ /* Commit the inner transaction, return to outer xact context */
+ ReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ * in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+ PLyExceptionEntry *entry;
+ PyObject *exc;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ SPI_freetuptable(SPI_tuptable);
+
+ /*
+ * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ * have left us in a disconnected state. We need this hack to return
+ * to connected state.
+ */
+ SPI_restore_connection();
+
+ /* Look up the correct exception */
+ entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode,
+ HASH_FIND, NULL);
+ /* We really should find it, but just in case have a fallback */
+ Assert(entry != NULL);
+ exc = entry ? entry->exc : PLy_exc_spi_error;
+ /* Make Python raise the exception */
+ PLy_spi_exception_set(exc, edata);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ return ret;
+}
+
+static PyObject *
+PLy_cursor_fetch(PyObject *self, PyObject *args)
+{
+ PLyCursorObject *cursor;
+ int count;
+ PLyResultObject *ret;
+ volatile MemoryContext oldcontext;
+ volatile ResourceOwner oldowner;
+ Portal portal;
+
+ if (!PyArg_ParseTuple(args, "i", &count))
+ return NULL;
+
+ cursor = (PLyCursorObject *) self;
+
+ if (cursor->closed)
+ {
+ PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
+ return NULL;
+ }
+
+ portal = GetPortalByName(cursor->portalname);
+ if (!PortalIsValid(portal))
+ {
+ PLy_exception_set(PyExc_ValueError,
+ "iterating a cursor in an aborted subtransaction");
+ return NULL;
+ }
+
+ ret = (PLyResultObject *) PLy_result_new();
+ if (ret == NULL)
+ return NULL;
+
+ oldcontext = CurrentMemoryContext;
+ oldowner = CurrentResourceOwner;
+
+ BeginInternalSubTransaction(NULL);
+ MemoryContextSwitchTo(oldcontext);
+
+ PG_TRY();
+ {
+ SPI_cursor_fetch(portal, true, count);
+
+ if (cursor->result.is_rowtype != 1)
+ PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
+
+ Py_DECREF(ret->status);
+ ret->status = PyInt_FromLong(SPI_OK_FETCH);
+
+ Py_DECREF(ret->nrows);
+ ret->nrows = PyInt_FromLong(SPI_processed);
+
+ if (SPI_processed != 0)
+ {
+ int i;
+
+ Py_DECREF(ret->rows);
+ ret->rows = PyList_New(SPI_processed);
+
+ for (i = 0; i < SPI_processed; i++)
+ {
+ PyObject *row = PLyDict_FromTuple(&cursor->result,
+ SPI_tuptable->vals[i],
+ SPI_tuptable->tupdesc);
+ PyList_SetItem(ret->rows, i, row);
+ }
+ }
+
+ SPI_freetuptable(SPI_tuptable);
+
+ /* Commit the inner transaction, return to outer xact context */
+ ReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ /*
+ * AtEOSubXact_SPI() should not have popped any SPI context, but just
+ * in case it did, make sure we remain connected.
+ */
+ SPI_restore_connection();
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+ PLyExceptionEntry *entry;
+ PyObject *exc;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Abort the inner transaction */
+ RollbackAndReleaseCurrentSubTransaction();
+ MemoryContextSwitchTo(oldcontext);
+ CurrentResourceOwner = oldowner;
+
+ SPI_freetuptable(SPI_tuptable);
+
+ /*
+ * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+ * have left us in a disconnected state. We need this hack to return
+ * to connected state.
+ */
+ SPI_restore_connection();
+
+ /* Look up the correct exception */
+ entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode,
+ HASH_FIND, NULL);
+ /* We really should find it, but just in case have a fallback */
+ Assert(entry != NULL);
+ exc = entry ? entry->exc : PLy_exc_spi_error;
+ /* Make Python raise the exception */
+ PLy_spi_exception_set(exc, edata);
+ return NULL;
+ }
+ PG_END_TRY();
+
+ return (PyObject *) ret;
+}
+
+static PyObject *
+PLy_cursor_close(PyObject *self, PyObject *unused)
+{
+ PLyCursorObject *cursor = (PLyCursorObject *) self;
+
+ if (!cursor->closed)
+ {
+ Portal portal = GetPortalByName(cursor->portalname);
+
+ if (!PortalIsValid(portal))
+ {
+ PLy_exception_set(PyExc_ValueError,
+ "closing a cursor in an aborted subtransaction");
+ return NULL;
+ }
+
+ SPI_cursor_close(portal);
+ cursor->closed = true;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
/* s = plpy.subtransaction() */
static PyObject *
PLy_subtransaction(PyObject *self, PyObject *unused)
@@ -4184,6 +4824,8 @@ PLy_init_plpy(void)
elog(ERROR, "could not initialize PLy_ResultType");
if (PyType_Ready(&PLy_SubtransactionType) < 0)
elog(ERROR, "could not initialize PLy_SubtransactionType");
+ if (PyType_Ready(&PLy_CursorType) < 0)
+ elog(ERROR, "could not initialize PLy_CursorType");
#if PY_MAJOR_VERSION >= 3
PyModule_Create(&PLy_module);
diff --git a/src/pl/plpython/sql/plpython_spi.sql b/src/pl/plpython/sql/plpython_spi.sql
index 7f8f6a33d26..874b31e6df6 100644
--- a/src/pl/plpython/sql/plpython_spi.sql
+++ b/src/pl/plpython/sql/plpython_spi.sql
@@ -105,3 +105,119 @@ else:
$$ LANGUAGE plpythonu;
SELECT result_nrows_test();
+
+
+-- cursor objects
+
+CREATE FUNCTION simple_cursor_test() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+does = 0
+for row in res:
+ if row['lname'] == 'doe':
+ does += 1
+return does
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION double_cursor_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+res.close()
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+assert len(res.fetch(3)) == 3
+assert len(res.fetch(3)) == 1
+assert len(res.fetch(3)) == 0
+assert len(res.fetch(3)) == 0
+try:
+ # use next() or __next__(), the method name changed in
+ # http://www.python.org/dev/peps/pep-3114/
+ try:
+ res.next()
+ except AttributeError:
+ res.__next__()
+except StopIteration:
+ pass
+else:
+ assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users order by fname")
+assert len(res.fetch(2)) == 2
+
+item = None
+try:
+ item = res.next()
+except AttributeError:
+ item = res.__next__()
+assert item['fname'] == 'rick'
+
+assert len(res.fetch(2)) == 1
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION fetch_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+ res.fetch(1)
+except ValueError:
+ pass
+else:
+ assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION next_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+ try:
+ res.next()
+ except AttributeError:
+ res.__next__()
+except ValueError:
+ pass
+else:
+ assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users where false")
+assert len(res.fetch(1)) == 0
+try:
+ try:
+ res.next()
+ except AttributeError:
+ res.__next__()
+except StopIteration:
+ pass
+else:
+ assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$
+plan = plpy.prepare(
+ "select fname, lname from users where fname like $1 || '%' order by fname",
+ ["text"])
+for row in plpy.cursor(plan, ["w"]):
+ yield row['fname']
+for row in plpy.cursor(plan, ["j"]):
+ yield row['fname']
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$
+plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'",
+ ["text"])
+c = plpy.cursor(plan, ["a", "b"])
+$$ LANGUAGE plpythonu;
+
+SELECT simple_cursor_test();
+SELECT double_cursor_close();
+SELECT cursor_fetch();
+SELECT cursor_mix_next_and_fetch();
+SELECT fetch_after_close();
+SELECT next_after_close();
+SELECT cursor_fetch_next_empty();
+SELECT cursor_plan();
+SELECT cursor_plan_wrong_args();
diff --git a/src/pl/plpython/sql/plpython_subtransaction.sql b/src/pl/plpython/sql/plpython_subtransaction.sql
index a19cad5104e..9ad6377c7cd 100644
--- a/src/pl/plpython/sql/plpython_subtransaction.sql
+++ b/src/pl/plpython/sql/plpython_subtransaction.sql
@@ -242,3 +242,55 @@ SELECT pk_violation_inside_subtransaction();
SELECT * FROM subtransaction_tbl;
DROP TABLE subtransaction_tbl;
+
+-- cursor/subtransactions interactions
+
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+ cur.fetch(10);
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(10)
+ return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+ with plpy.subtransaction():
+ plpy.execute('create temporary table tmp(i) '
+ 'as select generate_series(1, 10)')
+ plan = plpy.prepare("select i from tmp")
+ cur = plpy.cursor(plan)
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ fetched = cur.fetch(5)
+ return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+ with plpy.subtransaction():
+ cur = plpy.cursor('select 1')
+ plpy.execute("select no_such_function()")
+except plpy.SPIError:
+ cur.close()
+ return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+
+SELECT cursor_in_subxact();
+SELECT cursor_aborted_subxact();
+SELECT cursor_plan_aborted_subxact();
+SELECT cursor_close_aborted_subxact();