1
0
mirror of https://github.com/quay/quay.git synced 2025-09-17 19:02:19 +03:00
Files
quay/endpoints/secscan.py
Kenny Lee Sin Cheong 5f63b3a7bb chore: drop deprecated tables and remove unused code (PROJQUAY-522) (#2089)
* chore: drop deprecated tables and remove unused code

* isort imports

* migration: check for table existence before drop
2023-08-25 12:17:24 -04:00

81 lines
2.4 KiB
Python

import base64
import json
import logging
import jwt
from flask import Blueprint, abort, jsonify, make_response, request
import features
from app import app, secscan_notification_queue
from data.database import Manifest, ManifestSecurityStatus
from endpoints.decorators import anon_allowed, route_show_if
from util.security.jwtutil import TOKEN_REGEX, decode
logger = logging.getLogger(__name__)
secscan = Blueprint("secscan", __name__)
JWT_HEADER_NAME = "Authorization"
@secscan.route("/_internal_ping")
@anon_allowed
def internal_ping():
return make_response("true", 200)
@route_show_if(features.SECURITY_SCANNER)
@route_show_if(features.SECURITY_NOTIFICATIONS)
@secscan.route("/notification", methods=["POST"])
@anon_allowed
def secscan_notification():
# If Quay is configured with a Clair V4 PSK we assume
# Clair will also sign JWT's with this PSK. Therefore,
# attempt jwt verification.
key = app.config.get("SECURITY_SCANNER_V4_PSK", None)
if key:
key = base64.b64decode(key)
jwt_header = request.headers.get(JWT_HEADER_NAME, "")
match = TOKEN_REGEX.match(jwt_header)
if match is None:
logger.error("Could not find matching bearer token")
abort(401)
token = match.group(1)
try:
decode(token, key=key, algorithms=["HS256"])
except jwt.exceptions.InvalidTokenError as e:
logger.error("Could not verify jwt {}".format(e))
abort(401)
logger.debug("Successfully verified jwt")
data = request.get_json()
if data is None:
logger.error("expected json request")
abort(400)
logger.debug("Got notification from V4 Security Scanner: %s", data)
if "notification_id" not in data or "callback" not in data:
abort(400)
notification_id = data["notification_id"]
name = ["with_id", notification_id]
if not secscan_notification_queue.alive(name):
secscan_notification_queue.put(
name,
json.dumps({"notification_id": notification_id}),
)
return make_response("Okay")
@secscan.route("/_backfill_status")
@anon_allowed
def manifest_security_backfill_status():
manifest_count = Manifest.select().count()
mss_count = ManifestSecurityStatus.select().count()
if manifest_count == 0:
percent = 1.0
else:
percent = mss_count / float(manifest_count)
return jsonify({"backfill_percent": round(percent * 100, 2)})