1
0
mirror of https://github.com/quay/quay.git synced 2025-04-18 10:44:06 +03:00
quay/notifications/notificationmethod.py
Ivan Bazulic 176e1c934d
webhook: Fix failure in sending webhook POST requests (PROJQUAY-7468) (#3020)
`urllib` requires that certs be parsed as tuple, not as a list. This small fix addresses that issue.
2024-07-11 17:04:53 -04:00

521 lines
18 KiB
Python

import json
import logging
import os.path
import re
from html.parser import HTMLParser
from unittest import mock
from urllib.parse import urlparse
import requests
from flask_mail import Message
import features
from app import OVERRIDE_CONFIG_DIRECTORY, app, mail
from data import model
from util.fips import login_fips_safe
from util.jsontemplate import JSONTemplate, JSONTemplateParseException
from workers.queueworker import JobException
logger = logging.getLogger(__name__)
METHOD_TIMEOUT = app.config.get("NOTIFICATION_SEND_TIMEOUT", 10) # Seconds
HOSTNAME_BLACKLIST = ["localhost", "127.0.0.1"]
HOSTNAME_BLACKLIST.extend(app.config.get("WEBHOOK_HOSTNAME_BLACKLIST", []))
MAIL_DEFAULT_SENDER = app.config.get("MAIL_DEFAULT_SENDER", "admin@example.com")
SSL_FILENAMES = ["ssl.cert", "ssl.key"]
class InvalidNotificationMethodException(Exception):
pass
class CannotValidateNotificationMethodException(Exception):
pass
class NotificationMethodPerformException(JobException):
pass
def _ssl_cert():
if app.config["PREFERRED_URL_SCHEME"] == "https":
cert_files = tuple([OVERRIDE_CONFIG_DIRECTORY + f for f in SSL_FILENAMES])
cert_exists = all([os.path.isfile(f) for f in cert_files])
return cert_files if cert_exists else None
return None
class NotificationMethod(object):
def __init__(self):
pass
@classmethod
def method_name(cls):
"""
Particular method implemented by subclasses.
"""
raise NotImplementedError
def validate(self, namespace_name, repo_name, config_data):
"""
Validates that the notification can be created with the given data.
Throws a CannotValidateNotificationMethodException on failure.
"""
raise NotImplementedError
def perform(self, notification_obj, event_handler, notification_data):
"""
Performs the notification method.
notification_obj: The notification namedtuple.
event_handler: The NotificationEvent handler.
notification_data: The dict of notification data placed in the queue.
"""
raise NotImplementedError
@classmethod
def get_method(cls, methodname):
for subc in cls.__subclasses__():
if subc.method_name() == methodname:
return subc()
raise InvalidNotificationMethodException("Unable to find method: %s" % methodname)
class QuayNotificationMethod(NotificationMethod):
@classmethod
def method_name(cls):
return "quay_notification"
def validate(self, namespace_name, repo_name, config_data):
_, err_message, _ = self.find_targets(namespace_name, config_data)
if err_message:
raise CannotValidateNotificationMethodException(err_message)
def find_targets(self, namespace_name, config_data):
target_info = config_data.get("target", None)
if not target_info or not target_info.get("kind"):
return (True, "Missing target", [])
if target_info["kind"] == "user":
target = model.user.get_nonrobot_user(target_info["name"])
if not target:
# Just to be safe.
return (True, "Unknown user %s" % target_info["name"], [])
return (True, None, [target])
elif target_info["kind"] == "org":
try:
target = model.organization.get_organization(target_info["name"])
except model.organization.InvalidOrganizationException:
return (True, "Unknown organization %s" % target_info["name"], None)
# Only repositories under the organization can cause notifications to that org.
if target_info["name"] != namespace_name:
return (False, "Organization name must match repository namespace")
return (True, None, [target])
elif target_info["kind"] == "team":
# Lookup the team.
org_team = None
try:
org_team = model.team.get_organization_team(namespace_name, target_info["name"])
except model.InvalidTeamException:
# Probably deleted.
return (True, "Unknown team %s" % target_info["name"], None)
# Lookup the team's members
return (True, None, model.organization.get_organization_team_members(org_team.id))
def perform(self, notification_obj, event_handler, notification_data):
repository = notification_obj.repository
if not repository:
# Probably deleted.
return
# Lookup the target user or team to which we'll send the notification.
config_data = notification_obj.method_config_dict
status, err_message, target_users = self.find_targets(
repository.namespace_name, config_data
)
if not status:
raise NotificationMethodPerformException(err_message)
# For each of the target users, create a notification.
for target_user in set(target_users or []):
model.notification.create_notification(
event_handler.event_name(), target_user, metadata=notification_data["event_data"]
)
class EmailMethod(NotificationMethod):
@classmethod
def method_name(cls):
return "email"
def validate(self, namespace_name, repo_name, config_data):
email = config_data.get("email", "")
if not email:
raise CannotValidateNotificationMethodException("Missing e-mail address")
record = model.repository.get_email_authorized_for_repo(namespace_name, repo_name, email)
if not record or not record.confirmed:
raise CannotValidateNotificationMethodException(
"The specified e-mail address "
"is not authorized to receive "
"notifications for this repository"
)
def perform(self, notification_obj, event_handler, notification_data):
config_data = notification_obj.method_config_dict
email = config_data.get("email", "")
if not email:
return
with app.app_context():
msg = Message(
event_handler.get_summary(notification_data["event_data"], notification_data),
recipients=[email],
)
msg.html = event_handler.get_message(notification_data["event_data"], notification_data)
try:
if features.FIPS:
assert app.config[
"MAIL_USE_TLS"
], "MAIL_USE_TLS must be enabled to use SMTP in FIPS mode."
with mock.patch("smtplib.SMTP.login", login_fips_safe):
mail.send(msg)
else:
mail.send(msg)
except Exception as ex:
logger.exception("Email was unable to be sent")
raise NotificationMethodPerformException(str(ex))
class WebhookMethod(NotificationMethod):
@classmethod
def method_name(cls):
return "webhook"
def validate(self, namespace_name, repo_name, config_data):
# Validate the URL.
url = config_data.get("url", "")
if not url:
raise CannotValidateNotificationMethodException("Missing webhook URL")
parsed = urlparse(url)
if parsed.scheme != "https" and parsed.scheme != "http":
raise CannotValidateNotificationMethodException("Invalid webhook URL")
if parsed.hostname.lower() in HOSTNAME_BLACKLIST:
raise CannotValidateNotificationMethodException("Invalid webhook URL")
# If a template was specified, ensure it is a valid template.
template = config_data.get("template")
if template:
# Validate template.
try:
JSONTemplate(template)
except JSONTemplateParseException as jtpe:
raise CannotValidateNotificationMethodException(str(jtpe))
def perform(self, notification_obj, event_handler, notification_data):
config_data = notification_obj.method_config_dict
url = config_data.get("url", "")
if not url:
return
parsed = urlparse(url)
if parsed.scheme != "https" and parsed.scheme != "http":
logger.error("Invalid webhook URL: %s", url)
return
if parsed.hostname.lower() in HOSTNAME_BLACKLIST:
logger.error("Invalid webhook URL: %s", url)
return
payload = notification_data["event_data"]
template = config_data.get("template")
if template:
try:
jt = JSONTemplate(template)
payload = jt.apply(payload)
except JSONTemplateParseException as jtpe:
logger.exception("Got exception when trying to process template `%s`", template)
raise NotificationMethodPerformException(str(jtpe))
headers = {"Content-type": "application/json"}
try:
resp = requests.post(
url,
data=json.dumps(payload),
headers=headers,
cert=_ssl_cert(),
timeout=METHOD_TIMEOUT,
)
if resp.status_code // 100 != 2:
error_message = "%s response for webhook to url: %s" % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception("Webhook was unable to be sent")
raise NotificationMethodPerformException(str(ex))
class FlowdockMethod(NotificationMethod):
"""
Method for sending notifications to Flowdock via the Team Inbox API:
https://www.flowdock.com/api/team-inbox
"""
@classmethod
def method_name(cls):
return "flowdock"
def validate(self, namespace_name, repo_name, config_data):
token = config_data.get("flow_api_token", "")
if not token:
raise CannotValidateNotificationMethodException("Missing Flowdock API Token")
def perform(self, notification_obj, event_handler, notification_data):
config_data = notification_obj.method_config_dict
token = config_data.get("flow_api_token", "")
if not token:
return
owner = model.user.get_user_or_org(notification_obj.repository.namespace_name)
if not owner:
# Something went wrong.
return
url = "https://api.flowdock.com/v1/messages/team_inbox/%s" % token
headers = {"Content-type": "application/json"}
payload = {
"source": "Quay",
"from_address": MAIL_DEFAULT_SENDER,
"subject": event_handler.get_summary(
notification_data["event_data"], notification_data
),
"content": event_handler.get_message(
notification_data["event_data"], notification_data
),
"from_name": owner.username,
"project": (
notification_obj.repository.namespace_name + " " + notification_obj.repository.name
),
"tags": ["#" + event_handler.event_name()],
"link": notification_data["event_data"]["homepage"],
}
try:
resp = requests.post(
url, data=json.dumps(payload), headers=headers, timeout=METHOD_TIMEOUT
)
if resp.status_code // 100 != 2:
error_message = "%s response for flowdock to url: %s" % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception("Flowdock method was unable to be sent")
raise NotificationMethodPerformException(str(ex))
class HipchatMethod(NotificationMethod):
"""
Method for sending notifications to Hipchat via the API:
https://www.hipchat.com/docs/apiv2/method/send_room_notification
"""
@classmethod
def method_name(cls):
return "hipchat"
def validate(self, namespace_name, repo_name, config_data):
if not config_data.get("notification_token", ""):
raise CannotValidateNotificationMethodException(
"Missing Hipchat Room Notification Token"
)
if not config_data.get("room_id", ""):
raise CannotValidateNotificationMethodException("Missing Hipchat Room ID")
def perform(self, notification_obj, event_handler, notification_data):
config_data = notification_obj.method_config_dict
token = config_data.get("notification_token", "")
room_id = config_data.get("room_id", "")
if not token or not room_id:
return
owner = model.user.get_user_or_org(notification_obj.repository.namespace_name)
if not owner:
# Something went wrong.
return
url = "https://api.hipchat.com/v2/room/%s/notification?auth_token=%s" % (room_id, token)
level = event_handler.get_level(notification_data["event_data"], notification_data)
color = {
"info": "gray",
"warning": "yellow",
"error": "red",
"success": "green",
"primary": "purple",
}.get(level, "gray")
headers = {"Content-type": "application/json"}
payload = {
"color": color,
"message": event_handler.get_message(
notification_data["event_data"], notification_data
),
"notify": level == "error",
"message_format": "html",
}
try:
resp = requests.post(
url, data=json.dumps(payload), headers=headers, timeout=METHOD_TIMEOUT
)
if resp.status_code // 100 != 2:
error_message = "%s response for hipchat to url: %s" % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception("Hipchat method was unable to be sent")
raise NotificationMethodPerformException(str(ex))
class SlackAdjuster(HTMLParser):
def __init__(self):
super().__init__()
self.reset()
self.result = []
def handle_data(self, d):
self.result.append(d)
def get_attr(self, attrs, name):
for attr in attrs:
if attr[0] == name:
return attr[1]
return ""
def handle_starttag(self, tag, attrs):
if tag == "a":
self.result.append("<%s|" % (self.get_attr(attrs, "href"),))
if tag == "i":
self.result.append("_")
if tag == "b" or tag == "strong":
self.result.append("*")
if tag == "img":
self.result.append("")
def handle_endtag(self, tag):
if tag == "a":
self.result.append(">")
if tag == "b" or tag == "strong":
self.result.append("*")
if tag == "i":
self.result.append("_")
def get_data(self):
return "".join(self.result)
def adjust_tags(html):
s = SlackAdjuster()
s.feed(html)
return s.get_data()
class SlackMethod(NotificationMethod):
"""
Method for sending notifications to Slack via the API:
https://api.slack.com/docs/attachments
"""
@classmethod
def method_name(cls):
return "slack"
def validate(self, namespace_name, repo_name, config_data):
if not config_data.get("url", ""):
raise CannotValidateNotificationMethodException("Missing Slack Callback URL")
def format_for_slack(self, message):
message = message.replace("\n", "")
message = re.sub(r"\s+", " ", message)
message = message.replace("<br>", "\n")
return adjust_tags(message)
def perform(self, notification_obj, event_handler, notification_data):
config_data = notification_obj.method_config_dict
url = config_data.get("url", "")
if not url:
return
owner = model.user.get_user_or_org(notification_obj.repository.namespace_name)
if not owner:
# Something went wrong.
return
level = event_handler.get_level(notification_data["event_data"], notification_data)
color = {
"info": "#ffffff",
"warning": "warning",
"error": "danger",
"success": "good",
"primary": "good",
}.get(level, "#ffffff")
summary = event_handler.get_summary(notification_data["event_data"], notification_data)
message = event_handler.get_message(notification_data["event_data"], notification_data)
headers = {"Content-type": "application/json"}
payload = {
"text": summary,
"username": "quayiobot",
"attachments": [
{
"fallback": summary,
"text": self.format_for_slack(message),
"color": color,
"mrkdwn_in": ["text"],
}
],
}
try:
resp = requests.post(
url, data=json.dumps(payload), headers=headers, timeout=METHOD_TIMEOUT
)
if resp.status_code // 100 != 2:
error_message = "%s response for Slack to url: %s" % (resp.status_code, url)
logger.error(error_message)
logger.error(resp.content)
raise NotificationMethodPerformException(error_message)
except requests.exceptions.RequestException as ex:
logger.exception("Slack method was unable to be sent: %s", str(ex))
raise NotificationMethodPerformException(str(ex))