import json from contextlib import contextmanager from app import app, notification_queue from auth.auth_context import get_authenticated_user, get_validated_oauth_token from data import model DEFAULT_BATCH_SIZE = 1000 def build_repository_event_data(namespace_name, repo_name, extra_data=None, subpage=None): """ Builds the basic repository data for an event, including the repository's name, Docker URL and homepage. If extra_data is specified, it is appended to the dictionary before it is returned. """ repo_string = "%s/%s" % (namespace_name, repo_name) homepage = "%s://%s/repository/%s" % ( app.config["PREFERRED_URL_SCHEME"], app.config["SERVER_HOSTNAME"], repo_string, ) if subpage: if not subpage.startswith("/"): subpage = "/" + subpage homepage = homepage + subpage event_data = { "repository": repo_string, "namespace": namespace_name, "name": repo_name, "docker_url": "%s/%s" % (app.config["SERVER_HOSTNAME"], repo_string), "homepage": homepage, } event_data.update(extra_data or {}) return event_data def build_notification_data(notification, event_data, performer_data=None): if not performer_data: performer_data = {} oauth_token = get_validated_oauth_token() if oauth_token: performer_data["oauth_token_id"] = oauth_token.id performer_data["oauth_token_application_id"] = oauth_token.application.client_id performer_data["oauth_token_application"] = oauth_token.application.name performer_user = get_authenticated_user() if performer_user: performer_data["entity_id"] = performer_user.id performer_data["entity_name"] = performer_user.username return { "notification_uuid": notification.uuid, "event_data": event_data, "performer_data": performer_data, } @contextmanager def notification_batch(batch_size=DEFAULT_BATCH_SIZE): """ Context manager implementation which returns a target callable with the same signature as spawn_notification. When the the context block exits the notifications generated by the callable will be bulk inserted into the queue with the specified batch size. """ with notification_queue.batch_insert(batch_size) as queue_put: def spawn_notification_batch( repo, event_name, extra_data=None, subpage=None, pathargs=None, performer_data=None ): event_data = build_repository_event_data( repo.namespace_name, repo.name, extra_data=extra_data, subpage=subpage ) notification_uuid = extra_data.get("notification_uuid", None) notifications = model.notification.list_repo_notifications( repo.namespace_name, repo.name, event_name=event_name, notification_uuid=notification_uuid, ) path = [repo.namespace_name, repo.name, event_name] + (pathargs or []) for notification in list(notifications): notification_data = build_notification_data( notification, event_data, performer_data ) queue_put(path, json.dumps(notification_data)) yield spawn_notification_batch def spawn_notification( repo, event_name, extra_data=None, subpage=None, pathargs=None, performer_data=None ): with notification_batch(1) as batch_spawn: batch_spawn(repo, event_name, extra_data, subpage, pathargs, performer_data)