1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/auth/credentials.py
Daniel Messer e8ff33e728 logs: add failure logging for login, push, pull and delete events (PROJQUAY-5411) (#1903)
* add login failure logging

Signed-off-by: dmesser <dmesser@redhat.com>

* move failure logging into credential validation

Signed-off-by: dmesser <dmesser@redhat.com>

* more precise tracking of affected users

Signed-off-by: dmesser <dmesser@redhat.com>

* fix indent

Signed-off-by: dmesser <dmesser@redhat.com>

* differentiate robots with wrong credentials

Signed-off-by: dmesser <dmesser@redhat.com>

* don't audit failures by default

Signed-off-by: dmesser <dmesser@redhat.com>

* discrete failure tracking for logins, push, pulls and deletes

Signed-off-by: dmesser <dmesser@redhat.com>

* refine log metadata

Signed-off-by: dmesser <dmesser@redhat.com>

* login failure log visualization

Signed-off-by: dmesser <dmesser@redhat.com>

* properly use data model

Signed-off-by: dmesser <dmesser@redhat.com>

* fix unit test bug

Signed-off-by: dmesser <dmesser@redhat.com>

* track non-existing repos differently

Signed-off-by: dmesser <dmesser@redhat.com>

* log view visualization of failed pushes and pulls

Signed-off-by: dmesser <dmesser@redhat.com>

* ensure all tests are conducted with failure logging

Signed-off-by: dmesser <dmesser@redhat.com>

* additional unicode protection

Signed-off-by: dmesser <dmesser@redhat.com>

* python black formatting

Signed-off-by: dmesser <dmesser@redhat.com>

* add cypress test data

Signed-off-by: dmesser <dmesser@redhat.com>

* add safety checks for ascii conversion attempts

Signed-off-by: dmesser <dmesser@redhat.com>

* adjusting unit test with correct error message

Signed-off-by: dmesser <dmesser@redhat.com>

* update to alembic head

Signed-off-by: dmesser <dmesser@redhat.com>

* add standard oauth token metadata in audit

Signed-off-by: dmesser <dmesser@redhat.com>

* update alembic head

Signed-off-by: dmesser <dmesser@redhat.com>

* correct field name

Signed-off-by: dmesser <dmesser@redhat.com>

* formatting

Signed-off-by: dmesser <dmesser@redhat.com>

* bump alembic head

Signed-off-by: dmesser <dmesser@redhat.com>

* refactor auth logging imports

Signed-off-by: dmesser <dmesser@redhat.com>

* bump alembic head

Signed-off-by: dmesser <dmesser@redhat.com>

* formatting

Signed-off-by: dmesser <dmesser@redhat.com>

* restore module

Signed-off-by: dmesser <dmesser@redhat.com>

* pre-commit fixes

Signed-off-by: dmesser <dmesser@redhat.com>

* adding missing default

Signed-off-by: dmesser <dmesser@redhat.com>

* bump alembic head

Signed-off-by: dmesser <dmesser@redhat.com>

* update test data

Signed-off-by: dmesser <dmesser@redhat.com>

* refactoring to save db calls

Signed-off-by: dmesser <dmesser@redhat.com>

* fix unit tests

Signed-off-by: dmesser <dmesser@redhat.com>

* handle unicode conversion errors on email look up

Signed-off-by: dmesser <dmesser@redhat.com>

* bump alembic head

Signed-off-by: dmesser <dmesser@redhat.com>

* proper debug logging and conditional db calls

Signed-off-by: dmesser <dmesser@redhat.com>

* omit wildcard import

Signed-off-by: dmesser <dmesser@redhat.com>

* re-add import

Signed-off-by: dmesser <dmesser@redhat.com>

---------

Signed-off-by: dmesser <dmesser@redhat.com>
2024-01-16 16:46:20 +01:00

243 lines
9.0 KiB
Python

import logging
from enum import Enum
from flask import request
import features
from app import app, authentication
from auth.credential_consts import (
ACCESS_TOKEN_USERNAME,
APP_SPECIFIC_TOKEN_USERNAME,
OAUTH_TOKEN_USERNAME,
)
from auth.log import log_action
from auth.oauth import validate_oauth_token
from auth.validateresult import AuthKind, ValidateResult
from data import model
from data.database import User
from util.names import parse_robot_username
logger = logging.getLogger(__name__)
class CredentialKind(Enum):
user = "user"
robot = "robot"
token = ACCESS_TOKEN_USERNAME
oauth_token = OAUTH_TOKEN_USERNAME
app_specific_token = APP_SPECIFIC_TOKEN_USERNAME
def validate_credentials(auth_username, auth_password_or_token):
"""
Validates a pair of auth username and password/token credentials.
"""
# Check for access tokens.
if auth_username == ACCESS_TOKEN_USERNAME:
logger.debug("Found credentials for access token")
try:
token = model.token.load_token_data(auth_password_or_token)
logger.debug("Successfully validated credentials for access token %s", token.id)
return ValidateResult(AuthKind.credentials, token=token), CredentialKind.token
except model.DataModelException:
logger.warning(
"Failed to validate credentials for access token %s", auth_password_or_token
)
return (
ValidateResult(AuthKind.credentials, error_message="Invalid access token"),
CredentialKind.token,
)
# Check for App Specific tokens.
if features.APP_SPECIFIC_TOKENS and auth_username == APP_SPECIFIC_TOKEN_USERNAME:
logger.debug("Found credentials for app specific auth token")
token = model.appspecifictoken.access_valid_token(auth_password_or_token)
if token is None:
logger.debug(
"Failed to validate credentials for app specific token: %s", auth_password_or_token
)
error_message = "Invalid token"
if app.config.get("ACTION_LOG_AUDIT_LOGIN_FAILURES"):
log_action(
"login_failure",
None,
{
"type": "v2auth",
"kind": "app_specific_token",
"useragent": request.user_agent.string,
"message": error_message,
},
)
return (
ValidateResult(AuthKind.credentials, error_message=error_message),
CredentialKind.app_specific_token,
)
if not token.user.enabled:
logger.debug("Tried to use an app specific token for a disabled user: %s", token.uuid)
error_message = "This user has been disabled. Please contact your administrator."
if app.config.get("ACTION_LOG_AUDIT_LOGIN_FAILURES"):
log_action(
"login_failure",
token.user.username,
{
"type": "v2auth",
"kind": "app_specific_token",
"app_specific_token_title": token.title,
"username": token.user.username,
"useragent": request.user_agent.string,
"message": error_message,
},
performer=token.user,
)
return (
ValidateResult(
AuthKind.credentials,
error_message=error_message,
),
CredentialKind.app_specific_token,
)
logger.debug("Successfully validated credentials for app specific token %s", token.id)
return (
ValidateResult(AuthKind.credentials, appspecifictoken=token),
CredentialKind.app_specific_token,
)
# Check for OAuth tokens.
if auth_username == OAUTH_TOKEN_USERNAME:
return validate_oauth_token(auth_password_or_token), CredentialKind.oauth_token
# Check for robots and users.
is_robot = parse_robot_username(auth_username)
if is_robot:
logger.debug("Found credentials header for robot %s", auth_username)
try:
robot = model.user.verify_robot(auth_username, auth_password_or_token)
logger.debug("Successfully validated credentials for robot %s", auth_username)
return ValidateResult(AuthKind.credentials, robot=robot), CredentialKind.robot
except model.DeactivatedRobotOwnerException as dre:
robot_owner, robot_name = parse_robot_username(auth_username)
logger.debug(
"Tried to use the robot %s for a disabled user: %s", robot_name, robot_owner
)
if app.config.get("ACTION_LOG_AUDIT_LOGIN_FAILURES"):
try:
performer = model.user.lookup_robot(auth_username)
except User.DoesNotExist:
performer = None
log_action(
"login_failure",
robot_owner,
{
"type": "v2auth",
"kind": "robot",
"robot": auth_username,
"username": robot_owner,
"useragent": request.user_agent.string,
"message": str(dre),
},
performer=performer,
)
return (
ValidateResult(AuthKind.credentials, error_message=str(dre)),
CredentialKind.robot,
)
except model.InvalidRobotCredentialException as ire:
logger.debug("Failed to validate credentials for robot %s: %s", auth_username, ire)
if app.config.get("ACTION_LOG_AUDIT_LOGIN_FAILURES"):
robot_owner, _ = parse_robot_username(auth_username)
try:
performer = model.user.lookup_robot(auth_username)
except User.DoesNotExist:
performer = None
log_action(
"login_failure",
robot_owner,
{
"type": "v2auth",
"kind": "robot",
"robot": auth_username,
"username": robot_owner,
"useragent": request.user_agent.string,
"message": str(ire),
},
performer=performer,
)
return (
ValidateResult(AuthKind.credentials, error_message=str(ire)),
CredentialKind.robot,
)
except model.InvalidRobotException as ire:
if isinstance(ire, model.InvalidRobotException):
logger.debug("Failed to validate credentials for robot %s: %s", auth_username, ire)
elif isinstance(ire, model.InvalidRobotOwnerException):
logger.debug(
"Tried to use the robot %s for a non-existing user: %s", auth_username, ire
)
if app.config.get("ACTION_LOG_AUDIT_LOGIN_FAILURES"):
robot_owner, _ = parse_robot_username(auth_username)
# need to get the owner here in case it wasn't found due to a non-existing user
owner = model.user.get_nonrobot_user(robot_owner)
log_action(
"login_failure",
owner.username if owner else None,
{
"type": "v2auth",
"kind": "robot",
"robot": auth_username,
"username": robot_owner,
"useragent": request.user_agent.string,
"message": str(ire),
},
)
return (
ValidateResult(AuthKind.credentials, error_message=str(ire)),
CredentialKind.robot,
)
# Otherwise, treat as a standard user.
(authenticated, err) = authentication.verify_and_link_user(
auth_username, auth_password_or_token, basic_auth=True
)
if authenticated:
logger.debug("Successfully validated credentials for user %s", authenticated.username)
return ValidateResult(AuthKind.credentials, user=authenticated), CredentialKind.user
else:
logger.warning("Failed to validate credentials for user %s: %s", auth_username, err)
if app.config.get("ACTION_LOG_AUDIT_LOGIN_FAILURES"):
log_action(
"login_failure",
None,
{
"type": "v2auth",
"kind": "user",
"username": auth_username,
"useragent": request.user_agent.string,
"message": err,
},
)
return ValidateResult(AuthKind.credentials, error_message=err), CredentialKind.user