1
0
mirror of https://github.com/postgres/postgres.git synced 2025-06-30 21:42:05 +03:00

Allow to lock views.

Now all tables used in view definitions can be recursively locked by a
LOCK command.

Author: Yugo Nagata
Reviewed by Robert Haas, Thomas Munro and me.

Discussion: https://postgr.es/m/20171011183629.eb2817b3.nagata%40sraoss.co.jp
This commit is contained in:
Tatsuo Ishii
2018-03-30 09:18:02 +09:00
parent fb60478011
commit 34c20de4d0
4 changed files with 285 additions and 18 deletions

View File

@ -23,11 +23,15 @@
#include "utils/acl.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "rewrite/rewriteHandler.h"
#include "access/heapam.h"
#include "nodes/nodeFuncs.h"
static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait);
static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode);
static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid);
static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode, Oid userid);
static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid,
Oid oldrelid, void *arg);
static void LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait);
/*
* LOCK TABLE
@ -62,8 +66,10 @@ LockTableCommand(LockStmt *lockstmt)
RangeVarCallbackForLockTable,
(void *) &lockstmt->mode);
if (recurse)
LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait);
if (get_rel_relkind(reloid) == RELKIND_VIEW)
LockViewRecurse(reloid, reloid, lockstmt->mode, lockstmt->nowait);
else if (recurse)
LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait, GetUserId());
}
}
@ -86,15 +92,17 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
return; /* woops, concurrently dropped; no permissions
* check */
/* Currently, we only allow plain tables to be locked */
if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
/* Currently, we only allow plain tables or views to be locked */
if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table",
errmsg("\"%s\" is not a table or a view",
rv->relname)));
/* Check permissions. */
aclresult = LockTableAclCheck(relid, lockmode);
aclresult = LockTableAclCheck(relid, lockmode, GetUserId());
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
}
@ -107,7 +115,7 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
* multiply-inheriting children more than once, but that's no problem.
*/
static void
LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid)
{
List *children;
ListCell *lc;
@ -120,7 +128,7 @@ LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
AclResult aclresult;
/* Check permissions before acquiring the lock. */
aclresult = LockTableAclCheck(childreloid, lockmode);
aclresult = LockTableAclCheck(childreloid, lockmode, userid);
if (aclresult != ACLCHECK_OK)
{
char *relname = get_rel_name(childreloid);
@ -157,15 +165,120 @@ LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
continue;
}
LockTableRecurse(childreloid, lockmode, nowait);
LockTableRecurse(childreloid, lockmode, nowait, userid);
}
}
/*
* Apply LOCK TABLE recursively over a view
*
* All tables and views appearing in the view definition query are locked
* recursively with the same lock mode.
*/
typedef struct
{
Oid root_reloid;
LOCKMODE lockmode;
bool nowait;
Oid viewowner;
Oid viewoid;
} LockViewRecurse_context;
static bool
LockViewRecurse_walker(Node *node, LockViewRecurse_context *context)
{
if (node == NULL)
return false;
if (IsA(node, Query))
{
Query *query = (Query *) node;
ListCell *rtable;
foreach(rtable, query->rtable)
{
RangeTblEntry *rte = lfirst(rtable);
AclResult aclresult;
Oid relid = rte->relid;
char relkind = rte->relkind;
char *relname = get_rel_name(relid);
/* The OLD and NEW placeholder entries in the view's rtable are skipped. */
if (relid == context->viewoid &&
(!strcmp(rte->eref->aliasname, "old") || !strcmp(rte->eref->aliasname, "new")))
continue;
/* Currently, we only allow plain tables or views to be locked. */
if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
relkind != RELKIND_VIEW)
continue;
/* Check infinite recursion in the view definition. */
if (relid == context->root_reloid)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("infinite recursion detected in rules for relation \"%s\"",
get_rel_name(context->root_reloid))));
/* Check permissions with the view owner's privilege. */
aclresult = LockTableAclCheck(relid, context->lockmode, context->viewowner);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, get_relkind_objtype(relkind), relname);
/* We have enough rights to lock the relation; do so. */
if (!context->nowait)
LockRelationOid(relid, context->lockmode);
else if (!ConditionalLockRelationOid(relid, context->lockmode))
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on relation \"%s\"",
relname)));
if (relkind == RELKIND_VIEW)
LockViewRecurse(relid, context->root_reloid, context->lockmode, context->nowait);
else if (rte->inh)
LockTableRecurse(relid, context->lockmode, context->nowait, context->viewowner);
}
return query_tree_walker(query,
LockViewRecurse_walker,
context,
QTW_IGNORE_JOINALIASES);
}
return expression_tree_walker(node,
LockViewRecurse_walker,
context);
}
static void
LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait)
{
LockViewRecurse_context context;
Relation view;
Query *viewquery;
view = heap_open(reloid, NoLock);
viewquery = get_view_query(view);
heap_close(view, NoLock);
context.root_reloid = root_reloid;
context.lockmode = lockmode;
context.nowait = nowait;
context.viewowner = view->rd_rel->relowner;
context.viewoid = reloid;
LockViewRecurse_walker((Node *) viewquery, &context);
}
/*
* Check whether the current user is permitted to lock this relation.
*/
static AclResult
LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
LockTableAclCheck(Oid reloid, LOCKMODE lockmode, Oid userid)
{
AclResult aclresult;
AclMode aclmask;
@ -178,7 +291,7 @@ LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
else
aclmask = ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE;
aclresult = pg_class_aclcheck(reloid, GetUserId(), aclmask);
aclresult = pg_class_aclcheck(reloid, userid, aclmask);
return aclresult;
}