Skip to content

Commit cb398ea

Browse files
committed
Fix refresh tests
It turns out that they were broken because of a completely different reason than I thought. `TestSyncMarker` wasn't thread local, therefore it was being accessed by multiple tests in parallel.
1 parent 69984e4 commit cb398ea

File tree

4 files changed

+163
-140
lines changed

4 files changed

+163
-140
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ base64 = "0.22.1"
6161
tracing-test = "0.2.4"
6262
regex = "1.10.4"
6363
parking_lot = "0.12.3"
64-
once_cell = "1.19.0"
64+
thread_local = "1.1.8"
6565

6666
[profile.release]
6767
debug = 1

src/bors/handlers/refresh.rs

Lines changed: 128 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -105,132 +105,131 @@ fn elapsed_time(date: DateTime<Utc>) -> Duration {
105105
(time - date).to_std().unwrap_or(Duration::ZERO)
106106
}
107107

108-
// TODO: we need a way to get MOCK_TIME working with a multi-threaded tokio runtime
109-
// #[cfg(test)]
110-
// mod tests {
111-
// use crate::bors::handlers::refresh::MOCK_TIME;
112-
// use crate::bors::handlers::WAIT_FOR_WORKFLOW_STARTED;
113-
// use crate::database::DbClient;
114-
// use crate::tests::mocks::{default_repo_name, run_test, BorsBuilder, WorkflowEvent, World};
115-
// use chrono::Utc;
116-
// use std::future::Future;
117-
// use std::time::Duration;
118-
// use tokio::runtime::RuntimeFlavor;
119-
//
120-
// #[sqlx::test]
121-
// async fn refresh_no_builds(pool: sqlx::PgPool) {
122-
// run_test(pool, |tester| async move {
123-
// tester.refresh().await;
124-
// Ok(tester)
125-
// })
126-
// .await;
127-
// }
128-
//
129-
// #[sqlx::test]
130-
// async fn refresh_do_nothing_before_timeout(pool: sqlx::PgPool) {
131-
// let world = World::default();
132-
// world.default_repo().lock().set_config(
133-
// r#"
134-
// timeout = 3600
135-
// "#,
136-
// );
137-
// BorsBuilder::new(pool)
138-
// .world(world)
139-
// .run_test(|mut tester| async move {
140-
// tester.post_comment("@bors try").await?;
141-
// tester.expect_comments(1).await;
142-
// with_mocked_time(Duration::from_secs(10), async {
143-
// tester.refresh().await;
144-
// })
145-
// .await;
146-
// Ok(tester)
147-
// })
148-
// .await;
149-
// }
150-
//
151-
// #[sqlx::test]
152-
// async fn refresh_cancel_build_after_timeout(pool: sqlx::PgPool) {
153-
// let world = World::default();
154-
// world.default_repo().lock().set_config(
155-
// r#"
156-
// timeout = 3600
157-
// "#,
158-
// );
159-
// BorsBuilder::new(pool)
160-
// .world(world)
161-
// .run_test(|mut tester| async move {
162-
// tester.post_comment("@bors try").await?;
163-
// tester.expect_comments(1).await;
164-
// with_mocked_time(Duration::from_secs(4000), async {
165-
// assert_eq!(
166-
// tester
167-
// .db()
168-
// .get_running_builds(&default_repo_name())
169-
// .await
170-
// .unwrap()
171-
// .len(),
172-
// 1
173-
// );
174-
// tester.refresh().await;
175-
// })
176-
// .await;
177-
// insta::assert_snapshot!(tester.get_comment().await?, @":boom: Test timed out");
178-
// assert_eq!(
179-
// tester
180-
// .db()
181-
// .get_running_builds(&default_repo_name())
182-
// .await
183-
// .unwrap()
184-
// .len(),
185-
// 0
186-
// );
187-
// Ok(tester)
188-
// })
189-
// .await;
190-
// }
191-
//
192-
// #[sqlx::test]
193-
// async fn refresh_cancel_workflow_after_timeout(pool: sqlx::PgPool) {
194-
// let world = World::default();
195-
// world.default_repo().lock().set_config(
196-
// r#"
197-
// timeout = 3600
198-
// "#,
199-
// );
200-
// let world = BorsBuilder::new(pool)
201-
// .world(world)
202-
// .run_test(|mut tester| async move {
203-
// tester.post_comment("@bors try").await?;
204-
// tester.expect_comments(1).await;
205-
// tester
206-
// .workflow_event(WorkflowEvent::started(tester.try_branch()))
207-
// .await?;
208-
// WAIT_FOR_WORKFLOW_STARTED.sync().await;
209-
//
210-
// with_mocked_time(Duration::from_secs(4000), async {
211-
// tester.refresh().await;
212-
// })
213-
// .await;
214-
// tester.expect_comments(1).await;
215-
// Ok(tester)
216-
// })
217-
// .await;
218-
// world.check_cancelled_workflows(default_repo_name(), &[1]);
219-
// }
220-
//
221-
// async fn with_mocked_time<Fut: Future<Output = ()>>(in_future: Duration, future: Fut) {
222-
// // It is important to use this function only with a single threaded runtime,
223-
// // otherwise the `MOCK_TIME` variable might get mixed up between different threads.
224-
// assert_eq!(
225-
// tokio::runtime::Handle::current().runtime_flavor(),
226-
// RuntimeFlavor::CurrentThread
227-
// );
228-
// MOCK_TIME.with(|time| {
229-
// *time.borrow_mut() = Some(Utc::now() + chrono::Duration::from_std(in_future).unwrap());
230-
// });
231-
// future.await;
232-
// MOCK_TIME.with(|time| {
233-
// *time.borrow_mut() = None;
234-
// });
235-
// }
236-
// }
108+
#[cfg(test)]
109+
mod tests {
110+
use crate::bors::handlers::refresh::MOCK_TIME;
111+
use crate::bors::handlers::WAIT_FOR_WORKFLOW_STARTED;
112+
use crate::bors::WAIT_FOR_REFRESH;
113+
use crate::tests::mocks::{default_repo_name, run_test, BorsBuilder, WorkflowEvent, World};
114+
use chrono::Utc;
115+
use std::future::Future;
116+
use std::time::Duration;
117+
use tokio::runtime::RuntimeFlavor;
118+
119+
#[sqlx::test]
120+
async fn refresh_no_builds(pool: sqlx::PgPool) {
121+
run_test(pool, |tester| async move {
122+
tester.refresh().await;
123+
Ok(tester)
124+
})
125+
.await;
126+
}
127+
128+
#[sqlx::test]
129+
async fn refresh_do_nothing_before_timeout(pool: sqlx::PgPool) {
130+
let world = World::default();
131+
world.default_repo().lock().set_config(
132+
r#"
133+
timeout = 3600
134+
"#,
135+
);
136+
BorsBuilder::new(pool)
137+
.world(world)
138+
.run_test(|mut tester| async move {
139+
tester.post_comment("@bors try").await?;
140+
tester.expect_comments(1).await;
141+
with_mocked_time(Duration::from_secs(10), async {
142+
tester.refresh().await;
143+
})
144+
.await;
145+
Ok(tester)
146+
})
147+
.await;
148+
}
149+
150+
#[sqlx::test]
151+
async fn refresh_cancel_build_after_timeout(pool: sqlx::PgPool) {
152+
let world = World::default();
153+
world.default_repo().lock().set_config(
154+
r#"
155+
timeout = 3600
156+
"#,
157+
);
158+
BorsBuilder::new(pool)
159+
.world(world)
160+
.run_test(|mut tester| async move {
161+
tester.post_comment("@bors try").await?;
162+
tester.expect_comments(1).await;
163+
with_mocked_time(Duration::from_secs(4000), async {
164+
assert_eq!(
165+
tester
166+
.db()
167+
.get_running_builds(&default_repo_name())
168+
.await
169+
.unwrap()
170+
.len(),
171+
1
172+
);
173+
tester.refresh().await;
174+
})
175+
.await;
176+
insta::assert_snapshot!(tester.get_comment().await?, @":boom: Test timed out");
177+
assert_eq!(
178+
tester
179+
.db()
180+
.get_running_builds(&default_repo_name())
181+
.await
182+
.unwrap()
183+
.len(),
184+
0
185+
);
186+
Ok(tester)
187+
})
188+
.await;
189+
}
190+
191+
#[sqlx::test]
192+
async fn refresh_cancel_workflow_after_timeout(pool: sqlx::PgPool) {
193+
let world = World::default();
194+
world.default_repo().lock().set_config(
195+
r#"
196+
timeout = 3600
197+
"#,
198+
);
199+
let world = BorsBuilder::new(pool)
200+
.world(world)
201+
.run_test(|mut tester| async move {
202+
tester.post_comment("@bors try").await?;
203+
tester.expect_comments(1).await;
204+
tester
205+
.workflow_event(WorkflowEvent::started(tester.try_branch()))
206+
.await?;
207+
WAIT_FOR_WORKFLOW_STARTED.sync().await;
208+
209+
with_mocked_time(Duration::from_secs(4000), async {
210+
tester.refresh().await;
211+
})
212+
.await;
213+
tester.expect_comments(1).await;
214+
Ok(tester)
215+
})
216+
.await;
217+
world.check_cancelled_workflows(default_repo_name(), &[1]);
218+
}
219+
220+
async fn with_mocked_time<Fut: Future<Output = ()>>(in_future: Duration, future: Fut) {
221+
// It is important to use this function only with a single threaded runtime,
222+
// otherwise the `MOCK_TIME` variable might get mixed up between different threads.
223+
assert_eq!(
224+
tokio::runtime::Handle::current().runtime_flavor(),
225+
RuntimeFlavor::CurrentThread
226+
);
227+
MOCK_TIME.with(|time| {
228+
*time.borrow_mut() = Some(Utc::now() + chrono::Duration::from_std(in_future).unwrap());
229+
});
230+
future.await;
231+
MOCK_TIME.with(|time| {
232+
*time.borrow_mut() = None;
233+
});
234+
}
235+
}

