# coding=utf-8 import base64 import json as py_json import time import unittest import zlib from datetime import datetime, timedelta from io import BytesIO from urllib.parse import parse_qs, urlencode, urlparse, urlunparse import jwt from authlib.jose import JsonWebKey from cryptography.hazmat.primitives import serialization from flask import url_for from mock import patch from parameterized import parameterized, parameterized_class from app import app from data import model from data.database import NotificationKind, ServiceKeyApprovalType from data.model.notification import list_notifications from data.model.oauth import create_user_access_token from data.model.organization import create_organization, get_organization from data.model.user import get_user from endpoints import keyserver from endpoints.api import api, api_bp from endpoints.api.user import Signin from endpoints.csrf import OAUTH_CSRF_TOKEN_NAME from endpoints.keyserver import jwk_with_kid from endpoints.test.shared import gen_basic_auth, toggle_feature from endpoints.web import web as web_bp from endpoints.webhooks import webhooks as webhooks_bp from initdb import finished_database_for_testing, setup_database_for_testing from test.helpers import assert_action_logged from util.registry.gzipinputstream import WINDOW_BUFFER_SIZE from util.security.token import encode_public_private_token try: app.register_blueprint(web_bp, url_prefix="") except ValueError: # This blueprint was already registered pass try: app.register_blueprint(webhooks_bp, url_prefix="/webhooks") except ValueError: # This blueprint was already registered pass try: app.register_blueprint(keyserver.key_server, url_prefix="") except ValueError: # This blueprint was already registered pass try: app.register_blueprint(api_bp, url_prefix="/api") except ValueError: # This blueprint was already registered pass CSRF_TOKEN_KEY = "_csrf_token" CSRF_TOKEN = "123csrfforme" class EndpointTestCase(unittest.TestCase): maxDiff = None def _add_csrf(self, without_csrf): parts = urlparse(without_csrf) query = parse_qs(parts[4]) for k, v in query.items(): query[k] = v[0] self._set_csrf() query[CSRF_TOKEN_KEY] = CSRF_TOKEN url = urlunparse(list(parts[0:4]) + [urlencode(query)] + list(parts[5:])) return url def _set_csrf(self): with self.app.session_transaction() as sess: sess[CSRF_TOKEN_KEY] = CSRF_TOKEN sess[OAUTH_CSRF_TOKEN_NAME] = "someoauthtoken" def setUp(self): setup_database_for_testing(self) self.app = app.test_client() self.ctx = app.test_request_context() self.ctx.__enter__() def tearDown(self): finished_database_for_testing(self) self.ctx.__exit__(True, None, None) def getResponse(self, resource_name, expected_code=200, **kwargs): rv = self.app.get(url_for(resource_name, **kwargs)) self.assertEqual(rv.status_code, expected_code) return rv.data def deleteResponse(self, resource_name, headers=None, expected_code=200, **kwargs): headers = headers or {} rv = self.app.delete(url_for(resource_name, **kwargs), headers=headers) self.assertEqual(rv.status_code, expected_code) return rv.data def deleteEmptyResponse(self, resource_name, headers=None, expected_code=204, **kwargs): headers = headers or {} rv = self.app.delete(url_for(resource_name, **kwargs), headers=headers) self.assertEqual(rv.status_code, expected_code) self.assertEqual(rv.data, b"") # ensure response body empty return def putResponse(self, resource_name, headers=None, data=None, expected_code=200, **kwargs): headers = headers or {} data = data or {} rv = self.app.put( url_for(resource_name, **kwargs), headers=headers, data=py_json.dumps(data) ) self.assertEqual(rv.status_code, expected_code) return rv.data def postResponse( self, resource_name, headers=None, data=None, form=None, with_csrf=True, expected_code=200, **kwargs, ): headers = headers or {} form = form or {} url = url_for(resource_name, **kwargs) if with_csrf: url = self._add_csrf(url) post_data = None if form: post_data = form elif data: post_data = py_json.dumps(data) rv = self.app.post(url, headers=headers, data=post_data) if expected_code is not None: self.assertEqual(rv.status_code, expected_code) return rv def login(self, username, password): rv = self.app.post( self._add_csrf(api.url_for(Signin)), data=py_json.dumps(dict(username=username, password=password)), headers={"Content-Type": "application/json"}, ) self.assertEqual(rv.status_code, 200) class BuildLogsTestCase(EndpointTestCase): build_uuid = "deadpork-dead-pork-dead-porkdeadpork" def test_buildlogs_invalid_build_uuid(self): self.login("public", "password") self.getResponse("web.buildlogs", build_uuid="bad_build_uuid", expected_code=400) def test_buildlogs_not_logged_in(self): self.getResponse("web.buildlogs", build_uuid=self.build_uuid, expected_code=403) def test_buildlogs_unauthorized(self): self.login("reader", "password") self.getResponse("web.buildlogs", build_uuid=self.build_uuid, expected_code=403) def test_buildlogs_logsarchived(self): self.login("public", "password") with patch("data.model.build.RepositoryBuild", logs_archived=True): self.getResponse("web.buildlogs", build_uuid=self.build_uuid, expected_code=403) def test_buildlogs_successful(self): self.login("public", "password") logs = ["log1", "log2"] with patch("endpoints.web.build_logs.get_log_entries", return_value=(None, logs)): resp = self.getResponse("web.buildlogs", build_uuid=self.build_uuid, expected_code=200) self.assertEqual({"logs": logs}, py_json.loads(resp)) class ArchivedLogsTestCase(EndpointTestCase): build_uuid = "deadpork-dead-pork-dead-porkdeadpork" def test_logarchive_invalid_build_uuid(self): self.login("public", "password") self.getResponse("web.logarchive", file_id="bad_build_uuid", expected_code=403) def test_logarchive_not_logged_in(self): self.getResponse("web.logarchive", file_id=self.build_uuid, expected_code=403) def test_logarchive_unauthorized(self): self.login("reader", "password") self.getResponse("web.logarchive", file_id=self.build_uuid, expected_code=403) def test_logarchive_file_not_found(self): self.login("public", "password") self.getResponse("web.logarchive", file_id=self.build_uuid, expected_code=403) def test_logarchive_successful(self): self.login("public", "password") data = b"my_file_stream" mock_file = BytesIO(zlib.compressobj(-1, zlib.DEFLATED, WINDOW_BUFFER_SIZE).compress(data)) with patch("endpoints.web.log_archive._storage.stream_read_file", return_value=mock_file): self.getResponse("web.logarchive", file_id=self.build_uuid, expected_code=200) class WebhookEndpointTestCase(EndpointTestCase): def test_invalid_build_trigger_webhook(self): self.postResponse( "webhooks.build_trigger_webhook", trigger_uuid="invalidtrigger", expected_code=404 ) def test_valid_build_trigger_webhook_invalid_auth(self): trigger = list(model.build.list_build_triggers("devtable", "building"))[0] self.postResponse( "webhooks.build_trigger_webhook", trigger_uuid=trigger.uuid, expected_code=403 ) def test_valid_build_trigger_webhook_cookie_auth(self): self.login("devtable", "password") # Cookie auth is not supported, so this should 403 trigger = list(model.build.list_build_triggers("devtable", "building"))[0] self.postResponse( "webhooks.build_trigger_webhook", trigger_uuid=trigger.uuid, expected_code=403 ) def test_valid_build_trigger_webhook_missing_payload(self): auth_header = gen_basic_auth("devtable", "password") trigger = list(model.build.list_build_triggers("devtable", "building"))[0] self.postResponse( "webhooks.build_trigger_webhook", trigger_uuid=trigger.uuid, expected_code=400, headers={"Authorization": auth_header, "Content-Type": "application/json"}, data={}, ) def test_valid_build_trigger_webhook_invalid_payload(self): auth_header = gen_basic_auth("devtable", "password") trigger = list(model.build.list_build_triggers("devtable", "building"))[0] self.postResponse( "webhooks.build_trigger_webhook", trigger_uuid=trigger.uuid, expected_code=400, headers={"Authorization": auth_header, "Content-Type": "application/json"}, data={"invalid": "payload"}, ) class WebEndpointTestCase(EndpointTestCase): def test_index(self): self.getResponse("web.index") def test_robots(self): self.getResponse("web.robots") def test_repo_view(self): self.getResponse("web.repository", path="devtable/simple") def test_unicode_repo_view(self): self.getResponse("web.repository", path="%E2%80%8Bcoreos/hyperkube%E2%80%8B") def test_org_view(self): self.getResponse("web.org_view", path="buynlarge") def test_user_view(self): self.getResponse("web.user_view", path="devtable") def test_confirm_repo_email(self): code = model.repository.create_email_authorization_for_repo( "devtable", "simple", "foo@bar.com" ) self.getResponse("web.confirm_repo_email", code=code.code) found = model.repository.get_email_authorized_for_repo("devtable", "simple", "foo@bar.com") self.assertTrue(found.confirmed) def test_confirm_email(self): user = model.user.get_user("devtable") self.assertNotEqual(user.email, "foo@bar.com") confirmation_code = model.user.create_confirm_email_code(user, "foo@bar.com") self.getResponse("web.confirm_email", code=confirmation_code, expected_code=302) user = model.user.get_user("devtable") self.assertEqual(user.email, "foo@bar.com") def test_confirm_recovery(self): # Try for an invalid code. self.getResponse("web.confirm_recovery", code="someinvalidcode", expected_code=200) # Create a valid code and try. user = model.user.get_user("devtable") confirmation_code = model.user.create_reset_password_email_code(user.email) self.getResponse("web.confirm_recovery", code=confirmation_code, expected_code=302) def test_confirm_recovery_verified(self): # Create a valid code and try. user = model.user.get_user("devtable") user.verified = False user.save() confirmation_code = model.user.create_reset_password_email_code(user.email) self.getResponse("web.confirm_recovery", code=confirmation_code, expected_code=302) # Ensure the current user is the expected user and that they are verified. user = model.user.get_user("devtable") self.assertTrue(user.verified) self.getResponse("web.receipt", expected_code=404) # Will 401 if no user. def test_request_authorization_code(self): self.login("devtable", "password") # Try for an invalid client. self.getResponse( "web.request_authorization_code", client_id="foo", redirect_uri="bar", scope="baz", expected_code=404, ) # Try for a valid client. org = model.organization.get_organization("buynlarge") assert org app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") self.getResponse( "web.request_authorization_code", client_id=app.client_id, redirect_uri=app.redirect_uri, scope="repo:read", expected_code=200, ) def test_request_authorization_code_assignment_id_with_code(self): self.login("devtable", "password") org = model.organization.get_organization("buynlarge") app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") self.getResponse( "web.request_authorization_code", client_id=app.client_id, redirect_uri=app.redirect_uri, scope="repo:read", assignment_uuid="assignmentid", response_type="code", expected_code=400, ) def test_request_authorization_code_not_org_admin(self): self.login("devtable", "password") user = get_user("randomuser") org = model.organization.create_organization("testorg", "testorg@devtable.com", user) app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") self.getResponse( "web.request_authorization_code", client_id=app.client_id, redirect_uri=app.redirect_uri, scope="repo:read", expected_code=403, ) def test_request_authorization_code_assigned_authorization(self): self.login("devtable", "password") devtable = get_user("devtable") user = get_user("randomuser") org = model.organization.create_organization("testorg", "testorg@devtable.com", user) app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") assignment = model.oauth.assign_token_to_user( app, devtable, app.redirect_uri, "repo:read", "token" ) response = self.getResponse( "web.request_authorization_code", client_id=app.client_id, redirect_uri=app.redirect_uri, scope="repo:read", assignment_uuid=assignment.uuid, response_type="token", expected_code=200, ) assert "Are you sure you want to authorize this application?" in str(response) def test_request_authorization_code_assigned_authorization_disabled(self): self.login("devtable", "password") devtable = get_user("devtable") user = get_user("randomuser") org = model.organization.create_organization("testorg", "testorg@devtable.com", user) app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") assignment = model.oauth.assign_token_to_user( app, devtable, app.redirect_uri, "repo:read", "token" ) with toggle_feature("ASSIGN_OAUTH_TOKEN", False): self.getResponse( "web.request_authorization_code", client_id=app.client_id, redirect_uri=app.redirect_uri, scope="repo:read", assignment_uuid=assignment.uuid, response_type="token", expected_code=400, ) def test_request_authorization_code_assigned_authorization_with_existing_scopes(self): self.login("devtable", "password") devtable = get_user("devtable") user = get_user("randomuser") org = model.organization.create_organization("testorg", "testorg@devtable.com", user) app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") assignment = model.oauth.assign_token_to_user( app, devtable, app.redirect_uri, "repo:read", "token" ) create_user_access_token(devtable, app.client_id, "repo:read") oauth_tokens = list(model.oauth.list_access_tokens_for_user(devtable)) filtered_tokens = [token for token in oauth_tokens if token.application == app] assert len(filtered_tokens) == 1 self.getResponse( "web.request_authorization_code", client_id=app.client_id, redirect_uri=app.redirect_uri, scope="repo:read", assignment_uuid=assignment.uuid, response_type="token", expected_code=302, ) assert model.oauth.get_assigned_authorization_for_user(devtable, assignment.uuid) is None oauth_tokens = list(model.oauth.list_access_tokens_for_user(devtable)) filtered_tokens = [token for token in oauth_tokens if token.application == app] assert len(filtered_tokens) == 2 def test_build_status_badge(self): # Try for an invalid repository. self.getResponse("web.build_status_badge", repository="foo/bar", expected_code=404) # Try for a public repository. self.getResponse("web.build_status_badge", repository="public/publicrepo") # Try for an private repository. self.getResponse("web.build_status_badge", repository="devtable/simple", expected_code=404) # Try for an private repository with an invalid token. self.getResponse( "web.build_status_badge", repository="devtable/simple", token="sometoken", expected_code=404, ) # Try for an private repository with a valid token. repository = model.repository.get_repository("devtable", "simple") self.getResponse( "web.build_status_badge", repository="devtable/simple", token=repository.badge_token ) def test_attach_custom_build_trigger(self): self.getResponse("web.attach_custom_build_trigger", repository="foo/bar", expected_code=401) self.getResponse( "web.attach_custom_build_trigger", repository="devtable/simple", expected_code=401 ) self.login("freshuser", "password") self.getResponse( "web.attach_custom_build_trigger", repository="devtable/simple", expected_code=403 ) self.login("devtable", "password") self.getResponse( "web.attach_custom_build_trigger", repository="devtable/simple", expected_code=302 ) def test_redirect_to_repository(self): self.getResponse("web.redirect_to_repository", repository="foo/bar", expected_code=404) self.getResponse( "web.redirect_to_repository", repository="public/publicrepo", expected_code=302 ) self.getResponse( "web.redirect_to_repository", repository="devtable/simple", expected_code=403 ) self.login("devtable", "password") self.getResponse( "web.redirect_to_repository", repository="devtable/simple", expected_code=302 ) def test_redirect_to_namespace(self): self.getResponse("web.redirect_to_namespace", namespace="unknown", expected_code=404) self.getResponse("web.redirect_to_namespace", namespace="devtable", expected_code=302) self.getResponse("web.redirect_to_namespace", namespace="buynlarge", expected_code=302) class OAuthTestCase(EndpointTestCase): @parameterized.expand(["token", "code"]) def test_authorize_nologin(self, response_type): form = { "client_id": "someclient", "redirect_uri": "http://localhost:5000/foobar", "scope": "user:admin", "response_type": response_type, } self.postResponse("web.authorize_application", form=form, with_csrf=True, expected_code=401) @parameterized.expand(["token", "code"]) def test_authorize_invalidclient(self, response_type): self.login("devtable", "password") form = { "client_id": "someclient", "redirect_uri": "http://localhost:5000/foobar", "scope": "user:admin", "response_type": response_type, } resp = self.postResponse( "web.authorize_application", form=form, with_csrf=True, expected_code=302 ) self.assertEqual( "http://localhost:5000/foobar?error=unauthorized_client", resp.headers["Location"] ) @parameterized.expand(["token", "code"]) def test_authorize_invalidscope(self, response_type): self.login("devtable", "password") form = { "client_id": "deadbeef", "redirect_uri": "http://localhost:8000/o2c.html", "scope": "invalid:scope", "response_type": response_type, } resp = self.postResponse( "web.authorize_application", form=form, with_csrf=True, expected_code=302 ) self.assertEqual( "http://localhost:8000/o2c.html?error=invalid_scope", resp.headers["Location"] ) @parameterized.expand(["token", "code"]) def test_authorize_invalidredirecturi(self, response_type): self.login("devtable", "password") # Note: Defined in initdb.py form = { "client_id": "deadbeef", "redirect_uri": "http://some/invalid/uri", "scope": "user:admin", "response_type": response_type, } self.postResponse("web.authorize_application", form=form, with_csrf=True, expected_code=400) @parameterized.expand(["token", "code"]) def test_authorize_success(self, response_type): self.login("devtable", "password") # Note: Defined in initdb.py form = { "client_id": "deadbeef", "redirect_uri": "http://localhost:8000/o2c.html", "scope": "user:admin", "response_type": response_type, } resp = self.postResponse( "web.authorize_application", form=form, with_csrf=True, expected_code=302 ) expected_value = "access_token=" if response_type == "token" else "code=" self.assertTrue(expected_value in resp.headers["Location"]) @parameterized.expand(["token", "code"]) def test_authorize_nocsrf(self, response_type): self.login("devtable", "password") # Note: Defined in initdb.py form = { "client_id": "deadbeef", "redirect_uri": "http://localhost:8000/o2c.html", "scope": "user:admin", "response_type": response_type, } self.postResponse( "web.authorize_application", form=form, with_csrf=False, expected_code=403 ) @parameterized.expand(["token", "code"]) def test_authorize_nocsrf_withinvalidheader(self, response_type): # Note: Defined in initdb.py form = { "client_id": "deadbeef", "redirect_uri": "http://localhost:8000/o2c.html", "scope": "user:admin", "response_type": response_type, } headers = dict(authorization="Some random header") self.postResponse( "web.authorize_application", headers=headers, form=form, with_csrf=False, expected_code=401, ) @parameterized.expand(["token", "code"]) def test_authorize_nocsrf_withbadheader(self, response_type): # Note: Defined in initdb.py form = { "client_id": "deadbeef", "redirect_uri": "http://localhost:8000/o2c.html", "scope": "user:admin", "response_type": response_type, } headers = dict(authorization=gen_basic_auth("devtable", "invalidpassword")) self.postResponse( "web.authorize_application", headers=headers, form=form, with_csrf=False, expected_code=401, ) def test_authorize_application_assignment_id_with_code(self): # Note: Defined in initdb.py form = { "client_id": "deadbeef", "redirect_uri": "http://localhost:8000/o2c.html", "scope": "user:admin", "assignment_uuid": "assignmentid", "response_type": "code", } headers = dict(authorization=gen_basic_auth("devtable", "password")) self.postResponse( "web.authorize_application", headers=headers, form=form, with_csrf=True, expected_code=400, ) def test_authorize_application_not_org_admin(self): user = get_user("randomuser") org = model.organization.create_organization("testorg", "testorg@devtable.com", user) app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") form = { "client_id": app.client_id, "redirect_uri": app.redirect_uri, "scope": "user:admin", "assignment_uuid": "assignmentid", "response_type": "token", } headers = dict(authorization=gen_basic_auth("devtable", "password")) response = self.postResponse( "web.authorize_application", headers=headers, form=form, with_csrf=True, expected_code=302, ) assert "unauthorized_client" in response.headers["Location"] def test_authorize_application_assigned_authorization(self): devtable = get_user("devtable") user = get_user("randomuser") org = model.organization.create_organization("testorg", "testorg@devtable.com", user) app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") assignment = model.oauth.assign_token_to_user( app, devtable, app.redirect_uri, "repo:read", "token" ) tokens = list(model.oauth.list_access_tokens_for_user(devtable)) assert len(tokens) == 1 form = { "client_id": app.client_id, "redirect_uri": app.redirect_uri, "scope": "user:admin", "assignment_uuid": assignment.uuid, "response_type": "token", } headers = dict(authorization=gen_basic_auth("devtable", "password")) response = self.postResponse( "web.authorize_application", headers=headers, form=form, with_csrf=True, expected_code=302, ) assert len(list(model.oauth.list_access_tokens_for_user(devtable))) == 2 assert model.oauth.get_assigned_authorization_for_user(devtable, assignment.uuid) is None def test_authorize_application_assigned_authorization_disabled(self): devtable = get_user("devtable") user = get_user("randomuser") org = model.organization.create_organization("testorg", "testorg@devtable.com", user) app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") assignment = model.oauth.assign_token_to_user( app, devtable, app.redirect_uri, "repo:read", "token" ) form = { "client_id": app.client_id, "redirect_uri": app.redirect_uri, "scope": "user:admin", "assignment_uuid": assignment.uuid, "response_type": "token", } headers = dict(authorization=gen_basic_auth("devtable", "password")) with toggle_feature("ASSIGN_OAUTH_TOKEN", False): self.postResponse( "web.authorize_application", headers=headers, form=form, with_csrf=True, expected_code=400, ) @parameterized.expand(["token", "code"]) def test_authorize_nocsrf_correctheader(self, response_type): # Note: Defined in initdb.py form = { "client_id": "deadbeef", "redirect_uri": "http://localhost:8000/o2c.html", "scope": "user:admin", "response_type": response_type, } # Try without the client id being in the whitelist. headers = dict(authorization=gen_basic_auth("devtable", "password")) self.postResponse( "web.authorize_application", headers=headers, form=form, with_csrf=False, expected_code=403, ) # Add the client ID to the whitelist and try again. app.config["DIRECT_OAUTH_CLIENTID_WHITELIST"] = ["deadbeef"] headers = dict(authorization=gen_basic_auth("devtable", "password")) resp = self.postResponse( "web.authorize_application", headers=headers, form=form, with_csrf=False, expected_code=302, ) # Reset app config app.config["DIRECT_OAUTH_CLIENTID_WHITELIST"] = [] expected_value = "access_token=" if response_type == "token" else "code=" self.assertTrue(expected_value in resp.headers["Location"]) @parameterized.expand(["token", "code"]) def test_authorize_nocsrf_ratelimiting(self, response_type): # Note: Defined in initdb.py form = { "client_id": "deadbeef", "redirect_uri": "http://localhost:8000/o2c.html", "scope": "user:admin", "response_type": response_type, } # Try without the client id being in the whitelist a few times, making sure we eventually get rate limited. headers = dict(authorization=gen_basic_auth("devtable", "invalidpassword")) self.postResponse( "web.authorize_application", headers=headers, form=form, with_csrf=False, expected_code=401, ) counter = 0 while True: r = self.postResponse( "web.authorize_application", headers=headers, form=form, with_csrf=False, expected_code=None, ) self.assertNotEqual(200, r.status_code) counter = counter + 1 if counter > 5: self.fail("Exponential backoff did not fire") if r.status_code == 429: break class KeyServerTestCase(EndpointTestCase): def _get_test_jwt_payload(self): return { "iss": "sample_service", "aud": keyserver.JWT_AUDIENCE, "exp": int(time.time()) + 60, "iat": int(time.time()), "nbf": int(time.time()), } def test_list_service_keys(self): # Retrieve all the keys. all_keys = model.service_keys.list_all_keys() visible_jwks = [ jwk_with_kid(key) for key in model.service_keys.list_service_keys("sample_service") ] invisible_jwks = [] for key in all_keys: is_expired = key.expiration_date and key.expiration_date <= datetime.utcnow() if key.service != "sample_service" or key.approval is None or is_expired: invisible_jwks.append(key.jwk) rv = self.getResponse("key_server.list_service_keys", service="sample_service") jwkset = py_json.loads(rv) # Make sure the hidden keys are not returned and the visible ones are returned. self.assertTrue(len(visible_jwks) > 0) self.assertTrue(len(invisible_jwks) > 0) self.assertEqual(len(visible_jwks), len(jwkset["keys"])) for jwk in jwkset["keys"]: self.assertIn(jwk, visible_jwks) self.assertNotIn(jwk, invisible_jwks) def test_get_service_key(self): # 200 for an approved key self.getResponse("key_server.get_service_key", service="sample_service", kid="kid1") # 409 for an unapproved key self.getResponse( "key_server.get_service_key", service="sample_service", kid="kid3", expected_code=409 ) # 404 for a non-existant key self.getResponse( "key_server.get_service_key", service="sample_service", kid="kid9999", expected_code=404 ) # 403 for an approved but expired key that is inside of the 2 week window. self.getResponse( "key_server.get_service_key", service="sample_service", kid="kid6", expected_code=403 ) # 404 for an approved, expired key that is outside of the 2 week window. self.getResponse( "key_server.get_service_key", service="sample_service", kid="kid7", expected_code=404 ) def test_put_service_key(self): # No Authorization header should yield a 400 self.putResponse( "key_server.put_service_key", service="sample_service", kid="kid420", headers={ "Content-Type": "application/json", }, data={}, expected_code=400, ) # Mint a JWT with our test payload jwk = JsonWebKey.generate_key("RSA", 2048, is_private=True) private_pem = jwk.get_private_key().private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) payload = self._get_test_jwt_payload() token = jwt.encode(payload, private_pem, "RS256") # Invalid service name should yield a 400. self.putResponse( "key_server.put_service_key", service="sample service", kid="kid420", headers={ "Authorization": "Bearer %s" % token, "Content-Type": "application/json", }, data=jwk.as_dict(), expected_code=400, ) # Publish a new key with assert_action_logged("service_key_create"): self.putResponse( "key_server.put_service_key", service="sample_service", kid="kid420", headers={ "Authorization": "Bearer %s" % token, "Content-Type": "application/json", }, data=jwk.as_dict(), expected_code=202, ) # Ensure that the key exists but is unapproved. self.getResponse( "key_server.get_service_key", service="sample_service", kid="kid420", expected_code=409 ) # Attempt to rotate the key. Since not approved, it will fail. token = jwt.encode(payload, private_pem, "RS256", headers={"kid": "kid420"}) self.putResponse( "key_server.put_service_key", service="sample_service", kid="kid6969", headers={ "Authorization": "Bearer %s" % token, "Content-Type": "application/json", }, data=jwk.as_dict(), expected_code=403, ) # Approve the key. model.service_keys.approve_service_key( "kid420", ServiceKeyApprovalType.SUPERUSER, approver=1 ) # Rotate that new key with assert_action_logged("service_key_rotate"): token = jwt.encode(payload, private_pem, "RS256", headers={"kid": "kid420"}) self.putResponse( "key_server.put_service_key", service="sample_service", kid="kid6969", headers={ "Authorization": "Bearer %s" % token, "Content-Type": "application/json", }, data=jwk.as_dict(), expected_code=200, ) # Rotation should only work when signed by the previous key jwk = JsonWebKey.generate_key("RSA", 2048, is_private=True) private_pem = jwk.get_private_key().private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) token = jwt.encode(payload, private_pem, "RS256", headers={"kid": "kid420"}) self.putResponse( "key_server.put_service_key", service="sample_service", kid="kid6969", headers={ "Authorization": "Bearer %s" % token, "Content-Type": "application/json", }, data=jwk.as_dict(), expected_code=403, ) def test_attempt_delete_service_key_with_no_kid_signer(self): # Generate two keys, approving the first. private_key, _ = model.service_keys.generate_service_key( "sample_service", None, kid="first" ) # Mint a JWT with our test payload but *no kid*. token = jwt.encode( self._get_test_jwt_payload(), private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ), "RS256", headers={}, ) # Using the credentials of our key, attempt to delete our unapproved key self.deleteResponse( "key_server.delete_service_key", headers={"Authorization": "Bearer %s" % token}, expected_code=400, service="sample_service", kid="first", ) def test_attempt_delete_service_key_with_expired_key(self): # Generate two keys, approving the first. private_key, _ = model.service_keys.generate_service_key( "sample_service", None, kid="first" ) model.service_keys.approve_service_key( "first", ServiceKeyApprovalType.SUPERUSER, approver=1 ) model.service_keys.generate_service_key("sample_service", None, kid="second") # Mint a JWT with our test payload token = jwt.encode( self._get_test_jwt_payload(), private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ), "RS256", headers={"kid": "first"}, ) # Set the expiration of the first to now - some time. model.service_keys.set_key_expiration("first", datetime.utcnow() - timedelta(seconds=100)) # Using the credentials of our second key, attempt to delete our unapproved key self.deleteResponse( "key_server.delete_service_key", headers={"Authorization": "Bearer %s" % token}, expected_code=403, service="sample_service", kid="second", ) # Set the expiration to the future and delete the key. model.service_keys.set_key_expiration("first", datetime.utcnow() + timedelta(seconds=100)) with assert_action_logged("service_key_delete"): self.deleteEmptyResponse( "key_server.delete_service_key", headers={"Authorization": "Bearer %s" % token}, expected_code=204, service="sample_service", kid="second", ) def test_delete_unapproved_service_key(self): # No Authorization header should yield a 400 self.deleteResponse( "key_server.delete_service_key", expected_code=400, service="sample_service", kid="kid1" ) # Generate an unapproved key. private_key, _ = model.service_keys.generate_service_key( "sample_service", None, kid="unapprovedkeyhere" ) # Mint a JWT with our test payload token = jwt.encode( self._get_test_jwt_payload(), private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ), "RS256", headers={"kid": "unapprovedkeyhere"}, ) # Delete our unapproved key with itself. with assert_action_logged("service_key_delete"): self.deleteEmptyResponse( "key_server.delete_service_key", headers={"Authorization": "Bearer %s" % token}, expected_code=204, service="sample_service", kid="unapprovedkeyhere", ) def test_delete_chained_service_key(self): # No Authorization header should yield a 400 self.deleteResponse( "key_server.delete_service_key", expected_code=400, service="sample_service", kid="kid1" ) # Generate two keys. private_key, _ = model.service_keys.generate_service_key( "sample_service", None, kid="kid123" ) model.service_keys.generate_service_key("sample_service", None, kid="kid321") # Mint a JWT with our test payload token = jwt.encode( self._get_test_jwt_payload(), private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ), "RS256", headers={"kid": "kid123"}, ) # Using the credentials of our second key, attempt tp delete our unapproved key self.deleteResponse( "key_server.delete_service_key", headers={"Authorization": "Bearer %s" % token}, expected_code=403, service="sample_service", kid="kid321", ) # Approve the second key. model.service_keys.approve_service_key( "kid123", ServiceKeyApprovalType.SUPERUSER, approver=1 ) # Using the credentials of our approved key, delete our unapproved key with assert_action_logged("service_key_delete"): self.deleteEmptyResponse( "key_server.delete_service_key", headers={"Authorization": "Bearer %s" % token}, expected_code=204, service="sample_service", kid="kid321", ) # Attempt to delete a key signed by a key from a different service bad_token = jwt.encode( self._get_test_jwt_payload(), private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ), "RS256", headers={"kid": "kid5"}, ) self.deleteResponse( "key_server.delete_service_key", headers={"Authorization": "Bearer %s" % bad_token}, expected_code=403, service="sample_service", kid="kid123", ) # Delete a self-signed, approved key with assert_action_logged("service_key_delete"): self.deleteEmptyResponse( "key_server.delete_service_key", headers={"Authorization": "Bearer %s" % token}, expected_code=204, service="sample_service", kid="kid123", ) class AssignOauthAppTestCase(EndpointTestCase): def test_assign_user(self): self.login("devtable", "password") assigned_user = get_user("randomuser") org = get_organization("buynlarge") app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") assigned_auths = model.oauth.list_assigned_authorizations_for_user(assigned_user) assert len(assigned_auths) == 0 notifications = list_notifications(assigned_user) assert len(notifications) == 0 response = self.postResponse( "web.assign_user_to_app", with_csrf=True, expected_code=200, client_id=app.client_id, redirect_uri=app.redirect_uri, scope="user:admin", username="randomuser", response_type="token", ) assert "Token assigned successfully" in response.data.decode("utf-8") assigned_auths = model.oauth.list_assigned_authorizations_for_user(assigned_user) assert len(assigned_auths) == 1 assert assigned_auths[0].application == app notifications = list_notifications(assigned_user) assert len(notifications) == 1 assert ( notifications[0].kind == NotificationKind.select() .where(NotificationKind.name == "assigned_authorization") .get() ) def test_assign_user_unauthenticated(self): org = get_organization("buynlarge") app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") self.postResponse( "web.assign_user_to_app", with_csrf=True, expected_code=401, client_id=app.client_id, redirect_uri=app.redirect_uri, scope="user:admin", username="randomuser", response_type="token", ) def test_assign_user_user_does_not_exist(self): self.login("devtable", "password") org = get_organization("buynlarge") app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") self.postResponse( "web.assign_user_to_app", with_csrf=True, expected_code=404, client_id=app.client_id, redirect_uri=app.redirect_uri, scope="user:admin", username="doesnotexist", response_type="token", ) def test_assign_user_app_does_not_exist(self): self.login("devtable", "password") self.postResponse( "web.assign_user_to_app", with_csrf=True, expected_code=404, client_id="doesnotexist", redirect_uri="http://foo/bar/baz", scope="user:admin", username="randomuser", response_type="token", ) def test_assign_user_not_org_admin(self): self.login("devtable", "password") user = get_user("randomuser") org = create_organization("neworg", "neworg@devtable.com", user) app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") self.postResponse( "web.assign_user_to_app", with_csrf=True, expected_code=403, client_id=app.client_id, redirect_uri=app.redirect_uri, scope="user:admin", username="freshuser", response_type="token", ) def test_assign_user_disabled(self): self.login("devtable", "password") user = get_user("randomuser") org = create_organization("neworg", "neworg@devtable.com", user) app = model.oauth.create_application(org, "test", "http://foo/bar", "http://foo/bar/baz") with toggle_feature("ASSIGN_OAUTH_TOKEN", False): self.postResponse( "web.assign_user_to_app", with_csrf=True, expected_code=404, client_id=app.client_id, redirect_uri=app.redirect_uri, scope="user:admin", username="freshuser", response_type="token", ) if __name__ == "__main__": unittest.main()