1
0
mirror of https://github.com/postgres/postgres.git synced 2025-07-03 20:02:46 +03:00

JSON_TABLE

This feature allows jsonb data to be treated as a table and thus used in
a FROM clause like other tabular data. Data can be selected from the
jsonb using jsonpath expressions, and hoisted out of nested structures
in the jsonb to form multiple rows, more or less like an outer join.

Nikita Glukhov

Reviewers have included (in no particular order) Andres Freund, Alexander
Korotkov, Pavel Stehule, Andrew Alsup, Erik Rijkers, Zhihong Yu (whose
name I previously misspelled), Himanshu Upadhyaya, Daniel Gustafsson,
Justin Pryzby.

Discussion: https://postgr.es/m/7e2cb85d-24cf-4abb-30a5-1a33715959bd@postgrespro.ru
This commit is contained in:
Andrew Dunstan
2022-04-04 15:36:03 -04:00
parent c42a6fc41d
commit 4e34747c88
31 changed files with 2605 additions and 34 deletions

View File

@ -61,9 +61,11 @@
#include "catalog/pg_collation.h"
#include "catalog/pg_type.h"
#include "executor/execExpr.h"
#include "funcapi.h"
#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "regex/regex.h"
#include "utils/builtins.h"
#include "utils/date.h"
@ -74,6 +76,8 @@
#include "utils/guc.h"
#include "utils/json.h"
#include "utils/jsonpath.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/timestamp.h"
#include "utils/varlena.h"
@ -155,6 +159,57 @@ typedef struct JsonValueListIterator
ListCell *next;
} JsonValueListIterator;
/* Structures for JSON_TABLE execution */
typedef struct JsonTableScanState JsonTableScanState;
typedef struct JsonTableJoinState JsonTableJoinState;
struct JsonTableScanState
{
JsonTableScanState *parent;
JsonTableJoinState *nested;
MemoryContext mcxt;
JsonPath *path;
List *args;
JsonValueList found;
JsonValueListIterator iter;
Datum current;
int ordinal;
bool currentIsNull;
bool errorOnError;
bool advanceNested;
bool reset;
};
struct JsonTableJoinState
{
union
{
struct
{
JsonTableJoinState *left;
JsonTableJoinState *right;
bool advanceRight;
} join;
JsonTableScanState scan;
} u;
bool is_join;
};
/* random number to identify JsonTableContext */
#define JSON_TABLE_CONTEXT_MAGIC 418352867
typedef struct JsonTableContext
{
int magic;
struct
{
ExprState *expr;
JsonTableScanState *scan;
} *colexprs;
JsonTableScanState root;
bool empty;
} JsonTableContext;
/* strict/lax flags is decomposed into four [un]wrap/error flags */
#define jspStrictAbsenseOfErrors(cxt) (!(cxt)->laxMode)
#define jspAutoUnwrap(cxt) ((cxt)->laxMode)
@ -245,6 +300,7 @@ static JsonPathExecResult getArrayIndex(JsonPathExecContext *cxt,
JsonPathItem *jsp, JsonbValue *jb, int32 *index);
static JsonBaseObjectInfo setBaseObject(JsonPathExecContext *cxt,
JsonbValue *jbv, int32 id);
static void JsonValueListClear(JsonValueList *jvl);
static void JsonValueListAppend(JsonValueList *jvl, JsonbValue *jbv);
static int JsonValueListLength(const JsonValueList *jvl);
static bool JsonValueListIsEmpty(JsonValueList *jvl);
@ -262,6 +318,12 @@ static JsonbValue *wrapItemsInArray(const JsonValueList *items);
static int compareDatetime(Datum val1, Oid typid1, Datum val2, Oid typid2,
bool useTz, bool *have_error);
static JsonTableJoinState *JsonTableInitPlanState(JsonTableContext *cxt,
Node *plan, JsonTableScanState *parent);
static bool JsonTableNextRow(JsonTableScanState *scan);
/****************** User interface to JsonPath executor ********************/
/*
@ -2458,6 +2520,13 @@ setBaseObject(JsonPathExecContext *cxt, JsonbValue *jbv, int32 id)
return baseObject;
}
static void
JsonValueListClear(JsonValueList *jvl)
{
jvl->singleton = NULL;
jvl->list = NULL;
}
static void
JsonValueListAppend(JsonValueList *jvl, JsonbValue *jbv)
{
@ -3067,3 +3136,370 @@ JsonItemFromDatum(Datum val, Oid typid, int32 typmod, JsonbValue *res)
"casted to supported jsonpath types.")));
}
}
/************************ JSON_TABLE functions ***************************/
/*
* Returns private data from executor state. Ensure validity by check with
* MAGIC number.
*/
static inline JsonTableContext *
GetJsonTableContext(TableFuncScanState *state, const char *fname)
{
JsonTableContext *result;
if (!IsA(state, TableFuncScanState))
elog(ERROR, "%s called with invalid TableFuncScanState", fname);
result = (JsonTableContext *) state->opaque;
if (result->magic != JSON_TABLE_CONTEXT_MAGIC)
elog(ERROR, "%s called with invalid TableFuncScanState", fname);
return result;
}
/* Recursively initialize JSON_TABLE scan state */
static void
JsonTableInitScanState(JsonTableContext *cxt, JsonTableScanState *scan,
JsonTableParent *node, JsonTableScanState *parent,
List *args, MemoryContext mcxt)
{
int i;
scan->parent = parent;
scan->errorOnError = node->errorOnError;
scan->path = DatumGetJsonPathP(node->path->constvalue);
scan->args = args;
scan->mcxt = AllocSetContextCreate(mcxt, "JsonTableContext",
ALLOCSET_DEFAULT_SIZES);
scan->nested = node->child ?
JsonTableInitPlanState(cxt, node->child, scan) : NULL;
scan->current = PointerGetDatum(NULL);
scan->currentIsNull = true;
for (i = node->colMin; i <= node->colMax; i++)
cxt->colexprs[i].scan = scan;
}
/* Recursively initialize JSON_TABLE scan state */
static JsonTableJoinState *
JsonTableInitPlanState(JsonTableContext *cxt, Node *plan,
JsonTableScanState *parent)
{
JsonTableJoinState *state = palloc0(sizeof(*state));
if (IsA(plan, JsonTableSibling))
{
JsonTableSibling *join = castNode(JsonTableSibling, plan);
state->is_join = true;
state->u.join.left = JsonTableInitPlanState(cxt, join->larg, parent);
state->u.join.right = JsonTableInitPlanState(cxt, join->rarg, parent);
}
else
{
JsonTableParent *node = castNode(JsonTableParent, plan);
state->is_join = false;
JsonTableInitScanState(cxt, &state->u.scan, node, parent,
parent->args, parent->mcxt);
}
return state;
}
/*
* JsonTableInitOpaque
* Fill in TableFuncScanState->opaque for JsonTable processor
*/
static void
JsonTableInitOpaque(TableFuncScanState *state, int natts)
{
JsonTableContext *cxt;
PlanState *ps = &state->ss.ps;
TableFuncScan *tfs = castNode(TableFuncScan, ps->plan);
TableFunc *tf = tfs->tablefunc;
JsonExpr *ci = castNode(JsonExpr, tf->docexpr);
JsonTableParent *root = castNode(JsonTableParent, tf->plan);
List *args = NIL;
ListCell *lc;
int i;
cxt = palloc0(sizeof(JsonTableContext));
cxt->magic = JSON_TABLE_CONTEXT_MAGIC;
if (ci->passing_values)
{
ListCell *exprlc;
ListCell *namelc;
forboth(exprlc, ci->passing_values,
namelc, ci->passing_names)
{
Expr *expr = (Expr *) lfirst(exprlc);
String *name = lfirst_node(String, namelc);
JsonPathVariableEvalContext *var = palloc(sizeof(*var));
var->name = pstrdup(name->sval);
var->typid = exprType((Node *) expr);
var->typmod = exprTypmod((Node *) expr);
var->estate = ExecInitExpr(expr, ps);
var->econtext = ps->ps_ExprContext;
var->mcxt = CurrentMemoryContext;
var->evaluated = false;
var->value = (Datum) 0;
var->isnull = true;
args = lappend(args, var);
}
}
cxt->colexprs = palloc(sizeof(*cxt->colexprs) *
list_length(tf->colvalexprs));
JsonTableInitScanState(cxt, &cxt->root, root, NULL, args,
CurrentMemoryContext);
i = 0;
foreach(lc, tf->colvalexprs)
{
Expr *expr = lfirst(lc);
cxt->colexprs[i].expr =
ExecInitExprWithCaseValue(expr, ps,
&cxt->colexprs[i].scan->current,
&cxt->colexprs[i].scan->currentIsNull);
i++;
}
state->opaque = cxt;
}
/* Reset scan iterator to the beginning of the item list */
static void
JsonTableRescan(JsonTableScanState *scan)
{
JsonValueListInitIterator(&scan->found, &scan->iter);
scan->current = PointerGetDatum(NULL);
scan->currentIsNull = true;
scan->advanceNested = false;
scan->ordinal = 0;
}
/* Reset context item of a scan, execute JSON path and reset a scan */
static void
JsonTableResetContextItem(JsonTableScanState *scan, Datum item)
{
MemoryContext oldcxt;
JsonPathExecResult res;
Jsonb *js = (Jsonb *) DatumGetJsonbP(item);
JsonValueListClear(&scan->found);
MemoryContextResetOnly(scan->mcxt);
oldcxt = MemoryContextSwitchTo(scan->mcxt);
res = executeJsonPath(scan->path, scan->args, EvalJsonPathVar, js,
scan->errorOnError, &scan->found, false /* FIXME */);
MemoryContextSwitchTo(oldcxt);
if (jperIsError(res))
{
Assert(!scan->errorOnError);
JsonValueListClear(&scan->found); /* EMPTY ON ERROR case */
}
JsonTableRescan(scan);
}
/*
* JsonTableSetDocument
* Install the input document
*/
static void
JsonTableSetDocument(TableFuncScanState *state, Datum value)
{
JsonTableContext *cxt = GetJsonTableContext(state, "JsonTableSetDocument");
JsonTableResetContextItem(&cxt->root, value);
}
/*
* Fetch next row from a union joined scan.
*
* Returns false at the end of a scan, true otherwise.
*/
static bool
JsonTableNextJoinRow(JsonTableJoinState *state)
{
if (!state->is_join)
return JsonTableNextRow(&state->u.scan);
if (!state->u.join.advanceRight)
{
/* fetch next outer row */
if (JsonTableNextJoinRow(state->u.join.left))
return true;
state->u.join.advanceRight = true; /* next inner row */
}
/* fetch next inner row */
return JsonTableNextJoinRow(state->u.join.right);
}
/* Recursively set 'reset' flag of scan and its child nodes */
static void
JsonTableJoinReset(JsonTableJoinState *state)
{
if (state->is_join)
{
JsonTableJoinReset(state->u.join.left);
JsonTableJoinReset(state->u.join.right);
state->u.join.advanceRight = false;
}
else
{
state->u.scan.reset = true;
state->u.scan.advanceNested = false;
if (state->u.scan.nested)
JsonTableJoinReset(state->u.scan.nested);
}
}
/*
* Fetch next row from a simple scan with outer joined nested subscans.
*
* Returns false at the end of a scan, true otherwise.
*/
static bool
JsonTableNextRow(JsonTableScanState *scan)
{
JsonbValue *jbv;
MemoryContext oldcxt;
/* reset context item if requested */
if (scan->reset)
{
Assert(!scan->parent->currentIsNull);
JsonTableResetContextItem(scan, scan->parent->current);
scan->reset = false;
}
if (scan->advanceNested)
{
/* fetch next nested row */
if (JsonTableNextJoinRow(scan->nested))
return true;
scan->advanceNested = false;
}
/* fetch next row */
jbv = JsonValueListNext(&scan->found, &scan->iter);
if (!jbv)
{
scan->current = PointerGetDatum(NULL);
scan->currentIsNull = true;
return false; /* end of scan */
}
/* set current row item */
oldcxt = MemoryContextSwitchTo(scan->mcxt);
scan->current = JsonbPGetDatum(JsonbValueToJsonb(jbv));
scan->currentIsNull = false;
MemoryContextSwitchTo(oldcxt);
scan->ordinal++;
if (scan->nested)
{
JsonTableJoinReset(scan->nested);
scan->advanceNested = JsonTableNextJoinRow(scan->nested);
}
return true;
}
/*
* JsonTableFetchRow
* Prepare the next "current" tuple for upcoming GetValue calls.
* Returns FALSE if the row-filter expression returned no more rows.
*/
static bool
JsonTableFetchRow(TableFuncScanState *state)
{
JsonTableContext *cxt = GetJsonTableContext(state, "JsonTableFetchRow");
if (cxt->empty)
return false;
return JsonTableNextRow(&cxt->root);
}
/*
* JsonTableGetValue
* Return the value for column number 'colnum' for the current row.
*
* This leaks memory, so be sure to reset often the context in which it's
* called.
*/
static Datum
JsonTableGetValue(TableFuncScanState *state, int colnum,
Oid typid, int32 typmod, bool *isnull)
{
JsonTableContext *cxt = GetJsonTableContext(state, "JsonTableGetValue");
ExprContext *econtext = state->ss.ps.ps_ExprContext;
ExprState *estate = cxt->colexprs[colnum].expr;
JsonTableScanState *scan = cxt->colexprs[colnum].scan;
Datum result;
if (scan->currentIsNull) /* NULL from outer/union join */
{
result = (Datum) 0;
*isnull = true;
}
else if (estate) /* regular column */
{
result = ExecEvalExpr(estate, econtext, isnull);
}
else
{
result = Int32GetDatum(scan->ordinal); /* ordinality column */
*isnull = false;
}
return result;
}
/*
* JsonTableDestroyOpaque
*/
static void
JsonTableDestroyOpaque(TableFuncScanState *state)
{
JsonTableContext *cxt = GetJsonTableContext(state, "JsonTableDestroyOpaque");
/* not valid anymore */
cxt->magic = 0;
state->opaque = NULL;
}
const TableFuncRoutine JsonbTableRoutine =
{
JsonTableInitOpaque,
JsonTableSetDocument,
NULL,
NULL,
NULL,
JsonTableFetchRow,
JsonTableGetValue,
JsonTableDestroyOpaque
};