1
0
mirror of https://github.com/quay/quay.git synced 2025-07-30 07:43:13 +03:00

api: /v1/user/initialize to create first user (PROJQUAY-1926) (#771)

Add an unauthenticated API endpoint to create the initial user in the database. Usage is primarily intended for deployment automation.
This commit is contained in:
thomasmckay
2021-07-23 12:03:47 -04:00
committed by GitHub
parent fa0e861849
commit 6470248be1
11 changed files with 196 additions and 15 deletions

View File

@ -97,9 +97,7 @@ def test_valid_token(app):
def test_valid_oauth(app):
user = model.user.get_user("devtable")
app = model.oauth.list_applications_for_org(model.user.get_user_or_org("buynlarge"))[0]
oauth_token, code = model.oauth.create_access_token_for_testing(
user, app.client_id, "repo:read"
)
oauth_token, code = model.oauth.create_user_access_token(user, app.client_id, "repo:read")
token = _token(OAUTH_TOKEN_USERNAME, code)
result = validate_basic_auth(token)
assert result == ValidateResult(AuthKind.basic, oauthtoken=oauth_token)

View File

@ -48,9 +48,7 @@ def test_valid_token(app):
def test_valid_oauth(app):
user = model.user.get_user("devtable")
app = model.oauth.list_applications_for_org(model.user.get_user_or_org("buynlarge"))[0]
oauth_token, code = model.oauth.create_access_token_for_testing(
user, app.client_id, "repo:read"
)
oauth_token, code = model.oauth.create_user_access_token(user, app.client_id, "repo:read")
result, kind = validate_credentials(OAUTH_TOKEN_USERNAME, code)
assert kind == CredentialKind.oauth_token
assert result == ValidateResult(AuthKind.oauth, oauthtoken=oauth_token)

View File

@ -28,7 +28,7 @@ def test_valid_oauth(app):
user = model.user.get_user("devtable")
app = model.oauth.list_applications_for_org(model.user.get_user_or_org("buynlarge"))[0]
token_string = "%s%s" % ("a" * 20, "b" * 20)
oauth_token, _ = model.oauth.create_access_token_for_testing(
oauth_token, _ = model.oauth.create_user_access_token(
user, app.client_id, "repo:read", access_token=token_string
)
result = validate_bearer_auth("bearer " + token_string)
@ -40,7 +40,7 @@ def test_valid_oauth(app):
def test_disabled_user_oauth(app):
user = model.user.get_user("disabled")
token_string = "%s%s" % ("a" * 20, "b" * 20)
oauth_token, _ = model.oauth.create_access_token_for_testing(
oauth_token, _ = model.oauth.create_user_access_token(
user, "deadbeef", "repo:admin", access_token=token_string
)
@ -54,7 +54,7 @@ def test_disabled_user_oauth(app):
def test_expired_token(app):
user = model.user.get_user("devtable")
token_string = "%s%s" % ("a" * 20, "b" * 20)
oauth_token, _ = model.oauth.create_access_token_for_testing(
oauth_token, _ = model.oauth.create_user_access_token(
user, "deadbeef", "repo:admin", access_token=token_string, expires_in=-1000
)

View File

@ -777,3 +777,6 @@ class DefaultConfig(ImmutableConfig):
# Account recovery mode
ACCOUNT_RECOVERY_MODE = False
# Feature Flag: If set to true, the first User account may be created via API /api/v1/user/initialize
FEATURE_USER_INITIALIZE = False

View File

@ -382,7 +382,7 @@ def list_applications_for_org(org):
return query
def create_access_token_for_testing(user_obj, client_id, scope, access_token=None, expires_in=9000):
def create_user_access_token(user_obj, client_id, scope, access_token=None, expires_in=9000):
access_token = access_token or random_string_generator(length=40)()
token_name = access_token[:ACCESS_TOKEN_PREFIX_LENGTH]
token_code = access_token[ACCESS_TOKEN_PREFIX_LENGTH:]

View File

@ -2,9 +2,10 @@ import pytest
from mock import patch
from flask import url_for
from endpoints.api.test.shared import conduct_api_call
from endpoints.api.user import User
from endpoints.test.shared import client_with_identity
from endpoints.test.shared import client_with_identity, conduct_call
from features import FeatureNameValue
from test.fixtures import *
@ -40,3 +41,96 @@ def test_user_metadata_update(client):
# The location field should be unchanged.
assert user.get("location") == location
@pytest.mark.parametrize(
"user_count, expected_code, feature_mailing, feature_user_initialize, metadata",
[
# Non-empty database fails
(
1,
400,
True,
True,
{
"username": "nonemptydb",
"password": "password",
"email": "someone@somewhere.com",
},
),
# Empty database with mailing succeeds
(
0,
200,
True,
True,
{
"username": "emptydbemail",
"password": "password",
"email": "someone@somewhere.com",
},
),
# Empty database without mailing succeeds
(
0,
200,
False,
True,
{
"username": "emptydbnoemail",
"password": "password",
},
),
# Empty database with mailing missing email fails
(
0,
400,
True,
True,
{
"username": "emptydbbademail",
"password": "password",
},
),
# Empty database with access token
(
0,
200,
False,
True,
{"username": "emptydbtoken", "password": "password", "access_token": "true"},
),
],
)
def test_initialize_user(
user_count, expected_code, feature_mailing, feature_user_initialize, metadata, client
):
with patch("endpoints.web.has_users") as mock_user_count:
with patch("features.MAILING", FeatureNameValue("MAILING", feature_mailing)):
with patch(
"features.USER_INITIALIZE",
FeatureNameValue("USER_INITIALIZE", feature_user_initialize),
):
mock_user_count.return_value = user_count
user = conduct_call(
client,
"web.user_initialize",
url_for,
"POST",
{},
body=metadata,
expected_code=expected_code,
headers={"Content-Type": "application/json"},
)
if expected_code == 200:
assert user.json["username"] == metadata["username"]
if feature_mailing:
assert user.json["email"] == metadata["email"]
else:
assert user.json["email"] is None
assert user.json.get("encrypted_password", None)
if metadata.get("access_token"):
assert 40 == len(user.json.get("access_token", ""))
else:
assert not user.json.get("access_token")

View File

@ -32,6 +32,7 @@ from app import (
get_app_url,
instance_keys,
storage,
authentication,
)
from auth import scopes
from auth.auth_context import get_authenticated_user
@ -50,7 +51,7 @@ from buildtrigger.bitbuckethandler import BitbucketBuildTrigger
from buildtrigger.customhandler import CustomBuildTrigger
from buildtrigger.triggerutil import TriggerProviderException
from data import model
from data.database import db, RepositoryTag, TagToRepositoryTag
from data.database import db, RepositoryTag, TagToRepositoryTag, random_string_generator, User
from endpoints.api.discovery import swagger_route_data
from endpoints.common import common_login, render_page_template
from endpoints.csrf import csrf_protect, generate_csrf_token, verify_csrf
@ -897,3 +898,82 @@ def redirect_to_namespace(namespace):
return redirect(url_for("web.org_view", path=namespace))
else:
return redirect(url_for("web.user_view", path=namespace))
def has_users():
"""
Return false if no users in database yet
"""
return bool(User.select().limit(1))
@web.route("/api/v1/user/initialize", methods=["POST"])
@route_show_if(features.USER_INITIALIZE)
def user_initialize():
"""
Create initial user in an empty database
"""
# Ensure that we are using database auth.
if not features.USER_INITIALIZE:
response = jsonify({"message": "Cannot initialize user, FEATURE_USER_INITIALIZE is False"})
response.status_code = 400
return response
# Ensure that we are using database auth.
if app.config["AUTHENTICATION_TYPE"] != "Database":
response = jsonify({"message": "Cannot initialize user in a non-database auth system"})
response.status_code = 400
return response
if has_users():
response = jsonify({"message": "Cannot initialize user in a non-empty database"})
response.status_code = 400
return response
user_data = request.get_json()
try:
prompts = model.user.get_default_user_prompts(features)
new_user = model.user.create_user(
user_data["username"],
user_data["password"],
user_data.get("email"),
auto_verify=True,
email_required=features.MAILING,
is_possible_abuser=False,
prompts=prompts,
)
success, headers = common_login(new_user.uuid)
if not success:
response = jsonify({"message": "Could not login. Failed to initialize user"})
response.status_code = 403
return response
result = {
"username": user_data["username"],
"email": user_data.get("email"),
"encrypted_password": authentication.encrypt_user_password(
user_data["password"]
).decode("ascii"),
}
if user_data.get("access_token"):
model.oauth.create_application(
new_user,
"automation",
"",
"",
client_id=user_data["username"],
description="Application token generated via /api/v1/user/initialize",
)
scope = "org:admin repo:admin repo:create repo:read repo:write super:user user:admin user:read"
created, access_token = model.oauth.create_user_access_token(
new_user, user_data["username"], scope
)
result["access_token"] = access_token
return (result, 200, headers)
except model.user.DataModelException as ex:
response = jsonify({"message": "Failed to initialize user: " + str(ex)})
response.status_code = 400
return response

View File

@ -888,7 +888,7 @@ def populate_database(minimal=False):
description="This is another test application",
)
model.oauth.create_access_token_for_testing(
model.oauth.create_user_access_token(
new_user_1, "deadbeef", "repo:admin", access_token="%s%s" % ("b" * 40, "c" * 40)
)

View File

@ -1265,7 +1265,7 @@ class TestCreateOrganization(ApiTestCase):
# Attempt with auth with invalid scope.
dt_user = model.user.get_user(ADMIN_ACCESS_USER)
token, code = model.oauth.create_access_token_for_testing(dt_user, "deadbeef", "repo:read")
token, code = model.oauth.create_user_access_token(dt_user, "deadbeef", "repo:read")
self.postResponse(
OrganizationList,
data=dict(name="neworg", email="testorg@example.com"),
@ -1274,7 +1274,7 @@ class TestCreateOrganization(ApiTestCase):
)
# Create OAuth token with user:admin scope.
token, code = model.oauth.create_access_token_for_testing(
token, code = model.oauth.create_user_access_token(
dt_user, "deadbeef", "user:admin", access_token="d" * 40
)
data = self.postResponse(

View File

@ -112,3 +112,5 @@ class TestConfig(DefaultConfig):
FEATURE_REPO_MIRROR = True
FEATURE_GENERAL_OCI_SUPPORT = True
OCI_NAMESPACE_WHITELIST = []
FEATURE_USER_INITIALIZE = True

View File

@ -1173,5 +1173,11 @@ CONFIG_SCHEMA = {
"description": "Whether new push to a non-existent organization creates it. Defaults to False.",
"x-example": False,
},
# Allow first user to be initialized via API
"FEATURE_USER_INITIALIZE": {
"type": "boolean",
"description": "If set to true, the first User account may be created via API /api/v1/user/initialize",
"x-example": False,
},
},
}