Skip to content

Commit f7cc63e

Browse files
authored
Merge pull request #71 from rust-lang/pa-s3
Store index in S3, write index less times, and implement graceful shutdown
2 parents 7654b8a + e2f5a81 commit f7cc63e

File tree

12 files changed

+1076
-83
lines changed

12 files changed

+1076
-83
lines changed

Cargo.lock

Lines changed: 699 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,6 @@ tracing = "0.1.16"
3232
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
3333
atty = "0.2.14"
3434
clap = { version = "4.1.8", features = ["derive"] }
35-
tokio = { version = "1.26.0", features = ["rt-multi-thread", "rt"] }
35+
tokio = { version = "1.26.0", features = ["rt-multi-thread", "rt", "signal", "macros"] }
36+
aws-sdk-s3 = "0.26.0"
37+
aws-config = "0.55.1"

Readme.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,10 @@ To initialize a new index file, perform the following steps:
4747
* You can (temporarily) check the result directory in to the repository to see diffs.
4848
* Example command: `rla-offline extract-dir --ci actions -i demo.idx -s data/failed -d data/err`
4949
* *Note: Eventually, the expected results for the test log files will be provided in the repository and used as regression tests.*
50+
51+
### Index file storage
52+
53+
The index file can be stored either in the local filesystem (by providing the
54+
absolute or relative path to the file) or in S3 (by providing a
55+
`s3://{bucket}/{key}` URL). The S3 region of the bucket is detected
56+
automatically at startup.

src/bin/offline/extract.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::offline;
22
use crate::rla;
33

4+
use rla::index::IndexStorage;
45
use std::fs;
56
use std::io::{self, Write};
67
use std::path::Path;
@@ -31,7 +32,7 @@ fn load_lines<'a>(ci: &dyn rla::ci::CiPlatform, log: &'a [u8]) -> Vec<Line<'a>>
3132

3233
pub fn dir(
3334
ci: &dyn rla::ci::CiPlatform,
34-
index_file: &Path,
35+
index_file: &IndexStorage,
3536
src_dir: &Path,
3637
dst_dir: &Path,
3738
) -> rla::Result<()> {
@@ -91,7 +92,11 @@ pub fn dir(
9192
Ok(())
9293
}
9394

94-
pub fn one(ci: &dyn rla::ci::CiPlatform, index_file: &Path, log_file: &Path) -> rla::Result<()> {
95+
pub fn one(
96+
ci: &dyn rla::ci::CiPlatform,
97+
index_file: &IndexStorage,
98+
log_file: &Path,
99+
) -> rla::Result<()> {
95100
let config = rla::extract::Config::default();
96101
let index = rla::Index::load(index_file)?;
97102

src/bin/offline/learn.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
use crate::offline;
22
use crate::rla;
33

4-
use std::path::{Path, PathBuf};
4+
use rla::index::IndexStorage;
5+
use std::path::PathBuf;
56
use std::time::Duration;
67
use std::time::Instant;
78
use walkdir::{self, WalkDir};
89

910
pub fn learn(
1011
ci: &dyn rla::ci::CiPlatform,
11-
index_file: &Path,
12+
index_file: &IndexStorage,
1213
inputs: &[PathBuf],
1314
multiplier: u32,
1415
) -> rla::Result<()> {

src/bin/rla-offline.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ extern crate rust_log_analyzer as rla;
1212
extern crate walkdir;
1313

1414
use clap::Parser;
15+
use rla::index::IndexStorage;
1516
use std::path::PathBuf;
1617

1718
mod offline;
@@ -53,7 +54,7 @@ enum Cli {
5354
long = "index-file",
5455
help = "The index file to read / write. An existing index file is updated."
5556
)]
56-
index_file: PathBuf,
57+
index_file: IndexStorage,
5758
#[arg(
5859
short = 'm',
5960
long = "multiplier",
@@ -79,7 +80,7 @@ enum Cli {
7980
long = "index-file",
8081
help = "The index file to read / write."
8182
)]
82-
index_file: PathBuf,
83+
index_file: IndexStorage,
8384
#[arg(
8485
short = 's',
8586
long = "source",
@@ -106,7 +107,7 @@ enum Cli {
106107
long = "index-file",
107108
help = "The index file to read / write."
108109
)]
109-
index_file: PathBuf,
110+
index_file: IndexStorage,
110111
#[arg(help = "The log file to analyze.")]
111112
log: PathBuf,
112113
},

src/bin/rla-server.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ extern crate regex;
1414
extern crate rust_log_analyzer as rla;
1515
extern crate serde_json;
1616

