mirror of
https://github.com/quay/quay.git
synced 2026-01-27 18:42:52 +03:00
* chore: drop deprecated tables and remove unused code * isort imports * migration: check for table existence before drop
207 lines
6.3 KiB
Python
207 lines
6.3 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from abc import ABCMeta, abstractmethod
|
|
from collections import namedtuple
|
|
from threading import Lock, Thread
|
|
from typing import Dict
|
|
|
|
import geoip2.database
|
|
import geoip2.errors
|
|
import requests
|
|
from cachetools.func import lru_cache, ttl_cache
|
|
from netaddr import AddrFormatError, IPAddress, IPNetwork, IPSet
|
|
from six import add_metaclass
|
|
|
|
from util.abchelpers import nooper
|
|
|
|
ResolvedLocation = namedtuple(
|
|
"ResolvedLocation",
|
|
["provider", "service", "sync_token", "country_iso_code", "aws_region", "continent"],
|
|
)
|
|
|
|
AWS_SERVICES = {"EC2", "CODEBUILD"}
|
|
|
|
# https://support.maxmind.com/hc/en-us/articles/4414877149467-IP-Geolocation-Data#h_01FRRGRYTGZB29ERDBZCX3MR8Q
|
|
GEOIP_CONTINENT_CODES = ["AF", "AN", "AS", "EU", "NA", "OC", "SA"]
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_aws_ip_ranges():
|
|
try:
|
|
# resolve absolute path to file
|
|
path = os.path.dirname(os.path.abspath(__file__))
|
|
file_path = os.path.join(path, "aws-ip-ranges.json")
|
|
with open(file_path, "r") as f:
|
|
return json.loads(f.read())
|
|
except IOError:
|
|
logger.exception("Could not load AWS IP Ranges")
|
|
return None
|
|
except ValueError:
|
|
logger.exception("Could not load AWS IP Ranges")
|
|
return None
|
|
except TypeError:
|
|
logger.exception("Could not load AWS IP Ranges")
|
|
return None
|
|
|
|
|
|
@add_metaclass(ABCMeta)
|
|
class IPResolverInterface(object):
|
|
"""
|
|
Helper class for resolving information about an IP address.
|
|
"""
|
|
|
|
@abstractmethod
|
|
def resolve_ip(self, ip_address):
|
|
"""
|
|
Attempts to return resolved information about the specified IP Address.
|
|
|
|
If such an attempt fails, returns None.
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def is_ip_possible_threat(self, ip_address):
|
|
"""
|
|
Attempts to return whether the given IP address is a possible abuser or spammer.
|
|
|
|
Returns False if the IP address information could not be looked up.
|
|
"""
|
|
pass
|
|
|
|
|
|
@nooper
|
|
class NoopIPResolver(IPResolverInterface):
|
|
"""
|
|
No-op version of the security scanner API.
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
class IPResolver(IPResolverInterface):
|
|
def __init__(self, app):
|
|
self.app = app
|
|
|
|
# resolve absolute path to file
|
|
path = os.path.dirname(os.path.abspath(__file__))
|
|
file_path = os.path.join(path, "GeoLite2-Country.mmdb")
|
|
self.geoip_db = geoip2.database.Reader(file_path)
|
|
|
|
self.amazon_ranges: Dict[str, IPSet] = None
|
|
self.sync_token = None
|
|
|
|
logger.info("Loading AWS IP ranges from disk")
|
|
aws_ip_ranges_data = _get_aws_ip_ranges()
|
|
if aws_ip_ranges_data is not None and aws_ip_ranges_data.get("syncToken"):
|
|
logger.debug("Building AWS IP ranges")
|
|
self.amazon_ranges = IPResolver._parse_amazon_ranges(aws_ip_ranges_data)
|
|
self.sync_token = aws_ip_ranges_data["syncToken"]
|
|
logger.debug("Finished building AWS IP ranges")
|
|
|
|
@ttl_cache(maxsize=100, ttl=600)
|
|
def is_ip_possible_threat(self, ip_address):
|
|
if self.app.config.get("THREAT_NAMESPACE_MAXIMUM_BUILD_COUNT") is None:
|
|
return False
|
|
|
|
if self.app.config.get("IP_DATA_API_KEY") is None:
|
|
return False
|
|
|
|
if not ip_address:
|
|
return False
|
|
|
|
api_key = self.app.config["IP_DATA_API_KEY"]
|
|
|
|
try:
|
|
logger.debug("Requesting IP data for IP %s", ip_address)
|
|
r = requests.get(
|
|
"https://api.ipdata.co/%s/threat?api-key=%s" % (ip_address, api_key), timeout=1
|
|
)
|
|
if r.status_code != 200:
|
|
logger.debug("Got non-200 response for IP %s: %s", ip_address, r.status_code)
|
|
return False
|
|
|
|
logger.debug("Got IP data for IP %s: %s => %s", ip_address, r.status_code, r.json())
|
|
threat_data = r.json()
|
|
return threat_data.get("is_threat", False) or threat_data.get("is_bogon", False)
|
|
except requests.RequestException:
|
|
logger.exception("Got exception when trying to lookup IP Address")
|
|
except ValueError:
|
|
logger.exception("Got exception when trying to lookup IP Address")
|
|
except Exception:
|
|
logger.exception("Got exception when trying to lookup IP Address")
|
|
|
|
return False
|
|
|
|
def resolve_ip(self, ip_address):
|
|
"""
|
|
Attempts to return resolved information about the specified IP Address.
|
|
|
|
If such an attempt fails, returns None.
|
|
"""
|
|
if not ip_address:
|
|
return None
|
|
|
|
try:
|
|
parsed_ip = IPAddress(ip_address)
|
|
except AddrFormatError:
|
|
return ResolvedLocation("invalid_ip", None, self.sync_token, None, None, None)
|
|
|
|
# Try geoip classification
|
|
try:
|
|
geoinfo = self.geoip_db.country(ip_address)
|
|
except geoip2.errors.AddressNotFoundError:
|
|
geoinfo = None
|
|
|
|
aws_region = self.get_aws_ip_region(parsed_ip)
|
|
|
|
# Not an AWS IP
|
|
if not aws_region:
|
|
if geoinfo:
|
|
return ResolvedLocation(
|
|
"internet",
|
|
geoinfo.country.iso_code,
|
|
self.sync_token,
|
|
geoinfo.country.iso_code,
|
|
None,
|
|
geoinfo.continent.code,
|
|
)
|
|
|
|
return ResolvedLocation("internet", None, self.sync_token, None, None, None)
|
|
|
|
return ResolvedLocation(
|
|
"aws",
|
|
None,
|
|
self.sync_token,
|
|
geoinfo.country.iso_code if geoinfo else None,
|
|
aws_region,
|
|
geoinfo.continent.code if geoinfo else None,
|
|
)
|
|
|
|
def get_aws_ip_region(self, ip_address: IPAddress):
|
|
if not self.amazon_ranges:
|
|
return None
|
|
|
|
for region in self.amazon_ranges:
|
|
ipset = self.amazon_ranges[region]
|
|
if ip_address in ipset:
|
|
return region
|
|
return None
|
|
|
|
@staticmethod
|
|
def _parse_amazon_ranges(ranges):
|
|
all_amazon = {}
|
|
for service_description in ranges["prefixes"]:
|
|
region = service_description["region"]
|
|
service = service_description["service"]
|
|
|
|
if region not in all_amazon:
|
|
all_amazon[region] = IPSet()
|
|
|
|
if service in AWS_SERVICES:
|
|
all_amazon[region].add(IPNetwork(service_description["ip_prefix"]))
|
|
|
|
return all_amazon
|