Skip to content

Commit d9ea15a

Browse files
authored
Merge pull request #73 from vohoanglong0107/refactor-global-loop
refactor: separate global event handling to a new loop
2 parents f6ed8c0 + c42348f commit d9ea15a

File tree

6 files changed

+119
-69
lines changed

6 files changed

+119
-69
lines changed

src/bin/bors.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use std::time::Duration;
55

66
use anyhow::Context;
77
use bors::{
8-
create_app, create_bors_process, BorsContext, BorsEvent, BorsGlobalEvent, CommandParser,
9-
GithubAppState, SeaORMClient, ServerState, WebhookSecret,
8+
create_app, create_bors_process, BorsContext, BorsGlobalEvent, CommandParser, GithubAppState,
9+
SeaORMClient, ServerState, WebhookSecret,
1010
};
1111
use clap::Parser;
1212
use sea_orm::Database;
@@ -79,19 +79,21 @@ fn try_main(opts: Opts) -> anyhow::Result<()> {
7979
))
8080
.context("Cannot load GitHub repository state")?;
8181
let ctx = BorsContext::new(CommandParser::new(opts.cmd_prefix), Arc::new(db));
82-
let (tx, bors_process) = create_bors_process(state, ctx);
82+
let (repository_tx, global_tx, bors_process) = create_bors_process(state, ctx);
8383

84-
let refresh_tx = tx.clone();
84+
let refresh_tx = global_tx.clone();
8585
let refresh_process = async move {
8686
loop {
8787
tokio::time::sleep(PERIODIC_REFRESH).await;
88-
refresh_tx
89-
.send(BorsEvent::Global(BorsGlobalEvent::Refresh))
90-
.await?;
88+
refresh_tx.send(BorsGlobalEvent::Refresh).await?;
9189
}
9290
};
9391

94-
let state = ServerState::new(tx, WebhookSecret::new(opts.webhook_secret));
92+
let state = ServerState::new(
93+
repository_tx,
94+
global_tx,
95+
WebhookSecret::new(opts.webhook_secret),
96+
);
9597
let server_process = webhook_server(state);
9698

