You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-07-29 22:01:14 +03:00
WIP: upstream OIDC provider support
This commit is contained in:
@ -27,8 +27,8 @@ serde_json = "1.0.89"
|
||||
serde_urlencoded = "0.7.1"
|
||||
thiserror = "1.0.37"
|
||||
tokio = { version = "1.22.0", features = ["sync", "parking_lot"], optional = true }
|
||||
tower = { version = "0.4.13", features = ["timeout", "limit"] }
|
||||
tower-http = { version = "0.3.5", features = ["follow-redirect", "decompression-full", "set-header", "compression-full", "cors", "util"] }
|
||||
tower = { version = "0.4.13", features = ["limit"] }
|
||||
tower-http = { version = "0.3.5", features = ["timeout", "follow-redirect", "decompression-full", "set-header", "compression-full", "cors", "util"] }
|
||||
tracing = "0.1.37"
|
||||
tracing-opentelemetry = "0.18.0"
|
||||
webpki = { version = "0.22.0", optional = true }
|
||||
|
@ -16,7 +16,6 @@ use std::{convert::Infallible, net::SocketAddr};
|
||||
|
||||
use bytes::Bytes;
|
||||
use http::{Request, Response};
|
||||
use http_body::{combinators::BoxBody, Body};
|
||||
use hyper::{
|
||||
client::{
|
||||
connect::dns::{GaiResolver, Name},
|
||||
@ -26,14 +25,11 @@ use hyper::{
|
||||
};
|
||||
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
|
||||
use thiserror::Error;
|
||||
use tower::{
|
||||
util::{MapErrLayer, MapResponseLayer},
|
||||
Layer, Service,
|
||||
};
|
||||
use tower::{Layer, Service};
|
||||
|
||||
use crate::{
|
||||
layers::{
|
||||
client::{ClientLayer, ClientResponse},
|
||||
client::ClientLayer,
|
||||
otel::{TraceDns, TraceLayer},
|
||||
},
|
||||
BoxCloneSyncService, BoxError,
|
||||
@ -229,32 +225,20 @@ where
|
||||
}
|
||||
|
||||
/// Create a traced HTTP client, with a default timeout, which follows redirects
|
||||
/// and handles compression
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error if it failed to initialize
|
||||
pub async fn client<B, E>(
|
||||
operation: &'static str,
|
||||
) -> Result<
|
||||
BoxCloneSyncService<Request<B>, Response<BoxBody<bytes::Bytes, ClientError>>, ClientError>,
|
||||
ClientInitError,
|
||||
>
|
||||
) -> Result<BoxCloneSyncService<Request<B>, Response<hyper::Body>, hyper::Error>, ClientInitError>
|
||||
where
|
||||
B: http_body::Body<Data = Bytes, Error = E> + Default + Send + 'static,
|
||||
E: Into<BoxError> + 'static,
|
||||
{
|
||||
let client = make_traced_client().await?;
|
||||
|
||||
let layer = (
|
||||
// Convert the errors to ClientError to help dealing with them
|
||||
MapErrLayer::new(ClientError::from),
|
||||
MapResponseLayer::new(|r: ClientResponse<hyper::Body>| {
|
||||
r.map(|body| body.map_err(ClientError::from).boxed())
|
||||
}),
|
||||
ClientLayer::new(operation),
|
||||
);
|
||||
let client = BoxCloneSyncService::new(layer.layer(client));
|
||||
let client = ClientLayer::new(operation).layer(client);
|
||||
|
||||
Ok(client)
|
||||
Ok(BoxCloneSyncService::new(client))
|
||||
}
|
||||
|
@ -38,6 +38,14 @@ impl<S, B> Error<S, B> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> Error<E, E> {
|
||||
pub fn unify(self) -> E {
|
||||
match self {
|
||||
Self::Service { inner } | Self::Body { inner } => inner,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BodyToBytesResponse<S> {
|
||||
inner: S,
|
||||
|
@ -17,13 +17,12 @@ use std::{marker::PhantomData, time::Duration};
|
||||
use http::{header::USER_AGENT, HeaderValue, Request, Response};
|
||||
use tower::{
|
||||
limit::{ConcurrencyLimit, ConcurrencyLimitLayer},
|
||||
timeout::{Timeout, TimeoutLayer},
|
||||
Layer, Service,
|
||||
};
|
||||
use tower_http::{
|
||||
decompression::{Decompression, DecompressionBody, DecompressionLayer},
|
||||
follow_redirect::{FollowRedirect, FollowRedirectLayer},
|
||||
set_header::{SetRequestHeader, SetRequestHeaderLayer},
|
||||
timeout::{Timeout, TimeoutLayer},
|
||||
};
|
||||
|
||||
use super::otel::TraceLayer;
|
||||
@ -48,9 +47,6 @@ impl<B> ClientLayer<B> {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub type ClientResponse<B> = Response<DecompressionBody<B>>;
|
||||
|
||||
impl<ReqBody, ResBody, S, E> Layer<S> for ClientLayer<ReqBody>
|
||||
where
|
||||
S: Service<Request<ReqBody>, Response = Response<ResBody>, Error = E>
|
||||
@ -63,21 +59,14 @@ where
|
||||
S::Future: Send + 'static,
|
||||
E: Into<BoxError>,
|
||||
{
|
||||
type Service = Decompression<
|
||||
SetRequestHeader<
|
||||
TraceHttpClient<ConcurrencyLimit<FollowRedirect<TraceHttpClient<Timeout<S>>>>>,
|
||||
HeaderValue,
|
||||
>,
|
||||
type Service = SetRequestHeader<
|
||||
TraceHttpClient<ConcurrencyLimit<FollowRedirect<TraceHttpClient<Timeout<S>>>>>,
|
||||
HeaderValue,
|
||||
>;
|
||||
|
||||
fn layer(&self, inner: S) -> Self::Service {
|
||||
// Note that most layers here just forward the error type. Two notables
|
||||
// exceptions are:
|
||||
// - the TimeoutLayer
|
||||
// - the DecompressionLayer
|
||||
// Those layers do type erasure of the error.
|
||||
// Note that all layers here just forward the error type.
|
||||
(
|
||||
DecompressionLayer::new(),
|
||||
SetRequestHeaderLayer::overriding(USER_AGENT, MAS_USER_AGENT.clone()),
|
||||
// A trace that has the whole operation, with all the redirects, timeouts and rate
|
||||
// limits in it
|
||||
|
Reference in New Issue
Block a user