Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ pub struct Args {
#[arg(long, default_value = "/tmp")]
recorder_path: String,

/// Sets the path for message schemas.
#[arg(long, default_value = "src/external/zBlueberry/msgs")]
schema_path: String,

/// Zenoh configuration key-value pairs. Can be used multiple times.
/// Format: --zkey key=value
#[arg(long, value_name = "KEY=VALUE", num_args = 1..)]
Expand Down Expand Up @@ -64,29 +68,34 @@ pub fn is_verbose() -> bool {
args().verbose
}

/// Returns the recorder path
pub fn recorder_path() -> std::path::PathBuf {
let recorder_path = args().recorder_path.clone();
pub fn path_dir_from_arg(arg: &str) -> std::path::PathBuf {
let path = std::path::PathBuf::from(arg);

let pathbuf = std::fs::canonicalize(&recorder_path)
let pathbuf = std::fs::canonicalize(&path)
.inspect_err(|_| {
log::warn!(
"Failed canonicalizing path: {recorder_path:?}, using the non-canonized instead."
)
log::warn!("Failed canonicalizing path: {path:?}, using the non-canonized instead.")
})
.unwrap_or_else(|_| std::path::PathBuf::from(&recorder_path));
.unwrap_or_else(|_| std::path::PathBuf::from(&path));

if !pathbuf.exists() {
log::error!("Recorder path does not exist: {pathbuf:?}");
log::error!("Path does not exist: {pathbuf:?}");
std::process::exit(1);
} else if !pathbuf.is_dir() {
log::error!("Recorder path is not a directory: {pathbuf:?}");
log::error!("Path is not a directory: {pathbuf:?}");
std::process::exit(1);
}

pathbuf
}

pub fn recorder_path() -> std::path::PathBuf {
path_dir_from_arg(&args().recorder_path)
}

pub fn schema_path() -> std::path::PathBuf {
path_dir_from_arg(&args().schema_path)
}

/// Returns the zenoh configuration key-value pairs as a HashMap
pub fn zkey_config() -> HashMap<String, String> {
let mut config = HashMap::new();
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn main() -> anyhow::Result<()> {
.unwrap_or_else(|error| panic!("Failed to insert {key}: {error}"));
}

let mut service = Service::new(config, cli::recorder_path()).await;
let mut service = Service::new(config, cli::recorder_path(), cli::schema_path()).await;
service.run().await;

Ok(())
Expand Down
21 changes: 11 additions & 10 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,20 @@ pub struct Service {
subscriber: Subscriber<FifoChannelHandler<Sample>>,
mcap: Mcap,
recorder_path: std::path::PathBuf,
schema_path: std::path::PathBuf,
}

fn load_cdr_schema(schema: &str) -> Result<String> {
fn load_cdr_schema(schema: &str, schema_path: &std::path::Path) -> Result<String> {
let mut schema_splitted = schema.split(".");
let schema_package = schema_splitted.next().ok_or(anyhow::anyhow!(
"Failed to get schema package from {schema}"
))?;
let schema_name = schema_splitted
.next()
.ok_or(anyhow::anyhow!("Failed to get schema name from {schema}"))?;
let current_dir = std::env::current_dir()
.map_err(|e| anyhow::anyhow!("Failed to get current directory: {e}"))?;
let current_dir_string = current_dir.display().to_string();
let schema_path = format!(
"{current_dir_string}/src/external/zBlueberry/msgs/{schema_package}/{schema_name}.msg"
);
let schema_path = schema_path.join(format!("{schema_package}/{schema_name}.msg"));
std::fs::read_to_string(&schema_path)
.map_err(|e| anyhow::anyhow!("Failed to read schema: {e}, ({schema_path})"))
.map_err(|e| anyhow::anyhow!("Failed to read schema: {e}, ({schema_path:?})"))
}

fn create_schema(value: &Value) -> Value {
Expand Down Expand Up @@ -127,7 +123,11 @@ fn generate_filename() -> String {
}

impl Service {
pub async fn new(config: Config, recorder_path: std::path::PathBuf) -> Self {
pub async fn new(
config: Config,
recorder_path: std::path::PathBuf,
schema_path: std::path::PathBuf,
) -> Self {
let session = zenoh::open(config)
.await
.expect("Failed to open zenoh session");
Expand All @@ -144,6 +144,7 @@ impl Service {
channel: HashMap::new(),
},
recorder_path,
schema_path,
}
}

Expand Down Expand Up @@ -194,7 +195,7 @@ impl Service {
let (encoding, schema_description, msg_encoding) =
match (encoding_string_0, encoding_string_1) {
("application/cdr", Some(schema)) => {
let schema = match load_cdr_schema(schema) {
let schema = match load_cdr_schema(schema, &self.schema_path) {
Ok(schema) => schema,
Err(e) => {
log::error!("{topic}: Failed to load schema: {e}");
Expand Down
Loading