Skip to content

Commit 57af270

Browse files
authored
Allow running jobs from the job queue in tests (#4775)
2 parents 1c36430 + e955ecc commit 57af270

File tree

15 files changed

+271
-85
lines changed

15 files changed

+271
-85
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cli/src/commands/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,9 @@ impl Options {
173173
test_mailer_in_background(&mailer, Duration::from_secs(30));
174174

175175
info!("Starting task worker");
176-
mas_tasks::init(
176+
mas_tasks::init_and_run(
177177
PgRepositoryFactory::new(pool.clone()),
178+
SystemClock::default(),
178179
&mailer,
179180
homeserver_connection.clone(),
180181
url_builder.clone(),

crates/cli/src/commands/worker.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use clap::Parser;
1010
use figment::Figment;
1111
use mas_config::{AppConfig, ConfigurationSection};
1212
use mas_router::UrlBuilder;
13+
use mas_storage::SystemClock;
1314
use mas_storage_pg::PgRepositoryFactory;
1415
use tracing::{info, info_span};
1516

@@ -63,8 +64,9 @@ impl Options {
6364
drop(config);
6465

6566
info!("Starting task scheduler");
66-
mas_tasks::init(
67+
mas_tasks::init_and_run(
6768
PgRepositoryFactory::new(pool.clone()),
69+
SystemClock::default(),
6870
&mailer,
6971
conn,
7072
url_builder,

crates/handlers/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ mas-axum-utils.workspace = true
7272
mas-config.workspace = true
7373
mas-context.workspace = true
7474
mas-data-model.workspace = true
75+
mas-email.workspace = true
7576
mas-http.workspace = true
7677
mas-i18n.workspace = true
7778
mas-iana.workspace = true
@@ -83,6 +84,7 @@ mas-policy.workspace = true
8384
mas-router.workspace = true
8485
mas-storage.workspace = true
8586
mas-storage-pg.workspace = true
87+
mas-tasks.workspace = true
8688
mas-templates.workspace = true
8789
oauth2-types.workspace = true
8890
zxcvbn.workspace = true

crates/handlers/src/admin/v1/users/deactivate.rs

Lines changed: 64 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,9 @@ pub async fn handler(
105105
mod tests {
106106
use chrono::Duration;
107107
use hyper::{Request, StatusCode};
108+
use insta::assert_json_snapshot;
108109
use mas_storage::{Clock, RepositoryAccess, user::UserRepository};
109-
use sqlx::{PgPool, types::Json};
110+
use sqlx::PgPool;
110111

111112
use crate::test_utils::{RequestBuilderExt, ResponseExt, TestState, setup};
112113

@@ -137,15 +138,37 @@ mod tests {
137138
serde_json::json!(state.clock.now())
138139
);
139140

140-
// It should have scheduled a deactivation job for the user
141-
// XXX: we don't have a good way to look for the deactivation job
142-
let job: Json<serde_json::Value> = sqlx::query_scalar(
143-
"SELECT payload FROM queue_jobs WHERE queue_name = 'deactivate-user'",
144-
)
145-
.fetch_one(&pool)
146-
.await
147-
.expect("Deactivation job to be scheduled");
148-
assert_eq!(job["user_id"], serde_json::json!(user.id));
141+
// Make sure to run the jobs in the queue
142+
state.run_jobs_in_queue().await;
143+
144+
let request = Request::get(format!("/api/admin/v1/users/{}", user.id))
145+
.bearer(&token)
146+
.empty();
147+
let response = state.request(request).await;
148+
response.assert_status(StatusCode::OK);
149+
let body: serde_json::Value = response.json();
150+
151+
assert_json_snapshot!(body, @r#"
152+
{
153+
"data": {
154+
"type": "user",
155+
"id": "01FSHN9AG0MZAA6S4AF7CTV32E",
156+
"attributes": {
157+
"username": "alice",
158+
"created_at": "2022-01-16T14:40:00Z",
159+
"locked_at": "2022-01-16T14:40:00Z",
160+
"deactivated_at": "2022-01-16T14:40:00Z",
161+
"admin": false
162+
},
163+
"links": {
164+
"self": "/api/admin/v1/users/01FSHN9AG0MZAA6S4AF7CTV32E"
165+
}
166+
},
167+
"links": {
168+
"self": "/api/admin/v1/users/01FSHN9AG0MZAA6S4AF7CTV32E"
169+
}
170+
}
171+
"#);
149172
}
150173

151174
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]
@@ -179,15 +202,37 @@ mod tests {
179202
serde_json::json!(state.clock.now())
180203
);
181204

182-
// It should have scheduled a deactivation job for the user
183-
// XXX: we don't have a good way to look for the deactivation job
184-
let job: Json<serde_json::Value> = sqlx::query_scalar(
185-
"SELECT payload FROM queue_jobs WHERE queue_name = 'deactivate-user'",
186-
)
187-
.fetch_one(&pool)
188-
.await
189-
.expect("Deactivation job to be scheduled");
190-
assert_eq!(job["user_id"], serde_json::json!(user.id));
205+
// Make sure to run the jobs in the queue
206+
state.run_jobs_in_queue().await;
207+
208+
let request = Request::get(format!("/api/admin/v1/users/{}", user.id))
209+
.bearer(&token)
210+
.empty();
211+
let response = state.request(request).await;
212+
response.assert_status(StatusCode::OK);
213+
let body: serde_json::Value = response.json();
214+
215+
assert_json_snapshot!(body, @r#"
216+
{
217+
"data": {
218+
"type": "user",
219+
"id": "01FSHN9AG0MZAA6S4AF7CTV32E",
220+
"attributes": {
221+
"username": "alice",
222+
"created_at": "2022-01-16T14:40:00Z",
223+
"locked_at": "2022-01-16T14:40:00Z",
224+
"deactivated_at": "2022-01-16T14:41:00Z",
225+
"admin": false
226+
},
227+
"links": {
228+
"self": "/api/admin/v1/users/01FSHN9AG0MZAA6S4AF7CTV32E"
229+
}
230+
},
231+
"links": {
232+
"self": "/api/admin/v1/users/01FSHN9AG0MZAA6S4AF7CTV32E"
233+
}
234+
}
235+
"#);
191236
}
192237

193238
#[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")]

crates/handlers/src/test_utils.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use mas_axum_utils::{
2929
};
3030
use mas_config::RateLimitingConfig;
3131
use mas_data_model::SiteConfig;
32+
use mas_email::{MailTransport, Mailer};
3233
use mas_i18n::Translator;
3334
use mas_keystore::{Encrypter, JsonWebKey, JsonWebKeySet, Keystore, PrivateKey};
3435
use mas_matrix::{HomeserverConnection, MockHomeserverConnection};
@@ -39,6 +40,7 @@ use mas_storage::{
3940
clock::MockClock,
4041
};
4142
use mas_storage_pg::PgRepositoryFactory;
43+
use mas_tasks::QueueWorker;
4244
use mas_templates::{SiteConfigExt, Templates};
4345
use oauth2_types::{registration::ClientRegistrationResponse, requests::AccessTokenResponse};
4446
use rand::SeedableRng;
@@ -113,6 +115,7 @@ pub(crate) struct TestState {
113115
pub rng: Arc<Mutex<ChaChaRng>>,
114116
pub http_client: reqwest::Client,
115117
pub task_tracker: TaskTracker,
118+
queue_worker: Arc<tokio::sync::Mutex<QueueWorker>>,
116119

117120
#[allow(dead_code)] // It is used, as it will cancel the CancellationToken when dropped
118121
cancellation_drop_guard: Arc<DropGuard>,
@@ -235,6 +238,27 @@ impl TestState {
235238
shutdown_token.child_token(),
236239
);
237240

241+
let mailer = Mailer::new(
242+
templates.clone(),
243+
MailTransport::blackhole(),
244+
"hello@example.com".parse().unwrap(),
245+
"hello@example.com".parse().unwrap(),
246+
);
247+
248+
let queue_worker = mas_tasks::init(
249+
PgRepositoryFactory::new(pool.clone()),
250+
Arc::clone(&clock),
251+
&mailer,
252+
homeserver_connection.clone(),
253+
url_builder.clone(),
254+
&site_config,
255+
shutdown_token.child_token(),
256+
)
257+
.await
258+
.unwrap();
259+
260+
let queue_worker = Arc::new(tokio::sync::Mutex::new(queue_worker));
261+
238262
Ok(Self {
239263
repository_factory: PgRepositoryFactory::new(pool),
240264
templates,
@@ -254,10 +278,19 @@ impl TestState {
254278
rng,
255279
http_client,
256280
task_tracker,
281+
queue_worker,
257282
cancellation_drop_guard: Arc::new(shutdown_token.drop_guard()),
258283
})
259284
}
260285

286+
/// Run all the available jobs in the queue.
287+
///
288+
/// Panics if it fails to run the jobs (but not on job failures!)
289+
pub async fn run_jobs_in_queue(&self) {
290+
let mut queue = self.queue_worker.lock().await;
291+
queue.process_all_jobs_in_tests().await.unwrap();
292+
}
293+
261294
/// Reset the test utils to a fresh state, with the same configuration.
262295
pub async fn reset(self) -> Self {
263296
let site_config = self.site_config.clone();

crates/storage/src/clock.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::sync::{Arc, atomic::AtomicI64};
1515
use chrono::{DateTime, TimeZone, Utc};
1616

1717
/// Represents a clock which can give the current date and time
18-
pub trait Clock: Sync {
18+
pub trait Clock: Send + Sync {
1919
/// Get the current date and time
2020
fn now(&self) -> DateTime<Utc>;
2121
}

crates/tasks/src/database.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ impl RunnableJob for CleanupExpiredTokensJob {
2424

2525
let count = repo
2626
.oauth2_access_token()
27-
.cleanup_revoked(&clock)
27+
.cleanup_revoked(clock)
2828
.await
2929
.map_err(JobError::retry)?;
3030
repo.save().await.map_err(JobError::retry)?;

crates/tasks/src/email.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ impl RunnableJob for SendEmailAuthenticationCodeJob {
100100
.user_email()
101101
.add_authentication_code(
102102
&mut rng,
103-
&clock,
103+
clock,
104104
Duration::minutes(5), // TODO: make this configurable
105105
&user_email_authentication,
106106
code,

crates/tasks/src/lib.rs

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@ use mas_data_model::SiteConfig;
1010
use mas_email::Mailer;
1111
use mas_matrix::HomeserverConnection;
1212
use mas_router::UrlBuilder;
13-
use mas_storage::{BoxClock, BoxRepository, RepositoryError, RepositoryFactory, SystemClock};
13+
use mas_storage::{BoxRepository, Clock, RepositoryError, RepositoryFactory};
1414
use mas_storage_pg::PgRepositoryFactory;
1515
use new_queue::QueueRunnerError;
1616
use opentelemetry::metrics::Meter;
1717
use rand::SeedableRng;
1818
use sqlx::{Pool, Postgres};
1919
use tokio_util::{sync::CancellationToken, task::TaskTracker};
2020

21+
pub use crate::new_queue::QueueWorker;
22+
2123
mod database;
2224
mod email;
2325
mod matrix;
@@ -39,7 +41,7 @@ static METER: LazyLock<Meter> = LazyLock::new(|| {
3941
struct State {
4042
repository_factory: PgRepositoryFactory,
4143
mailer: Mailer,
42-
clock: SystemClock,
44+
clock: Arc<dyn Clock>,
4345
homeserver: Arc<dyn HomeserverConnection>,
4446
url_builder: UrlBuilder,
4547
site_config: SiteConfig,
@@ -48,7 +50,7 @@ struct State {
4850
impl State {
4951
pub fn new(
5052
repository_factory: PgRepositoryFactory,
51-
clock: SystemClock,
53+
clock: impl Clock + 'static,
5254
mailer: Mailer,
5355
homeserver: impl HomeserverConnection + 'static,
5456
url_builder: UrlBuilder,
@@ -57,7 +59,7 @@ impl State {
5759
Self {
5860
repository_factory,
5961
mailer,
60-
clock,
62+
clock: Arc::new(clock),
6163
homeserver: Arc::new(homeserver),
6264
url_builder,
6365
site_config,
@@ -68,8 +70,8 @@ impl State {
6870
self.repository_factory.pool()
6971
}
7072

71-
pub fn clock(&self) -> BoxClock {
72-
Box::new(self.clock.clone())
73+
pub fn clock(&self) -> &dyn Clock {
74+
&self.clock
7375
}
7476

7577
pub fn mailer(&self) -> &Mailer {
@@ -99,29 +101,31 @@ impl State {
99101
}
100102
}
101103

102-
/// Initialise the workers.
104+
/// Initialise the worker, without running it.
105+
///
106+
/// This is mostly useful for tests.
103107
///
104108
/// # Errors
105109
///
106110
/// This function can fail if the database connection fails.
107111
pub async fn init(
108112
repository_factory: PgRepositoryFactory,
113+
clock: impl Clock + 'static,
109114
mailer: &Mailer,
110115
homeserver: impl HomeserverConnection + 'static,
111116
url_builder: UrlBuilder,
112117
site_config: &SiteConfig,
113118
cancellation_token: CancellationToken,
114-
task_tracker: &TaskTracker,
115-
) -> Result<(), QueueRunnerError> {
119+
) -> Result<QueueWorker, QueueRunnerError> {
116120
let state = State::new(
117121
repository_factory,
118-
SystemClock::default(),
122+
clock,
119123
mailer.clone(),
120124
homeserver,
121125
url_builder,
122126
site_config.clone(),
123127
);
124-
let mut worker = self::new_queue::QueueWorker::new(state, cancellation_token).await?;
128+
let mut worker = QueueWorker::new(state, cancellation_token).await?;
125129

126130
worker
127131
.register_handler::<mas_storage::queue::CleanupExpiredTokensJob>()
@@ -157,6 +161,36 @@ pub async fn init(
157161
mas_storage::queue::PruneStalePolicyDataJob,
158162
);
159163

164+
Ok(worker)
165+
}
166+
167+
/// Initialise the worker and run it.
168+
///
169+
/// # Errors
170+
///
171+
/// This function can fail if the database connection fails.
172+
#[expect(clippy::too_many_arguments, reason = "this is fine")]
173+
pub async fn init_and_run(
174+
repository_factory: PgRepositoryFactory,
175+
clock: impl Clock + 'static,
176+
mailer: &Mailer,
177+
homeserver: impl HomeserverConnection + 'static,
178+
url_builder: UrlBuilder,
179+
site_config: &SiteConfig,
180+
cancellation_token: CancellationToken,
181+
task_tracker: &TaskTracker,
182+
) -> Result<(), QueueRunnerError> {
183+
let worker = init(
184+
repository_factory,
185+
clock,
186+
mailer,
187+
homeserver,
188+
url_builder,
189+
site_config,
190+
cancellation_token,
191+
)
192+
.await?;
193+
160194
task_tracker.spawn(worker.run());
161195

162196
Ok(())

0 commit comments

Comments
 (0)