17+
use crate::server::QueueItem;
1718
use clap::Parser;
19+
use crossbeam::channel::Sender;
20+
use rla::index::IndexStorage;
1821
use std::process;
1922
use std::sync::Arc;
2023
use std::thread;
@@ -47,7 +50,7 @@ struct Cli {
4750
long = "index-file",
4851
help = "The index file to read / write. An existing index file is updated."
4952
)]
50-
index_file: std::path::PathBuf,
53+
index_file: IndexStorage,
5154
#[arg(
5255
long = "debug-post",
5356
help = "Post all comments to the given issue instead of the actual PR. Format: \"user/repo#id\""
@@ -90,7 +93,10 @@ fn main() {
9093

9194
let (queue_send, queue_recv) = crossbeam::channel::unbounded();
9295

93-
let service = Arc::new(server::RlaService::new(args.webhook_verify, queue_send)?);
96+
let service = Arc::new(server::RlaService::new(
97+
args.webhook_verify,
98+
queue_send.clone(),
99+
)?);
94100

95101
let mut worker = server::Worker::new(
96102
args.index_file,
@@ -102,7 +108,7 @@ fn main() {
102108
args.query_builds_from_primary_repo,
103109
)?;
104110

105-
thread::spawn(move || {
111+
let worker_thread = thread::spawn(move || {
106112
if let Err(e) = worker.main() {
107113
error!("Worker failed, exiting: {}", e);
108114
process::exit(1);
@@ -125,9 +131,34 @@ fn main() {
125131
}))
126132
}
127133
}))
134+
.with_graceful_shutdown(graceful_shutdown(queue_send))
128135
.await
129136
})?;
130137

138+
worker_thread.join().expect("worker thread failed");
139+
131140
Ok(())
132141
});
133142
}
143+
144+
async fn graceful_shutdown(sender: Sender<QueueItem>) {
145+
let ctrl_c = tokio::signal::ctrl_c();
146+
147+
// ECS uses SIGTERM to signal graceful shutdown must begin.
148+
#[cfg(unix)]
149+
let mut sigterm =
150+
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
151+
#[cfg(unix)]
152+
let sigterm = sigterm.recv();
153+
154+
#[cfg(not(unix))]
155+
let sigterm = std::future::pending(); // Never resolves
156+
157+
tokio::select! {
158+
_ = ctrl_c => {}
159+
_ = sigterm => {}
160+
};
161+
162+
info!("graceful shutdown signal received");
163+
let _ = sender.send(QueueItem::GracefulShutdown);
164+
}

src/bin/server/mod.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,29 @@ pub use self::worker::Worker;
66
mod service;
77
mod worker;
88

9-
pub struct QueueItem {
10-
pub kind: QueueItemKind,
11-
pub delivery_id: String,
9+
pub enum QueueItem {
10+
GitHubStatus {
11+
payload: rla::github::CommitStatusEvent,
12+
delivery_id: String,
13+
},
14+
GitHubCheckRun {
15+
payload: rla::github::CheckRunEvent,
16+
delivery_id: String,
17+
},
18+
GitHubPullRequest {
19+
payload: rla::github::PullRequestEvent,
20+
delivery_id: String,
21+
},
22+
GracefulShutdown,
1223
}
1324

14-
pub enum QueueItemKind {
15-
GitHubStatus(rla::github::CommitStatusEvent),
16-
GitHubCheckRun(rla::github::CheckRunEvent),
17-
GitHubPullRequest(rla::github::PullRequestEvent),
25+
impl QueueItem {
26+
fn delivery_id(&self) -> Option<&str> {
27+
match self {
28+
QueueItem::GitHubStatus { delivery_id, .. } => Some(&delivery_id),
29+
QueueItem::GitHubCheckRun { delivery_id, .. } => Some(&delivery_id),
30+
QueueItem::GitHubPullRequest { delivery_id, .. } => Some(&delivery_id),
31+
QueueItem::GracefulShutdown => None,
32+
}
33+
}
1834
}

src/bin/server/service.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{QueueItem, QueueItemKind};
1+
use super::QueueItem;
22

33
use crate::rla;
44
use anyhow::bail;
@@ -76,7 +76,7 @@ impl RlaService {
7676
return reply(StatusCode::BAD_REQUEST, "Missing delivery ID.\n");
7777
};
7878

79-
let item_kind = match event {
79+
let item = match event {
8080
"status" => {
8181
let payload = match serde_json::from_slice(body) {
8282
Ok(p) => p,
@@ -85,7 +85,10 @@ impl RlaService {
8585
return reply(StatusCode::BAD_REQUEST, "Failed to decode payload.\n");
8686
}
8787
};
88-
QueueItemKind::GitHubStatus(payload)
88+
QueueItem::GitHubStatus {
89+
payload,
90+
delivery_id,
91+
}
8992
}
9093
"check_run" => {
9194
let payload = match serde_json::from_slice(body) {
@@ -96,10 +99,16 @@ impl RlaService {
9699
}
97100
};
98101

99-
QueueItemKind::GitHubCheckRun(payload)
102+
QueueItem::GitHubCheckRun {
103+
payload,
104+
delivery_id,
105+
}
100106
}
101107
"pull_request" => match serde_json::from_slice(body) {
102-
Ok(payload) => QueueItemKind::GitHubPullRequest(payload),
108+
Ok(payload) => QueueItem::GitHubPullRequest {
109+
payload,
110+
delivery_id,
111+
},
103112
Err(err) => {
104113
error!("Failed to decode 'pull_request' webhook payload: {}", err);
105114
return reply(StatusCode::BAD_REQUEST, "Failed to decode payload\n");
@@ -115,10 +124,6 @@ impl RlaService {
115124
}
116125
};
117126

118-
let item = QueueItem {
119-
kind: item_kind,
120-
delivery_id,
121-
};
122127
match self.queue.send(item) {
123128
Ok(()) => reply(StatusCode::OK, "Event processed.\n"),
124129
Err(e) => {

0 commit comments

Comments
 (0)