1
0
mirror of https://github.com/quay/quay.git synced 2025-11-17 23:02:34 +03:00
Files
quay/endpoints/api/prototype.py
Kurtis Mullins 38be6d05d0 Python 3 (#153)
* Convert all Python2 to Python3 syntax.

* Removes oauth2lib dependency

* Replace mockredis with fakeredis

* byte/str conversions

* Removes nonexisting __nonzero__ in Python3

* Python3 Dockerfile and related

* [PROJQUAY-98] Replace resumablehashlib with rehash

* PROJQUAY-123 - replace gpgme with python3-gpg

* [PROJQUAY-135] Fix unhashable class error

* Update external dependencies for Python 3

- Move github.com/app-registry/appr to github.com/quay/appr
- github.com/coderanger/supervisor-stdout
- github.com/DevTable/container-cloud-config
- Update to latest mockldap with changes applied from coreos/mockldap
- Update dependencies in requirements.txt and requirements-dev.txt

* Default FLOAT_REPR function to str in json encoder and removes keyword assignment

True, False, and str were not keywords in Python2...

* [PROJQUAY-165] Replace package `bencode` with `bencode.py`

- Bencode is not compatible with Python 3.x and is no longer
  maintained. Bencode.py appears to be a drop-in replacement/fork
  that is compatible with Python 3.

* Make sure monkey.patch is called before anything else (

* Removes anunidecode dependency and replaces it with text_unidecode

* Base64 encode/decode pickle dumps/loads when storing value in DB

Base64 encodes/decodes the serialized values when storing them in the
DB. Also make sure to return a Python3 string instead of a Bytes when
coercing for db, otherwise, Postgres' TEXT field will convert it into
a hex representation when storing the value.

* Implement __hash__ on Digest class

In Python 3, if a class defines __eq__() but not __hash__(), its
instances will not be usable as items in hashable collections (e.g sets).

* Remove basestring check

* Fix expected message in credentials tests

* Fix usage of Cryptography.Fernet for Python3 (#219)

- Specifically, this addresses the issue where Byte<->String
  conversions weren't being applied correctly.

* Fix utils

- tar+stream layer format utils
- filelike util

* Fix storage tests

* Fix endpoint tests

* Fix workers tests

* Fix docker's empty layer bytes

* Fix registry tests

* Appr

* Enable CI for Python 3.6

* Skip buildman tests

Skip buildman tests while it's being rewritten to allow ci to pass.

* Install swig for CI

* Update expected exception type in redis validation test

* Fix gpg signing calls

Fix gpg calls for updated gpg wrapper, and add signing tests.

* Convert / to // for Python3 integer division

* WIP: Update buildman to use asyncio instead of trollius.

This dependency is considered deprecated/abandoned and was only
used as an implementation/backport of asyncio on Python 2.x
This is a work in progress, and is included in the PR just to get the
rest of the tests passing. The builder is actually being rewritten.

* Target Python 3.8

* Removes unused files

- Removes unused files that were added accidentally while rebasing
- Small fixes/cleanup
- TODO tasks comments

* Add TODO to verify rehash backward compat with resumablehashlib

* Revert "[PROJQUAY-135] Fix unhashable class error" and implements __hash__ instead.

This reverts commit 735e38e3c1d072bf50ea864bc7e119a55d3a8976.
Instead, defines __hash__ for encryped fields class, using the parent
field's implementation.

* Remove some unused files ad imports

Co-authored-by: Kenny Lee Sin Cheong <kenny.lee@redhat.com>
Co-authored-by: Tom McKay <thomasmckay@redhat.com>
2020-06-05 16:50:13 -04:00

294 lines
10 KiB
Python

"""
Manage default permissions added to repositories.
"""
from flask import request
from endpoints.api import (
resource,
nickname,
ApiResource,
validate_json_request,
request_error,
log_action,
path_param,
require_scope,
)
from endpoints.exception import Unauthorized, NotFound
from auth.permissions import AdministerOrganizationPermission
from auth.auth_context import get_authenticated_user
from auth import scopes
from data import model
from app import avatar
def prototype_view(proto, org_members):
def prototype_user_view(user):
return {
"name": user.username,
"is_robot": user.robot,
"kind": "user",
"is_org_member": user.robot or user.username in org_members,
"avatar": avatar.get_data_for_user(user),
}
if proto.delegate_user:
delegate_view = prototype_user_view(proto.delegate_user)
else:
delegate_view = {
"name": proto.delegate_team.name,
"kind": "team",
"avatar": avatar.get_data_for_team(proto.delegate_team),
}
return {
"activating_user": (
prototype_user_view(proto.activating_user) if proto.activating_user else None
),
"delegate": delegate_view,
"role": proto.role.name,
"id": proto.uuid,
}
def log_prototype_action(action_kind, orgname, prototype, **kwargs):
username = get_authenticated_user().username
log_params = {
"prototypeid": prototype.uuid,
"username": username,
"activating_username": (
prototype.activating_user.username if prototype.activating_user else None
),
"role": prototype.role.name,
}
for key, value in list(kwargs.items()):
log_params[key] = value
if prototype.delegate_user:
log_params["delegate_user"] = prototype.delegate_user.username
elif prototype.delegate_team:
log_params["delegate_team"] = prototype.delegate_team.name
log_action(action_kind, orgname, log_params)
@resource("/v1/organization/<orgname>/prototypes")
@path_param("orgname", "The name of the organization")
class PermissionPrototypeList(ApiResource):
"""
Resource for listing and creating permission prototypes.
"""
schemas = {
"NewPrototype": {
"type": "object",
"description": "Description of a new prototype",
"required": ["role", "delegate",],
"properties": {
"role": {
"type": "string",
"description": "Role that should be applied to the delegate",
"enum": ["read", "write", "admin",],
},
"activating_user": {
"type": "object",
"description": "Repository creating user to whom the rule should apply",
"required": ["name",],
"properties": {
"name": {
"type": "string",
"description": "The username for the activating_user",
},
},
},
"delegate": {
"type": "object",
"description": "Information about the user or team to which the rule grants access",
"required": ["name", "kind",],
"properties": {
"name": {
"type": "string",
"description": "The name for the delegate team or user",
},
"kind": {
"type": "string",
"description": "Whether the delegate is a user or a team",
"enum": ["user", "team",],
},
},
},
},
},
}
@require_scope(scopes.ORG_ADMIN)
@nickname("getOrganizationPrototypePermissions")
def get(self, orgname):
"""
List the existing prototypes for this organization.
"""
permission = AdministerOrganizationPermission(orgname)
if permission.can():
try:
org = model.organization.get_organization(orgname)
except model.InvalidOrganizationException:
raise NotFound()
permissions = model.permission.get_prototype_permissions(org)
users_filter = {p.activating_user for p in permissions} | {
p.delegate_user for p in permissions
}
org_members = model.organization.get_organization_member_set(
org, users_filter=users_filter
)
return {"prototypes": [prototype_view(p, org_members) for p in permissions]}
raise Unauthorized()
@require_scope(scopes.ORG_ADMIN)
@nickname("createOrganizationPrototypePermission")
@validate_json_request("NewPrototype")
def post(self, orgname):
"""
Create a new permission prototype.
"""
permission = AdministerOrganizationPermission(orgname)
if permission.can():
try:
org = model.organization.get_organization(orgname)
except model.InvalidOrganizationException:
raise NotFound()
details = request.get_json()
activating_username = None
if (
"activating_user" in details
and details["activating_user"]
and "name" in details["activating_user"]
):
activating_username = details["activating_user"]["name"]
delegate = details["delegate"] if "delegate" in details else {}
delegate_kind = delegate.get("kind", None)
delegate_name = delegate.get("name", None)
delegate_username = delegate_name if delegate_kind == "user" else None
delegate_teamname = delegate_name if delegate_kind == "team" else None
activating_user = (
model.user.get_user(activating_username) if activating_username else None
)
delegate_user = model.user.get_user(delegate_username) if delegate_username else None
delegate_team = (
model.team.get_organization_team(orgname, delegate_teamname)
if delegate_teamname
else None
)
if activating_username and not activating_user:
raise request_error(message="Unknown activating user")
if not delegate_user and not delegate_team:
raise request_error(message="Missing delegate user or team")
role_name = details["role"]
prototype = model.permission.add_prototype_permission(
org, role_name, activating_user, delegate_user, delegate_team
)
log_prototype_action("create_prototype_permission", orgname, prototype)
users_filter = {prototype.activating_user, prototype.delegate_user}
org_members = model.organization.get_organization_member_set(
org, users_filter=users_filter
)
return prototype_view(prototype, org_members)
raise Unauthorized()
@resource("/v1/organization/<orgname>/prototypes/<prototypeid>")
@path_param("orgname", "The name of the organization")
@path_param("prototypeid", "The ID of the prototype")
class PermissionPrototype(ApiResource):
"""
Resource for managingin individual permission prototypes.
"""
schemas = {
"PrototypeUpdate": {
"type": "object",
"description": "Description of a the new prototype role",
"required": ["role",],
"properties": {
"role": {
"type": "string",
"description": "Role that should be applied to the permission",
"enum": ["read", "write", "admin",],
},
},
},
}
@require_scope(scopes.ORG_ADMIN)
@nickname("deleteOrganizationPrototypePermission")
def delete(self, orgname, prototypeid):
"""
Delete an existing permission prototype.
"""
permission = AdministerOrganizationPermission(orgname)
if permission.can():
try:
org = model.organization.get_organization(orgname)
except model.InvalidOrganizationException:
raise NotFound()
prototype = model.permission.delete_prototype_permission(org, prototypeid)
if not prototype:
raise NotFound()
log_prototype_action("delete_prototype_permission", orgname, prototype)
return "", 204
raise Unauthorized()
@require_scope(scopes.ORG_ADMIN)
@nickname("updateOrganizationPrototypePermission")
@validate_json_request("PrototypeUpdate")
def put(self, orgname, prototypeid):
"""
Update the role of an existing permission prototype.
"""
permission = AdministerOrganizationPermission(orgname)
if permission.can():
try:
org = model.organization.get_organization(orgname)
except model.InvalidOrganizationException:
raise NotFound()
existing = model.permission.get_prototype_permission(org, prototypeid)
if not existing:
raise NotFound()
details = request.get_json()
role_name = details["role"]
prototype = model.permission.update_prototype_permission(org, prototypeid, role_name)
if not prototype:
raise NotFound()
log_prototype_action(
"modify_prototype_permission", orgname, prototype, original_role=existing.role.name
)
users_filter = {prototype.activating_user, prototype.delegate_user}
org_members = model.organization.get_organization_member_set(
org, users_filter=users_filter
)
return prototype_view(prototype, org_members)
raise Unauthorized()