From efd3f338277685a00da5789115535b2bc066b137 Mon Sep 17 00:00:00 2001 From: Ed Smith Date: Fri, 13 Jan 2023 23:49:28 +0000 Subject: [PATCH] Allow LAVA jobs to pass artifacts back to the runner We'd like to potentially add artifacts created by LAVA jobs to the archive stored by Gitlab. To achieve this, we run a cut-down web server on the lava-gitlab-runner that is able to respond to POST requests at http:////artifacts/ The LAVA job is able to know the upload URL because we introduce a new namespace for templated variables `runner` in the lava-gitlab-runner and add `ARTIFACT_UPLOAD_URL` to it. It's still relatively complicated to get variables into LAVA tests; the pattern I used in my test repository https://gitlab.collabora.com/eds/callback-tests is to create a parameter called `CALLBACK_URL` in the test itself, and then in the job we can use a stanza like: ```yaml test: foo parameters: CALLBACK_URL: {{ '{{ runner.ARTIFACT_UPLOAD_URL }}' }} ``` to make it available to the test. Bear in mind, if you use this that LAVA does not automatically export parameters from environment variables so you will need to export it inside your `steps:` in your test if you want to use it in scripts. The `key` part of the upload URL is a long random string. It's generated uniquely per Gitlab job, not per LAVA job, although this detail is not important unless you are performing multi-node tests. Because of the dynamic nature of the key, and the runner's port and IP, artifact upload is only possible for `submit` jobs. For `monitor` jobs, there's simply no way to communicate the necessary URL to them. Backing the webserver, there is a shared `UploadServer` that stores the uploaded artifacts, and bridges between the web server thread and the job thread. It stores a `JobArtifacts` for each active job, which the `ArtifactStore` can query when we come to upload files. I've elected to put the uploaded artifacts at: `_artifacts/` in the archive, to match the other uploads which also lead with the job ID. Note however that it will require some significant reworking to support distinct directories for multi-node jobs. That's because we do not know how many nodes there are in the job until after we submit, at which point it's too late to create new keys for the other jobs. We could speculatively create a surplus, for example, but we couldn't then tie them to job IDs anyway. For the generated upload URL, note that you can use the environment variables `LAVA_GITLAB_RUNNER_ROUTABLE_HOST`, `LAVA_GITLAB_RUNNER_ROUTABLE_PORT` to specify an arbitrary external address, for example for an appropriate reverse proxy. Also the upload URL will always be `https`, even though the service itself does not have TLS support. Signed-off-by: Ed Smith --- Cargo.toml | 3 + src/main.rs | 169 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/upload.rs | 112 +++++++++++++++++++++++++++++++++ 3 files changed, 279 insertions(+), 5 deletions(-) create mode 100644 src/upload.rs diff --git a/Cargo.toml b/Cargo.toml index 9fb2604..2d12352 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,12 +9,14 @@ homepage = "https://gitlab.collabora.com/lava/lava-gitlab-runner" repository = "https://gitlab.collabora.com/lava/lava-gitlab-runner.git" [dependencies] +axum = "0.6" bytes = "1.2.0" chrono = { version = "0.4", features = ["serde"] } colored = "2" gitlab-runner = "0.0.8" lava-api = "0.1.1" lazy_static = "1.4" +local-ip-address = "0.5" structopt = "0.3.23" url = "2.2.2" tokio = "1.12.0" @@ -27,6 +29,7 @@ serde = { version = "^1.0.97", features = ["derive"] } serde_json = "1.0.68" serde_yaml = "0.9" rand = "0.8.4" +rand_chacha = "0.3" tempfile = "3.3.0" tokio-util = { version = "0.7", features = [ "io" ] } tracing-subscriber = { version = "0.3.9", features = [ "env-filter"] } diff --git a/src/main.rs b/src/main.rs index 1edd88e..2f419c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,15 @@ use std::borrow::Cow; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, HashSet}; +use std::env; use std::io::Read; +use std::net::{IpAddr, Ipv4Addr}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use axum::extract::Path; +use axum::routing::post; +use axum::Router; use bytes::{Buf, Bytes}; use colored::{Color, Colorize}; use futures::stream::{Stream, TryStreamExt}; @@ -25,13 +30,16 @@ use structopt::StructOpt; use tokio::time::sleep; use tokio_util::sync::CancellationToken; use tracing::Level; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; use tracing_subscriber::filter; use tracing_subscriber::prelude::*; use url::Url; mod throttled; +mod upload; + use throttled::{ThrottledLava, Throttler}; +use upload::{JobArtifacts, UploadServer}; const MASK_PATTERN: &str = "[MASKED]"; @@ -107,6 +115,7 @@ struct MonitorJobs { #[derive(Clone, Debug, Serialize)] struct TransformVariables<'a> { pub job: BTreeMap<&'a str, &'a str>, + pub runner: BTreeMap<&'a str, &'a str>, } #[derive(Debug)] @@ -243,11 +252,18 @@ enum JobCancelBehaviour { struct AvailableArtifactStore { lava: Arc, masker: Arc, + artifact_caches: Mutex>>>, + job_map: Mutex>>>, } impl AvailableArtifactStore { pub fn new(lava: Arc, masker: Arc) -> Self { - Self { lava, masker } + Self { + lava, + masker, + artifact_caches: Default::default(), + job_map: Default::default(), + } } pub fn get_log( @@ -282,12 +298,53 @@ impl AvailableArtifactStore { .flatten_stream(), ) } + + pub fn create_upload_url(&self) -> String { + let artifacts = UPLOAD_SERVER.lock().unwrap().add_new_job(); + let url = artifacts.lock().unwrap().get_upload_url().to_string(); + self.artifact_caches + .lock() + .unwrap() + .insert(url.clone(), artifacts); + url + } + + pub fn add_job_for_upload_url(&self, id: i64, upload_url: &str) { + let artifacts = self + .artifact_caches + .lock() + .unwrap() + .get(upload_url) + .unwrap() + .clone(); + self.job_map.lock().unwrap().insert(id, artifacts); + } + + pub fn get_uploaded_artifact(&self, id: i64, path: &str) -> Option { + self.job_map + .lock() + .unwrap() + .get(&id) + .and_then(|cache| cache.lock().unwrap().get_artifact_data(path)) + } + + pub fn get_uploaded_artifact_paths(&self, id: i64) -> Option> { + self.job_map.lock().unwrap().get(&id).map(|cache| { + cache + .lock() + .unwrap() + .get_artifact_paths() + .map(str::to_string) + .collect() + }) + } } #[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] enum LavaUploadableFileType { Log { id: i64 }, Junit { id: i64 }, + Artifact { id: i64, path: String }, } #[derive(Clone)] @@ -336,15 +393,25 @@ impl LavaUploadableFile { store, } } + + pub fn artifact(id: i64, path: String, store: Arc) -> Self { + Self { + which: LavaUploadableFileType::Artifact { id, path }, + store, + } + } } impl UploadableFile for LavaUploadableFile { type Data<'a> = Box; fn get_path(&self) -> Cow<'_, str> { - match self.which { + match &self.which { LavaUploadableFileType::Log { id } => format!("{}_log.yaml", id).into(), LavaUploadableFileType::Junit { id } => format!("{}_junit.xml", id).into(), + LavaUploadableFileType::Artifact { id, path } => { + format!("{}_artifacts/{}", id, path).into() + } } } @@ -357,6 +424,9 @@ impl UploadableFile for LavaUploadableFile { LavaUploadableFileType::Junit { id } => { Box::new(self.store.get_junit(*id).into_async_read()) } + LavaUploadableFileType::Artifact { id, path } => Box::new(futures::io::Cursor::new( + self.store.get_uploaded_artifact(*id, path).unwrap(), + )), } } } @@ -724,7 +794,7 @@ impl Run { } } - fn transform(&self, definition: String) -> Result { + fn transform(&self, definition: String, upload_url: &str) -> Result { let mut handlebars = Handlebars::new(); handlebars.set_strict_mode(true); handlebars @@ -739,6 +809,7 @@ impl Run { .variables() .map(|var| (var.key(), var.value())) .collect(), + runner: BTreeMap::from([("ARTIFACT_UPLOAD_URL", upload_url)]), }; handlebars.render("definition", &mappings).map_err(|e| { outputln!("Failed to substitute in template: {}", e); @@ -754,8 +825,9 @@ impl Run { "submit" => { if let Some(filename) = p.next() { let data = self.find_file(filename).await?; + let upload_url = self.store.create_upload_url(); let definition = match String::from_utf8(data) { - Ok(data) => self.transform(data)?, + Ok(data) => self.transform(data, &upload_url)?, Err(_) => { outputln!("Job definition is not utf-8"); return Err(()); @@ -763,6 +835,9 @@ impl Run { }; let ids = self.submit_definition(&definition).await?; self.ids.extend(&ids); + for id in &self.ids { + self.store.add_job_for_upload_url(*id, &upload_url); + } self.follow_job(ids[0], cancel_token, JobCancelBehaviour::CancelLava) .await } else { @@ -834,6 +909,14 @@ impl CancellableJobHandler for Run { for id in &self.ids { available_files.push(LavaUploadableFile::log(*id, self.store.clone())); available_files.push(LavaUploadableFile::junit(*id, self.store.clone())); + for path in self + .store + .get_uploaded_artifact_paths(*id) + .into_iter() + .flatten() + { + available_files.push(LavaUploadableFile::artifact(*id, path, self.store.clone())); + } } Ok(Box::new(available_files.into_iter())) } @@ -844,6 +927,7 @@ type LavaMap = Arc>>>; lazy_static! { static ref LAVA_MAP: LavaMap = Arc::new(Mutex::new(BTreeMap::new())); static ref MAX_CONCURRENT_REQUESTS: Arc> = Arc::new(Mutex::new(20)); + static ref UPLOAD_SERVER: Arc> = Default::default(); } async fn new_job(job: Job) -> Result, ()> { @@ -921,6 +1005,10 @@ async fn new_job(job: Job) -> Result, body: Bytes) { + UPLOAD_SERVER.lock().unwrap().upload_file(&job, &path, body); +} + #[tokio::main] async fn main() { let opts = Opts::from_args(); @@ -949,6 +1037,77 @@ async fn main() { ); } + tokio::spawn(async { + let local_port = match env::var("LAVA_GITLAB_RUNNER_LOCAL_PORT") { + Ok(val) => val + .parse() + .expect("failed to parse LAVA_GITLAB_RUNNER_LOCAL_PORT as a port number"), + Err(_) => { + warn!("No LAVA_GITLAB_RUNNER_LOCAL_PORT set, will listen on ephemeral IP."); + 0u16 + } + }; + + let listener = std::net::TcpListener::bind(std::net::SocketAddr::new( + IpAddr::V4(Ipv4Addr::UNSPECIFIED), + local_port, + )) + .expect("failed to bind listener"); + + let routable_host = match env::var("LAVA_GITLAB_RUNNER_ROUTABLE_HOST") { + Ok(val) => val, + Err(_) => { + let host = local_ip_address::local_ip() + .expect("failed to determine local ip") + .to_string(); + + warn!( + "No LAVA_GITLAB_RUNNER_ROUTABLE_HOST set, using best guess of local IP {}.", + host + ); + host + } + }; + + let routable_port = match env::var("LAVA_GITLAB_RUNNER_ROUTABLE_PORT") { + Ok(val) => val + .parse() + .expect("failed to parse LAVA_GITLAB_RUNNER_ROUTABLE_PORT as a port number"), + Err(_) => { + let port = listener + .local_addr() + .expect("failed to get local address") + .port(); + + info!( + "No LAVA_GITLAB_RUNNER_ROUTABLE_PORT set, using local port {}.", + port + ); + port + } + }; + + let routable_addr = format!("{}:{}", routable_host, routable_port); + + info!( + "Artifact upload listening on {} (reporting routable {})", + listener.local_addr().expect("failed to get local address"), + routable_addr + ); + + UPLOAD_SERVER + .lock() + .unwrap() + .set_base_address(routable_addr); + let app = Router::new().route("/artifacts/:job/*path", post(upload_artifact)); + + axum::Server::from_tcp(listener) + .expect("failed to create axum server from TCP listener") + .serve(app.into_make_service()) + .await + .unwrap(); + }); + runner .run(new_job, 64) .await diff --git a/src/upload.rs b/src/upload.rs new file mode 100644 index 0000000..6f38920 --- /dev/null +++ b/src/upload.rs @@ -0,0 +1,112 @@ +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex, Weak}; + +use bytes::Bytes; +use rand::distributions::Alphanumeric; +use rand::{Rng, SeedableRng}; +use rand_chacha::ChaCha20Rng; +use tracing::warn; + +pub struct UploadServer { + rng: ChaCha20Rng, + job_cache: BTreeMap>>, + base_addr: Option, +} + +impl UploadServer { + pub fn new() -> Self { + Self { + rng: ChaCha20Rng::from_entropy(), + job_cache: Default::default(), + base_addr: None, + } + } + + pub fn add_new_job(&mut self) -> Arc> { + // Wipe any dead jobs as the new one starts. It's not hugely + // important when this happens, so long as it happens + // periodically. + self.cleanup(); + + let prefix = self.generate_unique_id(); + let base_addr = self + .base_addr + .as_ref() + .expect("failed to set base_address on UploadServer before adding jobs."); + let url = format!("https://{}/artifacts/{}/", base_addr, prefix); + let ja: Arc> = Arc::new(Mutex::new(JobArtifacts::new(url))); + self.job_cache.insert(prefix, Arc::downgrade(&ja)); + ja + } + + pub fn set_base_address(&mut self, base_addr: String) { + self.base_addr = Some(base_addr); + } + + pub fn upload_file(&mut self, key: &str, path: &str, data: Bytes) { + if let Some(ja) = self.job_cache.get(key).and_then(Weak::upgrade) { + ja.lock().unwrap().upload_artifact(path, data) + } else { + warn!( + "Ignoring attempt to upload {} for non-existent or expired job", + path + ); + } + } + + fn generate_unique_id(&mut self) -> String { + (&mut self.rng) + .sample_iter(&Alphanumeric) + .take(64) + .map(char::from) + .collect() + } + + pub fn cleanup(&mut self) { + let mut cleaned = Vec::new(); + for (k, v) in self.job_cache.iter() { + if v.strong_count() == 0 { + cleaned.push(k.clone()); + } + } + for k in cleaned { + self.job_cache.remove(&k); + } + } +} + +impl Default for UploadServer { + fn default() -> Self { + Self::new() + } +} + +pub struct JobArtifacts { + artifacts: BTreeMap, + url: String, +} + +impl JobArtifacts { + fn new(url: String) -> Self { + JobArtifacts { + artifacts: Default::default(), + url, + } + } + + pub fn get_upload_url(&self) -> &str { + &self.url + } + + pub fn get_artifact_paths(&self) -> impl Iterator { + self.artifacts.iter().map(|x| x.0.as_ref()) + } + + pub fn get_artifact_data(&self, path: &str) -> Option { + self.artifacts.get(path).map(Clone::clone) + } + + pub fn upload_artifact(&mut self, path: &str, data: Bytes) { + self.artifacts.insert(path.to_string(), data); + } +}