1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00
quay/endpoints/csrf.py
Kenny Lee Sin Cheong 5f63b3a7bb
chore: drop deprecated tables and remove unused code (PROJQUAY-522) (#2089)
* chore: drop deprecated tables and remove unused code

* isort imports

* migration: check for table existence before drop
2023-08-25 12:17:24 -04:00

80 lines
2.4 KiB
Python

import base64
import hmac
import logging
import os
from functools import wraps
from flask import Response, request, session
import features
from app import app
from auth.auth_context import get_sso_token, get_validated_oauth_token
from util.http import abort
logger = logging.getLogger(__name__)
OAUTH_CSRF_TOKEN_NAME = "_oauth_csrf_token"
_QUAY_CSRF_TOKEN_NAME = "_csrf_token"
_QUAY_CSRF_HEADER_NAME = "X-CSRF-Token"
QUAY_CSRF_UPDATED_HEADER_NAME = "X-Next-CSRF-Token"
def generate_csrf_token(session_token_name=_QUAY_CSRF_TOKEN_NAME, force=False):
"""
If not present in the session, generates a new CSRF token with the given name and places it into
the session.
Returns the generated token.
"""
if session_token_name not in session or force:
session[session_token_name] = base64.urlsafe_b64encode(os.urandom(48)).decode("utf-8")
return session[session_token_name]
def verify_csrf(
session_token_name=_QUAY_CSRF_TOKEN_NAME,
request_token_name=_QUAY_CSRF_TOKEN_NAME,
check_header=True,
):
"""
Verifies that the CSRF token with the given name is found in the session and that the matching
token is found in the request args or values.
"""
token = str(session.get(session_token_name, ""))
found_token = str(request.values.get(request_token_name, ""))
if check_header and not found_token:
found_token = str(request.headers.get(_QUAY_CSRF_HEADER_NAME, ""))
if not token or not found_token or not hmac.compare_digest(token, found_token):
msg = "CSRF Failure. Session token (%s) was %s and request token (%s) was %s"
logger.error(msg, session_token_name, token, request_token_name, found_token)
abort(403, message="CSRF token was invalid or missing.")
def csrf_protect(
session_token_name=_QUAY_CSRF_TOKEN_NAME,
request_token_name=_QUAY_CSRF_TOKEN_NAME,
all_methods=False,
check_header=True,
):
def inner(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Verify the CSRF token.
if get_validated_oauth_token() is None and get_sso_token() is None:
if all_methods or (request.method != "GET" and request.method != "HEAD"):
verify_csrf(session_token_name, request_token_name, check_header)
# Invoke the handler.
resp = func(*args, **kwargs)
return resp
return wrapper
return inner
app.jinja_env.globals["csrf_token"] = generate_csrf_token