9799
let fut = async move {

src/bors/handlers/mod.rs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use tracing::Instrument;
44

55
use crate::bors::command::{BorsCommand, CommandParseError};
6-
use crate::bors::event::{BorsRepositoryEvent, PullRequestComment};
6+
use crate::bors::event::{BorsGlobalEvent, BorsRepositoryEvent, PullRequestComment};
77
use crate::bors::handlers::help::command_help;
88
use crate::bors::handlers::ping::command_ping;
99
use crate::bors::handlers::refresh::refresh_repository;
@@ -16,32 +16,14 @@ use crate::database::DbClient;
1616
use crate::github::GithubRepoName;
1717
use crate::utils::logging::LogError;
1818

19-
use super::event::{BorsEvent, BorsGlobalEvent};
20-
2119
mod help;
2220
mod labels;
2321
mod ping;
2422
mod refresh;
2523
mod trybuild;
2624
mod workflow;
2725

28-
/// This function executes a single BORS event, it is the main execution function of the bot.
29-
pub async fn handle_bors_event<Client: RepositoryClient>(
30-
event: BorsEvent,
31-
state: Arc<dyn BorsState<Client>>,
32-
ctx: Arc<BorsContext>,
33-
) -> anyhow::Result<()> {
34-
match event {
35-
BorsEvent::Repository(event) => {
36-
handle_bors_repository_event(event, state, ctx).await?;
37-
}
38-
BorsEvent::Global(event) => {
39-
handle_bors_global_event(event, state, ctx).await?;
40-
}
41-
}
42-
Ok(())
43-
}
44-
26+
/// This function executes a single BORS repository event
4527
pub async fn handle_bors_repository_event<Client: RepositoryClient>(
4628
event: BorsRepositoryEvent,
4729
state: Arc<dyn BorsState<Client>>,
@@ -128,6 +110,7 @@ pub async fn handle_bors_repository_event<Client: RepositoryClient>(
128110
Ok(())
129111
}
130112

113+
/// This function executes a single BORS global event
131114
pub async fn handle_bors_global_event<Client: RepositoryClient>(
132115
event: BorsGlobalEvent,
133116
state: Arc<dyn BorsState<Client>>,

src/bors/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::permissions::UserPermissions;
1717
pub use command::CommandParser;
1818
pub use comment::Comment;
1919
pub use context::BorsContext;
20-
pub use handlers::handle_bors_event;
20+
pub use handlers::{handle_bors_global_event, handle_bors_repository_event};
2121

2222
/// Provides functionality for working with a remote repository.
2323
#[async_trait]

src/github/server.rs

Lines changed: 81 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use crate::bors::event::BorsEvent;
2-
use crate::bors::{handle_bors_event, BorsContext, BorsState};
2+
use crate::bors::{handle_bors_global_event, handle_bors_repository_event, BorsContext};
33
use crate::github::api::GithubAppState;
44
use crate::github::webhook::GitHubWebhook;
55
use crate::github::webhook::WebhookSecret;
66
use crate::utils::logging::LogError;
7+
use crate::{BorsGlobalEvent, BorsRepositoryEvent};
78
use axum::extract::State;
89
use axum::http::StatusCode;
910
use axum::response::IntoResponse;
@@ -17,14 +18,20 @@ use tracing::Instrument;
1718

1819
/// Shared server state for all axum handlers.
1920
pub struct ServerState {
20-
webhook_sender: WebhookSender,
21+
repository_event_queue: mpsc::Sender<BorsRepositoryEvent>,
22+
global_event_queue: mpsc::Sender<BorsGlobalEvent>,
2123
webhook_secret: WebhookSecret,
2224
}
2325

2426
impl ServerState {
25-
pub fn new(webhook_sender: WebhookSender, webhook_secret: WebhookSecret) -> Self {
27+
pub fn new(
28+
repository_event_queue: mpsc::Sender<BorsRepositoryEvent>,
29+
global_event_queue: mpsc::Sender<BorsGlobalEvent>,
30+
webhook_secret: WebhookSecret,
31+
) -> Self {
2632
Self {
27-
webhook_sender,
33+
repository_event_queue,
34+
global_event_queue,
2835
webhook_secret,
2936
}
3037
}
@@ -48,41 +55,88 @@ pub async fn github_webhook_handler(
4855
State(state): State<ServerStateRef>,
4956
GitHubWebhook(event): GitHubWebhook,
5057
) -> impl IntoResponse {
51-
match state.webhook_sender.send(event).await {
52-
Ok(_) => (StatusCode::OK, ""),
53-
Err(err) => {
54-
tracing::error!("Could not send webhook event: {err:?}");
55-
(StatusCode::INTERNAL_SERVER_ERROR, "")
56-
}
58+
match event {
59+
BorsEvent::Global(e) => match state.global_event_queue.send(e).await {
60+
Ok(_) => (StatusCode::OK, ""),
61+
Err(err) => {
62+
tracing::error!("Could not send webhook global event: {err:?}");
63+
(StatusCode::INTERNAL_SERVER_ERROR, "")
64+
}
65+
},
66+
BorsEvent::Repository(e) => match state.repository_event_queue.send(e).await {
67+
Ok(_) => (StatusCode::OK, ""),
68+
Err(err) => {
69+
tracing::error!("Could not send webhook repository event: {err:?}");
70+
(StatusCode::INTERNAL_SERVER_ERROR, "")
71+
}
72+
},
5773
}
5874
}
5975

60-
type WebhookSender = mpsc::Sender<BorsEvent>;
61-
6276
/// Creates a future with a Bors process that continuously receives webhook events and reacts to
6377
/// them.
6478
pub fn create_bors_process(
6579
state: GithubAppState,
6680
ctx: BorsContext,
67-
) -> (WebhookSender, impl Future<Output = ()>) {
68-
let (tx, mut rx) = mpsc::channel::<BorsEvent>(1024);
81+
) -> (
82+
mpsc::Sender<BorsRepositoryEvent>,
83+
mpsc::Sender<BorsGlobalEvent>,
84+
impl Future<Output = ()>,
85+
) {
86+
let (repository_tx, repository_rx) = mpsc::channel::<BorsRepositoryEvent>(1024);
87+
let (global_tx, global_rx) = mpsc::channel::<BorsGlobalEvent>(1024);
6988

7089
let service = async move {
71-
let state: Arc<dyn BorsState<_>> = Arc::new(state);
90+
let state = Arc::new(state);
7291
let ctx = Arc::new(ctx);
73-
while let Some(event) = rx.recv().await {
74-
let state = state.clone();
75-
let ctx = ctx.clone();
76-
77-
let span = tracing::info_span!("Event");
78-
tracing::debug!("Received event: {event:#?}");
79-
if let Err(error) = handle_bors_event(event, state, ctx)
80-
.instrument(span.clone())
81-
.await
82-
{
83-
span.log_error(error);
92+
tokio::select! {
93+
_ = consume_repository_events(state.clone(), ctx.clone(), repository_rx) => {
94+
tracing::warn!("Repository event handling process has ended");
95+
}
96+
_ = consume_global_events(state.clone(), ctx.clone(), global_rx) => {
97+
tracing::warn!("Global event handling process has ended");
8498
}
8599
}
86100
};
87-
(tx, service)
101+
(repository_tx, global_tx, service)
102+
}
103+
104+
async fn consume_repository_events(
105+
state: Arc<GithubAppState>,
106+
ctx: Arc<BorsContext>,
107+
mut repository_rx: mpsc::Receiver<BorsRepositoryEvent>,
108+
) {
109+
while let Some(event) = repository_rx.recv().await {
110+
let state = state.clone();
111+
let ctx = ctx.clone();
112+
113+
let span = tracing::info_span!("RepositoryEvent");
114+
tracing::debug!("Received repository event: {event:#?}");
115+
if let Err(error) = handle_bors_repository_event(event, state, ctx)
116+
.instrument(span.clone())
117+
.await
118+
{
119+
span.log_error(error);
120+
}
121+
}
122+
}
123+
124+
async fn consume_global_events(
125+
state: Arc<GithubAppState>,
126+
ctx: Arc<BorsContext>,
127+
mut global_rx: mpsc::Receiver<BorsGlobalEvent>,
128+
) {
129+
while let Some(event) = global_rx.recv().await {
130+
let state = state.clone();
131+
let ctx = ctx.clone();
132+
133+
let span = tracing::info_span!("GlobalEvent");
134+
tracing::debug!("Received global event: {event:#?}");
135+
if let Err(error) = handle_bors_global_event(event, state, ctx)
136+
.instrument(span.clone())
137+
.await
138+
{
139+
span.log_error(error);
140+
}
141+
}
88142
}

src/github/webhook.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -674,8 +674,13 @@ mod tests {
674674
HeaderValue::from_str(&signature).unwrap(),
675675
);
676676

677-
let (tx, _) = mpsc::channel(1024);
678-
let server_ref = ServerStateRef::new(ServerState::new(tx, WebhookSecret::new(secret)));
677+
let (repository_tx, _) = mpsc::channel(1024);
678+
let (global_tx, _) = mpsc::channel(1024);
679+
let server_ref = ServerStateRef::new(ServerState::new(
680+
repository_tx,
681+
global_tx,
682+
WebhookSecret::new(secret),
683+
));
679684
GitHubWebhook::from_request(request, &server_ref).await
680685
}
681686
}

src/tests/state.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use crate::bors::event::{
1616
WorkflowStarted,
1717
};
1818
use crate::bors::{
19-
handle_bors_event, BorsContext, CheckSuite, CommandParser, Comment, RepositoryState,
19+
handle_bors_global_event, handle_bors_repository_event, BorsContext, CheckSuite, CommandParser,
20+
Comment, RepositoryState,
2021
};
2122
use crate::bors::{BorsState, RepositoryClient};
2223
use crate::config::RepositoryConfig;
@@ -67,16 +68,21 @@ impl TestBorsState {
6768

6869
/// Execute an event.
6970
pub async fn event(&self, event: BorsEvent) {
70-
handle_bors_event(
71-
event,
72-
Arc::new(self.clone()),
73-
Arc::new(BorsContext::new(
74-
CommandParser::new("@bors".to_string()),
75-
Arc::clone(&self.db) as Arc<dyn DbClient>,
76-
)),
77-
)
78-
.await
79-
.unwrap();
71+
let state = Arc::new(self.clone());
72+
let ctx = Arc::new(BorsContext::new(
73+
CommandParser::new("@bors".to_string()),
74+
Arc::clone(&self.db) as Arc<dyn DbClient>,
75+
));
76+
match event {
77+
BorsEvent::Repository(event) => {
78+
handle_bors_repository_event(event, state, ctx)
79+
.await
80+
.unwrap();
81+
}
82+
BorsEvent::Global(event) => {
83+
handle_bors_global_event(event, state, ctx).await.unwrap();
84+
}
85+
}
8086
}
8187

8288
pub async fn comment<T: Into<PullRequestComment>>(&self, comment: T) {

0 commit comments

Comments
 (0)