Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ edition = "2021"
anyhow = "1.0.87"
async-process = "2.2.4"
async-std = "1.13.0"
chrono = { version = "0.4", features = ["serde"] }
reqwest = { version = "0.11", features = ["blocking", "json"] }
clap = { version = "4.5.17", features = ["derive", "string"] }
clap-verbosity-flag = "2.2.1"
colored = "2.1.0"
Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ pub enum GitLabExecutorPullPolicy {
#[serde(rename = "if-not-present")]
/// Only pull an image if the image file is not present
IfNotPresent,
#[serde(rename = "try-update")]
/// Only pull an image when the image is newer than local SIF. It currently only works with docker hub as remote.
TryUpdate,
#[serde(rename = "never")]
/// Never pull an image
Never,
Expand Down
87 changes: 85 additions & 2 deletions src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use anyhow::{anyhow, Context};
use log::{debug, info};
use std::{ffi::OsStr, fs, path::PathBuf, process::Stdio};
use std::{ffi::OsStr, fs, path::PathBuf, process::Stdio, process::Command};
use reqwest::blocking;
use serde_json::Value;
use chrono::{DateTime, Utc};

use serde_json::{json, to_string_pretty};

Expand Down Expand Up @@ -87,10 +90,11 @@ fn build_image_filename(image_name: &str) -> PathBuf {
fn build_image_pull_url(image_name: &str) -> String {
let parts = image_name.split_once(":");
// if the image name contains a valid URL (based on its protocol name), we use it directly
// we add local:// here to avoid the local sif to become non-existed docker hub image.
match parts {
Some((protocol, _)) => match protocol {
"library" | "shub" | "docker" | "docker-archive" | "docker-daemon" | "oci"
| "oci-archive" | "http" | "https" | "oras" => return image_name.to_owned(),
| "oci-archive" | "http" | "https" | "oras" | "local" => return image_name.to_owned(),
_ => (),
},
_ => (),
Expand All @@ -99,6 +103,70 @@ fn build_image_pull_url(image_name: &str) -> String {
format!("docker://{}", image_name)
}

// This uses docker hub rest api to grab the time from last push.
fn get_docker_tag_timestamp(pull_url: &str) -> anyhow::Result<DateTime<Utc>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were to add this, the function should preferably be async and the reqwest dependency should not use blocking

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the function calling this just async not multithread?
I thought it is multithread, so doing blocking in that thread was fine to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The functions are async, how they are actually being executed (multi-threaded or single-threaded) is up to the runtime system. The only as I have is that they should be non-blocking, in case we want to do other things at the same time in the future/make the requests cancellable.

if !pull_url.contains("docker://") {
return Err(anyhow!("only accept docker but it is {}", pull_url));
}
// change the pull_url to the rest API url
let resturl = pull_url
.replace("docker://", "<tmp>")
.replace(":", "/tags/")
.replace("<tmp>", "https://hub.docker.com/v2/repositories/");

debug!("Asking {} as rest api from {}", resturl, pull_url);

// Use RestAPI
let response = blocking::get(&resturl)?;
let body: Value = response.json()?;

// Extract and parse the "tag_last_pushed" field
if let Some(value) = body["tag_last_pushed"].as_str() {
let dt_docker: DateTime<Utc> = value.parse()?;
debug!("Parsed docker push datetime {} from {}", dt_docker, value);
return Ok(dt_docker);
}

return Err(anyhow!("json[\"tag_last_pushed\"] is not available"));
}

// This uses inspect to grab the time from build.
fn get_sif_build_timestamp(command: &PathBuf, sifpath: &PathBuf) -> anyhow::Result<DateTime<Utc>> {
let output = Command::new(command)
.arg("inspect")
.arg("-j")
.arg(sifpath)
.output()
.expect("Failed to execute command");

if output.status.success() {
let stdout = String::from_utf8_lossy(&output.stdout);
let json: Value = serde_json::from_str(&stdout)?;
if let Some(value) = json["data"]["attributes"]["labels"]["org.label-schema.build-date"].as_str() {
// because CEST is not in chrono or chrono_tz, so use date to change it
let utc = Command::new("date")
.arg("-u")
.arg("-d")
.arg(value.replace("_", " "))
.arg("+%Y-%m-%dT%H:%M:%SZ")
.output()
.expect("Failed to execute command");
if utc.status.success() {
let stdout = String::from_utf8_lossy(&utc.stdout).to_string();
let dt_sif: DateTime<Utc> = stdout.parse()?;
debug!("Parsed SIF build datetime {} from {}", dt_sif, stdout);
return Ok(dt_sif);
} else {
let stderr = String::from_utf8_lossy(&utc.stderr).to_string();
return Err(anyhow!(stderr));
}
}
return Err(anyhow!("json[\"data\"][\"attributes\"][\"labels\"][\"org.label-schema.build-date\"] is not available"));
}

return Err(anyhow!("{} inspect failed", command.display()));
}

async fn prepare_step(context: &JobContext) -> anyhow::Result<()> {
debug!(
"Executing prepare step for job {} with runner {}",
Expand Down Expand Up @@ -138,6 +206,20 @@ async fn prepare_step(context: &JobContext) -> anyhow::Result<()> {

let image_exists =
std::fs::exists(&filepath).context("Failed checking for existence of image file")?;
let mut dt_docker: Option<DateTime<Utc>> = None;
if pull_url.contains("docker://") {
dt_docker = Some(get_docker_tag_timestamp(&pull_url)?);
}
let mut dt_sif: Option<DateTime<Utc>> = None;
if image_exists {
dt_sif = Some(get_sif_build_timestamp(&config.apptainer_executable, &filepath)?);
}
let mut remote_is_newer = false;
if let (Some(dt_docker_val), Some(dt_sif_val)) = (dt_docker, dt_sif) {
remote_is_newer = dt_docker_val > dt_sif_val;
info!("docker push time: {} and SIF build time: {}, so do we pull? {}", dt_docker_val, dt_sif_val, remote_is_newer);
}

let pull_needed = match config.pull_policy {
GitLabExecutorPullPolicy::Always => true,
GitLabExecutorPullPolicy::Never => {
Expand All @@ -149,6 +231,7 @@ async fn prepare_step(context: &JobContext) -> anyhow::Result<()> {
false
}
GitLabExecutorPullPolicy::IfNotPresent => !image_exists,
GitLabExecutorPullPolicy::TryUpdate => !image_exists || remote_is_newer,
};
info!("Using image {}", image);
if !pull_needed {
Expand Down