Skip to content

Commit 626c86c

Browse files
committed
Trigger merge queue on events
- PR approval - Tree state change - Auto build completion
1 parent 3504fda commit 626c86c

File tree

6 files changed

+101
-32
lines changed

6 files changed

+101
-32
lines changed

src/bors/handlers/mod.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::bors::handlers::trybuild::{TRY_BRANCH_NAME, command_try_build, comman
1919
use crate::bors::handlers::workflow::{
2020
handle_check_suite_completed, handle_workflow_completed, handle_workflow_started,
2121
};
22-
use crate::bors::merge_queue::MergeQueueSender;
22+
use crate::bors::merge_queue::{AUTO_BRANCH_NAME, MergeQueueSender};
2323
use crate::bors::{BorsContext, Comment, RepositoryState};
2424
use crate::database::{DelegatedPermission, PullRequestModel};
2525
use crate::github::{GithubUser, PullRequest, PullRequestNumber};
@@ -53,6 +53,7 @@ pub async fn handle_bors_repository_event(
5353
event: BorsRepositoryEvent,
5454
ctx: Arc<BorsContext>,
5555
mergeable_queue_tx: MergeableQueueSender,
56+
merge_queue_tx: MergeQueueSender,
5657
) -> anyhow::Result<()> {
5758
let db = Arc::clone(&ctx.db);
5859
let Some(repo) = ctx
@@ -82,9 +83,10 @@ pub async fn handle_bors_repository_event(
8283
author = comment.author.username
8384
);
8485
let pr_number = comment.pr_number;
85-
if let Err(error) = handle_comment(Arc::clone(&repo), db, ctx, comment)
86-
.instrument(span.clone())
87-
.await
86+
if let Err(error) =
87+
handle_comment(Arc::clone(&repo), db, ctx, comment, merge_queue_tx.clone())
88+
.instrument(span.clone())
89+
.await
8890
{
8991
repo.client
9092
.post_comment(
@@ -117,7 +119,7 @@ pub async fn handle_bors_repository_event(
117119
repo = payload.repository.to_string(),
118120
id = payload.run_id.into_inner()
119121
);
120-
handle_workflow_completed(repo, db, payload)
122+
handle_workflow_completed(repo, db, payload, &merge_queue_tx)
121123
.instrument(span.clone())
122124
.await?;
123125
}
@@ -126,7 +128,7 @@ pub async fn handle_bors_repository_event(
126128
"Check suite completed",
127129
repo = payload.repository.to_string(),
128130
);
129-
handle_check_suite_completed(repo, db, payload)
131+
handle_check_suite_completed(repo, db, payload, &merge_queue_tx)
130132
.instrument(span.clone())
131133
.await?;
132134
}
@@ -353,6 +355,7 @@ async fn handle_comment(
353355
database: Arc<PgDbClient>,
354356
ctx: Arc<BorsContext>,
355357
comment: PullRequestComment,
358+
merge_queue_tx: MergeQueueSender,
356359
) -> anyhow::Result<()> {
357360
use std::fmt::Write;
358361

@@ -404,13 +407,14 @@ async fn handle_comment(
404407
&approver,
405408
priority,
406409
rollup,
410+
&merge_queue_tx,
407411
)
408412
.instrument(span)
409413
.await
410414
}
411415
BorsCommand::OpenTree => {
412416
let span = tracing::info_span!("TreeOpen");
413-
command_open_tree(repo, database, &pr, &comment.author)
417+
command_open_tree(repo, database, &pr, &comment.author, &merge_queue_tx)
414418
.instrument(span)
415419
.await
416420
}
@@ -423,6 +427,7 @@ async fn handle_comment(
423427
&comment.author,
424428
priority,
425429
&comment.html_url,
430+
&merge_queue_tx,
426431
)
427432
.instrument(span)
428433
.await
@@ -561,7 +566,7 @@ async fn reload_repos(
561566

562567
/// Is this branch interesting for the bot?
563568
fn is_bors_observed_branch(branch: &str) -> bool {
564-
branch == TRY_BRANCH_NAME
569+
branch == TRY_BRANCH_NAME || branch == AUTO_BRANCH_NAME
565570
}
566571

567572
/// Deny permission for a request.

src/bors/handlers/review.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::bors::comment::approve_non_open_pr_comment;
88
use crate::bors::handlers::has_permission;
99
use crate::bors::handlers::labels::handle_label_trigger;
1010
use crate::bors::handlers::{PullRequestData, deny_request};
11+
use crate::bors::merge_queue::MergeQueueSender;
1112
use crate::bors::{Comment, PullRequestStatus};
1213
use crate::database::ApprovalInfo;
1314
use crate::database::DelegatedPermission;
@@ -18,6 +19,7 @@ use crate::permissions::PermissionType;
1819

1920
/// Approve a pull request.
2021
/// A pull request can only be approved by a user of sufficient authority.
22+
#[allow(clippy::too_many_arguments)]
2123
pub(super) async fn command_approve(
2224
repo_state: Arc<RepositoryState>,
2325
db: Arc<PgDbClient>,
@@ -26,6 +28,7 @@ pub(super) async fn command_approve(
2628
approver: &Approver,
2729
priority: Option<u32>,
2830
rollup: Option<RollupMode>,
31+
merge_queue_tx: &MergeQueueSender,
2932
) -> anyhow::Result<()> {
3033
tracing::info!("Approving PR {}", pr.number());
3134
if !has_permission(&repo_state, author, pr, PermissionType::Review).await? {
@@ -52,6 +55,8 @@ pub(super) async fn command_approve(
5255

5356
db.approve(&pr.db, approval_info, priority, rollup).await?;
5457
handle_label_trigger(&repo_state, pr.number(), LabelTrigger::Approved).await?;
58+
59+
merge_queue_tx.send(()).await?;
5560
notify_of_approval(&repo_state, pr, approver.as_str()).await
5661
}
5762

@@ -156,6 +161,7 @@ pub(super) async fn command_close_tree(
156161
author: &GithubUser,
157162
priority: u32,
158163
comment_url: &str,
164+
merge_queue_tx: &MergeQueueSender,
159165
) -> anyhow::Result<()> {
160166
if !sufficient_approve_permission(repo_state.clone(), author) {
161167
deny_request(&repo_state, pr.number(), author, PermissionType::Review).await?;
@@ -169,6 +175,8 @@ pub(super) async fn command_close_tree(
169175
},
170176
)
171177
.await?;
178+
179+
merge_queue_tx.send(()).await?;
172180
notify_of_tree_closed(&repo_state, pr.number(), priority).await
173181
}
174182

@@ -177,6 +185,7 @@ pub(super) async fn command_open_tree(
177185
db: Arc<PgDbClient>,
178186
pr: &PullRequestData,
179187
author: &GithubUser,
188+
merge_queue_tx: &MergeQueueSender,
180189
) -> anyhow::Result<()> {
181190
if !sufficient_delegate_permission(repo_state.clone(), author) {
182191
deny_request(&repo_state, pr.number(), author, PermissionType::Review).await?;
@@ -185,6 +194,8 @@ pub(super) async fn command_open_tree(
185194

186195
db.upsert_repository(repo_state.repository(), TreeState::Open)
187196
.await?;
197+
198+
merge_queue_tx.send(()).await?;
188199
notify_of_tree_open(&repo_state, pr.number()).await
189200
}
190201

src/bors/handlers/workflow.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use crate::bors::comment::{try_build_succeeded_comment, workflow_failed_comment}
1111
use crate::bors::event::{CheckSuiteCompleted, WorkflowCompleted, WorkflowStarted};
1212
use crate::bors::handlers::is_bors_observed_branch;
1313
use crate::bors::handlers::labels::handle_label_trigger;
14+
use crate::bors::merge_queue::AUTO_BRANCH_NAME;
15+
use crate::bors::merge_queue::MergeQueueSender;
1416
use crate::database::{BuildStatus, WorkflowStatus};
1517
use crate::github::LabelTrigger;
1618

@@ -66,6 +68,7 @@ pub(super) async fn handle_workflow_completed(
6668
repo: Arc<RepositoryState>,
6769
db: Arc<PgDbClient>,
6870
mut payload: WorkflowCompleted,
71+
merge_queue_tx: &MergeQueueSender,
6972
) -> anyhow::Result<()> {
7073
if !is_bors_observed_branch(&payload.branch) {
7174
return Ok(());
@@ -98,13 +101,14 @@ pub(super) async fn handle_workflow_completed(
98101
branch: payload.branch,
99102
commit_sha: payload.commit_sha,
100103
};
101-
try_complete_build(repo.as_ref(), db.as_ref(), event).await
104+
try_complete_build(repo.as_ref(), db.as_ref(), event, merge_queue_tx).await
102105
}
103106

104107
pub(super) async fn handle_check_suite_completed(
105108
repo: Arc<RepositoryState>,
106109
db: Arc<PgDbClient>,
107110
payload: CheckSuiteCompleted,
111+
merge_queue_tx: &MergeQueueSender,
108112
) -> anyhow::Result<()> {
109113
if !is_bors_observed_branch(&payload.branch) {
110114
return Ok(());
@@ -115,14 +119,15 @@ pub(super) async fn handle_check_suite_completed(
115119
payload.branch,
116120
payload.commit_sha
117121
);
118-
try_complete_build(repo.as_ref(), db.as_ref(), payload).await
122+
try_complete_build(repo.as_ref(), db.as_ref(), payload, merge_queue_tx).await
119123
}
120124

121125
/// Try to complete a pending build.
122126
async fn try_complete_build(
123127
repo: &RepositoryState,
124128
db: &PgDbClient,
125129
payload: CheckSuiteCompleted,
130+
merge_queue_tx: &MergeQueueSender,
126131
) -> anyhow::Result<()> {
127132
if !is_bors_observed_branch(&payload.branch) {
128133
return Ok(());
@@ -217,6 +222,11 @@ async fn try_complete_build(
217222
};
218223
repo.client.post_comment(pr.number, message).await?;
219224

225+
// Trigger merge queue when an auto build completes
226+
if payload.branch == AUTO_BRANCH_NAME {
227+
merge_queue_tx.send(()).await?;
228+
}
229+
220230
Ok(())
221231
}
222232

src/bors/merge_queue.rs

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,33 @@ use crate::BorsContext;
77
use crate::bors::RepositoryState;
88
use crate::utils::sort_queue::sort_queue_prs;
99

10-
type MergeQueueEvent = ();
11-
pub type MergeQueueSender = mpsc::Sender<MergeQueueEvent>;
10+
#[derive(Debug)]
11+
enum MergeQueueEvent {
12+
Trigger,
13+
Shutdown,
14+
}
15+
16+
#[derive(Clone)]
17+
pub struct MergeQueueSender {
18+
inner: mpsc::Sender<MergeQueueEvent>,
19+
}
20+
21+
impl MergeQueueSender {
22+
pub async fn send(&self, _value: ()) -> Result<(), mpsc::error::SendError<()>> {
23+
self.inner
24+
.send(MergeQueueEvent::Trigger)
25+
.await
26+
.map_err(|_| mpsc::error::SendError(()))
27+
}
28+
29+
pub fn shutdown(&self) {
30+
let _ = self.inner.try_send(MergeQueueEvent::Shutdown);
31+
}
32+
}
33+
34+
/// Branch where CI checks run for auto builds.
35+
/// This branch should run CI checks.
36+
pub(super) const AUTO_BRANCH_NAME: &str = "automation/bors/auto";
1237

1338
pub async fn merge_queue_tick(ctx: Arc<BorsContext>) -> anyhow::Result<()> {
1439
let repos: Vec<Arc<RepositoryState>> =
@@ -46,25 +71,35 @@ pub async fn merge_queue_tick(ctx: Arc<BorsContext>) -> anyhow::Result<()> {
4671

4772
pub fn start_merge_queue(ctx: Arc<BorsContext>) -> (MergeQueueSender, impl Future<Output = ()>) {
4873
let (tx, mut rx) = mpsc::channel::<MergeQueueEvent>(10);
74+
let sender = MergeQueueSender { inner: tx };
4975

5076
let fut = async move {
51-
while rx.recv().await.is_some() {
52-
let span = tracing::info_span!("MergeQueue");
53-
tracing::debug!("Processing merge queue");
54-
if let Err(error) = merge_queue_tick(ctx.clone()).instrument(span.clone()).await {
55-
// In tests, we want to panic on all errors.
56-
#[cfg(test)]
57-
{
58-
panic!("Merge queue handler failed: {error:?}");
77+
while let Some(event) = rx.recv().await {
78+
match event {
79+
MergeQueueEvent::Trigger => {
80+
let span = tracing::info_span!("MergeQueue");
81+
tracing::debug!("Processing merge queue");
82+
if let Err(error) = merge_queue_tick(ctx.clone()).instrument(span.clone()).await
83+
{
84+
// In tests, we want to panic on all errors.
85+
#[cfg(test)]
86+
{
87+
panic!("Merge queue handler failed: {error:?}");
88+
}
89+
#[cfg(not(test))]
90+
{
91+
use crate::utils::logging::LogError;
92+
span.log_error(error);
93+
}
94+
}
5995
}
60-
#[cfg(not(test))]
61-
{
62-
use crate::utils::logging::LogError;
63-
span.log_error(error);
96+
MergeQueueEvent::Shutdown => {
97+
tracing::debug!("Merge queue received shutdown signal");
98+
break;
6499
}
65100
}
66101
}
67102
};
68103

69-
(tx, fut)
104+
(sender, fut)
70105
}

src/github/server.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ pub fn create_bors_process(
216216

217217
let (merge_queue_tx, merge_queue_fut) = start_merge_queue(ctx.clone());
218218
let merge_queue_tx_clone = merge_queue_tx.clone();
219+
let merge_queue_tx_for_return = merge_queue_tx.clone();
219220

220221
let service = async move {
221222
// In tests, we shutdown these futures by dropping the channel sender,
@@ -225,7 +226,12 @@ pub fn create_bors_process(
225226
#[cfg(test)]
226227
{
227228
let _ = tokio::join!(
228-
consume_repository_events(ctx.clone(), repository_rx, mq_tx.clone()),
229+
consume_repository_events(
230+
ctx.clone(),
231+
repository_rx,
232+
mq_tx.clone(),
233+
merge_queue_tx_clone.clone()
234+
),
229235
consume_global_events(
230236
ctx.clone(),
231237
global_rx,
@@ -244,7 +250,7 @@ pub fn create_bors_process(
244250
#[cfg(not(test))]
245251
{
246252
tokio::select! {
247-
_ = consume_repository_events(ctx.clone(), repository_rx, mq_tx.clone()) => {
253+
_ = consume_repository_events(ctx.clone(), repository_rx, mq_tx.clone(), merge_queue_tx_clone.clone()) => {
248254
tracing::error!("Repository event handling process has ended");
249255
}
250256
_ = consume_global_events(ctx.clone(), global_rx, mq_tx, merge_queue_tx, gh_client, team_api) => {
@@ -263,8 +269,8 @@ pub fn create_bors_process(
263269
BorsProcess {
264270
repository_tx,
265271
global_tx,
266-
merge_queue_tx: merge_queue_tx_clone,
267272
mergeable_queue_tx,
273+
merge_queue_tx: merge_queue_tx_for_return,
268274
bors_process: Box::pin(service),
269275
}
270276
}
@@ -273,16 +279,18 @@ async fn consume_repository_events(
273279
ctx: Arc<BorsContext>,
274280
mut repository_rx: mpsc::Receiver<BorsRepositoryEvent>,
275281
mergeable_queue_tx: MergeableQueueSender,
282+
merge_queue_tx: MergeQueueSender,
276283
) {
277284
while let Some(event) = repository_rx.recv().await {
278285
let ctx = ctx.clone();
279286
let mergeable_queue_tx = mergeable_queue_tx.clone();
280287

281288
let span = tracing::info_span!("RepositoryEvent");
282289
tracing::debug!("Received repository event: {event:?}");
283-
if let Err(error) = handle_bors_repository_event(event, ctx, mergeable_queue_tx)
284-
.instrument(span.clone())
285-
.await
290+
if let Err(error) =
291+
handle_bors_repository_event(event, ctx, mergeable_queue_tx, merge_queue_tx.clone())
292+
.instrument(span.clone())
293+
.await
286294
{
287295
handle_root_error(span, error);
288296
}

src/tests/mocks/bors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -839,7 +839,7 @@ impl BorsTester {
839839
// Make sure that the event channel senders are closed
840840
drop(self.app);
841841
drop(self.global_tx);
842-
drop(self.merge_queue_tx);
842+
self.merge_queue_tx.shutdown();
843843
self.mergeable_queue_tx.shutdown();
844844
// Wait until all events are handled in the bors service
845845
bors.await.unwrap();

0 commit comments

Comments
 (0)