src/tests/util.rs

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,39 @@
1-
use once_cell::sync::Lazy;
1+
use std::sync::atomic::{AtomicUsize, Ordering};
2+
use thread_local::ThreadLocal;
23
use tokio::sync;
34

45
pub struct TestSyncMarker {
5-
state: Lazy<TestSyncMarkerInner>,
6+
inner: ThreadLocal<TestSyncMarkerInner>,
67
}
78

89
impl TestSyncMarker {
910
pub const fn new() -> Self {
1011
Self {
11-
state: Lazy::new(TestSyncMarkerInner::new),
12+
inner: ThreadLocal::new(),
1213
}
1314
}
1415

15-
/// Mark that code has encountered this location.
1616
pub fn mark(&self) {
17-
// If we cannot send, don't block the program.
18-
let _ = self.state.tx.try_send(());
17+
self.get().mark();
1918
}
20-
21-
/// Wait until code has encountered this location.
2219
pub async fn sync(&self) {
23-
self.state.rx.lock().await.recv().await.unwrap();
20+
self.get().sync().await;
21+
}
22+
pub fn hits(&self) -> usize {
23+
self.get().hits()
24+
}
25+
26+
fn get(&self) -> &TestSyncMarkerInner {
27+
self.inner
28+
.get_or_try(|| Ok::<TestSyncMarkerInner, ()>(TestSyncMarkerInner::new()))
29+
.unwrap()
2430
}
2531
}
2632

2733
struct TestSyncMarkerInner {
2834
rx: sync::Mutex<sync::mpsc::Receiver<()>>,
2935
tx: sync::mpsc::Sender<()>,
36+
hits: AtomicUsize,
3037
}
3138

3239
impl TestSyncMarkerInner {
@@ -35,6 +42,23 @@ impl TestSyncMarkerInner {
3542
Self {
3643
tx,
3744
rx: sync::Mutex::new(rx),
45+
hits: AtomicUsize::new(0),
3846
}
3947
}
48+
49+
/// Mark that code has encountered this location.
50+
pub fn mark(&self) {
51+
// If we cannot send, don't block the program.
52+
let _ = self.tx.try_send(());
53+
self.hits.fetch_add(1, Ordering::SeqCst);
54+
}
55+
56+
/// Wait until code has encountered this location.
57+
pub async fn sync(&self) {
58+
self.rx.lock().await.recv().await.unwrap();
59+
}
60+
61+
pub fn hits(&self) -> usize {
62+
self.hits.load(Ordering::SeqCst)
63+
}
4064
}

0 commit comments

Comments
 (0)