mirror of
https://github.com/quay/quay.git
synced 2026-01-26 06:21:37 +03:00
184 lines
6.9 KiB
Python
184 lines
6.9 KiB
Python
import logging
|
|
|
|
from oauth.base import OAuthEndpoint
|
|
from oauth.login import OAuthLoginService, OAuthLoginException
|
|
from util import slash_join
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class GithubOAuthService(OAuthLoginService):
|
|
def __init__(self, config, key_name):
|
|
super(GithubOAuthService, self).__init__(config, key_name)
|
|
|
|
def login_enabled(self, config):
|
|
return config.get("FEATURE_GITHUB_LOGIN", False)
|
|
|
|
def service_id(self):
|
|
return "github"
|
|
|
|
def service_name(self):
|
|
if self.is_enterprise():
|
|
return "GitHub Enterprise"
|
|
|
|
return "GitHub"
|
|
|
|
def get_icon(self):
|
|
return "fa-github"
|
|
|
|
def get_login_scopes(self):
|
|
if self.config.get("ORG_RESTRICT"):
|
|
return ["user:email", "read:org"]
|
|
|
|
return ["user:email"]
|
|
|
|
def allowed_organizations(self):
|
|
if not self.config.get("ORG_RESTRICT", False):
|
|
return None
|
|
|
|
allowed = self.config.get("ALLOWED_ORGANIZATIONS", None)
|
|
if allowed is None:
|
|
return None
|
|
|
|
return [org.lower() for org in allowed]
|
|
|
|
def get_public_url(self, suffix):
|
|
return slash_join(self._endpoint(), suffix)
|
|
|
|
def _endpoint(self):
|
|
return self.config.get("GITHUB_ENDPOINT", "https://github.com")
|
|
|
|
def is_enterprise(self):
|
|
return self._api_endpoint().find(".github.com") < 0
|
|
|
|
def authorize_endpoint(self):
|
|
return OAuthEndpoint(slash_join(self._endpoint(), "/login/oauth/authorize"))
|
|
|
|
def token_endpoint(self):
|
|
return OAuthEndpoint(slash_join(self._endpoint(), "/login/oauth/access_token"))
|
|
|
|
def user_endpoint(self):
|
|
return OAuthEndpoint(slash_join(self._api_endpoint(), "user"))
|
|
|
|
def _api_endpoint(self):
|
|
return self.config.get("API_ENDPOINT", slash_join(self._endpoint(), "/api/v3/"))
|
|
|
|
def api_endpoint(self):
|
|
endpoint = self._api_endpoint()
|
|
if endpoint.endswith("/"):
|
|
return endpoint[0:-1]
|
|
|
|
return endpoint
|
|
|
|
def email_endpoint(self):
|
|
return slash_join(self._api_endpoint(), "user/emails")
|
|
|
|
def orgs_endpoint(self):
|
|
return slash_join(self._api_endpoint(), "user/orgs")
|
|
|
|
def validate_client_id_and_secret(self, http_client, url_scheme_and_hostname):
|
|
# First: Verify that the github endpoint is actually Github by checking for the
|
|
# X-GitHub-Request-Id here.
|
|
api_endpoint = self._api_endpoint()
|
|
result = http_client.get(
|
|
api_endpoint, auth=(self.client_id(), self.client_secret()), timeout=5
|
|
)
|
|
if not "X-GitHub-Request-Id" in result.headers:
|
|
raise Exception("Endpoint is not a Github (Enterprise) installation")
|
|
|
|
# Next: Verify the client ID and secret.
|
|
# Note: The following code is a hack until such time as Github officially adds an API endpoint
|
|
# for verifying a {client_id, client_secret} pair. This workaround was given to us
|
|
# *by a Github Engineer* (Jan 8, 2015).
|
|
#
|
|
# TODO: Replace with the real API call once added.
|
|
#
|
|
# Hitting the endpoint applications/{client_id}/tokens/foo will result in the following
|
|
# behavior IF the client_id is given as the HTTP username and the client_secret as the HTTP
|
|
# password:
|
|
# - If the {client_id, client_secret} pair is invalid in some way, we get a 401 error.
|
|
# - If the pair is valid, then we get a 404 because the 'foo' token does not exists.
|
|
validate_endpoint = slash_join(
|
|
api_endpoint, "applications/%s/tokens/foo" % self.client_id()
|
|
)
|
|
result = http_client.get(
|
|
validate_endpoint, auth=(self.client_id(), self.client_secret()), timeout=5
|
|
)
|
|
return result.status_code == 404
|
|
|
|
def validate_organization(self, organization_id, http_client):
|
|
org_endpoint = slash_join(self._api_endpoint(), "orgs/%s" % organization_id.lower())
|
|
|
|
result = http_client.get(
|
|
org_endpoint, headers={"Accept": "application/vnd.github.moondragon+json"}, timeout=5
|
|
)
|
|
|
|
return result.status_code == 200
|
|
|
|
def get_public_config(self):
|
|
return {
|
|
"CLIENT_ID": self.client_id(),
|
|
"AUTHORIZE_ENDPOINT": self.authorize_endpoint().to_url(),
|
|
"GITHUB_ENDPOINT": self._endpoint(),
|
|
"ORG_RESTRICT": self.config.get("ORG_RESTRICT", False),
|
|
}
|
|
|
|
def get_login_service_id(self, user_info):
|
|
return user_info["id"]
|
|
|
|
def get_login_service_username(self, user_info):
|
|
return user_info["login"]
|
|
|
|
def get_verified_user_email(self, app_config, http_client, token, user_info):
|
|
v3_media_type = {
|
|
"Accept": "application/vnd.github.v3",
|
|
"Authorization": "token %s" % token,
|
|
}
|
|
|
|
# Find the e-mail address for the user: we will accept any email, but we prefer the primary
|
|
get_email = http_client.get(self.email_endpoint(), headers=v3_media_type)
|
|
if get_email.status_code // 100 != 2:
|
|
raise OAuthLoginException(
|
|
"Got non-2XX status code for emails endpoint: %s" % get_email.status_code
|
|
)
|
|
|
|
verified_emails = [email for email in get_email.json() if email["verified"]]
|
|
primary_emails = [email for email in get_email.json() if email["primary"]]
|
|
|
|
# Special case: We don't care about whether an e-mail address is "verified" under GHE.
|
|
if self.is_enterprise() and not verified_emails:
|
|
verified_emails = primary_emails
|
|
|
|
allowed_emails = primary_emails or verified_emails or []
|
|
return allowed_emails[0]["email"] if len(allowed_emails) > 0 else None
|
|
|
|
def service_verify_user_info_for_login(self, app_config, http_client, token, user_info):
|
|
# Retrieve the user's organizations (if organization filtering is turned on)
|
|
if self.allowed_organizations() is None:
|
|
return
|
|
|
|
moondragon_media_type = {
|
|
"Accept": "application/vnd.github.moondragon+json",
|
|
"Authorization": "token %s" % token,
|
|
}
|
|
|
|
get_orgs = http_client.get(self.orgs_endpoint(), headers=moondragon_media_type)
|
|
|
|
if get_orgs.status_code // 100 != 2:
|
|
logger.debug("get_orgs response: %s", get_orgs.json())
|
|
raise OAuthLoginException(
|
|
"Got non-2XX response for org lookup: %s" % get_orgs.status_code
|
|
)
|
|
|
|
organizations = set([org.get("login").lower() for org in get_orgs.json()])
|
|
matching_organizations = organizations & set(self.allowed_organizations())
|
|
if not matching_organizations:
|
|
logger.debug(
|
|
"Found organizations %s, but expected one of %s",
|
|
organizations,
|
|
self.allowed_organizations(),
|
|
)
|
|
err = """You are not a member of an allowed GitHub organization.
|
|
Please contact your system administrator if you believe this is in error."""
|
|
raise OAuthLoginException(err)
|