1
0
mirror of https://github.com/quay/quay.git synced 2026-01-26 06:21:37 +03:00
Files
quay/workers/reconciliationworker.py
Jordi Piriz 42e1941229 refactor: Optimize reconciliation worker API calls and improve metrics (PROJQUAY-8960) (#4380)
Improve reconciliation worker lookup_subscription api calls
2025-10-21 13:12:17 -04:00

297 lines
11 KiB
Python

import logging
import time
from contextlib import contextmanager
from prometheus_client import Counter, Gauge, Histogram
import features
from app import app
from app import billing as stripe
from app import marketplace_subscriptions, marketplace_users
from data import model
from data.billing import RH_SKUS, get_plan
from util.locking import GlobalLock, LockNotAcquiredException
from util.marketplace import MarketplaceApiException
from workers.gunicorn_worker import GunicornWorker
from workers.namespacegcworker import LOCK_TIMEOUT_PADDING
from workers.worker import Worker
logger = logging.getLogger(__name__)
RECONCILIATION_TIMEOUT = 5 * 60 # 5min
RECONCILIATION_FREQUENCY = 5 * 60 # run reconciliation every 5 min
MILLISECONDS_IN_SECONDS = 1000
SECONDS_IN_DAYS = 86400
ONE_MONTH = 30 * SECONDS_IN_DAYS * MILLISECONDS_IN_SECONDS
FREE_TIER_SKU = "MW04192"
# Prometheus metrics
# Job execution metrics
reconciliation_runs_total = Counter(
"quay_reconciliation_runs_total",
"total number of reconciliation runs completed",
labelnames=["status"], # success, failed
)
reconciliation_duration_seconds = Gauge(
"quay_reconciliation_duration_seconds", "duration of last reconciliation run in seconds"
)
reconciliation_last_success_timestamp = Gauge(
"quay_reconciliation_last_success_timestamp", "timestamp of last successful reconciliation"
)
# User state metrics
reconciliation_users_total = Gauge(
"quay_reconciliation_users_total", "total number of active users in the database"
)
reconciliation_users_processed = Counter(
"quay_reconciliation_users_processed_total",
"total number of users successfully processed across all reconciliation runs",
)
reconciliation_users_not_processed = Counter(
"quay_reconciliation_users_not_processed_total",
"total number of users not processed across all reconciliation runs",
labelnames=["reason"],
)
# API call metrics
reconciliation_api_calls = Counter(
"quay_reconciliation_api_calls_total",
"number of API calls made during reconciliation",
labelnames=["api", "operation"],
)
reconciliation_api_call_duration = Histogram(
"quay_reconciliation_api_call_duration_seconds",
"time taken for API calls during reconciliation",
labelnames=["api", "operation"],
buckets=(1.0, 5.0, 20.0), # Track fast (<1s), slow (1-5s), very slow/timeout (>5s)
)
reconciliation_api_call_errors = Counter(
"quay_reconciliation_api_call_errors_total",
"number of API call errors during reconciliation",
labelnames=["api", "operation"],
)
@contextmanager
def track_api_call(api, operation):
"""
Context manager to track API call count, duration, and errors.
Usage:
with track_api_call("marketplace_user", "lookup_customer_id"):
result = user_api.lookup_customer_id(email)
"""
reconciliation_api_calls.labels(api=api, operation=operation).inc()
start_time = time.time()
try:
yield
except Exception:
reconciliation_api_call_errors.labels(api=api, operation=operation).inc()
raise
finally:
duration = time.time() - start_time
reconciliation_api_call_duration.labels(api=api, operation=operation).observe(duration)
def fetch_stripe_customer_plan(customer_id):
"""
Fetch plan details for a Stripe customer.
Args:
customer_id: Stripe customer ID
Returns:
dict: Plan details (including 'rh_sku') or None if customer has no subscription
Raises:
stripe.error.APIConnectionError: Cannot connect to Stripe
stripe.error.InvalidRequestError: Invalid customer ID
"""
customer = stripe.Customer.retrieve(customer_id)
# Check if customer has a subscription
if not hasattr(customer, "subscription") or customer.subscription is None:
return None
# Check if subscription has a plan
if not hasattr(customer.subscription, "plan") or customer.subscription.plan is None:
return None
return get_plan(customer.subscription.plan.id)
class ReconciliationWorker(Worker):
def __init__(self):
super(ReconciliationWorker, self).__init__()
self.add_operation(
self._reconcile_entitlements,
app.config.get("RECONCILIATION_FREQUENCY", RECONCILIATION_FREQUENCY),
)
def _reconcile_entitlements(self, skip_lock_for_testing=False):
"""
Performs reconciliation for user entitlements
"""
# try to acquire lock
if skip_lock_for_testing:
self._perform_reconciliation(
user_api=marketplace_users, marketplace_api=marketplace_subscriptions
)
else:
try:
with GlobalLock(
"RECONCILIATION_WORKER",
lock_ttl=app.config.get("RECONCILIATION_FREQUENCY", RECONCILIATION_FREQUENCY)
+ LOCK_TIMEOUT_PADDING,
):
self._perform_reconciliation(
user_api=marketplace_users, marketplace_api=marketplace_subscriptions
)
except LockNotAcquiredException:
logger.debug("Could not acquire global lock for entitlement reconciliation")
print(str(LockNotAcquiredException))
def _perform_reconciliation(self, user_api, marketplace_api):
"""
Core reconciliation logic.
"""
start_time = time.time()
try:
logger.info("Reconciliation worker looking to create new subscriptions...")
users = model.user.get_active_users(include_orgs=True)
# Set gauge to current user count
reconciliation_users_total.set(len(users))
for user in users:
if user.email is None or user.email == "":
reconciliation_users_not_processed.labels(reason="missing_email").inc()
logger.info("Email missing or empty for user %s", user.username)
continue
customer_ids = []
try:
with track_api_call("marketplace_user", "lookup_customer_id"):
customer_ids = user_api.lookup_customer_id(user.email, raise_exception=True)
except MarketplaceApiException as e:
logger.error("Failed to lookup customer ID for %s: %s", user.email, str(e))
if not customer_ids:
reconciliation_users_not_processed.labels(reason="no_customer_ids").inc()
logger.info("No web customer ids found for %s", user.email)
continue
# Check if user has a Stripe subscription with a valid plan
plan = None
if user.stripe_id:
try:
plan = fetch_stripe_customer_plan(user.stripe_id)
except stripe.error.StripeError as e:
logger.error("Stripe error for user %s: %s", user.username, str(e))
reconciliation_users_not_processed.labels(reason="stripe_error").inc()
reconciliation_api_calls.labels(
api="stripe", operation="fetch_customer_plan"
).inc()
continue
if plan:
self._reconcile_paying_user(user, customer_ids, plan, marketplace_api)
else:
self._reconcile_free_user(user, customer_ids, marketplace_api)
reconciliation_users_processed.inc()
logger.debug("Finished work for user %s", user.username)
logger.info("Reconciliation worker is done")
# Mark success
reconciliation_runs_total.labels(status="success").inc()
reconciliation_last_success_timestamp.set(time.time())
except Exception:
logger.exception("Reconciliation run failed")
reconciliation_runs_total.labels(status="failed").inc()
raise
finally:
duration = time.time() - start_time
reconciliation_duration_seconds.set(duration)
def _reconcile_paying_user(self, user, customer_ids, plan, marketplace_api):
"""
Given a user with a paid Stripe plan, create any missing entitlements.
Args:
user: User object
customer_ids: List of marketplace customer IDs
plan: Plan dict containing 'rh_sku' key
marketplace_api: Marketplace API client
"""
for customer_id in customer_ids:
rh_sku = plan.get("rh_sku")
try:
if not self._customer_id_has_sku(customer_id, rh_sku, marketplace_api):
self._create_entitlement(customer_id, rh_sku, marketplace_api)
except MarketplaceApiException as e:
logger.error(
"Failed to reconcile customer id (Free user). User:%s, customer_id: %s, error: %s",
user.username,
customer_id,
str(e),
)
# Not-implemented: Remove free tier sku entitlement for paying customers
def _reconcile_free_user(self, user, customer_ids, marketplace_api):
"""
Given a non-paying user, ensure they have the free-tier entitlement
"""
for customer_id in customer_ids:
try:
if not self._customer_id_has_sku(customer_id, FREE_TIER_SKU, marketplace_api):
self._create_entitlement(customer_id, FREE_TIER_SKU, marketplace_api)
except MarketplaceApiException as e:
logger.error(
"Failed to reconcile customer id. User:%s, customer_id: %s, error: %s",
user.username,
customer_id,
str(e),
)
# Not-implemented: Remove duplicated free tier sku entitlements.
def _create_entitlement(self, customer_id, sku, marketplace_api):
with track_api_call("marketplace_subscription", "create_entitlement"):
result = marketplace_api.create_entitlement(customer_id, sku, raise_exception=True)
if result and result.ok:
logger.info("Entitlement created. customer_id: %s, sku: %s", customer_id, sku)
else:
logger.error(
"Failed to create Entitlement. customer_id: %s, sku: %s", customer_id, sku
)
def _customer_id_has_sku(self, customer_id, sku, marketplace_api):
with track_api_call("marketplace_subscription", "lookup_subscription"):
subs = marketplace_api.lookup_subscription(customer_id, sku)
return subs is not None and len(subs) > 0
def create_gunicorn_worker():
"""
Follows the gunicorn application factory pattern, enabling
a quay worker to run as a gunicorn worker thread
"""
worker = GunicornWorker(
__name__, app, ReconciliationWorker(), features.ENTITLEMENT_RECONCILIATION
)
return worker
if __name__ == "__main__":
if not features.ENTITLEMENT_RECONCILIATION:
logger.debug("Reconciliation worker disabled; skipping")
while True:
time.sleep(1000)
GlobalLock.configure(app.config)
logger.debug("Starting reconciliation worker")
worker = ReconciliationWorker()
worker.start()