Skip to content

Commit 9d08dbd

Browse files
committed
Refactor merge queue architecture
Move `consume_merge_queue` logic into merge_queue module
1 parent 9f47421 commit 9d08dbd

File tree

2 files changed

+46
-25
lines changed

2 files changed

+46
-25
lines changed

src/bors/merge_queue.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use anyhow::anyhow;
22
use octocrab::params::checks::{CheckRunConclusion, CheckRunStatus};
3+
use std::future::Future;
34
use std::sync::Arc;
5+
use tokio::sync::mpsc;
6+
use tracing::Instrument;
47

58
use crate::{
69
BorsContext,
@@ -35,7 +38,7 @@ enum MergeResult {
3538
Conflict,
3639
}
3740

38-
pub async fn handle_merge_queue(ctx: Arc<BorsContext>) -> anyhow::Result<()> {
41+
pub async fn merge_queue_tick(ctx: Arc<BorsContext>) -> anyhow::Result<()> {
3942
let repos: Vec<Arc<RepositoryState>> =
4043
ctx.repositories.read().unwrap().values().cloned().collect();
4144

@@ -169,7 +172,6 @@ pub async fn handle_merge_queue(ctx: Arc<BorsContext>) -> anyhow::Result<()> {
169172
async fn start_auto_build(
170173
repo: &Arc<RepositoryState>,
171174
ctx: &Arc<BorsContext>,
172-
173175
pr: PullRequestModel,
174176
) -> anyhow::Result<bool> {
175177
let client = &repo.client;
@@ -300,3 +302,34 @@ async fn attempt_merge(
300302
Err(error) => Err(error.into()),
301303
}
302304
}
305+
306+
pub fn start_merge_queue(
307+
ctx: Arc<BorsContext>,
308+
) -> (
309+
mpsc::Sender<MergeQueueEvent>,
310+
impl Future<Output = anyhow::Result<()>>,
311+
) {
312+
let (tx, mut rx) = mpsc::channel::<MergeQueueEvent>(10);
313+
314+
let fut = async move {
315+
while rx.recv().await.is_some() {
316+
let span = tracing::info_span!("MergeQueue");
317+
tracing::debug!("Processing merge queue");
318+
if let Err(error) = merge_queue_tick(ctx.clone()).instrument(span.clone()).await {
319+
// In tests, we want to panic on all errors.
320+
#[cfg(test)]
321+
{
322+
panic!("Handler failed: {error:?}");
323+
}
324+
#[cfg(not(test))]
325+
{
326+
use crate::utils::logging::LogError;
327+
span.log_error(error);
328+
}
329+
}
330+
}
331+
Ok(())
332+
};
333+
334+
(tx, fut)
335+
}

src/github/server.rs

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::bors::event::BorsEvent;
2-
use crate::bors::merge_queue::{MergeQueueEvent, handle_merge_queue};
2+
use crate::bors::merge_queue::{MergeQueueEvent, start_merge_queue};
33
use crate::bors::mergeable_queue::{
44
MergeableQueueReceiver, MergeableQueueSender, create_mergeable_queue,
55
handle_mergeable_queue_item,
@@ -209,22 +209,22 @@ pub fn create_bors_process(
209209
) -> BorsProcess {
210210
let (repository_tx, repository_rx) = mpsc::channel::<BorsRepositoryEvent>(1024);
211211
let (global_tx, global_rx) = mpsc::channel::<BorsGlobalEvent>(1024);
212-
let (merge_queue_tx, merge_queue_rx) = mpsc::channel::<MergeQueueEvent>(128);
213212
let (mergeable_queue_tx, mergeable_queue_rx) = create_mergeable_queue();
214213

215214
let mq_tx = mergeable_queue_tx.clone();
215+
let ctx = Arc::new(ctx);
216+
217+
let (merge_queue_tx, merge_queue_fut) = start_merge_queue(ctx.clone());
216218
let merge_queue_tx_clone = merge_queue_tx.clone();
217219

218220
let service = async move {
219-
let ctx = Arc::new(ctx);
220-
221221
// In tests, we shutdown these futures by dropping the channel sender,
222222
// In that case, we need to wait until both of these futures resolve,
223223
// to make sure that they are able to handle all the events in the queue
224224
// before finishing.
225225
#[cfg(test)]
226226
{
227-
tokio::join!(
227+
let _ = tokio::join!(
228228
consume_repository_events(ctx.clone(), repository_rx, mq_tx.clone()),
229229
consume_global_events(
230230
ctx.clone(),
@@ -235,7 +235,7 @@ pub fn create_bors_process(
235235
team_api
236236
),
237237
consume_mergeable_queue(ctx.clone(), mergeable_queue_rx),
238-
consume_merge_queue(ctx, merge_queue_rx)
238+
merge_queue_fut
239239
);
240240
}
241241
// In real execution, the bot runs forever. If there is something that finishes
@@ -253,8 +253,11 @@ pub fn create_bors_process(
253253
_ = consume_mergeable_queue(ctx.clone(), mergeable_queue_rx) => {
254254
tracing::error!("Mergeable queue handling process has ended")
255255
}
256-
_ = consume_merge_queue(ctx.clone(), merge_queue_rx) => {
257-
tracing::error!("Merge queue handling process has ended")
256+
result = merge_queue_fut => {
257+
match result {
258+
Ok(()) => tracing::error!("Merge queue handling process has ended"),
259+
Err(e) => tracing::error!("Merge queue handling process has ended with error: {:?}", e)
260+
}
258261
}
259262
}
260263
}
@@ -338,21 +341,6 @@ async fn consume_mergeable_queue(
338341
}
339342
}
340343

341-
async fn consume_merge_queue(
342-
ctx: Arc<BorsContext>,
343-
mut merge_queue_rx: mpsc::Receiver<MergeQueueEvent>,
344-
) {
345-
while merge_queue_rx.recv().await.is_some() {
346-
let ctx = ctx.clone();
347-
348-
let span = tracing::info_span!("MergeQueue");
349-
tracing::debug!("Processing merge queue");
350-
if let Err(error) = handle_merge_queue(ctx).instrument(span.clone()).await {
351-
handle_root_error(span, error);
352-
}
353-
}
354-
}
355-
356344
#[allow(unused_variables)]
357345
fn handle_root_error(span: Span, error: Error) {
358346
// In tests, we want to panic on all errors.

0 commit comments

Comments
 (0)