Skip to content

Commit 388bfc2

Browse files
committed
Test helper to run all tests in the job queue
1 parent 45f15e1 commit 388bfc2

File tree

2 files changed

+75
-0
lines changed

2 files changed

+75
-0
lines changed

crates/handlers/src/test_utils.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,14 @@ impl TestState {
283283
})
284284
}
285285

286+
/// Run all the available jobs in the queue.
287+
///
288+
/// Panics if it fails to run the jobs (but not on job failures!)
289+
pub async fn run_jobs_in_queue(&self) {
290+
let mut queue = self.queue_worker.lock().unwrap();
291+
queue.process_all_jobs_in_tests().await.unwrap();
292+
}
293+
286294
/// Reset the test utils to a fresh state, with the same configuration.
287295
pub async fn reset(self) -> Self {
288296
let site_config = self.site_config.clone();

crates/tasks/src/new_queue.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,73 @@ impl QueueWorker {
713713

714714
Ok(())
715715
}
716+
717+
/// Process all the pending jobs in the queue.
718+
/// This should only be called in tests!
719+
///
720+
/// # Errors
721+
///
722+
/// This function can fail if the database connection fails.
723+
pub async fn process_all_jobs_in_tests(&mut self) -> Result<(), QueueRunnerError> {
724+
// I swear, I'm the leader!
725+
self.am_i_leader = true;
726+
727+
// First, perform the leader duties. This will make sure that we schedule
728+
// recurring jobs.
729+
self.perform_leader_duties().await?;
730+
731+
let clock = self.state.clock();
732+
let mut rng = self.state.rng();
733+
734+
// Grab the connection from the PgListener
735+
let txn = self
736+
.listener
737+
.begin()
738+
.await
739+
.map_err(QueueRunnerError::StartTransaction)?;
740+
let mut repo = PgRepository::from_conn(txn);
741+
742+
// Spawn all the jobs in the database
743+
let queues = self.tracker.queues();
744+
let jobs = repo
745+
.queue_job()
746+
// I really hope that we don't spawn more than 10k jobs in tests
747+
.reserve(clock, &self.registration, &queues, 10_000)
748+
.await?;
749+
750+
for Job {
751+
id,
752+
queue_name,
753+
payload,
754+
metadata,
755+
attempt,
756+
} in jobs
757+
{
758+
let cancellation_token = self.cancellation_token.child_token();
759+
let start = Instant::now();
760+
let context = JobContext {
761+
id,
762+
metadata,
763+
queue_name,
764+
attempt,
765+
start,
766+
cancellation_token,
767+
};
768+
769+
self.tracker.spawn_job(self.state.clone(), context, payload);
770+
}
771+
772+
self.tracker
773+
.process_jobs(&mut rng, clock, &mut repo, true)
774+
.await?;
775+
776+
repo.into_inner()
777+
.commit()
778+
.await
779+
.map_err(QueueRunnerError::CommitTransaction)?;
780+
781+
Ok(())
782+
}
716783
}
717784

718785
/// Tracks running jobs

0 commit comments

Comments
 (0)