""" Access usage logs for organizations or repositories. """ from datetime import datetime, timedelta from flask import abort, request import features from app import app, avatar, export_action_logs_queue, usermanager from auth import scopes from auth.auth_context import get_authenticated_context, get_authenticated_user from auth.permissions import AdministerOrganizationPermission from data.logs_model import logs_model from data.logs_model.shared import InvalidLogsDateRangeError from data.registry_model import registry_model from endpoints.api import ( ApiResource, InvalidRequest, RepositoryParamResource, allow_if_any_superuser, allow_if_global_readonly_superuser, allow_if_superuser, allow_if_superuser_with_full_access, format_date, log_action, nickname, page_support, parse_args, path_param, query_param, related_user_resource, require_repo_admin, require_scope, require_user_admin, resource, show_if, validate_json_request, ) from endpoints.exception import NotFound, Unauthorized LOGS_PER_PAGE = 20 SERVICE_LEVEL_LOG_KINDS = set( [ "service_key_create", "service_key_approve", "service_key_delete", "service_key_modify", "service_key_extend", "service_key_rotate", ] ) def _parse_datetime(dt_string): if not dt_string: return None try: return datetime.strptime(dt_string + " UTC", "%m/%d/%Y %Z") except ValueError: return None def _validate_logs_arguments(start_time, end_time): start_time = _parse_datetime(start_time) or (datetime.today() - timedelta(days=1)) end_time = _parse_datetime(end_time) or datetime.today() end_time = end_time + timedelta(days=1) return start_time, end_time def _get_logs( start_time, end_time, performer_name=None, repository_name=None, namespace_name=None, page_token=None, filter_kinds=None, ): (start_time, end_time) = _validate_logs_arguments(start_time, end_time) if end_time < start_time: abort(400) log_entry_page = logs_model.lookup_logs( start_time, end_time, performer_name, repository_name, namespace_name, filter_kinds, page_token, app.config["ACTION_LOG_MAX_PAGE"], ) include_namespace = namespace_name is None and repository_name is None return ( { "start_time": format_date(start_time), "end_time": format_date(end_time), "logs": [log.to_dict(avatar, include_namespace) for log in log_entry_page.logs], }, log_entry_page.next_page_token, ) def _get_aggregate_logs( start_time, end_time, performer_name=None, repository=None, namespace=None, filter_kinds=None ): (start_time, end_time) = _validate_logs_arguments(start_time, end_time) if end_time < start_time: abort(400) try: aggregated_logs = logs_model.get_aggregated_log_counts( start_time, end_time, performer_name=performer_name, repository_name=repository, namespace_name=namespace, filter_kinds=filter_kinds, ) except InvalidLogsDateRangeError: abort(400, "Invalid date range for logs") return {"aggregated": [log.to_dict() for log in aggregated_logs]} @resource("/v1/repository//logs") @path_param("repository", "The full path of the repository. e.g. namespace/name") class RepositoryLogs(RepositoryParamResource): """ Resource for fetching logs for the specific repository. """ @require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True) @nickname("listRepoLogs") @parse_args() @query_param("starttime", 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("endtime", 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @page_support() def get(self, namespace, repository, page_token, parsed_args): """ List the logs for the specified repository. """ if registry_model.lookup_repository(namespace, repository) is None: raise NotFound() start_time = parsed_args["starttime"] end_time = parsed_args["endtime"] return _get_logs( start_time, end_time, repository_name=repository, page_token=page_token, namespace_name=namespace, ) @resource("/v1/user/logs") class UserLogs(ApiResource): """ Resource for fetching logs for the current user. """ @require_user_admin() @nickname("listUserLogs") @parse_args() @query_param("starttime", 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("endtime", 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("performer", "Username for which to filter logs.", type=str) @page_support() def get(self, parsed_args, page_token): """ List the logs for the current user. """ performer_name = parsed_args["performer"] start_time = parsed_args["starttime"] end_time = parsed_args["endtime"] user = get_authenticated_user() return _get_logs( start_time, end_time, performer_name=performer_name, namespace_name=user.username, page_token=page_token, filter_kinds=SERVICE_LEVEL_LOG_KINDS, ) @resource("/v1/organization//logs") @path_param("orgname", "The name of the organization") @related_user_resource(UserLogs) class OrgLogs(ApiResource): """ Resource for fetching logs for the entire organization. """ @nickname("listOrgLogs") @parse_args() @query_param("starttime", 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("endtime", 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("performer", "Username for which to filter logs.", type=str) @page_support() @require_scope(scopes.ORG_ADMIN) def get(self, orgname, page_token, parsed_args): """ List the logs for the specified organization. """ permission = AdministerOrganizationPermission(orgname) if ( permission.can() or allow_if_global_readonly_superuser() or allow_if_superuser_with_full_access() ): performer_name = parsed_args["performer"] start_time = parsed_args["starttime"] end_time = parsed_args["endtime"] return _get_logs( start_time, end_time, namespace_name=orgname, performer_name=performer_name, page_token=page_token, ) raise Unauthorized() @resource("/v1/repository//aggregatelogs") @show_if(features.AGGREGATED_LOG_COUNT_RETRIEVAL) @path_param("repository", "The full path of the repository. e.g. namespace/name") class RepositoryAggregateLogs(RepositoryParamResource): """ Resource for fetching aggregated logs for the specific repository. """ @require_repo_admin(allow_for_global_readonly_superuser=True, allow_for_superuser=True) @nickname("getAggregateRepoLogs") @parse_args() @query_param("starttime", 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("endtime", 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) def get(self, namespace, repository, parsed_args): """ Returns the aggregated logs for the specified repository. """ if registry_model.lookup_repository(namespace, repository) is None: raise NotFound() start_time = parsed_args["starttime"] end_time = parsed_args["endtime"] return _get_aggregate_logs(start_time, end_time, repository=repository, namespace=namespace) @resource("/v1/user/aggregatelogs") @show_if(features.AGGREGATED_LOG_COUNT_RETRIEVAL) class UserAggregateLogs(ApiResource): """ Resource for fetching aggregated logs for the current user. """ @require_user_admin() @nickname("getAggregateUserLogs") @parse_args() @query_param("starttime", 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("endtime", 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("performer", "Username for which to filter logs.", type=str) def get(self, parsed_args): """ Returns the aggregated logs for the current user. """ performer_name = parsed_args["performer"] start_time = parsed_args["starttime"] end_time = parsed_args["endtime"] user = get_authenticated_user() return _get_aggregate_logs( start_time, end_time, performer_name=performer_name, namespace=user.username, filter_kinds=SERVICE_LEVEL_LOG_KINDS, ) @resource("/v1/organization//aggregatelogs") @show_if(features.AGGREGATED_LOG_COUNT_RETRIEVAL) @path_param("orgname", "The name of the organization") @related_user_resource(UserLogs) class OrgAggregateLogs(ApiResource): """ Resource for fetching aggregate logs for the entire organization. """ @nickname("getAggregateOrgLogs") @parse_args() @query_param("starttime", 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("endtime", 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("performer", "Username for which to filter logs.", type=str) @require_scope(scopes.ORG_ADMIN) def get(self, orgname, parsed_args): """ Gets the aggregated logs for the specified organization. """ permission = AdministerOrganizationPermission(orgname) if ( permission.can() or allow_if_global_readonly_superuser() or allow_if_superuser_with_full_access() ): performer_name = parsed_args["performer"] start_time = parsed_args["starttime"] end_time = parsed_args["endtime"] return _get_aggregate_logs( start_time, end_time, namespace=orgname, performer_name=performer_name ) raise Unauthorized() EXPORT_LOGS_SCHEMA = { "type": "object", "description": "Configuration for an export logs operation", "properties": { "callback_url": { "type": "string", "description": "The callback URL to invoke with a link to the exported logs", }, "callback_email": { "type": "string", "description": "The e-mail address at which to e-mail a link to the exported logs", }, }, } def _queue_logs_export(start_time, end_time, options, namespace_name, repository_name=None): callback_url = options.get("callback_url") if callback_url: if not callback_url.startswith("https://") and not callback_url.startswith("http://"): raise InvalidRequest("Invalid callback URL") callback_email = options.get("callback_email") if callback_email: if callback_email.find("@") < 0: raise InvalidRequest("Invalid callback e-mail") (start_time, end_time) = _validate_logs_arguments(start_time, end_time) if end_time < start_time: raise InvalidLogsDateRangeError("Invalid time span selected") export_id = logs_model.queue_logs_export( start_time, end_time, export_action_logs_queue, namespace_name, repository_name, callback_url, callback_email, ) if export_id is None: raise InvalidRequest("Invalid export request") return export_id def _log_export_success(user_or_org_name, export_id, request, repository=None): metadata = { "date/time": datetime.utcnow(), "export_id": export_id, "message": "queued for export", "url": request.get_json().get("callback_url") or None, "email": request.get_json().get("callback_email") or None, } if repository: metadata["repo"] = repository log_action( "export_logs_success", user_or_org_name, metadata, ) def _log_export_failure(user_or_org_name, request, ex, repository=None): metadata = { "date/time": datetime.utcnow(), "error": ex, "url": request.get_json().get("callback_url") or None, "email": request.get_json().get("callback_email") or None, } if repository: metadata["repo"] = repository log_action( "export_logs_failure", user_or_org_name, metadata, ) @resource("/v1/repository//exportlogs") @show_if(features.LOG_EXPORT) @path_param("repository", "The full path of the repository. e.g. namespace/name") class ExportRepositoryLogs(RepositoryParamResource): """ Resource for exporting the logs for the specific repository. """ schemas = {"ExportLogs": EXPORT_LOGS_SCHEMA} @require_repo_admin(allow_for_superuser=True) @nickname("exportRepoLogs") @parse_args() @query_param("starttime", 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("endtime", 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @validate_json_request("ExportLogs") def post(self, namespace, repository, parsed_args): """ Queues an export of the logs for the specified repository. """ if registry_model.lookup_repository(namespace, repository) is None: _log_export_failure(namespace, request, "non-existent repository", repository) raise NotFound() start_time = parsed_args["starttime"] end_time = parsed_args["endtime"] try: export_id = _queue_logs_export( start_time, end_time, request.get_json(), namespace, repository_name=repository ) except (InvalidRequest, InvalidLogsDateRangeError) as ex: _log_export_failure(namespace, request, ex, repository) abort(400, ex) _log_export_success(namespace, export_id, request, repository) return { "export_id": export_id, } @resource("/v1/user/exportlogs") @show_if(features.LOG_EXPORT) class ExportUserLogs(ApiResource): """ Resource for exporting the logs for the current user repository. """ schemas = {"ExportLogs": EXPORT_LOGS_SCHEMA} @require_user_admin() @nickname("exportUserLogs") @parse_args() @query_param("starttime", 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("endtime", 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @validate_json_request("ExportLogs") def post(self, parsed_args): """ Returns the aggregated logs for the current user. """ user = get_authenticated_user() start_time = parsed_args["starttime"] end_time = parsed_args["endtime"] try: export_id = _queue_logs_export(start_time, end_time, request.get_json(), user.username) except (InvalidRequest, InvalidLogsDateRangeError) as ex: _log_export_failure(user.username, request, ex, None) abort(400, ex) _log_export_success(user.username, export_id, request, None) return { "export_id": export_id, } @resource("/v1/organization//exportlogs") @show_if(features.LOG_EXPORT) @path_param("orgname", "The name of the organization") @related_user_resource(ExportUserLogs) class ExportOrgLogs(ApiResource): """ Resource for exporting the logs for an entire organization. """ schemas = {"ExportLogs": EXPORT_LOGS_SCHEMA} @nickname("exportOrgLogs") @parse_args() @query_param("starttime", 'Earliest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @query_param("endtime", 'Latest time for logs. Format: "%m/%d/%Y" in UTC.', type=str) @require_scope(scopes.ORG_ADMIN) @validate_json_request("ExportLogs") def post(self, orgname, parsed_args): """ Exports the logs for the specified organization. """ permission = AdministerOrganizationPermission(orgname) if ( permission.can() or allow_if_global_readonly_superuser() or allow_if_superuser_with_full_access() ): start_time = parsed_args["starttime"] end_time = parsed_args["endtime"] try: export_id = _queue_logs_export(start_time, end_time, request.get_json(), orgname) except (InvalidRequest, InvalidLogsDateRangeError) as ex: _log_export_failure(orgname, request, ex, None) abort(400, ex) _log_export_success(orgname, export_id, request, None) return { "export_id": export_id, } _log_export_failure(orgname, request, "unauthorized", None) raise Unauthorized()