mirror of
https://github.com/quay/quay.git
synced 2025-09-17 19:02:19 +03:00
* chore: drop deprecated tables and remove unused code * isort imports * migration: check for table existence before drop
81 lines
2.4 KiB
Python
81 lines
2.4 KiB
Python
import base64
|
|
import json
|
|
import logging
|
|
|
|
import jwt
|
|
from flask import Blueprint, abort, jsonify, make_response, request
|
|
|
|
import features
|
|
from app import app, secscan_notification_queue
|
|
from data.database import Manifest, ManifestSecurityStatus
|
|
from endpoints.decorators import anon_allowed, route_show_if
|
|
from util.security.jwtutil import TOKEN_REGEX, decode
|
|
|
|
logger = logging.getLogger(__name__)
|
|
secscan = Blueprint("secscan", __name__)
|
|
|
|
JWT_HEADER_NAME = "Authorization"
|
|
|
|
|
|
@secscan.route("/_internal_ping")
|
|
@anon_allowed
|
|
def internal_ping():
|
|
return make_response("true", 200)
|
|
|
|
|
|
@route_show_if(features.SECURITY_SCANNER)
|
|
@route_show_if(features.SECURITY_NOTIFICATIONS)
|
|
@secscan.route("/notification", methods=["POST"])
|
|
@anon_allowed
|
|
def secscan_notification():
|
|
# If Quay is configured with a Clair V4 PSK we assume
|
|
# Clair will also sign JWT's with this PSK. Therefore,
|
|
# attempt jwt verification.
|
|
key = app.config.get("SECURITY_SCANNER_V4_PSK", None)
|
|
if key:
|
|
key = base64.b64decode(key)
|
|
jwt_header = request.headers.get(JWT_HEADER_NAME, "")
|
|
match = TOKEN_REGEX.match(jwt_header)
|
|
if match is None:
|
|
logger.error("Could not find matching bearer token")
|
|
abort(401)
|
|
token = match.group(1)
|
|
try:
|
|
decode(token, key=key, algorithms=["HS256"])
|
|
except jwt.exceptions.InvalidTokenError as e:
|
|
logger.error("Could not verify jwt {}".format(e))
|
|
abort(401)
|
|
logger.debug("Successfully verified jwt")
|
|
|
|
data = request.get_json()
|
|
if data is None:
|
|
logger.error("expected json request")
|
|
abort(400)
|
|
|
|
logger.debug("Got notification from V4 Security Scanner: %s", data)
|
|
if "notification_id" not in data or "callback" not in data:
|
|
abort(400)
|
|
|
|
notification_id = data["notification_id"]
|
|
name = ["with_id", notification_id]
|
|
if not secscan_notification_queue.alive(name):
|
|
secscan_notification_queue.put(
|
|
name,
|
|
json.dumps({"notification_id": notification_id}),
|
|
)
|
|
|
|
return make_response("Okay")
|
|
|
|
|
|
@secscan.route("/_backfill_status")
|
|
@anon_allowed
|
|
def manifest_security_backfill_status():
|
|
manifest_count = Manifest.select().count()
|
|
mss_count = ManifestSecurityStatus.select().count()
|
|
|
|
if manifest_count == 0:
|
|
percent = 1.0
|
|
else:
|
|
percent = mss_count / float(manifest_count)
|
|
return jsonify({"backfill_percent": round(percent * 100, 2)})
|