1
0
mirror of https://github.com/matrix-org/matrix-authentication-service.git synced 2025-11-20 12:02:22 +03:00

Proactively provision devices & delete them when using the compat Matrix API

This commit is contained in:
Quentin Gliech
2023-04-04 18:15:02 +02:00
parent 8a2be43fe7
commit 1974786209
5 changed files with 192 additions and 24 deletions

View File

@@ -26,15 +26,15 @@ use mas_axum_utils::axum::{
headers::{Authorization, HeaderMapExt},
http::{Request, StatusCode},
};
use mas_http::HttpServiceExt;
use mas_http::{EmptyBody, HttpServiceExt};
use mas_storage::{
job::{JobWithSpanContext, ProvisionUserJob},
job::{DeleteDeviceJob, JobWithSpanContext, ProvisionDeviceJob, ProvisionUserJob},
user::{UserEmailRepository, UserRepository},
RepositoryAccess,
};
use serde::{Deserialize, Serialize};
use tower::{Service, ServiceExt};
use tracing::{info, info_span, Instrument};
use tracing::info;
use url::Url;
use crate::{layers::TracingLayer, JobContextExt, State};
@@ -85,6 +85,9 @@ struct UserRequest {
pub external_ids: Vec<ExternalID>,
}
/// Job to provision a user on the Matrix homeserver.
/// This works by doing a PUT request to the /_synapse/admin/v2/users/{user_id}
/// endpoint.
#[tracing::instrument(
name = "job.provision_user"
fields(user.id = %job.user_id()),
@@ -98,7 +101,7 @@ async fn provision_user(
let state = ctx.state();
let matrix = state.matrix_connection();
let mut client = state
.http_client("provision-matrix-user")
.http_client("matrx.provision_user")
.await?
.request_bytes_to_body()
.json_request();
@@ -110,7 +113,12 @@ async fn provision_user(
.await?
.context("User not found")?;
let mxid = format!("@{}:{}", user.username, matrix.homeserver);
// XXX: there is a lot that could go wrong in terms of encoding here
let mxid = format!(
"@{localpart}:{homeserver}",
localpart = user.username,
homeserver = matrix.homeserver
);
let three_pids = repo
.user_email()
@@ -142,21 +150,15 @@ async fn provision_user(
repo.cancel().await?;
let mut req = Request::put(
matrix
.endpoint
.join("_synapse/admin/v2/users/")?
.join(&mxid)?
.as_str(),
);
let path = format!("_synapse/admin/v2/users/{user_id}", user_id = mxid,);
let mut req = Request::put(matrix.endpoint.join(&path)?.as_str());
req.headers_mut()
.context("Failed to get headers")?
.typed_insert(Authorization::bearer(&matrix.access_token)?);
let req = req.body(body).context("Failed to build request")?;
let span = info_span!("matrix.provision_user", %mxid);
let response = client.ready().await?.call(req).instrument(span).await?;
let response = client.ready().await?.call(req).await?;
match response.status() {
StatusCode::CREATED => info!(%user.id, %mxid, "User created"),
@@ -168,6 +170,130 @@ async fn provision_user(
Ok(())
}
#[derive(Serialize, Deserialize)]
struct DeviceRequest {
device_id: String,
}
/// Job to provision a device on the Matrix homeserver.
/// This works by doing a POST request to the
/// /_synapse/admin/v2/users/{user_id}/devices endpoint.
#[tracing::instrument(
name = "job.provision_device"
fields(
user.id = %job.user_id(),
device.id = %job.device_id(),
),
skip_all,
err(Debug),
)]
async fn provision_device(
job: JobWithSpanContext<ProvisionDeviceJob>,
ctx: JobContext,
) -> Result<(), anyhow::Error> {
let state = ctx.state();
let matrix = state.matrix_connection();
let mut client = state
.http_client("matrix.provision_device")
.await?
.request_bytes_to_body()
.json_request();
let mut repo = state.repository().await?;
let user = repo
.user()
.lookup(job.user_id())
.await?
.context("User not found")?;
// XXX: there is a lot that could go wrong in terms of encoding here
let mxid = format!(
"@{localpart}:{homeserver}",
localpart = user.username,
homeserver = matrix.homeserver
);
let path = format!("_synapse/admin/v2/users/{user_id}/devices", user_id = mxid);
let mut req = Request::post(matrix.endpoint.join(&path)?.as_str());
req.headers_mut()
.context("Failed to get headers")?
.typed_insert(Authorization::bearer(&matrix.access_token)?);
let req = req
.body(DeviceRequest {
device_id: job.device_id().to_owned(),
})
.context("Failed to build request")?;
let response = client.ready().await?.call(req).await?;
match response.status() {
StatusCode::CREATED => {
info!(%user.id, %mxid, device.id = job.device_id(), "Device created")
}
code => anyhow::bail!("Failed to provision device. Status code: {code}"),
}
Ok(())
}
/// Job to delete a device from a user's account.
/// This works by doing a DELETE request to the
/// /_synapse/admin/v2/users/{user_id}/devices/{device_id} endpoint.
#[tracing::instrument(
name = "job.delete_device"
fields(
user.id = %job.user_id(),
device.id = %job.device_id(),
),
skip_all,
err(Debug),
)]
async fn delete_device(
job: JobWithSpanContext<DeleteDeviceJob>,
ctx: JobContext,
) -> Result<(), anyhow::Error> {
let state = ctx.state();
let matrix = state.matrix_connection();
let mut client = state.http_client("matrix.delete_device").await?;
let mut repo = state.repository().await?;
let user = repo
.user()
.lookup(job.user_id())
.await?
.context("User not found")?;
// XXX: there is a lot that could go wrong in terms of encoding here
let mxid = format!(
"@{localpart}:{homeserver}",
localpart = user.username,
homeserver = matrix.homeserver
);
let path = format!(
"_synapse/admin/v2/users/{mxid}/devices/{device_id}",
device_id = job.device_id()
);
let mut req = Request::delete(matrix.endpoint.join(&path)?.as_str());
req.headers_mut()
.context("Failed to get headers")?
.typed_insert(Authorization::bearer(&matrix.access_token)?);
let req = req
.body(EmptyBody::new())
.context("Failed to build request")?;
let response = client.ready().await?.call(req).await?;
match response.status() {
StatusCode::OK => info!(%user.id, %mxid, "Device deleted"),
code => anyhow::bail!("Failed to delete device. Status code: {code}"),
};
Ok(())
}
pub(crate) fn register(
suffix: &str,
monitor: Monitor<TokioExecutor>,
@@ -175,10 +301,30 @@ pub(crate) fn register(
) -> Monitor<TokioExecutor> {
let storage = state.store();
let worker_name = format!("{job}-{suffix}", job = ProvisionUserJob::NAME);
let worker = WorkerBuilder::new(worker_name)
let provision_user_worker = WorkerBuilder::new(worker_name)
.layer(state.inject())
.layer(TracingLayer::new())
.with_storage(storage)
.build(job_fn(provision_user));
monitor.register(worker)
let storage = state.store();
let worker_name = format!("{job}-{suffix}", job = ProvisionDeviceJob::NAME);
let provision_device_worker = WorkerBuilder::new(worker_name)
.layer(state.inject())
.layer(TracingLayer::new())
.with_storage(storage)
.build(job_fn(provision_device));
let storage = state.store();
let worker_name = format!("{job}-{suffix}", job = DeleteDeviceJob::NAME);
let delete_device_worker = WorkerBuilder::new(worker_name)
.layer(state.inject())
.layer(TracingLayer::new())
.with_storage(storage)
.build(job_fn(delete_device));
monitor
.register(provision_user_worker)
.register(provision_device_worker)
.register(delete_device_worker)
}