diff --git a/crates/cli/src/util.rs b/crates/cli/src/util.rs index 7284e6197..c181cd15c 100644 --- a/crates/cli/src/util.rs +++ b/crates/cli/src/util.rs @@ -17,7 +17,7 @@ use mas_data_model::{SessionExpirationConfig, SiteConfig}; use mas_email::{MailTransport, Mailer}; use mas_handlers::passwords::PasswordManager; use mas_matrix::{HomeserverConnection, ReadOnlyHomeserverConnection}; -use mas_matrix_synapse::SynapseConnection; +use mas_matrix_synapse::{LegacySynapseConnection, SynapseConnection}; use mas_policy::PolicyFactory; use mas_router::UrlBuilder; use mas_storage::{BoxRepositoryFactory, RepositoryAccess, RepositoryFactory}; @@ -469,14 +469,22 @@ pub fn homeserver_connection_from_config( http_client: reqwest::Client, ) -> Arc { match config.kind { - HomeserverKind::Synapse => Arc::new(SynapseConnection::new( + HomeserverKind::Synapse | HomeserverKind::SynapseLegacy => { + Arc::new(LegacySynapseConnection::new( + config.homeserver.clone(), + config.endpoint.clone(), + config.secret.clone(), + http_client, + )) + } + HomeserverKind::SynapseModern => Arc::new(SynapseConnection::new( config.homeserver.clone(), config.endpoint.clone(), config.secret.clone(), http_client, )), HomeserverKind::SynapseReadOnly => { - let connection = SynapseConnection::new( + let connection = LegacySynapseConnection::new( config.homeserver.clone(), config.endpoint.clone(), config.secret.clone(), diff --git a/crates/config/src/sections/matrix.rs b/crates/config/src/sections/matrix.rs index 1cead9ffd..e035b7d79 100644 --- a/crates/config/src/sections/matrix.rs +++ b/crates/config/src/sections/matrix.rs @@ -27,15 +27,25 @@ fn default_endpoint() -> Url { #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)] #[serde(rename_all = "snake_case")] pub enum HomeserverKind { - /// Homeserver is Synapse + /// Homeserver is Synapse, using the legacy API + /// + /// This will switch to using the modern API in a few releases. #[default] Synapse, - /// Homeserver is Synapse, in read-only mode + /// Homeserver is Synapse, using the legacy API, in read-only mode /// /// This is meant for testing rolling out Matrix Authentication Service with /// no risk of writing data to the homeserver. + /// + /// This will switch to using the modern API in a few releases. SynapseReadOnly, + + /// Homeserver is Synapse, using the legacy API, + SynapseLegacy, + + /// Homeserver is Synapse, with the modern API available + SynapseModern, } /// Configuration related to the Matrix homeserver diff --git a/crates/handlers/src/admin/v1/users/add.rs b/crates/handlers/src/admin/v1/users/add.rs index 299012d91..d67737526 100644 --- a/crates/handlers/src/admin/v1/users/add.rs +++ b/crates/handlers/src/admin/v1/users/add.rs @@ -166,10 +166,7 @@ pub async fn handler( let user = repo.user().add(&mut rng, &clock, params.username).await?; homeserver - .provision_user(&ProvisionRequest::new( - homeserver.mxid(&user.username), - &user.sub, - )) + .provision_user(&ProvisionRequest::new(&user.username, &user.sub)) .await .map_err(RouteError::Homeserver)?; @@ -222,8 +219,7 @@ mod tests { assert_eq!(user.username, "alice"); // Check that the user was created on the homeserver - let mxid = state.homeserver_connection.mxid("alice"); - let result = state.homeserver_connection.query_user(&mxid).await; + let result = state.homeserver_connection.query_user("alice").await; assert!(result.is_ok()); } diff --git a/crates/handlers/src/admin/v1/users/reactivate.rs b/crates/handlers/src/admin/v1/users/reactivate.rs index 37b38c6b6..0be687a39 100644 --- a/crates/handlers/src/admin/v1/users/reactivate.rs +++ b/crates/handlers/src/admin/v1/users/reactivate.rs @@ -83,9 +83,8 @@ pub async fn handler( .ok_or(RouteError::NotFound(id))?; // Call the homeserver synchronously to reactivate the user - let mxid = homeserver.mxid(&user.username); homeserver - .reactivate_user(&mxid) + .reactivate_user(&user.username) .await .map_err(RouteError::Homeserver)?; @@ -127,20 +126,23 @@ mod tests { // Provision and immediately deactivate the user on the homeserver, // because this endpoint will try to reactivate it - let mxid = state.homeserver_connection.mxid(&user.username); state .homeserver_connection - .provision_user(&ProvisionRequest::new(&mxid, &user.sub)) + .provision_user(&ProvisionRequest::new(&user.username, &user.sub)) .await .unwrap(); state .homeserver_connection - .delete_user(&mxid, true) + .delete_user(&user.username, true) .await .unwrap(); // The user should be deactivated on the homeserver - let mx_user = state.homeserver_connection.query_user(&mxid).await.unwrap(); + let mx_user = state + .homeserver_connection + .query_user(&user.username) + .await + .unwrap(); assert!(mx_user.deactivated); let request = Request::post(format!("/api/admin/v1/users/{}/reactivate", user.id)) @@ -176,10 +178,9 @@ mod tests { repo.save().await.unwrap(); // Provision the user on the homeserver - let mxid = state.homeserver_connection.mxid(&user.username); state .homeserver_connection - .provision_user(&ProvisionRequest::new(&mxid, &user.sub)) + .provision_user(&ProvisionRequest::new(&user.username, &user.sub)) .await .unwrap(); diff --git a/crates/handlers/src/admin/v1/users/unlock.rs b/crates/handlers/src/admin/v1/users/unlock.rs index 5584f4a69..944dd77f4 100644 --- a/crates/handlers/src/admin/v1/users/unlock.rs +++ b/crates/handlers/src/admin/v1/users/unlock.rs @@ -112,10 +112,9 @@ mod tests { // Also provision the user on the homeserver, because this endpoint will try to // reactivate it - let mxid = state.homeserver_connection.mxid(&user.username); state .homeserver_connection - .provision_user(&ProvisionRequest::new(&mxid, &user.sub)) + .provision_user(&ProvisionRequest::new(&user.username, &user.sub)) .await .unwrap(); @@ -149,21 +148,24 @@ mod tests { repo.save().await.unwrap(); // Provision the user on the homeserver - let mxid = state.homeserver_connection.mxid(&user.username); state .homeserver_connection - .provision_user(&ProvisionRequest::new(&mxid, &user.sub)) + .provision_user(&ProvisionRequest::new(&user.username, &user.sub)) .await .unwrap(); // but then deactivate it state .homeserver_connection - .delete_user(&mxid, true) + .delete_user(&user.username, true) .await .unwrap(); // The user should be deactivated on the homeserver - let mx_user = state.homeserver_connection.query_user(&mxid).await.unwrap(); + let mx_user = state + .homeserver_connection + .query_user(&user.username) + .await + .unwrap(); assert!(mx_user.deactivated); let request = Request::post(format!("/api/admin/v1/users/{}/unlock", user.id)) @@ -182,7 +184,11 @@ mod tests { body["data"]["attributes"]["deactivated_at"], serde_json::json!(state.clock.now()) ); - let mx_user = state.homeserver_connection.query_user(&mxid).await.unwrap(); + let mx_user = state + .homeserver_connection + .query_user(&user.username) + .await + .unwrap(); assert!(mx_user.deactivated); } diff --git a/crates/handlers/src/compat/login.rs b/crates/handlers/src/compat/login.rs index 75b96417e..d75b4f8bd 100644 --- a/crates/handlers/src/compat/login.rs +++ b/crates/handlers/src/compat/login.rs @@ -411,7 +411,11 @@ pub(crate) async fn post( // Now we can create the device on the homeserver, without holding the // transaction if let Err(err) = homeserver - .create_device(&user_id, device.as_str(), session.human_name.as_deref()) + .upsert_device( + &user.username, + device.as_str(), + session.human_name.as_deref(), + ) .await { // Something went wrong, let's end this session and schedule a device sync @@ -829,10 +833,9 @@ mod tests { .add(&mut rng, &state.clock, &user, version, hash, None) .await .unwrap(); - let mxid = state.homeserver_connection.mxid(&user.username); state .homeserver_connection - .provision_user(&ProvisionRequest::new(mxid, &user.sub)) + .provision_user(&ProvisionRequest::new(&user.username, &user.sub)) .await .unwrap(); @@ -1133,10 +1136,9 @@ mod tests { .await .unwrap(); - let mxid = state.homeserver_connection.mxid(&user.username); state .homeserver_connection - .provision_user(&ProvisionRequest::new(mxid, &user.sub)) + .provision_user(&ProvisionRequest::new(&user.username, &user.sub)) .await .unwrap(); @@ -1239,10 +1241,9 @@ mod tests { let user = repo.user().lock(&state.clock, user).await.unwrap(); repo.save().await.unwrap(); - let mxid = state.homeserver_connection.mxid(&user.username); state .homeserver_connection - .provision_user(&ProvisionRequest::new(mxid, &user.sub)) + .provision_user(&ProvisionRequest::new(&user.username, &user.sub)) .await .unwrap(); diff --git a/crates/handlers/src/graphql/model/matrix.rs b/crates/handlers/src/graphql/model/matrix.rs index 08e583d74..7316c0d63 100644 --- a/crates/handlers/src/graphql/model/matrix.rs +++ b/crates/handlers/src/graphql/model/matrix.rs @@ -27,9 +27,9 @@ impl MatrixUser { conn: &C, user: &str, ) -> Result { - let mxid = conn.mxid(user); + let info = conn.query_user(user).await?; - let info = conn.query_user(&mxid).await?; + let mxid = conn.mxid(user); Ok(MatrixUser { mxid, diff --git a/crates/handlers/src/graphql/mutations/compat_session.rs b/crates/handlers/src/graphql/mutations/compat_session.rs index ec8993942..973d46f05 100644 --- a/crates/handlers/src/graphql/mutations/compat_session.rs +++ b/crates/handlers/src/graphql/mutations/compat_session.rs @@ -187,10 +187,9 @@ impl CompatSessionMutations { .await?; // Update the device on the homeserver side - let mxid = homeserver.mxid(&user.username); if let Some(device) = session.device.as_ref() { homeserver - .update_device_display_name(&mxid, device.as_str(), &input.human_name) + .update_device_display_name(&user.username, device.as_str(), &input.human_name) .await .context("Failed to provision device")?; } diff --git a/crates/handlers/src/graphql/mutations/matrix.rs b/crates/handlers/src/graphql/mutations/matrix.rs index 16c4916c3..f88668e2f 100644 --- a/crates/handlers/src/graphql/mutations/matrix.rs +++ b/crates/handlers/src/graphql/mutations/matrix.rs @@ -93,7 +93,6 @@ impl MatrixMutations { repo.cancel().await?; let conn = state.homeserver_connection(); - let mxid = conn.mxid(&user.username); if let Some(display_name) = &input.display_name { // Let's do some basic validation on the display name @@ -105,11 +104,11 @@ impl MatrixMutations { return Ok(SetDisplayNamePayload::Invalid); } - conn.set_displayname(&mxid, display_name) + conn.set_displayname(&user.username, display_name) .await .context("Failed to set display name")?; } else { - conn.unset_displayname(&mxid) + conn.unset_displayname(&user.username) .await .context("Failed to unset display name")?; } diff --git a/crates/handlers/src/graphql/mutations/oauth2_session.rs b/crates/handlers/src/graphql/mutations/oauth2_session.rs index 0de5b16a2..55723efc5 100644 --- a/crates/handlers/src/graphql/mutations/oauth2_session.rs +++ b/crates/handlers/src/graphql/mutations/oauth2_session.rs @@ -212,11 +212,10 @@ impl OAuth2SessionMutations { repo.user().acquire_lock_for_sync(&user).await?; // Look for devices to provision - let mxid = homeserver.mxid(&user.username); for scope in &*session.scope { if let Some(device) = Device::from_scope_token(scope) { homeserver - .create_device(&mxid, device.as_str(), None) + .upsert_device(&user.username, device.as_str(), None) .await .context("Failed to provision device")?; } @@ -331,11 +330,10 @@ impl OAuth2SessionMutations { .await?; // Update the device on the homeserver side - let mxid = homeserver.mxid(&user.username); for scope in &*session.scope { if let Some(device) = Device::from_scope_token(scope) { homeserver - .update_device_display_name(&mxid, device.as_str(), &input.human_name) + .update_device_display_name(&user.username, device.as_str(), &input.human_name) .await .context("Failed to provision device")?; } diff --git a/crates/handlers/src/graphql/mutations/user.rs b/crates/handlers/src/graphql/mutations/user.rs index 26352db81..f9f5696e7 100644 --- a/crates/handlers/src/graphql/mutations/user.rs +++ b/crates/handlers/src/graphql/mutations/user.rs @@ -586,8 +586,7 @@ impl UserMutations { }; // Call the homeserver synchronously to reactivate the user - let mxid = matrix.mxid(&user.username); - matrix.reactivate_user(&mxid).await?; + matrix.reactivate_user(&user.username).await?; // Now reactivate & unlock the user in our database let user = repo.user().reactivate(user).await?; @@ -654,9 +653,7 @@ impl UserMutations { }; let conn = state.homeserver_connection(); - let mxid = conn.mxid(&user.username); - - conn.allow_cross_signing_reset(&mxid) + conn.allow_cross_signing_reset(&user.username) .await .context("Failed to allow cross-signing reset")?; diff --git a/crates/handlers/src/graphql/tests.rs b/crates/handlers/src/graphql/tests.rs index df4b5b80b..bc5079924 100644 --- a/crates/handlers/src/graphql/tests.rs +++ b/crates/handlers/src/graphql/tests.rs @@ -529,10 +529,9 @@ async fn test_oauth2_client_credentials(pool: PgPool) { // XXX: we don't run the task worker here, so even though the addUser mutation // should have scheduled a job to provision the user, it won't run in the test, // so we need to do it manually - let mxid = state.homeserver_connection.mxid("alice"); state .homeserver_connection - .provision_user(&ProvisionRequest::new(mxid, user_id)) + .provision_user(&ProvisionRequest::new("alice", user_id)) .await .unwrap(); diff --git a/crates/handlers/src/oauth2/introspection.rs b/crates/handlers/src/oauth2/introspection.rs index b1a7a99ea..50c043b04 100644 --- a/crates/handlers/src/oauth2/introspection.rs +++ b/crates/handlers/src/oauth2/introspection.rs @@ -634,10 +634,9 @@ mod tests { .await .unwrap(); - let mxid = state.homeserver_connection.mxid(&user.username); state .homeserver_connection - .provision_user(&ProvisionRequest::new(mxid, &user.sub)) + .provision_user(&ProvisionRequest::new(&user.username, &user.sub)) .await .unwrap(); @@ -835,10 +834,9 @@ mod tests { .await .unwrap(); - let mxid = state.homeserver_connection.mxid(&user.username); state .homeserver_connection - .provision_user(&ProvisionRequest::new(mxid, &user.sub)) + .provision_user(&ProvisionRequest::new(&user.username, &user.sub)) .await .unwrap(); diff --git a/crates/handlers/src/oauth2/token.rs b/crates/handlers/src/oauth2/token.rs index 1634e8f2e..768f79d35 100644 --- a/crates/handlers/src/oauth2/token.rs +++ b/crates/handlers/src/oauth2/token.rs @@ -575,11 +575,14 @@ async fn authorization_code_grant( .await?; // Look for device to provision - let mxid = homeserver.mxid(&browser_session.user.username); for scope in &*session.scope { if let Some(device) = Device::from_scope_token(scope) { homeserver - .create_device(&mxid, device.as_str(), Some(&device_name)) + .upsert_device( + &browser_session.user.username, + device.as_str(), + Some(&device_name), + ) .await .map_err(RouteError::ProvisionDeviceFailed)?; } @@ -951,11 +954,10 @@ async fn device_code_grant( .await?; // Look for device to provision - let mxid = homeserver.mxid(&browser_session.user.username); for scope in &*session.scope { if let Some(device) = Device::from_scope_token(scope) { homeserver - .create_device(&mxid, device.as_str(), None) + .upsert_device(&browser_session.user.username, device.as_str(), None) .await .map_err(RouteError::ProvisionDeviceFailed)?; } diff --git a/crates/matrix-synapse/src/error.rs b/crates/matrix-synapse/src/error.rs index 01f01a3ce..c1d98ccd1 100644 --- a/crates/matrix-synapse/src/error.rs +++ b/crates/matrix-synapse/src/error.rs @@ -9,6 +9,16 @@ use async_trait::async_trait; use serde::Deserialize; use thiserror::Error; +/// Encountered when trying to register a user ID which has been taken. +/// — +pub(crate) const M_USER_IN_USE: &str = "M_USER_IN_USE"; +/// Encountered when trying to register a user ID which is not valid. +/// — +pub(crate) const M_INVALID_USERNAME: &str = "M_INVALID_USERNAME"; +/// Encountered when trying to register a user ID reserved by an appservice. +/// — +pub(crate) const M_EXCLUSIVE: &str = "M_EXCLUSIVE"; + /// Represents a Matrix error /// Ref: #[derive(Debug, Deserialize)] diff --git a/crates/matrix-synapse/src/legacy.rs b/crates/matrix-synapse/src/legacy.rs new file mode 100644 index 000000000..d07e6b5d5 --- /dev/null +++ b/crates/matrix-synapse/src/legacy.rs @@ -0,0 +1,683 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +use std::{collections::HashSet, time::Duration}; + +use anyhow::{Context, bail}; +use http::{Method, StatusCode}; +use mas_http::RequestBuilderExt as _; +use mas_matrix::{HomeserverConnection, MatrixUser, ProvisionRequest}; +use serde::{Deserialize, Serialize}; +use tracing::debug; +use url::Url; + +use crate::error::{M_EXCLUSIVE, M_INVALID_USERNAME, M_USER_IN_USE, SynapseResponseExt}; + +static SYNAPSE_AUTH_PROVIDER: &str = "oauth-delegated"; + +#[derive(Clone)] +pub struct SynapseConnection { + homeserver: String, + endpoint: Url, + access_token: String, + http_client: reqwest::Client, +} + +impl SynapseConnection { + #[must_use] + pub fn new( + homeserver: String, + endpoint: Url, + access_token: String, + http_client: reqwest::Client, + ) -> Self { + Self { + homeserver, + endpoint, + access_token, + http_client, + } + } + + fn builder(&self, method: Method, url: &str) -> reqwest::RequestBuilder { + self.http_client + .request( + method, + self.endpoint + .join(url) + .map(String::from) + .unwrap_or_default(), + ) + .bearer_auth(&self.access_token) + } + + fn post(&self, url: &str) -> reqwest::RequestBuilder { + self.builder(Method::POST, url) + } + + fn get(&self, url: &str) -> reqwest::RequestBuilder { + self.builder(Method::GET, url) + } + + fn put(&self, url: &str) -> reqwest::RequestBuilder { + self.builder(Method::PUT, url) + } + + fn delete(&self, url: &str) -> reqwest::RequestBuilder { + self.builder(Method::DELETE, url) + } +} + +#[derive(Serialize, Deserialize)] +struct ExternalID { + auth_provider: String, + external_id: String, +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +enum ThreePIDMedium { + Email, + Msisdn, +} + +#[derive(Serialize, Deserialize)] +struct ThreePID { + medium: ThreePIDMedium, + address: String, +} + +#[derive(Default, Serialize, Deserialize)] +struct SynapseUser { + #[serde( + default, + rename = "displayname", + skip_serializing_if = "Option::is_none" + )] + display_name: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + avatar_url: Option, + + #[serde(default, rename = "threepids", skip_serializing_if = "Option::is_none")] + three_pids: Option>, + + #[serde(default, skip_serializing_if = "Option::is_none")] + external_ids: Option>, + + #[serde(default, skip_serializing_if = "Option::is_none")] + deactivated: Option, +} + +#[derive(Deserialize)] +struct SynapseDeviceListResponse { + devices: Vec, +} + +#[derive(Serialize, Deserialize)] +struct SynapseDevice { + device_id: String, + + #[serde(default, skip_serializing_if = "Option::is_none")] + dehydrated: Option, +} + +#[derive(Serialize)] +struct SynapseUpdateDeviceRequest<'a> { + display_name: Option<&'a str>, +} + +#[derive(Serialize)] +struct SynapseDeleteDevicesRequest { + devices: Vec, +} + +#[derive(Serialize)] +struct SetDisplayNameRequest<'a> { + displayname: &'a str, +} + +#[derive(Serialize)] +struct SynapseDeactivateUserRequest { + erase: bool, +} + +#[derive(Serialize)] +struct SynapseAllowCrossSigningResetRequest {} + +/// Response body of +/// `/_synapse/admin/v1/username_available?username={localpart}` +#[derive(Deserialize)] +struct UsernameAvailableResponse { + available: bool, +} + +#[async_trait::async_trait] +impl HomeserverConnection for SynapseConnection { + fn homeserver(&self) -> &str { + &self.homeserver + } + + #[tracing::instrument( + name = "homeserver.query_user", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + ), + err(Debug), + )] + async fn query_user(&self, localpart: &str) -> Result { + let mxid = self.mxid(localpart); + let encoded_mxid = urlencoding::encode(&mxid); + + let response = self + .get(&format!("_synapse/admin/v2/users/{encoded_mxid}")) + .send_traced() + .await + .context("Failed to query user from Synapse")?; + + let response = response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while querying user from Synapse")?; + + let body: SynapseUser = response + .json() + .await + .context("Failed to deserialize response while querying user from Synapse")?; + + Ok(MatrixUser { + displayname: body.display_name, + avatar_url: body.avatar_url, + deactivated: body.deactivated.unwrap_or(false), + }) + } + + #[tracing::instrument( + name = "homeserver.is_localpart_available", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + ), + err(Debug), + )] + async fn is_localpart_available(&self, localpart: &str) -> Result { + // Synapse will give us a M_UNKNOWN error if the localpart is not ASCII, + // so we bail out early + if !localpart.is_ascii() { + return Ok(false); + } + + let localpart = urlencoding::encode(localpart); + + let response = self + .get(&format!( + "_synapse/admin/v1/username_available?username={localpart}" + )) + .send_traced() + .await + .context("Failed to query localpart availability from Synapse")?; + + match response.error_for_synapse_error().await { + Ok(resp) => { + let response: UsernameAvailableResponse = resp.json().await.context( + "Unexpected response while querying localpart availability from Synapse", + )?; + + Ok(response.available) + } + + Err(err) + if err.errcode() == Some(M_INVALID_USERNAME) + || err.errcode() == Some(M_USER_IN_USE) + || err.errcode() == Some(M_EXCLUSIVE) => + { + debug!( + error = &err as &dyn std::error::Error, + "Localpart is not available" + ); + Ok(false) + } + + Err(err) => Err(err).context("Failed to query localpart availability from Synapse"), + } + } + + #[tracing::instrument( + name = "homeserver.provision_user", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = request.localpart(), + user.id = request.sub(), + ), + err(Debug), + )] + async fn provision_user(&self, request: &ProvisionRequest) -> Result { + let mut body = SynapseUser { + external_ids: Some(vec![ExternalID { + auth_provider: SYNAPSE_AUTH_PROVIDER.to_owned(), + external_id: request.sub().to_owned(), + }]), + ..SynapseUser::default() + }; + + request + .on_displayname(|displayname| { + body.display_name = Some(displayname.unwrap_or_default().to_owned()); + }) + .on_avatar_url(|avatar_url| { + body.avatar_url = Some(avatar_url.unwrap_or_default().to_owned()); + }) + .on_emails(|emails| { + body.three_pids = Some( + emails + .unwrap_or_default() + .iter() + .map(|email| ThreePID { + medium: ThreePIDMedium::Email, + address: email.clone(), + }) + .collect(), + ); + }); + + let mxid = self.mxid(request.localpart()); + let encoded_mxid = urlencoding::encode(&mxid); + let response = self + .put(&format!("_synapse/admin/v2/users/{encoded_mxid}")) + .json(&body) + .send_traced() + .await + .context("Failed to provision user in Synapse")?; + + let response = response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while provisioning user in Synapse")?; + + match response.status() { + StatusCode::CREATED => Ok(true), + StatusCode::OK => Ok(false), + code => bail!("Unexpected HTTP code while provisioning user in Synapse: {code}"), + } + } + + #[tracing::instrument( + name = "homeserver.upsert_device", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + matrix.device_id = device_id, + ), + err(Debug), + )] + async fn upsert_device( + &self, + localpart: &str, + device_id: &str, + initial_display_name: Option<&str>, + ) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); + let encoded_mxid = urlencoding::encode(&mxid); + + let response = self + .post(&format!("_synapse/admin/v2/users/{encoded_mxid}/devices")) + .json(&SynapseDevice { + device_id: device_id.to_owned(), + dehydrated: None, + }) + .send_traced() + .await + .context("Failed to create device in Synapse")?; + + let response = response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while creating device in Synapse")?; + + if response.status() != StatusCode::CREATED { + bail!( + "Unexpected HTTP code while creating device in Synapse: {}", + response.status() + ); + } + + // It's annoying, but the POST endpoint doesn't let us set the display name + // of the device, so we have to do it manually. + if let Some(display_name) = initial_display_name { + self.update_device_display_name(localpart, device_id, display_name) + .await?; + } + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.update_device_display_name", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + matrix.device_id = device_id, + ), + err(Debug), + )] + async fn update_device_display_name( + &self, + localpart: &str, + device_id: &str, + display_name: &str, + ) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); + let encoded_mxid = urlencoding::encode(&mxid); + let device_id = urlencoding::encode(device_id); + let response = self + .put(&format!( + "_synapse/admin/v2/users/{encoded_mxid}/devices/{device_id}" + )) + .json(&SynapseUpdateDeviceRequest { + display_name: Some(display_name), + }) + .send_traced() + .await + .context("Failed to update device display name in Synapse")?; + + let response = response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while updating device display name in Synapse")?; + + if response.status() != StatusCode::OK { + bail!( + "Unexpected HTTP code while updating device display name in Synapse: {}", + response.status() + ); + } + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.delete_device", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + matrix.device_id = device_id, + ), + err(Debug), + )] + async fn delete_device(&self, localpart: &str, device_id: &str) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); + let encoded_mxid = urlencoding::encode(&mxid); + let encoded_device_id = urlencoding::encode(device_id); + + let response = self + .delete(&format!( + "_synapse/admin/v2/users/{encoded_mxid}/devices/{encoded_device_id}" + )) + .send_traced() + .await + .context("Failed to delete device in Synapse")?; + + let response = response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while deleting device in Synapse")?; + + if response.status() != StatusCode::OK { + bail!( + "Unexpected HTTP code while deleting device in Synapse: {}", + response.status() + ); + } + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.sync_devices", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + ), + err(Debug), + )] + async fn sync_devices( + &self, + localpart: &str, + devices: HashSet, + ) -> Result<(), anyhow::Error> { + // Get the list of current devices + let mxid = self.mxid(localpart); + let encoded_mxid = urlencoding::encode(&mxid); + + let response = self + .get(&format!("_synapse/admin/v2/users/{encoded_mxid}/devices")) + .send_traced() + .await + .context("Failed to query devices from Synapse")?; + + let response = response.error_for_synapse_error().await?; + + if response.status() != StatusCode::OK { + bail!( + "Unexpected HTTP code while querying devices from Synapse: {}", + response.status() + ); + } + + let body: SynapseDeviceListResponse = response + .json() + .await + .context("Failed to parse response while querying devices from Synapse")?; + + let existing_devices: HashSet = body + .devices + .into_iter() + .filter(|d| d.dehydrated != Some(true)) + .map(|d| d.device_id) + .collect(); + + // First, delete all the devices that are not needed anymore + let to_delete = existing_devices.difference(&devices).cloned().collect(); + + let response = self + .post(&format!( + "_synapse/admin/v2/users/{encoded_mxid}/delete_devices" + )) + .json(&SynapseDeleteDevicesRequest { devices: to_delete }) + .send_traced() + .await + .context("Failed to delete devices from Synapse")?; + + let response = response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while deleting devices from Synapse")?; + + if response.status() != StatusCode::OK { + bail!( + "Unexpected HTTP code while deleting devices from Synapse: {}", + response.status() + ); + } + + // Then, create the devices that are missing. There is no batching API to do + // this, so we do this sequentially, which is fine as the API is idempotent. + for device_id in devices.difference(&existing_devices) { + self.upsert_device(localpart, device_id, None).await?; + } + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.delete_user", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + erase = erase, + ), + err(Debug), + )] + async fn delete_user(&self, localpart: &str, erase: bool) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); + let encoded_mxid = urlencoding::encode(&mxid); + + let response = self + .post(&format!("_synapse/admin/v1/deactivate/{encoded_mxid}")) + .json(&SynapseDeactivateUserRequest { erase }) + // Deactivation can take a while, so we set a longer timeout + .timeout(Duration::from_secs(60 * 5)) + .send_traced() + .await + .context("Failed to deactivate user in Synapse")?; + + let response = response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while deactivating user in Synapse")?; + + if response.status() != StatusCode::OK { + bail!( + "Unexpected HTTP code while deactivating user in Synapse: {}", + response.status() + ); + } + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.reactivate_user", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + ), + err(Debug), + )] + async fn reactivate_user(&self, localpart: &str) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); + let encoded_mxid = urlencoding::encode(&mxid); + let response = self + .put(&format!("_synapse/admin/v2/users/{encoded_mxid}")) + .json(&SynapseUser { + deactivated: Some(false), + ..SynapseUser::default() + }) + .send_traced() + .await + .context("Failed to reactivate user in Synapse")?; + + let response = response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while reactivating user in Synapse")?; + + match response.status() { + StatusCode::CREATED | StatusCode::OK => Ok(()), + code => bail!("Unexpected HTTP code while reactivating user in Synapse: {code}",), + } + } + + #[tracing::instrument( + name = "homeserver.set_displayname", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + matrix.displayname = displayname, + ), + err(Debug), + )] + async fn set_displayname( + &self, + localpart: &str, + displayname: &str, + ) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); + let encoded_mxid = urlencoding::encode(&mxid); + let response = self + .put(&format!( + "_matrix/client/v3/profile/{encoded_mxid}/displayname" + )) + .json(&SetDisplayNameRequest { displayname }) + .send_traced() + .await + .context("Failed to set displayname in Synapse")?; + + let response = response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while setting displayname in Synapse")?; + + if response.status() != StatusCode::OK { + bail!( + "Unexpected HTTP code while setting displayname in Synapse: {}", + response.status() + ); + } + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.unset_displayname", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + ), + err(Display), + )] + async fn unset_displayname(&self, localpart: &str) -> Result<(), anyhow::Error> { + self.set_displayname(localpart, "").await + } + + #[tracing::instrument( + name = "homeserver.allow_cross_signing_reset", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + ), + err(Debug), + )] + async fn allow_cross_signing_reset(&self, localpart: &str) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); + let encoded_mxid = urlencoding::encode(&mxid); + + let response = self + .post(&format!( + "_synapse/admin/v1/users/{encoded_mxid}/_allow_cross_signing_replacement_without_uia" + )) + .json(&SynapseAllowCrossSigningResetRequest {}) + .send_traced() + .await + .context("Failed to allow cross-signing reset in Synapse")?; + + let response = response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while allowing cross-signing reset in Synapse")?; + + if response.status() != StatusCode::OK { + bail!( + "Unexpected HTTP code while allowing cross-signing reset in Synapse: {}", + response.status(), + ); + } + + Ok(()) + } +} diff --git a/crates/matrix-synapse/src/lib.rs b/crates/matrix-synapse/src/lib.rs index 28f27d76c..062ecaa75 100644 --- a/crates/matrix-synapse/src/lib.rs +++ b/crates/matrix-synapse/src/lib.rs @@ -4,678 +4,8 @@ // SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial // Please see LICENSE files in the repository root for full details. -use std::{collections::HashSet, time::Duration}; - -use anyhow::{Context, bail}; -use error::SynapseResponseExt; -use http::{Method, StatusCode}; -use mas_http::RequestBuilderExt as _; -use mas_matrix::{HomeserverConnection, MatrixUser, ProvisionRequest}; -use serde::{Deserialize, Serialize}; -use tracing::debug; -use url::Url; - -static SYNAPSE_AUTH_PROVIDER: &str = "oauth-delegated"; - -/// Encountered when trying to register a user ID which has been taken. -/// — -const M_USER_IN_USE: &str = "M_USER_IN_USE"; -/// Encountered when trying to register a user ID which is not valid. -/// — -const M_INVALID_USERNAME: &str = "M_INVALID_USERNAME"; -/// Encountered when trying to register a user ID reserved by an appservice. -/// — -const M_EXCLUSIVE: &str = "M_EXCLUSIVE"; - mod error; +mod legacy; +mod modern; -#[derive(Clone)] -pub struct SynapseConnection { - homeserver: String, - endpoint: Url, - access_token: String, - http_client: reqwest::Client, -} - -impl SynapseConnection { - #[must_use] - pub fn new( - homeserver: String, - endpoint: Url, - access_token: String, - http_client: reqwest::Client, - ) -> Self { - Self { - homeserver, - endpoint, - access_token, - http_client, - } - } - - fn builder(&self, method: Method, url: &str) -> reqwest::RequestBuilder { - self.http_client - .request( - method, - self.endpoint - .join(url) - .map(String::from) - .unwrap_or_default(), - ) - .bearer_auth(&self.access_token) - } - - fn post(&self, url: &str) -> reqwest::RequestBuilder { - self.builder(Method::POST, url) - } - - fn get(&self, url: &str) -> reqwest::RequestBuilder { - self.builder(Method::GET, url) - } - - fn put(&self, url: &str) -> reqwest::RequestBuilder { - self.builder(Method::PUT, url) - } - - fn delete(&self, url: &str) -> reqwest::RequestBuilder { - self.builder(Method::DELETE, url) - } -} - -#[derive(Serialize, Deserialize)] -struct ExternalID { - auth_provider: String, - external_id: String, -} - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -enum ThreePIDMedium { - Email, - Msisdn, -} - -#[derive(Serialize, Deserialize)] -struct ThreePID { - medium: ThreePIDMedium, - address: String, -} - -#[derive(Default, Serialize, Deserialize)] -struct SynapseUser { - #[serde( - default, - rename = "displayname", - skip_serializing_if = "Option::is_none" - )] - display_name: Option, - - #[serde(default, skip_serializing_if = "Option::is_none")] - avatar_url: Option, - - #[serde(default, rename = "threepids", skip_serializing_if = "Option::is_none")] - three_pids: Option>, - - #[serde(default, skip_serializing_if = "Option::is_none")] - external_ids: Option>, - - #[serde(default, skip_serializing_if = "Option::is_none")] - deactivated: Option, -} - -#[derive(Deserialize)] -struct SynapseDeviceListResponse { - devices: Vec, -} - -#[derive(Serialize, Deserialize)] -struct SynapseDevice { - device_id: String, - - #[serde(default, skip_serializing_if = "Option::is_none")] - dehydrated: Option, -} - -#[derive(Serialize)] -struct SynapseUpdateDeviceRequest<'a> { - display_name: Option<&'a str>, -} - -#[derive(Serialize)] -struct SynapseDeleteDevicesRequest { - devices: Vec, -} - -#[derive(Serialize)] -struct SetDisplayNameRequest<'a> { - displayname: &'a str, -} - -#[derive(Serialize)] -struct SynapseDeactivateUserRequest { - erase: bool, -} - -#[derive(Serialize)] -struct SynapseAllowCrossSigningResetRequest {} - -/// Response body of -/// `/_synapse/admin/v1/username_available?username={localpart}` -#[derive(Deserialize)] -struct UsernameAvailableResponse { - available: bool, -} - -#[async_trait::async_trait] -impl HomeserverConnection for SynapseConnection { - fn homeserver(&self) -> &str { - &self.homeserver - } - - #[tracing::instrument( - name = "homeserver.query_user", - skip_all, - fields( - matrix.homeserver = self.homeserver, - matrix.mxid = mxid, - ), - err(Debug), - )] - async fn query_user(&self, mxid: &str) -> Result { - let encoded_mxid = urlencoding::encode(mxid); - - let response = self - .get(&format!("_synapse/admin/v2/users/{encoded_mxid}")) - .send_traced() - .await - .context("Failed to query user from Synapse")?; - - let response = response - .error_for_synapse_error() - .await - .context("Unexpected HTTP response while querying user from Synapse")?; - - let body: SynapseUser = response - .json() - .await - .context("Failed to deserialize response while querying user from Synapse")?; - - Ok(MatrixUser { - displayname: body.display_name, - avatar_url: body.avatar_url, - deactivated: body.deactivated.unwrap_or(false), - }) - } - - #[tracing::instrument( - name = "homeserver.is_localpart_available", - skip_all, - fields( - matrix.homeserver = self.homeserver, - matrix.localpart = localpart, - ), - err(Debug), - )] - async fn is_localpart_available(&self, localpart: &str) -> Result { - // Synapse will give us a M_UNKNOWN error if the localpart is not ASCII, - // so we bail out early - if !localpart.is_ascii() { - return Ok(false); - } - - let localpart = urlencoding::encode(localpart); - - let response = self - .get(&format!( - "_synapse/admin/v1/username_available?username={localpart}" - )) - .send_traced() - .await - .context("Failed to query localpart availability from Synapse")?; - - match response.error_for_synapse_error().await { - Ok(resp) => { - let response: UsernameAvailableResponse = resp.json().await.context( - "Unexpected response while querying localpart availability from Synapse", - )?; - - Ok(response.available) - } - - Err(err) - if err.errcode() == Some(M_INVALID_USERNAME) - || err.errcode() == Some(M_USER_IN_USE) - || err.errcode() == Some(M_EXCLUSIVE) => - { - debug!( - error = &err as &dyn std::error::Error, - "Localpart is not available" - ); - Ok(false) - } - - Err(err) => Err(err).context("Failed to query localpart availability from Synapse"), - } - } - - #[tracing::instrument( - name = "homeserver.provision_user", - skip_all, - fields( - matrix.homeserver = self.homeserver, - matrix.mxid = request.mxid(), - user.id = request.sub(), - ), - err(Debug), - )] - async fn provision_user(&self, request: &ProvisionRequest) -> Result { - let mut body = SynapseUser { - external_ids: Some(vec![ExternalID { - auth_provider: SYNAPSE_AUTH_PROVIDER.to_owned(), - external_id: request.sub().to_owned(), - }]), - ..SynapseUser::default() - }; - - request - .on_displayname(|displayname| { - body.display_name = Some(displayname.unwrap_or_default().to_owned()); - }) - .on_avatar_url(|avatar_url| { - body.avatar_url = Some(avatar_url.unwrap_or_default().to_owned()); - }) - .on_emails(|emails| { - body.three_pids = Some( - emails - .unwrap_or_default() - .iter() - .map(|email| ThreePID { - medium: ThreePIDMedium::Email, - address: email.clone(), - }) - .collect(), - ); - }); - - let encoded_mxid = urlencoding::encode(request.mxid()); - let response = self - .put(&format!("_synapse/admin/v2/users/{encoded_mxid}")) - .json(&body) - .send_traced() - .await - .context("Failed to provision user in Synapse")?; - - let response = response - .error_for_synapse_error() - .await - .context("Unexpected HTTP response while provisioning user in Synapse")?; - - match response.status() { - StatusCode::CREATED => Ok(true), - StatusCode::OK => Ok(false), - code => bail!("Unexpected HTTP code while provisioning user in Synapse: {code}"), - } - } - - #[tracing::instrument( - name = "homeserver.create_device", - skip_all, - fields( - matrix.homeserver = self.homeserver, - matrix.mxid = mxid, - matrix.device_id = device_id, - ), - err(Debug), - )] - async fn create_device( - &self, - mxid: &str, - device_id: &str, - initial_display_name: Option<&str>, - ) -> Result<(), anyhow::Error> { - let encoded_mxid = urlencoding::encode(mxid); - - let response = self - .post(&format!("_synapse/admin/v2/users/{encoded_mxid}/devices")) - .json(&SynapseDevice { - device_id: device_id.to_owned(), - dehydrated: None, - }) - .send_traced() - .await - .context("Failed to create device in Synapse")?; - - let response = response - .error_for_synapse_error() - .await - .context("Unexpected HTTP response while creating device in Synapse")?; - - if response.status() != StatusCode::CREATED { - bail!( - "Unexpected HTTP code while creating device in Synapse: {}", - response.status() - ); - } - - // It's annoying, but the POST endpoint doesn't let us set the display name - // of the device, so we have to do it manually. - if let Some(display_name) = initial_display_name { - self.update_device_display_name(mxid, device_id, display_name) - .await?; - } - - Ok(()) - } - - #[tracing::instrument( - name = "homeserver.update_device_display_name", - skip_all, - fields( - matrix.homeserver = self.homeserver, - matrix.mxid = mxid, - matrix.device_id = device_id, - ), - err(Debug), - )] - async fn update_device_display_name( - &self, - mxid: &str, - device_id: &str, - display_name: &str, - ) -> Result<(), anyhow::Error> { - let encoded_mxid = urlencoding::encode(mxid); - let device_id = urlencoding::encode(device_id); - let response = self - .put(&format!( - "_synapse/admin/v2/users/{encoded_mxid}/devices/{device_id}" - )) - .json(&SynapseUpdateDeviceRequest { - display_name: Some(display_name), - }) - .send_traced() - .await - .context("Failed to update device display name in Synapse")?; - - let response = response - .error_for_synapse_error() - .await - .context("Unexpected HTTP response while updating device display name in Synapse")?; - - if response.status() != StatusCode::OK { - bail!( - "Unexpected HTTP code while updating device display name in Synapse: {}", - response.status() - ); - } - - Ok(()) - } - - #[tracing::instrument( - name = "homeserver.delete_device", - skip_all, - fields( - matrix.homeserver = self.homeserver, - matrix.mxid = mxid, - matrix.device_id = device_id, - ), - err(Debug), - )] - async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), anyhow::Error> { - let encoded_mxid = urlencoding::encode(mxid); - let encoded_device_id = urlencoding::encode(device_id); - - let response = self - .delete(&format!( - "_synapse/admin/v2/users/{encoded_mxid}/devices/{encoded_device_id}" - )) - .send_traced() - .await - .context("Failed to delete device in Synapse")?; - - let response = response - .error_for_synapse_error() - .await - .context("Unexpected HTTP response while deleting device in Synapse")?; - - if response.status() != StatusCode::OK { - bail!( - "Unexpected HTTP code while deleting device in Synapse: {}", - response.status() - ); - } - - Ok(()) - } - - #[tracing::instrument( - name = "homeserver.sync_devices", - skip_all, - fields( - matrix.homeserver = self.homeserver, - matrix.mxid = mxid, - ), - err(Debug), - )] - async fn sync_devices( - &self, - mxid: &str, - devices: HashSet, - ) -> Result<(), anyhow::Error> { - // Get the list of current devices - let encoded_mxid = urlencoding::encode(mxid); - - let response = self - .get(&format!("_synapse/admin/v2/users/{encoded_mxid}/devices")) - .send_traced() - .await - .context("Failed to query devices from Synapse")?; - - let response = response.error_for_synapse_error().await?; - - if response.status() != StatusCode::OK { - bail!( - "Unexpected HTTP code while querying devices from Synapse: {}", - response.status() - ); - } - - let body: SynapseDeviceListResponse = response - .json() - .await - .context("Failed to parse response while querying devices from Synapse")?; - - let existing_devices: HashSet = body - .devices - .into_iter() - .filter(|d| d.dehydrated != Some(true)) - .map(|d| d.device_id) - .collect(); - - // First, delete all the devices that are not needed anymore - let to_delete = existing_devices.difference(&devices).cloned().collect(); - - let response = self - .post(&format!( - "_synapse/admin/v2/users/{encoded_mxid}/delete_devices" - )) - .json(&SynapseDeleteDevicesRequest { devices: to_delete }) - .send_traced() - .await - .context("Failed to delete devices from Synapse")?; - - let response = response - .error_for_synapse_error() - .await - .context("Unexpected HTTP response while deleting devices from Synapse")?; - - if response.status() != StatusCode::OK { - bail!( - "Unexpected HTTP code while deleting devices from Synapse: {}", - response.status() - ); - } - - // Then, create the devices that are missing. There is no batching API to do - // this, so we do this sequentially, which is fine as the API is idempotent. - for device_id in devices.difference(&existing_devices) { - self.create_device(mxid, device_id, None).await?; - } - - Ok(()) - } - - #[tracing::instrument( - name = "homeserver.delete_user", - skip_all, - fields( - matrix.homeserver = self.homeserver, - matrix.mxid = mxid, - erase = erase, - ), - err(Debug), - )] - async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), anyhow::Error> { - let encoded_mxid = urlencoding::encode(mxid); - - let response = self - .post(&format!("_synapse/admin/v1/deactivate/{encoded_mxid}")) - .json(&SynapseDeactivateUserRequest { erase }) - // Deactivation can take a while, so we set a longer timeout - .timeout(Duration::from_secs(60 * 5)) - .send_traced() - .await - .context("Failed to deactivate user in Synapse")?; - - let response = response - .error_for_synapse_error() - .await - .context("Unexpected HTTP response while deactivating user in Synapse")?; - - if response.status() != StatusCode::OK { - bail!( - "Unexpected HTTP code while deactivating user in Synapse: {}", - response.status() - ); - } - - Ok(()) - } - - #[tracing::instrument( - name = "homeserver.reactivate_user", - skip_all, - fields( - matrix.homeserver = self.homeserver, - matrix.mxid = mxid, - ), - err(Debug), - )] - async fn reactivate_user(&self, mxid: &str) -> Result<(), anyhow::Error> { - let encoded_mxid = urlencoding::encode(mxid); - let response = self - .put(&format!("_synapse/admin/v2/users/{encoded_mxid}")) - .json(&SynapseUser { - deactivated: Some(false), - ..SynapseUser::default() - }) - .send_traced() - .await - .context("Failed to reactivate user in Synapse")?; - - let response = response - .error_for_synapse_error() - .await - .context("Unexpected HTTP response while reactivating user in Synapse")?; - - match response.status() { - StatusCode::CREATED | StatusCode::OK => Ok(()), - code => bail!("Unexpected HTTP code while reactivating user in Synapse: {code}",), - } - } - - #[tracing::instrument( - name = "homeserver.set_displayname", - skip_all, - fields( - matrix.homeserver = self.homeserver, - matrix.mxid = mxid, - matrix.displayname = displayname, - ), - err(Debug), - )] - async fn set_displayname(&self, mxid: &str, displayname: &str) -> Result<(), anyhow::Error> { - let encoded_mxid = urlencoding::encode(mxid); - let response = self - .put(&format!( - "_matrix/client/v3/profile/{encoded_mxid}/displayname" - )) - .json(&SetDisplayNameRequest { displayname }) - .send_traced() - .await - .context("Failed to set displayname in Synapse")?; - - let response = response - .error_for_synapse_error() - .await - .context("Unexpected HTTP response while setting displayname in Synapse")?; - - if response.status() != StatusCode::OK { - bail!( - "Unexpected HTTP code while setting displayname in Synapse: {}", - response.status() - ); - } - - Ok(()) - } - - #[tracing::instrument( - name = "homeserver.unset_displayname", - skip_all, - fields( - matrix.homeserver = self.homeserver, - matrix.mxid = mxid, - ), - err(Display), - )] - async fn unset_displayname(&self, mxid: &str) -> Result<(), anyhow::Error> { - self.set_displayname(mxid, "").await - } - - #[tracing::instrument( - name = "homeserver.allow_cross_signing_reset", - skip_all, - fields( - matrix.homeserver = self.homeserver, - matrix.mxid = mxid, - ), - err(Debug), - )] - async fn allow_cross_signing_reset(&self, mxid: &str) -> Result<(), anyhow::Error> { - let encoded_mxid = urlencoding::encode(mxid); - - let response = self - .post(&format!( - "_synapse/admin/v1/users/{encoded_mxid}/_allow_cross_signing_replacement_without_uia" - )) - .json(&SynapseAllowCrossSigningResetRequest {}) - .send_traced() - .await - .context("Failed to allow cross-signing reset in Synapse")?; - - let response = response - .error_for_synapse_error() - .await - .context("Unexpected HTTP response while allowing cross-signing reset in Synapse")?; - - if response.status() != StatusCode::OK { - bail!( - "Unexpected HTTP code while allowing cross-signing reset in Synapse: {}", - response.status(), - ); - } - - Ok(()) - } -} +pub use self::{legacy::SynapseConnection as LegacySynapseConnection, modern::SynapseConnection}; diff --git a/crates/matrix-synapse/src/modern.rs b/crates/matrix-synapse/src/modern.rs new file mode 100644 index 000000000..26c8e21a1 --- /dev/null +++ b/crates/matrix-synapse/src/modern.rs @@ -0,0 +1,562 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +use std::collections::HashSet; + +use anyhow::Context as _; +use http::{Method, StatusCode}; +use mas_http::RequestBuilderExt; +use mas_matrix::{HomeserverConnection, MatrixUser, ProvisionRequest}; +use serde::{Deserialize, Serialize}; +use tracing::debug; +use url::Url; + +use crate::error::{M_EXCLUSIVE, M_INVALID_USERNAME, M_USER_IN_USE, SynapseResponseExt as _}; + +#[derive(Clone)] +pub struct SynapseConnection { + homeserver: String, + endpoint: Url, + access_token: String, + http_client: reqwest::Client, +} + +impl SynapseConnection { + #[must_use] + pub fn new( + homeserver: String, + endpoint: Url, + access_token: String, + http_client: reqwest::Client, + ) -> Self { + Self { + homeserver, + endpoint, + access_token, + http_client, + } + } + + fn builder(&self, method: Method, url: &str) -> reqwest::RequestBuilder { + self.http_client + .request( + method, + self.endpoint + .join(url) + .map(String::from) + .unwrap_or_default(), + ) + .bearer_auth(&self.access_token) + } + + fn post(&self, url: &str) -> reqwest::RequestBuilder { + self.builder(Method::POST, url) + } + + fn get(&self, url: &str) -> reqwest::RequestBuilder { + self.builder(Method::GET, url) + } +} + +#[async_trait::async_trait] +impl HomeserverConnection for SynapseConnection { + fn homeserver(&self) -> &str { + &self.homeserver + } + + #[tracing::instrument( + name = "homeserver.query_user", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + ), + err(Debug), + )] + async fn query_user(&self, localpart: &str) -> Result { + #[derive(Deserialize)] + #[allow(dead_code)] + struct Response { + user_id: String, + display_name: Option, + avatar_url: Option, + is_suspended: bool, + is_deactivated: bool, + } + + let encoded_localpart = urlencoding::encode(localpart); + let url = format!("_synapse/mas/query_user?localpart={encoded_localpart}"); + let response = self + .get(&url) + .send_traced() + .await + .context("Failed to query user from Synapse")?; + + let response = response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while querying user from Synapse")?; + + let body: Response = response + .json() + .await + .context("Failed to deserialize response while querying user from Synapse")?; + + Ok(MatrixUser { + displayname: body.display_name, + avatar_url: body.avatar_url, + deactivated: body.is_deactivated, + }) + } + + #[tracing::instrument( + name = "homeserver.provision_user", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = request.localpart(), + ), + err(Debug), + )] + async fn provision_user(&self, request: &ProvisionRequest) -> Result { + #[derive(Serialize)] + struct Request<'a> { + localpart: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + set_displayname: Option, + #[serde(skip_serializing_if = "std::ops::Not::not")] + unset_displayname: bool, + #[serde(skip_serializing_if = "Option::is_none")] + set_avatar_url: Option, + #[serde(skip_serializing_if = "std::ops::Not::not")] + unset_avatar_url: bool, + #[serde(skip_serializing_if = "Option::is_none")] + set_emails: Option>, + #[serde(skip_serializing_if = "std::ops::Not::not")] + unset_emails: bool, + } + + let mut body = Request { + localpart: request.localpart(), + set_displayname: None, + unset_displayname: false, + set_avatar_url: None, + unset_avatar_url: false, + set_emails: None, + unset_emails: false, + }; + + request.on_displayname(|displayname| match displayname { + Some(name) => body.set_displayname = Some(name.to_owned()), + None => body.unset_displayname = true, + }); + + request.on_avatar_url(|avatar_url| match avatar_url { + Some(url) => body.set_avatar_url = Some(url.to_owned()), + None => body.unset_avatar_url = true, + }); + + request.on_emails(|emails| match emails { + Some(emails) => body.set_emails = Some(emails.to_owned()), + None => body.unset_emails = true, + }); + + let response = self + .post("_synapse/mas/provision_user") + .json(&body) + .send_traced() + .await + .context("Failed to provision user in Synapse")?; + + let response = response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while provisioning user in Synapse")?; + + match response.status() { + StatusCode::CREATED => Ok(true), + StatusCode::OK => Ok(false), + code => { + anyhow::bail!("Unexpected HTTP code while provisioning user in Synapse: {code}") + } + } + } + + #[tracing::instrument( + name = "homeserver.is_localpart_available", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + ), + err(Debug), + )] + async fn is_localpart_available(&self, localpart: &str) -> Result { + // Synapse will give us an error if the localpart is not ASCII, so we bail out + // early + if !localpart.is_ascii() { + return Ok(false); + } + + let encoded_localpart = urlencoding::encode(localpart); + let url = format!("_synapse/mas/is_localpart_available?localpart={encoded_localpart}"); + let response = self + .get(&url) + .send_traced() + .await + .context("Failed to check localpart availability from Synapse")?; + + match response.error_for_synapse_error().await { + Ok(_resp) => Ok(true), + Err(err) + if err.errcode() == Some(M_INVALID_USERNAME) + || err.errcode() == Some(M_USER_IN_USE) + || err.errcode() == Some(M_EXCLUSIVE) => + { + debug!( + error = &err as &dyn std::error::Error, + "Localpart is not available" + ); + Ok(false) + } + + Err(err) => Err(err).context("Failed to query localpart availability from Synapse"), + } + } + + #[tracing::instrument( + name = "homeserver.upsert_device", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + matrix.device_id = device_id, + ), + err(Debug), + )] + async fn upsert_device( + &self, + localpart: &str, + device_id: &str, + initial_display_name: Option<&str>, + ) -> Result<(), anyhow::Error> { + #[derive(Serialize)] + struct Request<'a> { + localpart: &'a str, + device_id: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + display_name: Option<&'a str>, + } + + let body = Request { + localpart, + device_id, + display_name: initial_display_name, + }; + + let response = self + .post("_synapse/mas/upsert_device") + .json(&body) + .send_traced() + .await + .context("Failed to create device in Synapse")?; + + response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while creating device in Synapse")?; + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.update_device_display_name", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + matrix.device_id = device_id, + ), + err(Debug), + )] + async fn update_device_display_name( + &self, + localpart: &str, + device_id: &str, + display_name: &str, + ) -> Result<(), anyhow::Error> { + #[derive(Serialize)] + struct Request<'a> { + localpart: &'a str, + device_id: &'a str, + display_name: &'a str, + } + + let body = Request { + localpart, + device_id, + display_name, + }; + + let response = self + .post("_synapse/mas/update_device_display_name") + .json(&body) + .send_traced() + .await + .context("Failed to update device display name in Synapse")?; + + response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while updating device display name in Synapse")?; + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.delete_device", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + matrix.device_id = device_id, + ), + err(Debug), + )] + async fn delete_device(&self, localpart: &str, device_id: &str) -> Result<(), anyhow::Error> { + #[derive(Serialize)] + struct Request<'a> { + localpart: &'a str, + device_id: &'a str, + } + + let body = Request { + localpart, + device_id, + }; + + let response = self + .post("_synapse/mas/delete_device") + .json(&body) + .send_traced() + .await + .context("Failed to delete device in Synapse")?; + + response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while deleting device in Synapse")?; + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.sync_devices", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + matrix.device_count = devices.len(), + ), + err(Debug), + )] + async fn sync_devices( + &self, + localpart: &str, + devices: HashSet, + ) -> Result<(), anyhow::Error> { + #[derive(Serialize)] + struct Request<'a> { + localpart: &'a str, + devices: HashSet, + } + + let body = Request { localpart, devices }; + + let response = self + .post("_synapse/mas/sync_devices") + .json(&body) + .send_traced() + .await + .context("Failed to sync devices in Synapse")?; + + response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while syncing devices in Synapse")?; + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.delete_user", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + matrix.erase = erase, + ), + err(Debug), + )] + async fn delete_user(&self, localpart: &str, erase: bool) -> Result<(), anyhow::Error> { + #[derive(Serialize)] + struct Request<'a> { + localpart: &'a str, + erase: bool, + } + + let body = Request { localpart, erase }; + + let response = self + .post("_synapse/mas/delete_user") + .json(&body) + .send_traced() + .await + .context("Failed to delete user in Synapse")?; + + response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while deleting user in Synapse")?; + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.reactivate_user", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + ), + err(Debug), + )] + async fn reactivate_user(&self, localpart: &str) -> Result<(), anyhow::Error> { + #[derive(Serialize)] + struct Request<'a> { + localpart: &'a str, + } + + let body = Request { localpart }; + + let response = self + .post("_synapse/mas/reactivate_user") + .json(&body) + .send_traced() + .await + .context("Failed to reactivate user in Synapse")?; + + response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while reactivating user in Synapse")?; + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.set_displayname", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + ), + err(Debug), + )] + async fn set_displayname( + &self, + localpart: &str, + displayname: &str, + ) -> Result<(), anyhow::Error> { + #[derive(Serialize)] + struct Request<'a> { + localpart: &'a str, + displayname: &'a str, + } + + let body = Request { + localpart, + displayname, + }; + + let response = self + .post("_synapse/mas/set_displayname") + .json(&body) + .send_traced() + .await + .context("Failed to set displayname in Synapse")?; + + response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while setting displayname in Synapse")?; + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.unset_displayname", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + ), + err(Debug), + )] + async fn unset_displayname(&self, localpart: &str) -> Result<(), anyhow::Error> { + #[derive(Serialize)] + struct Request<'a> { + localpart: &'a str, + } + + let body = Request { localpart }; + + let response = self + .post("_synapse/mas/unset_displayname") + .json(&body) + .send_traced() + .await + .context("Failed to unset displayname in Synapse")?; + + response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while unsetting displayname in Synapse")?; + + Ok(()) + } + + #[tracing::instrument( + name = "homeserver.allow_cross_signing_reset", + skip_all, + fields( + matrix.homeserver = self.homeserver, + matrix.localpart = localpart, + ), + err(Debug), + )] + async fn allow_cross_signing_reset(&self, localpart: &str) -> Result<(), anyhow::Error> { + #[derive(Serialize)] + struct Request<'a> { + localpart: &'a str, + } + + let body = Request { localpart }; + + let response = self + .post("_synapse/mas/allow_cross_signing_reset") + .json(&body) + .send_traced() + .await + .context("Failed to allow cross-signing reset in Synapse")?; + + response + .error_for_synapse_error() + .await + .context("Unexpected HTTP response while allowing cross-signing reset in Synapse")?; + + Ok(()) + } +} diff --git a/crates/matrix/src/lib.rs b/crates/matrix/src/lib.rs index 9c0f81bd3..34c502f81 100644 --- a/crates/matrix/src/lib.rs +++ b/crates/matrix/src/lib.rs @@ -31,7 +31,7 @@ enum FieldAction { } pub struct ProvisionRequest { - mxid: String, + localpart: String, sub: String, displayname: FieldAction, avatar_url: FieldAction, @@ -43,12 +43,12 @@ impl ProvisionRequest { /// /// # Parameters /// - /// * `mxid` - The Matrix ID to provision. + /// * `localpart` - The localpart of the user to provision. /// * `sub` - The `sub` of the user, aka the internal ID. #[must_use] - pub fn new(mxid: impl Into, sub: impl Into) -> Self { + pub fn new(localpart: impl Into, sub: impl Into) -> Self { Self { - mxid: mxid.into(), + localpart: localpart.into(), sub: sub.into(), displayname: FieldAction::DoNothing, avatar_url: FieldAction::DoNothing, @@ -62,10 +62,10 @@ impl ProvisionRequest { &self.sub } - /// Get the Matrix ID to provision. + /// Get the localpart of the user to provision. #[must_use] - pub fn mxid(&self) -> &str { - &self.mxid + pub fn localpart(&self) -> &str { + &self.localpart } /// Ask to set the displayname of the user. @@ -211,13 +211,13 @@ pub trait HomeserverConnection: Send + Sync { /// /// # Parameters /// - /// * `mxid` - The Matrix ID of the user to query. + /// * `localpart` - The localpart of the user to query. /// /// # Errors /// /// Returns an error if the homeserver is unreachable or the user does not /// exist. - async fn query_user(&self, mxid: &str) -> Result; + async fn query_user(&self, localpart: &str) -> Result; /// Provision a user on the homeserver. /// @@ -247,16 +247,16 @@ pub trait HomeserverConnection: Send + Sync { /// /// # Parameters /// - /// * `mxid` - The Matrix ID of the user to create a device for. + /// * `localpart` - The localpart of the user to create a device for. /// * `device_id` - The device ID to create. /// /// # Errors /// /// Returns an error if the homeserver is unreachable or the device could /// not be created. - async fn create_device( + async fn upsert_device( &self, - mxid: &str, + localpart: &str, device_id: &str, initial_display_name: Option<&str>, ) -> Result<(), anyhow::Error>; @@ -265,7 +265,7 @@ pub trait HomeserverConnection: Send + Sync { /// /// # Parameters /// - /// * `mxid` - The Matrix ID of the user to update a device for. + /// * `localpart` - The localpart of the user to update a device for. /// * `device_id` - The device ID to update. /// * `display_name` - The new display name to set /// @@ -275,7 +275,7 @@ pub trait HomeserverConnection: Send + Sync { /// not be updated. async fn update_device_display_name( &self, - mxid: &str, + localpart: &str, device_id: &str, display_name: &str, ) -> Result<(), anyhow::Error>; @@ -284,90 +284,98 @@ pub trait HomeserverConnection: Send + Sync { /// /// # Parameters /// - /// * `mxid` - The Matrix ID of the user to delete a device for. + /// * `localpart` - The localpart of the user to delete a device for. /// * `device_id` - The device ID to delete. /// /// # Errors /// /// Returns an error if the homeserver is unreachable or the device could /// not be deleted. - async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), anyhow::Error>; + async fn delete_device(&self, localpart: &str, device_id: &str) -> Result<(), anyhow::Error>; /// Sync the list of devices of a user with the homeserver. /// /// # Parameters /// - /// * `mxid` - The Matrix ID of the user to sync the devices for. + /// * `localpart` - The localpart of the user to sync the devices for. /// * `devices` - The list of devices to sync. /// /// # Errors /// /// Returns an error if the homeserver is unreachable or the devices could /// not be synced. - async fn sync_devices(&self, mxid: &str, devices: HashSet) - -> Result<(), anyhow::Error>; + async fn sync_devices( + &self, + localpart: &str, + devices: HashSet, + ) -> Result<(), anyhow::Error>; /// Delete a user on the homeserver. /// /// # Parameters /// - /// * `mxid` - The Matrix ID of the user to delete. + /// * `localpart` - The localpart of the user to delete. /// * `erase` - Whether to ask the homeserver to erase the user's data. /// /// # Errors /// /// Returns an error if the homeserver is unreachable or the user could not /// be deleted. - async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), anyhow::Error>; + async fn delete_user(&self, localpart: &str, erase: bool) -> Result<(), anyhow::Error>; /// Reactivate a user on the homeserver. /// /// # Parameters /// - /// * `mxid` - The Matrix ID of the user to reactivate. + /// * `localpart` - The localpart of the user to reactivate. /// /// # Errors /// /// Returns an error if the homeserver is unreachable or the user could not /// be reactivated. - async fn reactivate_user(&self, mxid: &str) -> Result<(), anyhow::Error>; + async fn reactivate_user(&self, localpart: &str) -> Result<(), anyhow::Error>; /// Set the displayname of a user on the homeserver. /// /// # Parameters /// - /// * `mxid` - The Matrix ID of the user to set the displayname for. + /// * `localpart` - The localpart of the user to set the displayname for. /// * `displayname` - The displayname to set. /// /// # Errors /// /// Returns an error if the homeserver is unreachable or the displayname /// could not be set. - async fn set_displayname(&self, mxid: &str, displayname: &str) -> Result<(), anyhow::Error>; + async fn set_displayname( + &self, + localpart: &str, + displayname: &str, + ) -> Result<(), anyhow::Error>; /// Unset the displayname of a user on the homeserver. /// /// # Parameters /// - /// * `mxid` - The Matrix ID of the user to unset the displayname for. + /// * `localpart` - The localpart of the user to unset the displayname for. /// /// # Errors /// /// Returns an error if the homeserver is unreachable or the displayname /// could not be unset. - async fn unset_displayname(&self, mxid: &str) -> Result<(), anyhow::Error>; + async fn unset_displayname(&self, localpart: &str) -> Result<(), anyhow::Error>; /// Temporarily allow a user to reset their cross-signing keys. /// /// # Parameters /// - /// * `mxid` - The Matrix ID of the user to allow cross-signing key reset + /// * `localpart` - The localpart of the user to allow cross-signing key + /// reset /// /// # Errors /// /// Returns an error if the homeserver is unreachable or the cross-signing /// reset could not be allowed. - async fn allow_cross_signing_reset(&self, mxid: &str) -> Result<(), anyhow::Error>; + async fn allow_cross_signing_reset(&self, localpart: &str) -> Result<(), anyhow::Error>; } #[async_trait::async_trait] @@ -376,8 +384,8 @@ impl HomeserverConnection for &T (**self).homeserver() } - async fn query_user(&self, mxid: &str) -> Result { - (**self).query_user(mxid).await + async fn query_user(&self, localpart: &str) -> Result { + (**self).query_user(localpart).await } async fn provision_user(&self, request: &ProvisionRequest) -> Result { @@ -388,58 +396,62 @@ impl HomeserverConnection for &T (**self).is_localpart_available(localpart).await } - async fn create_device( + async fn upsert_device( &self, - mxid: &str, + localpart: &str, device_id: &str, initial_display_name: Option<&str>, ) -> Result<(), anyhow::Error> { (**self) - .create_device(mxid, device_id, initial_display_name) + .upsert_device(localpart, device_id, initial_display_name) .await } async fn update_device_display_name( &self, - mxid: &str, + localpart: &str, device_id: &str, display_name: &str, ) -> Result<(), anyhow::Error> { (**self) - .update_device_display_name(mxid, device_id, display_name) + .update_device_display_name(localpart, device_id, display_name) .await } - async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), anyhow::Error> { - (**self).delete_device(mxid, device_id).await + async fn delete_device(&self, localpart: &str, device_id: &str) -> Result<(), anyhow::Error> { + (**self).delete_device(localpart, device_id).await } async fn sync_devices( &self, - mxid: &str, + localpart: &str, devices: HashSet, ) -> Result<(), anyhow::Error> { - (**self).sync_devices(mxid, devices).await + (**self).sync_devices(localpart, devices).await } - async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), anyhow::Error> { - (**self).delete_user(mxid, erase).await + async fn delete_user(&self, localpart: &str, erase: bool) -> Result<(), anyhow::Error> { + (**self).delete_user(localpart, erase).await } - async fn reactivate_user(&self, mxid: &str) -> Result<(), anyhow::Error> { - (**self).reactivate_user(mxid).await + async fn reactivate_user(&self, localpart: &str) -> Result<(), anyhow::Error> { + (**self).reactivate_user(localpart).await } - async fn set_displayname(&self, mxid: &str, displayname: &str) -> Result<(), anyhow::Error> { - (**self).set_displayname(mxid, displayname).await + async fn set_displayname( + &self, + localpart: &str, + displayname: &str, + ) -> Result<(), anyhow::Error> { + (**self).set_displayname(localpart, displayname).await } - async fn unset_displayname(&self, mxid: &str) -> Result<(), anyhow::Error> { - (**self).unset_displayname(mxid).await + async fn unset_displayname(&self, localpart: &str) -> Result<(), anyhow::Error> { + (**self).unset_displayname(localpart).await } - async fn allow_cross_signing_reset(&self, mxid: &str) -> Result<(), anyhow::Error> { - (**self).allow_cross_signing_reset(mxid).await + async fn allow_cross_signing_reset(&self, localpart: &str) -> Result<(), anyhow::Error> { + (**self).allow_cross_signing_reset(localpart).await } } @@ -450,8 +462,8 @@ impl HomeserverConnection for Arc { (**self).homeserver() } - async fn query_user(&self, mxid: &str) -> Result { - (**self).query_user(mxid).await + async fn query_user(&self, localpart: &str) -> Result { + (**self).query_user(localpart).await } async fn provision_user(&self, request: &ProvisionRequest) -> Result { @@ -462,57 +474,61 @@ impl HomeserverConnection for Arc { (**self).is_localpart_available(localpart).await } - async fn create_device( + async fn upsert_device( &self, - mxid: &str, + localpart: &str, device_id: &str, initial_display_name: Option<&str>, ) -> Result<(), anyhow::Error> { (**self) - .create_device(mxid, device_id, initial_display_name) + .upsert_device(localpart, device_id, initial_display_name) .await } async fn update_device_display_name( &self, - mxid: &str, + localpart: &str, device_id: &str, display_name: &str, ) -> Result<(), anyhow::Error> { (**self) - .update_device_display_name(mxid, device_id, display_name) + .update_device_display_name(localpart, device_id, display_name) .await } - async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), anyhow::Error> { - (**self).delete_device(mxid, device_id).await + async fn delete_device(&self, localpart: &str, device_id: &str) -> Result<(), anyhow::Error> { + (**self).delete_device(localpart, device_id).await } async fn sync_devices( &self, - mxid: &str, + localpart: &str, devices: HashSet, ) -> Result<(), anyhow::Error> { - (**self).sync_devices(mxid, devices).await + (**self).sync_devices(localpart, devices).await } - async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), anyhow::Error> { - (**self).delete_user(mxid, erase).await + async fn delete_user(&self, localpart: &str, erase: bool) -> Result<(), anyhow::Error> { + (**self).delete_user(localpart, erase).await } - async fn reactivate_user(&self, mxid: &str) -> Result<(), anyhow::Error> { - (**self).reactivate_user(mxid).await + async fn reactivate_user(&self, localpart: &str) -> Result<(), anyhow::Error> { + (**self).reactivate_user(localpart).await } - async fn set_displayname(&self, mxid: &str, displayname: &str) -> Result<(), anyhow::Error> { - (**self).set_displayname(mxid, displayname).await + async fn set_displayname( + &self, + localpart: &str, + displayname: &str, + ) -> Result<(), anyhow::Error> { + (**self).set_displayname(localpart, displayname).await } - async fn unset_displayname(&self, mxid: &str) -> Result<(), anyhow::Error> { - (**self).unset_displayname(mxid).await + async fn unset_displayname(&self, localpart: &str) -> Result<(), anyhow::Error> { + (**self).unset_displayname(localpart).await } - async fn allow_cross_signing_reset(&self, mxid: &str) -> Result<(), anyhow::Error> { - (**self).allow_cross_signing_reset(mxid).await + async fn allow_cross_signing_reset(&self, localpart: &str) -> Result<(), anyhow::Error> { + (**self).allow_cross_signing_reset(localpart).await } } diff --git a/crates/matrix/src/mock.rs b/crates/matrix/src/mock.rs index 7969fee3c..0c315ff97 100644 --- a/crates/matrix/src/mock.rs +++ b/crates/matrix/src/mock.rs @@ -54,9 +54,10 @@ impl crate::HomeserverConnection for HomeserverConnection { &self.homeserver } - async fn query_user(&self, mxid: &str) -> Result { + async fn query_user(&self, localpart: &str) -> Result { + let mxid = self.mxid(localpart); let users = self.users.read().await; - let user = users.get(mxid).context("User not found")?; + let user = users.get(&mxid).context("User not found")?; Ok(MatrixUser { displayname: user.displayname.clone(), avatar_url: user.avatar_url.clone(), @@ -66,8 +67,9 @@ impl crate::HomeserverConnection for HomeserverConnection { async fn provision_user(&self, request: &ProvisionRequest) -> Result { let mut users = self.users.write().await; - let inserted = !users.contains_key(request.mxid()); - let user = users.entry(request.mxid().to_owned()).or_insert(MockUser { + let mxid = self.mxid(request.localpart()); + let inserted = !users.contains_key(&mxid); + let user = users.entry(mxid).or_insert(MockUser { sub: request.sub().to_owned(), avatar_url: None, displayname: None, @@ -107,51 +109,56 @@ impl crate::HomeserverConnection for HomeserverConnection { Ok(!users.contains_key(&mxid)) } - async fn create_device( + async fn upsert_device( &self, - mxid: &str, + localpart: &str, device_id: &str, _initial_display_name: Option<&str>, ) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); let mut users = self.users.write().await; - let user = users.get_mut(mxid).context("User not found")?; + let user = users.get_mut(&mxid).context("User not found")?; user.devices.insert(device_id.to_owned()); Ok(()) } async fn update_device_display_name( &self, - mxid: &str, + localpart: &str, device_id: &str, _display_name: &str, ) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); let mut users = self.users.write().await; - let user = users.get_mut(mxid).context("User not found")?; + let user = users.get_mut(&mxid).context("User not found")?; user.devices.get(device_id).context("Device not found")?; Ok(()) } - async fn delete_device(&self, mxid: &str, device_id: &str) -> Result<(), anyhow::Error> { + async fn delete_device(&self, localpart: &str, device_id: &str) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); let mut users = self.users.write().await; - let user = users.get_mut(mxid).context("User not found")?; + let user = users.get_mut(&mxid).context("User not found")?; user.devices.remove(device_id); Ok(()) } async fn sync_devices( &self, - mxid: &str, + localpart: &str, devices: HashSet, ) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); let mut users = self.users.write().await; - let user = users.get_mut(mxid).context("User not found")?; + let user = users.get_mut(&mxid).context("User not found")?; user.devices = devices; Ok(()) } - async fn delete_user(&self, mxid: &str, erase: bool) -> Result<(), anyhow::Error> { + async fn delete_user(&self, localpart: &str, erase: bool) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); let mut users = self.users.write().await; - let user = users.get_mut(mxid).context("User not found")?; + let user = users.get_mut(&mxid).context("User not found")?; user.devices.clear(); user.emails = None; user.deactivated = true; @@ -163,31 +170,39 @@ impl crate::HomeserverConnection for HomeserverConnection { Ok(()) } - async fn reactivate_user(&self, mxid: &str) -> Result<(), anyhow::Error> { + async fn reactivate_user(&self, localpart: &str) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); let mut users = self.users.write().await; - let user = users.get_mut(mxid).context("User not found")?; + let user = users.get_mut(&mxid).context("User not found")?; user.deactivated = false; Ok(()) } - async fn set_displayname(&self, mxid: &str, displayname: &str) -> Result<(), anyhow::Error> { + async fn set_displayname( + &self, + localpart: &str, + displayname: &str, + ) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); let mut users = self.users.write().await; - let user = users.get_mut(mxid).context("User not found")?; + let user = users.get_mut(&mxid).context("User not found")?; user.displayname = Some(displayname.to_owned()); Ok(()) } - async fn unset_displayname(&self, mxid: &str) -> Result<(), anyhow::Error> { + async fn unset_displayname(&self, localpart: &str) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); let mut users = self.users.write().await; - let user = users.get_mut(mxid).context("User not found")?; + let user = users.get_mut(&mxid).context("User not found")?; user.displayname = None; Ok(()) } - async fn allow_cross_signing_reset(&self, mxid: &str) -> Result<(), anyhow::Error> { + async fn allow_cross_signing_reset(&self, localpart: &str) -> Result<(), anyhow::Error> { + let mxid = self.mxid(localpart); let mut users = self.users.write().await; - let user = users.get_mut(mxid).context("User not found")?; + let user = users.get_mut(&mxid).context("User not found")?; user.cross_signing_reset_allowed = true; Ok(()) } @@ -207,11 +222,11 @@ mod tests { assert_eq!(conn.homeserver(), "example.org"); assert_eq!(conn.mxid("test"), mxid); - assert!(conn.query_user(mxid).await.is_err()); - assert!(conn.create_device(mxid, device, None).await.is_err()); - assert!(conn.delete_device(mxid, device).await.is_err()); + assert!(conn.query_user("test").await.is_err()); + assert!(conn.upsert_device("test", device, None).await.is_err()); + assert!(conn.delete_device("test", device).await.is_err()); - let request = ProvisionRequest::new("@test:example.org", "test") + let request = ProvisionRequest::new("test", "test") .set_displayname("Test User".into()) .set_avatar_url("mxc://example.org/1234567890".into()) .set_emails(vec!["test@example.org".to_owned()]); @@ -219,33 +234,33 @@ mod tests { let inserted = conn.provision_user(&request).await.unwrap(); assert!(inserted); - let user = conn.query_user(mxid).await.unwrap(); + let user = conn.query_user("test").await.unwrap(); assert_eq!(user.displayname, Some("Test User".into())); assert_eq!(user.avatar_url, Some("mxc://example.org/1234567890".into())); // Set the displayname again - assert!(conn.set_displayname(mxid, "John").await.is_ok()); + assert!(conn.set_displayname("test", "John").await.is_ok()); - let user = conn.query_user(mxid).await.unwrap(); + let user = conn.query_user("test").await.unwrap(); assert_eq!(user.displayname, Some("John".into())); // Unset the displayname - assert!(conn.unset_displayname(mxid).await.is_ok()); + assert!(conn.unset_displayname("test").await.is_ok()); - let user = conn.query_user(mxid).await.unwrap(); + let user = conn.query_user("test").await.unwrap(); assert_eq!(user.displayname, None); // Deleting a non-existent device should not fail - assert!(conn.delete_device(mxid, device).await.is_ok()); + assert!(conn.delete_device("test", device).await.is_ok()); // Create the device - assert!(conn.create_device(mxid, device, None).await.is_ok()); + assert!(conn.upsert_device("test", device, None).await.is_ok()); // Create the same device again - assert!(conn.create_device(mxid, device, None).await.is_ok()); + assert!(conn.upsert_device("test", device, None).await.is_ok()); // XXX: there is no API to query devices yet in the trait // Delete the device - assert!(conn.delete_device(mxid, device).await.is_ok()); + assert!(conn.delete_device("test", device).await.is_ok()); // The user we just created should be not available assert!(!conn.is_localpart_available("test").await.unwrap()); diff --git a/crates/matrix/src/readonly.rs b/crates/matrix/src/readonly.rs index 10a0642c2..2efa935f9 100644 --- a/crates/matrix/src/readonly.rs +++ b/crates/matrix/src/readonly.rs @@ -28,8 +28,8 @@ impl HomeserverConnection for ReadOnlyHomeserverConnect self.inner.homeserver() } - async fn query_user(&self, mxid: &str) -> Result { - self.inner.query_user(mxid).await + async fn query_user(&self, localpart: &str) -> Result { + self.inner.query_user(localpart).await } async fn provision_user(&self, _request: &ProvisionRequest) -> Result { @@ -40,9 +40,9 @@ impl HomeserverConnection for ReadOnlyHomeserverConnect self.inner.is_localpart_available(localpart).await } - async fn create_device( + async fn upsert_device( &self, - _mxid: &str, + _localpart: &str, _device_id: &str, _initial_display_name: Option<&str>, ) -> Result<(), anyhow::Error> { @@ -51,42 +51,46 @@ impl HomeserverConnection for ReadOnlyHomeserverConnect async fn update_device_display_name( &self, - _mxid: &str, + _localpart: &str, _device_id: &str, _display_name: &str, ) -> Result<(), anyhow::Error> { anyhow::bail!("Device display name update is not supported in read-only mode"); } - async fn delete_device(&self, _mxid: &str, _device_id: &str) -> Result<(), anyhow::Error> { + async fn delete_device(&self, _localpart: &str, _device_id: &str) -> Result<(), anyhow::Error> { anyhow::bail!("Device deletion is not supported in read-only mode"); } async fn sync_devices( &self, - _mxid: &str, + _localpart: &str, _devices: HashSet, ) -> Result<(), anyhow::Error> { anyhow::bail!("Device synchronization is not supported in read-only mode"); } - async fn delete_user(&self, _mxid: &str, _erase: bool) -> Result<(), anyhow::Error> { + async fn delete_user(&self, _localpart: &str, _erase: bool) -> Result<(), anyhow::Error> { anyhow::bail!("User deletion is not supported in read-only mode"); } - async fn reactivate_user(&self, _mxid: &str) -> Result<(), anyhow::Error> { + async fn reactivate_user(&self, _localpart: &str) -> Result<(), anyhow::Error> { anyhow::bail!("User reactivation is not supported in read-only mode"); } - async fn set_displayname(&self, _mxid: &str, _displayname: &str) -> Result<(), anyhow::Error> { + async fn set_displayname( + &self, + _localpart: &str, + _displayname: &str, + ) -> Result<(), anyhow::Error> { anyhow::bail!("User displayname update is not supported in read-only mode"); } - async fn unset_displayname(&self, _mxid: &str) -> Result<(), anyhow::Error> { + async fn unset_displayname(&self, _localpart: &str) -> Result<(), anyhow::Error> { anyhow::bail!("User displayname update is not supported in read-only mode"); } - async fn allow_cross_signing_reset(&self, _mxid: &str) -> Result<(), anyhow::Error> { + async fn allow_cross_signing_reset(&self, _localpart: &str) -> Result<(), anyhow::Error> { anyhow::bail!("Allowing cross-signing reset is not supported in read-only mode"); } } diff --git a/crates/tasks/src/matrix.rs b/crates/tasks/src/matrix.rs index 92e15f448..4855a4bf1 100644 --- a/crates/tasks/src/matrix.rs +++ b/crates/tasks/src/matrix.rs @@ -51,7 +51,6 @@ impl RunnableJob for ProvisionUserJob { .context("User not found") .map_err(JobError::fail)?; - let mxid = matrix.mxid(&user.username); let emails = repo .user_email() .all(&user) @@ -60,7 +59,8 @@ impl RunnableJob for ProvisionUserJob { .into_iter() .map(|email| email.email) .collect(); - let mut request = ProvisionRequest::new(mxid.clone(), user.sub.clone()).set_emails(emails); + let mut request = + ProvisionRequest::new(user.username.clone(), user.sub.clone()).set_emails(emails); if let Some(display_name) = self.display_name_to_set() { request = request.set_displayname(display_name.to_owned()); @@ -71,6 +71,7 @@ impl RunnableJob for ProvisionUserJob { .await .map_err(JobError::retry)?; + let mxid = matrix.mxid(&user.username); if created { info!(%user.id, %mxid, "User created"); } else { @@ -241,9 +242,8 @@ impl RunnableJob for SyncDevicesJob { } } - let mxid = matrix.mxid(&user.username); matrix - .sync_devices(&mxid, devices) + .sync_devices(&user.username, devices) .await .map_err(JobError::retry)?; diff --git a/docs/config.schema.json b/docs/config.schema.json index abb811fca..0163e741e 100644 --- a/docs/config.schema.json +++ b/docs/config.schema.json @@ -1707,18 +1707,32 @@ "description": "The kind of homeserver it is.", "oneOf": [ { - "description": "Homeserver is Synapse", + "description": "Homeserver is Synapse, using the legacy API\n\nThis will switch to using the modern API in a few releases.", "type": "string", "enum": [ "synapse" ] }, { - "description": "Homeserver is Synapse, in read-only mode\n\nThis is meant for testing rolling out Matrix Authentication Service with no risk of writing data to the homeserver.", + "description": "Homeserver is Synapse, using the legacy API, in read-only mode\n\nThis is meant for testing rolling out Matrix Authentication Service with no risk of writing data to the homeserver.\n\nThis will switch to using the modern API in a few releases.", "type": "string", "enum": [ "synapse_read_only" ] + }, + { + "description": "Homeserver is Synapse, using the legacy API,", + "type": "string", + "enum": [ + "synapse_legacy" + ] + }, + { + "description": "Homeserver is Synapse, with the modern API available", + "type": "string", + "enum": [ + "synapse_modern" + ] } ] },