1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00
quay/buildman/build_token.py
Kenny Lee Sin Cheong 5f63b3a7bb
chore: drop deprecated tables and remove unused code (PROJQUAY-522) (#2089)
* chore: drop deprecated tables and remove unused code

* isort imports

* migration: check for table existence before drop
2023-08-25 12:17:24 -04:00

125 lines
3.6 KiB
Python

import logging
import jsonschema
import jwt
from app import instance_keys
from util.security import jwtutil
from util.security.registry_jwt import (
ALGORITHM,
JWT_CLOCK_SKEW_SECONDS,
InvalidBearerTokenException,
generate_bearer_token,
)
logger = logging.getLogger(__name__)
ANONYMOUS_SUB = "(anonymous)"
BUILD_JOB_REGISTRATION_TYPE = "build_job_registration"
BUILD_JOB_TOKEN_TYPE = "build_job_token"
BUILD_TOKEN_CONTEXT_SCHEMA = {
"type": "object",
"description": "Build context",
"required": ["token_type", "build_id", "job_id", "expiration"],
"properties": {
"token_type": {
"type": "string",
"description": "The build token type",
},
"build_id": {
"type": "string",
"description": "The build id",
},
"job_id": {
"type": "string",
"description": "The job id",
},
"expiration": {
"type": "number",
"description": "The number of seconds until the job expires",
},
},
}
class InvalidBuildTokenException(Exception):
pass
def build_token(aud, token_type, build_id, job_id, expiration, instance_keys):
"""Returns an encoded JWT for the given build, signed by the local instance's private."""
token_data = {
"token_type": token_type,
"build_id": build_id,
"job_id": job_id,
"expiration": expiration,
}
token = generate_bearer_token(aud, ANONYMOUS_SUB, token_data, {}, expiration, instance_keys)
return token
def verify_build_token(token, aud, token_type, instance_keys):
"""Verify the JWT build token."""
try:
headers = jwt.get_unverified_header(token)
except jwtutil.InvalidTokenError as ite:
logger.error("Invalid token reason: %s", ite)
raise InvalidBuildTokenException(ite)
kid = headers.get("kid", None)
if kid is None:
logger.error("Missing kid header on encoded JWT: %s", token)
raise InvalidBuildTokenException("Missing kid header")
public_key = instance_keys.get_service_key_public_key(kid)
if public_key is None:
logger.error("Could not find requested service key %s with encoded JWT: %s", kid, token)
raise InvalidBuildTokenException("Unknown service key")
try:
payload = jwtutil.decode(
token,
public_key,
verify=True,
algorithms=[ALGORITHM],
audience=aud,
issuer=instance_keys.service_name,
leeway=JWT_CLOCK_SKEW_SECONDS,
)
except jwtutil.InvalidTokenError as ite:
logger.error("Invalid token reason: %s", ite)
raise InvalidBuildTokenException(ite)
if "sub" not in payload:
raise InvalidBuildTokenException("Missing sub field in JWT")
if payload["sub"] != ANONYMOUS_SUB:
raise InvalidBuildTokenException("Wrong sub field in JWT")
if (
"context" not in payload
or not payload["context"]["token_type"]
or not payload["context"]["build_id"]
or not payload["context"]["job_id"]
or not payload["context"]["expiration"]
):
raise InvalidBuildTokenException("Missing context field in JWT")
try:
jsonschema.validate(payload["context"], BUILD_TOKEN_CONTEXT_SCHEMA)
except jsonschema.ValidationError:
raise InvalidBuildTokenException(
"Unable to validate build token context schema: malformed context"
)
if payload["context"]["token_type"] != token_type:
raise InvalidBuildTokenException(
"Build token type in JWT does not match expected type: %s" % token_type
)
return payload