From 3bf9d6c609216063c0b7173bb78de550a256bc28 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 27 May 2025 18:48:09 +0800 Subject: [PATCH] feat(tracing): align resource attributes with OpenTelemetry semantic conventions Signed-off-by: Gaius --- Cargo.lock | 15 ++- dragonfly-client-backend/src/hdfs.rs | 4 +- dragonfly-client-backend/src/http.rs | 3 - dragonfly-client-backend/src/lib.rs | 7 +- .../src/object_storage.rs | 8 -- dragonfly-client-init/src/bin/main.rs | 1 + dragonfly-client-storage/src/content.rs | 13 +- dragonfly-client-storage/src/lib.rs | 11 +- dragonfly-client-storage/src/metadata.rs | 3 +- .../src/storage_engine/rocksdb.rs | 12 +- dragonfly-client-util/src/http/basic_auth.rs | 2 - dragonfly-client-util/src/http/mod.rs | 7 -- dragonfly-client-util/src/id_generator/mod.rs | 7 -- dragonfly-client/Cargo.toml | 1 + dragonfly-client/src/announcer/mod.rs | 4 - dragonfly-client/src/bin/dfcache/export.rs | 1 + dragonfly-client/src/bin/dfcache/import.rs | 1 + dragonfly-client/src/bin/dfcache/stat.rs | 1 + dragonfly-client/src/bin/dfdaemon/main.rs | 1 + dragonfly-client/src/bin/dfget/main.rs | 1 + dragonfly-client/src/dynconfig/mod.rs | 2 - dragonfly-client/src/gc/mod.rs | 2 - .../src/grpc/dfdaemon_download.rs | 89 +++++++++---- dragonfly-client/src/grpc/dfdaemon_upload.rs | 118 ++++++++++++++---- dragonfly-client/src/grpc/health.rs | 15 ++- dragonfly-client/src/grpc/interceptor.rs | 46 ++++++- dragonfly-client/src/grpc/manager.rs | 10 +- dragonfly-client/src/grpc/scheduler.rs | 29 ++--- dragonfly-client/src/health/mod.rs | 3 - dragonfly-client/src/metrics/mod.rs | 5 - dragonfly-client/src/proxy/header.rs | 10 +- dragonfly-client/src/proxy/mod.rs | 12 +- .../src/resource/persistent_cache_task.rs | 2 - dragonfly-client/src/resource/piece.rs | 3 - .../src/resource/piece_collector.rs | 2 - .../src/resource/piece_downloader.rs | 3 - dragonfly-client/src/resource/task.rs | 3 +- dragonfly-client/src/stats/mod.rs | 3 - dragonfly-client/src/tracing/mod.rs | 40 ++++-- 39 files changed, 280 insertions(+), 220 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 980ef645..53de562f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -971,6 +971,7 @@ dependencies = [ "openssl", "opentelemetry", "opentelemetry-otlp", + "opentelemetry-semantic-conventions", "opentelemetry_sdk", "path-absolutize", "percent-encoding", @@ -2855,6 +2856,12 @@ dependencies = [ "tonic", ] +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84b29a9f89f1a954936d5aa92f19b2feec3c8f3971d3e96206640db7f9706ae3" + [[package]] name = "opentelemetry_sdk" version = "0.29.0" @@ -5023,15 +5030,15 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" diff --git a/dragonfly-client-backend/src/hdfs.rs b/dragonfly-client-backend/src/hdfs.rs index 83518a07..f96ecc52 100644 --- a/dragonfly-client-backend/src/hdfs.rs +++ b/dragonfly-client-backend/src/hdfs.rs @@ -31,6 +31,7 @@ pub const HDFS_SCHEME: &str = "hdfs"; const DEFAULT_NAMENODE_PORT: u16 = 9870; /// Hdfs is a struct that implements the Backend trait. +#[derive(Default)] pub struct Hdfs { /// scheme is the scheme of the HDFS. scheme: String, @@ -39,7 +40,6 @@ pub struct Hdfs { /// Hdfs implements the Backend trait. impl Hdfs { /// new returns a new HDFS backend. - #[instrument(skip_all)] pub fn new() -> Self { Self { scheme: HDFS_SCHEME.to_string(), @@ -47,7 +47,6 @@ impl Hdfs { } /// operator initializes the operator with the parsed URL and HDFS config. - #[instrument(skip_all)] pub fn operator( &self, url: Url, @@ -84,7 +83,6 @@ impl Hdfs { #[tonic::async_trait] impl super::Backend for Hdfs { /// scheme returns the scheme of the HDFS backend. - #[instrument(skip_all)] fn scheme(&self) -> String { self.scheme.clone() } diff --git a/dragonfly-client-backend/src/http.rs b/dragonfly-client-backend/src/http.rs index 61c02b2c..c4a4a12d 100644 --- a/dragonfly-client-backend/src/http.rs +++ b/dragonfly-client-backend/src/http.rs @@ -43,7 +43,6 @@ pub struct HTTP { /// HTTP implements the http interface. impl HTTP { /// new returns a new HTTP. - #[instrument(skip_all)] pub fn new(scheme: &str) -> Result { // Default TLS client config with no validation. let client_config_builder = rustls::ClientConfig::builder() @@ -75,7 +74,6 @@ impl HTTP { } /// client returns a new reqwest client. - #[instrument(skip_all)] fn client( &self, client_cert: Option>>, @@ -117,7 +115,6 @@ impl HTTP { #[tonic::async_trait] impl super::Backend for HTTP { /// scheme returns the scheme of the HTTP backend. - #[instrument(skip_all)] fn scheme(&self) -> String { self.scheme.clone() } diff --git a/dragonfly-client-backend/src/lib.rs b/dragonfly-client-backend/src/lib.rs index dcf408a8..3393232d 100644 --- a/dragonfly-client-backend/src/lib.rs +++ b/dragonfly-client-backend/src/lib.rs @@ -27,7 +27,7 @@ use std::str::FromStr; use std::{collections::HashMap, pin::Pin, time::Duration}; use std::{fmt::Debug, fs}; use tokio::io::{AsyncRead, AsyncReadExt}; -use tracing::{error, info, instrument, warn}; +use tracing::{error, info, warn}; use url::Url; pub mod hdfs; @@ -227,7 +227,6 @@ pub struct BackendFactory { /// https://github.com/dragonflyoss/client/tree/main/dragonfly-client-backend/examples/plugin/. impl BackendFactory { /// new returns a new BackendFactory. - #[instrument(skip_all)] pub fn new(plugin_dir: Option<&Path>) -> Result { let mut backend_factory = Self::default(); backend_factory.load_builtin_backends()?; @@ -243,13 +242,11 @@ impl BackendFactory { } /// supported_download_directory returns whether the scheme supports directory download. - #[instrument(skip_all)] pub fn supported_download_directory(scheme: &str) -> bool { object_storage::Scheme::from_str(scheme).is_ok() || scheme == hdfs::HDFS_SCHEME } /// build returns the backend by the scheme of the url. - #[instrument(skip_all)] pub fn build(&self, url: &str) -> Result<&(dyn Backend + Send + Sync)> { let url = Url::parse(url).or_err(ErrorType::ParseError)?; let scheme = url.scheme(); @@ -260,7 +257,6 @@ impl BackendFactory { } /// load_builtin_backends loads the builtin backends. - #[instrument(skip_all)] fn load_builtin_backends(&mut self) -> Result<()> { self.backends.insert( "http".to_string(), @@ -330,7 +326,6 @@ impl BackendFactory { } /// load_plugin_backends loads the plugin backends. - #[instrument(skip_all)] fn load_plugin_backends(&mut self, plugin_dir: &Path) -> Result<()> { let backend_plugin_dir = plugin_dir.join(NAME); if !backend_plugin_dir.exists() { diff --git a/dragonfly-client-backend/src/object_storage.rs b/dragonfly-client-backend/src/object_storage.rs index ae2a5221..147ca1cc 100644 --- a/dragonfly-client-backend/src/object_storage.rs +++ b/dragonfly-client-backend/src/object_storage.rs @@ -177,7 +177,6 @@ pub struct ObjectStorage { /// ObjectStorage implements the ObjectStorage trait. impl ObjectStorage { /// Returns ObjectStorage that implements the Backend trait. - #[instrument(skip_all)] pub fn new(scheme: Scheme) -> ClientResult { // Initialize the reqwest client. let client = reqwest::Client::builder() @@ -196,7 +195,6 @@ impl ObjectStorage { } /// operator initializes the operator with the parsed URL and object storage. - #[instrument(skip_all)] pub fn operator( &self, parsed_url: &super::object_storage::ParsedURL, @@ -223,7 +221,6 @@ impl ObjectStorage { } /// s3_operator initializes the S3 operator with the parsed URL and object storage. - #[instrument(skip_all)] pub fn s3_operator( &self, parsed_url: &super::object_storage::ParsedURL, @@ -276,7 +273,6 @@ impl ObjectStorage { } /// gcs_operator initializes the GCS operator with the parsed URL and object storage. - #[instrument(skip_all)] pub fn gcs_operator( &self, parsed_url: &super::object_storage::ParsedURL, @@ -311,7 +307,6 @@ impl ObjectStorage { } /// abs_operator initializes the ABS operator with the parsed URL and object storage. - #[instrument(skip_all)] pub fn abs_operator( &self, parsed_url: &super::object_storage::ParsedURL, @@ -354,7 +349,6 @@ impl ObjectStorage { } /// oss_operator initializes the OSS operator with the parsed URL and object storage. - #[instrument(skip_all)] pub fn oss_operator( &self, parsed_url: &super::object_storage::ParsedURL, @@ -398,7 +392,6 @@ impl ObjectStorage { } /// obs_operator initializes the OBS operator with the parsed URL and object storage. - #[instrument(skip_all)] pub fn obs_operator( &self, parsed_url: &super::object_storage::ParsedURL, @@ -487,7 +480,6 @@ impl ObjectStorage { #[tonic::async_trait] impl crate::Backend for ObjectStorage { /// scheme returns the scheme of the object storage. - #[instrument(skip_all)] fn scheme(&self) -> String { self.scheme.to_string() } diff --git a/dragonfly-client-init/src/bin/main.rs b/dragonfly-client-init/src/bin/main.rs index a613fe3d..a47d6cb3 100644 --- a/dragonfly-client-init/src/bin/main.rs +++ b/dragonfly-client-init/src/bin/main.rs @@ -91,6 +91,7 @@ async fn main() -> Result<(), anyhow::Error> { args.log_max_files, None, None, + false, args.log_to_stdout, ); diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index 92822a4e..7afa88d0 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -67,7 +67,6 @@ pub struct WritePersistentCacheTaskResponse { /// Content implements the content storage. impl Content { /// new returns a new content. - #[instrument(skip_all)] pub async fn new(config: Arc, dir: &Path) -> Result { let dir = dir.join(DEFAULT_CONTENT_DIR); @@ -85,21 +84,18 @@ impl Content { } /// available_space returns the available space of the disk. - #[instrument(skip_all)] pub fn available_space(&self) -> Result { let stat = fs2::statvfs(&self.dir)?; Ok(stat.available_space()) } /// total_space returns the total space of the disk. - #[instrument(skip_all)] pub fn total_space(&self) -> Result { let stat = fs2::statvfs(&self.dir)?; Ok(stat.total_space()) } /// has_enough_space checks if the storage has enough space to store the content. - #[instrument(skip_all)] pub fn has_enough_space(&self, content_length: u64) -> Result { let available_space = self.available_space()?; if available_space < content_length { @@ -115,7 +111,6 @@ impl Content { } /// is_same_dev_inode checks if the source and target are the same device and inode. - #[instrument(skip_all)] async fn is_same_dev_inode, Q: AsRef>( &self, source: P, @@ -141,7 +136,6 @@ impl Content { } /// is_same_dev_inode_as_task checks if the task and target are the same device and inode. - #[instrument(skip_all)] pub async fn is_same_dev_inode_as_task(&self, task_id: &str, to: &Path) -> Result { let task_path = self.get_task_path(task_id); self.is_same_dev_inode(&task_path, to).await @@ -152,6 +146,7 @@ impl Content { /// Behavior of `create_task`: /// 1. If the task already exists, return the task path. /// 2. If the task does not exist, create the task directory and file. + #[instrument(skip_all)] pub async fn create_task(&self, task_id: &str, length: u64) -> Result { let task_path = self.get_task_path(task_id); if task_path.exists() { @@ -244,7 +239,6 @@ impl Content { } /// delete_task deletes the task content. - #[instrument(skip_all)] pub async fn delete_task(&self, task_id: &str) -> Result<()> { info!("delete task content: {}", task_id); let task_path = self.get_task_path(task_id); @@ -378,7 +372,6 @@ impl Content { } /// get_task_path returns the task path by task id. - #[instrument(skip_all)] fn get_task_path(&self, task_id: &str) -> PathBuf { // The task needs split by the first 3 characters of task id(sha256) to // avoid too many files in one directory. @@ -388,7 +381,6 @@ impl Content { /// is_same_dev_inode_as_persistent_cache_task checks if the persistent cache task and target /// are the same device and inode. - #[instrument(skip_all)] pub async fn is_same_dev_inode_as_persistent_cache_task( &self, task_id: &str, @@ -403,6 +395,7 @@ impl Content { /// Behavior of `create_persistent_cache_task`: /// 1. If the persistent cache task already exists, return the persistent cache task path. /// 2. If the persistent cache task does not exist, create the persistent cache task directory and file. + #[instrument(skip_all)] pub async fn create_persistent_cache_task( &self, task_id: &str, @@ -593,7 +586,6 @@ impl Content { } /// delete_task deletes the persistent cache task content. - #[instrument(skip_all)] pub async fn delete_persistent_cache_task(&self, task_id: &str) -> Result<()> { info!("delete persistent cache task content: {}", task_id); let persistent_cache_task_path = self.get_persistent_cache_task_path(task_id); @@ -606,7 +598,6 @@ impl Content { } /// get_persistent_cache_task_path returns the persistent cache task path by task id. - #[instrument(skip_all)] fn get_persistent_cache_task_path(&self, task_id: &str) -> PathBuf { // The persistent cache task needs split by the first 3 characters of task id(sha256) to // avoid too many files in one directory. diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 3ab7fb39..2e627c78 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -56,7 +56,6 @@ pub struct Storage { /// Storage implements the storage. impl Storage { /// new returns a new storage. - #[instrument(skip_all)] pub async fn new(config: Arc, dir: &Path, log_dir: PathBuf) -> Result { let metadata = metadata::Metadata::new(config.clone(), dir, &log_dir)?; let content = content::Content::new(config.clone(), dir).await?; @@ -71,19 +70,16 @@ impl Storage { } /// total_space returns the total space of the disk. - #[instrument(skip_all)] pub fn total_space(&self) -> Result { self.content.total_space() } /// available_space returns the available space of the disk. - #[instrument(skip_all)] pub fn available_space(&self) -> Result { self.content.available_space() } /// has_enough_space checks if the storage has enough space to store the content. - #[instrument(skip_all)] pub fn has_enough_space(&self, content_length: u64) -> Result { self.content.has_enough_space(content_length) } @@ -102,14 +98,12 @@ impl Storage { /// is_same_dev_inode_as_task checks if the task content is on the same device inode as the /// destination. - #[instrument(skip_all)] pub async fn is_same_dev_inode_as_task(&self, id: &str, to: &Path) -> Result { self.content.is_same_dev_inode_as_task(id, to).await } /// prepare_download_task_started prepares the metadata of the task when the task downloads /// started. - #[instrument(skip_all)] pub async fn prepare_download_task_started(&self, id: &str) -> Result { self.metadata.download_task_started(id, None, None, None) } @@ -227,7 +221,6 @@ impl Storage { /// is_same_dev_inode_as_persistent_cache_task checks if the persistent cache task content is on the same device inode as the /// destination. - #[instrument(skip_all)] pub async fn is_same_dev_inode_as_persistent_cache_task( &self, id: &str, @@ -633,7 +626,6 @@ impl Storage { } /// get_piece returns the piece metadata. - #[instrument(skip_all)] pub fn get_piece(&self, piece_id: &str) -> Result> { self.metadata.get_piece(piece_id) } @@ -645,13 +637,13 @@ impl Storage { } /// get_pieces returns the piece metadatas. + #[instrument(skip_all)] pub fn get_pieces(&self, task_id: &str) -> Result> { self.metadata.get_pieces(task_id) } /// piece_id returns the piece id. #[inline] - #[instrument(skip_all)] pub fn piece_id(&self, task_id: &str, number: u32) -> String { self.metadata.piece_id(task_id, number) } @@ -789,7 +781,6 @@ impl Storage { /// persistent_cache_piece_id returns the persistent cache piece id. #[inline] - #[instrument(skip_all)] pub fn persistent_cache_piece_id(&self, task_id: &str, number: u32) -> String { self.metadata.piece_id(task_id, number) } diff --git a/dragonfly-client-storage/src/metadata.rs b/dragonfly-client-storage/src/metadata.rs index 9e4b6d44..1708d3f5 100644 --- a/dragonfly-client-storage/src/metadata.rs +++ b/dragonfly-client-storage/src/metadata.rs @@ -840,7 +840,6 @@ impl Metadata { } /// get_piece gets the piece metadata. - #[instrument(skip_all)] pub fn get_piece(&self, piece_id: &str) -> Result> { self.db.get(piece_id.as_bytes()) } @@ -852,6 +851,7 @@ impl Metadata { } /// get_pieces gets the piece metadatas. + #[instrument(skip_all)] pub fn get_pieces(&self, task_id: &str) -> Result> { let pieces = self .db @@ -906,7 +906,6 @@ impl Metadata { /// piece_id returns the piece id. #[inline] - #[instrument(skip_all)] pub fn piece_id(&self, task_id: &str, number: u32) -> String { format!("{}-{}", task_id, number) } diff --git a/dragonfly-client-storage/src/storage_engine/rocksdb.rs b/dragonfly-client-storage/src/storage_engine/rocksdb.rs index 911a7320..b18143cd 100644 --- a/dragonfly-client-storage/src/storage_engine/rocksdb.rs +++ b/dragonfly-client-storage/src/storage_engine/rocksdb.rs @@ -24,7 +24,7 @@ use std::{ ops::Deref, path::{Path, PathBuf}, }; -use tracing::{info, instrument, warn}; +use tracing::{info, warn}; /// RocksdbStorageEngine is a storage engine based on rocksdb. pub struct RocksdbStorageEngine { @@ -67,7 +67,6 @@ impl RocksdbStorageEngine { const DEFAULT_LOG_MAX_FILES: usize = 10; /// open opens a rocksdb storage engine with the given directory and column families. - #[instrument(skip_all)] pub fn open(dir: &Path, log_dir: &PathBuf, cf_names: &[&str], keep: bool) -> Result { info!("initializing metadata directory: {:?} {:?}", dir, cf_names); // Initialize rocksdb options. @@ -135,7 +134,6 @@ impl RocksdbStorageEngine { /// RocksdbStorageEngine implements the storage engine operations. impl Operations for RocksdbStorageEngine { /// get gets the object by key. - #[instrument(skip_all)] fn get(&self, key: &[u8]) -> Result> { let cf = cf_handle::(self)?; let value = self.get_cf(cf, key).or_err(ErrorType::StorageError)?; @@ -146,7 +144,6 @@ impl Operations for RocksdbStorageEngine { } /// is_exist checks if the object exists by key. - #[instrument(skip_all)] fn is_exist(&self, key: &[u8]) -> Result { let cf = cf_handle::(self)?; Ok(self @@ -156,7 +153,6 @@ impl Operations for RocksdbStorageEngine { } /// put puts the object by key. - #[instrument(skip_all)] fn put(&self, key: &[u8], value: &O) -> Result<()> { let cf = cf_handle::(self)?; self.put_cf(cf, key, value.serialized()?) @@ -165,7 +161,6 @@ impl Operations for RocksdbStorageEngine { } /// delete deletes the object by key. - #[instrument(skip_all)] fn delete(&self, key: &[u8]) -> Result<()> { let cf = cf_handle::(self)?; let mut options = WriteOptions::default(); @@ -177,7 +172,6 @@ impl Operations for RocksdbStorageEngine { } /// iter iterates all objects. - #[instrument(skip_all)] fn iter(&self) -> Result, O)>>> { let cf = cf_handle::(self)?; let iter = self.iterator_cf(cf, rocksdb::IteratorMode::Start); @@ -188,7 +182,6 @@ impl Operations for RocksdbStorageEngine { } /// iter_raw iterates all objects without serialization. - #[instrument(skip_all)] fn iter_raw( &self, ) -> Result, Box<[u8]>)>>> { @@ -202,7 +195,6 @@ impl Operations for RocksdbStorageEngine { } /// prefix_iter iterates all objects with prefix. - #[instrument(skip_all)] fn prefix_iter( &self, prefix: &[u8], @@ -216,7 +208,6 @@ impl Operations for RocksdbStorageEngine { } /// prefix_iter_raw iterates all objects with prefix without serialization. - #[instrument(skip_all)] fn prefix_iter_raw( &self, prefix: &[u8], @@ -229,7 +220,6 @@ impl Operations for RocksdbStorageEngine { } /// batch_delete deletes objects by keys. - #[instrument(skip_all)] fn batch_delete(&self, keys: Vec<&[u8]>) -> Result<()> { let cf = cf_handle::(self)?; let mut batch = rocksdb::WriteBatch::default(); diff --git a/dragonfly-client-util/src/http/basic_auth.rs b/dragonfly-client-util/src/http/basic_auth.rs index 19da337f..54a2b775 100644 --- a/dragonfly-client-util/src/http/basic_auth.rs +++ b/dragonfly-client-util/src/http/basic_auth.rs @@ -20,7 +20,6 @@ use dragonfly_client_core::{ Error, Result, }; use http::header::{self, HeaderMap}; -use tracing::instrument; /// Credentials is the credentials for the basic auth. pub struct Credentials { @@ -34,7 +33,6 @@ pub struct Credentials { /// Credentials is the basic auth. impl Credentials { /// new returns a new Credentials. - #[instrument(skip_all)] pub fn new(username: &str, password: &str) -> Credentials { Self { username: username.to_string(), diff --git a/dragonfly-client-util/src/http/mod.rs b/dragonfly-client-util/src/http/mod.rs index 20af82c9..3ffffee8 100644 --- a/dragonfly-client-util/src/http/mod.rs +++ b/dragonfly-client-util/src/http/mod.rs @@ -21,12 +21,10 @@ use dragonfly_client_core::{ }; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use std::collections::HashMap; -use tracing::instrument; pub mod basic_auth; /// headermap_to_hashmap converts a headermap to a hashmap. -#[instrument(skip_all)] pub fn headermap_to_hashmap(header: &HeaderMap) -> HashMap { let mut hashmap: HashMap = HashMap::with_capacity(header.len()); for (k, v) in header { @@ -39,7 +37,6 @@ pub fn headermap_to_hashmap(header: &HeaderMap) -> HashMap) -> Result> { let mut headermap = HeaderMap::with_capacity(header.len()); for (k, v) in header { @@ -52,7 +49,6 @@ pub fn hashmap_to_headermap(header: &HashMap) -> Result) -> Result> { let mut header = HashMap::with_capacity(raw_header.len()); for h in raw_header { @@ -65,13 +61,11 @@ pub fn header_vec_to_hashmap(raw_header: Vec) -> Result) -> Result { hashmap_to_headermap(&header_vec_to_hashmap(raw_header)?) } /// get_range gets the range from http header. -#[instrument(skip_all)] pub fn get_range(header: &HeaderMap, content_length: u64) -> Result> { match header.get(reqwest::header::RANGE) { Some(range) => { @@ -85,7 +79,6 @@ pub fn get_range(header: &HeaderMap, content_length: u64) -> Result Result { let parsed_ranges = http_range_header::parse_range_header(range_header_value).or_err(ErrorType::ParseError)?; diff --git a/dragonfly-client-util/src/id_generator/mod.rs b/dragonfly-client-util/src/id_generator/mod.rs index b616a7e3..0acd81a1 100644 --- a/dragonfly-client-util/src/id_generator/mod.rs +++ b/dragonfly-client-util/src/id_generator/mod.rs @@ -22,7 +22,6 @@ use dragonfly_client_core::{ use sha2::{Digest, Sha256}; use std::io::{self, Read}; use std::path::PathBuf; -use tracing::instrument; use url::Url; use uuid::Uuid; @@ -76,7 +75,6 @@ pub struct IDGenerator { /// IDGenerator implements the IDGenerator. impl IDGenerator { /// new creates a new IDGenerator. - #[instrument(skip_all)] pub fn new(ip: String, hostname: String, is_seed_peer: bool) -> Self { IDGenerator { ip, @@ -87,7 +85,6 @@ impl IDGenerator { /// host_id generates the host id. #[inline] - #[instrument(skip_all)] pub fn host_id(&self) -> String { if self.is_seed_peer { return format!("{}-{}-{}", self.ip, self.hostname, "seed"); @@ -98,7 +95,6 @@ impl IDGenerator { /// task_id generates the task id. #[inline] - #[instrument(skip_all)] pub fn task_id(&self, parameter: TaskIDParameter) -> Result { match parameter { TaskIDParameter::Content(content) => { @@ -162,7 +158,6 @@ impl IDGenerator { /// persistent_cache_task_id generates the persistent cache task id. #[inline] - #[instrument(skip_all)] pub fn persistent_cache_task_id( &self, parameter: PersistentCacheTaskIDParameter, @@ -218,7 +213,6 @@ impl IDGenerator { /// peer_id generates the peer id. #[inline] - #[instrument(skip_all)] pub fn peer_id(&self) -> String { if self.is_seed_peer { return format!( @@ -234,7 +228,6 @@ impl IDGenerator { } /// task_type generates the task type by the task id. - #[instrument(skip_all)] pub fn task_type(&self, id: &str) -> TaskType { if id.ends_with(PERSISTENT_CACHE_TASK_SUFFIX) { return TaskType::PersistentCache; diff --git a/dragonfly-client/Cargo.toml b/dragonfly-client/Cargo.toml index af4b5d8a..c180d707 100644 --- a/dragonfly-client/Cargo.toml +++ b/dragonfly-client/Cargo.toml @@ -69,6 +69,7 @@ tracing-opentelemetry = "0.30.0" opentelemetry = { version = "0.29.1", default-features = false, features = ["trace"] } opentelemetry-otlp = { version = "0.29.0", default-features = false, features = ["trace", "grpc-tonic"] } opentelemetry_sdk = { version = "0.29.0", default-features = false, features = ["trace"] } +opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] } rolling-file = "0.2.0" pprof = { version = "0.14", features = ["flamegraph", "protobuf-codec"] } prometheus = { version = "0.13", features = ["process"] } diff --git a/dragonfly-client/src/announcer/mod.rs b/dragonfly-client/src/announcer/mod.rs index b95a783d..6ee004e7 100644 --- a/dragonfly-client/src/announcer/mod.rs +++ b/dragonfly-client/src/announcer/mod.rs @@ -50,7 +50,6 @@ pub struct ManagerAnnouncer { /// ManagerAnnouncer implements the manager announcer of the dfdaemon. impl ManagerAnnouncer { /// new creates a new manager announcer. - #[instrument(skip_all)] pub fn new( config: Arc, manager_client: Arc, @@ -66,7 +65,6 @@ impl ManagerAnnouncer { } /// run announces the dfdaemon information to the manager. - #[instrument(skip_all)] pub async fn run(&self) -> Result<()> { // Clone the shutdown channel. let mut shutdown = self.shutdown.clone(); @@ -132,7 +130,6 @@ pub struct SchedulerAnnouncer { /// SchedulerAnnouncer implements the scheduler announcer of the dfdaemon. impl SchedulerAnnouncer { /// new creates a new scheduler announcer. - #[instrument(skip_all)] pub async fn new( config: Arc, host_id: String, @@ -157,7 +154,6 @@ impl SchedulerAnnouncer { } /// run announces the dfdaemon information to the scheduler. - #[instrument(skip_all)] pub async fn run(&self) { // Clone the shutdown channel. let mut shutdown = self.shutdown.clone(); diff --git a/dragonfly-client/src/bin/dfcache/export.rs b/dragonfly-client/src/bin/dfcache/export.rs index e69e3dfd..f939b583 100644 --- a/dragonfly-client/src/bin/dfcache/export.rs +++ b/dragonfly-client/src/bin/dfcache/export.rs @@ -134,6 +134,7 @@ impl ExportCommand { self.log_max_files, None, None, + false, self.console, ); diff --git a/dragonfly-client/src/bin/dfcache/import.rs b/dragonfly-client/src/bin/dfcache/import.rs index d8af6536..e6d5872e 100644 --- a/dragonfly-client/src/bin/dfcache/import.rs +++ b/dragonfly-client/src/bin/dfcache/import.rs @@ -140,6 +140,7 @@ impl ImportCommand { self.log_max_files, None, None, + false, self.console, ); diff --git a/dragonfly-client/src/bin/dfcache/stat.rs b/dragonfly-client/src/bin/dfcache/stat.rs index f52f9065..87a3d081 100644 --- a/dragonfly-client/src/bin/dfcache/stat.rs +++ b/dragonfly-client/src/bin/dfcache/stat.rs @@ -86,6 +86,7 @@ impl StatCommand { self.log_max_files, None, None, + false, self.console, ); diff --git a/dragonfly-client/src/bin/dfdaemon/main.rs b/dragonfly-client/src/bin/dfdaemon/main.rs index 56933f1c..17598bd5 100644 --- a/dragonfly-client/src/bin/dfdaemon/main.rs +++ b/dragonfly-client/src/bin/dfdaemon/main.rs @@ -147,6 +147,7 @@ async fn main() -> Result<(), anyhow::Error> { args.log_max_files, config.tracing.addr.to_owned(), Some(config.host.clone()), + config.seed_peer.enable, args.console, ); diff --git a/dragonfly-client/src/bin/dfget/main.rs b/dragonfly-client/src/bin/dfget/main.rs index c1a6413e..1753b7be 100644 --- a/dragonfly-client/src/bin/dfget/main.rs +++ b/dragonfly-client/src/bin/dfget/main.rs @@ -305,6 +305,7 @@ async fn main() -> anyhow::Result<()> { args.log_max_files, None, None, + false, args.console, ); diff --git a/dragonfly-client/src/dynconfig/mod.rs b/dragonfly-client/src/dynconfig/mod.rs index 536b9287..db8e1941 100644 --- a/dragonfly-client/src/dynconfig/mod.rs +++ b/dragonfly-client/src/dynconfig/mod.rs @@ -65,7 +65,6 @@ pub struct Dynconfig { /// Dynconfig is the implementation of Dynconfig. impl Dynconfig { /// new creates a new Dynconfig. - #[instrument(skip_all)] pub async fn new( config: Arc, manager_client: Arc, @@ -88,7 +87,6 @@ impl Dynconfig { } /// run starts the dynconfig server. - #[instrument(skip_all)] pub async fn run(&self) { // Clone the shutdown channel. let mut shutdown = self.shutdown.clone(); diff --git a/dragonfly-client/src/gc/mod.rs b/dragonfly-client/src/gc/mod.rs index 7650efc6..ff3d43d3 100644 --- a/dragonfly-client/src/gc/mod.rs +++ b/dragonfly-client/src/gc/mod.rs @@ -53,7 +53,6 @@ pub struct GC { impl GC { /// new creates a new GC. - #[instrument(skip_all)] pub fn new( config: Arc, host_id: String, @@ -73,7 +72,6 @@ impl GC { } /// run runs the garbage collector. - #[instrument(skip_all)] pub async fn run(&self) { // Clone the shutdown channel. let mut shutdown = self.shutdown.clone(); diff --git a/dragonfly-client/src/grpc/dfdaemon_download.rs b/dragonfly-client/src/grpc/dfdaemon_download.rs index be340146..8fa04b63 100644 --- a/dragonfly-client/src/grpc/dfdaemon_download.rs +++ b/dragonfly-client/src/grpc/dfdaemon_download.rs @@ -46,6 +46,7 @@ use dragonfly_client_util::{ id_generator::{PersistentCacheTaskIDParameter, TaskIDParameter}, }; use hyper_util::rt::TokioIo; +use opentelemetry::Context; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -63,8 +64,9 @@ use tonic::{ }; use tower::{service_fn, ServiceBuilder}; use tracing::{error, info, instrument, Instrument, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; -use super::interceptor::TracingInterceptor; +use super::interceptor::{ExtractTracingInterceptor, InjectTracingInterceptor}; /// DfdaemonDownloadServer is the grpc unix server of the download. pub struct DfdaemonDownloadServer { @@ -74,8 +76,11 @@ pub struct DfdaemonDownloadServer { /// socket_path is the path of the unix domain socket. socket_path: PathBuf, - /// service is the grpc service of the dfdaemon. - service: DfdaemonDownloadGRPCServer, + /// task is the task manager. + task: Arc, + + /// persistent_cache_task is the persistent cache task manager. + persistent_cache_task: Arc, /// shutdown is used to shutdown the grpc server. shutdown: shutdown::Shutdown, @@ -87,7 +92,6 @@ pub struct DfdaemonDownloadServer { /// DfdaemonDownloadServer implements the grpc server of the download. impl DfdaemonDownloadServer { /// new creates a new DfdaemonServer. - #[instrument(skip_all)] pub fn new( config: Arc, socket_path: PathBuf, @@ -96,27 +100,28 @@ impl DfdaemonDownloadServer { shutdown: shutdown::Shutdown, shutdown_complete_tx: mpsc::UnboundedSender<()>, ) -> Self { - // Initialize the grpc service. - let service = DfdaemonDownloadGRPCServer::new(DfdaemonDownloadServerHandler { - socket_path: socket_path.clone(), - task, - persistent_cache_task, - }) - .max_decoding_message_size(usize::MAX) - .max_encoding_message_size(usize::MAX); - Self { config, socket_path, - service, + task, + persistent_cache_task, shutdown, _shutdown_complete: shutdown_complete_tx, } } /// run starts the download server with unix domain socket. - #[instrument(skip_all)] pub async fn run(&mut self, grpc_server_started_barrier: Arc) -> ClientResult<()> { + // Initialize the grpc service. + let service = DfdaemonDownloadGRPCServer::with_interceptor( + DfdaemonDownloadServerHandler { + socket_path: self.socket_path.clone(), + task: self.task.clone(), + persistent_cache_task: self.persistent_cache_task.clone(), + }, + ExtractTracingInterceptor, + ); + // Register the reflection service. let reflection = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(dragonfly_api::FILE_DESCRIPTOR_SET) @@ -148,7 +153,7 @@ impl DfdaemonDownloadServer { // TODO(Gaius): RateLimitLayer is not implemented Clone, so we can't use it here. // Only use the LoadShed layer and the ConcurrencyLimit layer. - let layer = ServiceBuilder::new() + let rate_limit_layer = ServiceBuilder::new() .concurrency_limit(self.config.download.server.request_rate_limit as usize) .load_shed() .into_inner(); @@ -159,10 +164,10 @@ impl DfdaemonDownloadServer { .tcp_keepalive(Some(super::TCP_KEEPALIVE)) .http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL)) .http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT)) - .layer(layer) + .layer(rate_limit_layer) .add_service(reflection.clone()) .add_service(health_service) - .add_service(self.service.clone()) + .add_service(service) .serve_with_incoming_shutdown(uds_stream, async move { // When the grpc server is started, notify the barrier. If the shutdown signal is received // before barrier is waited successfully, the server will shutdown immediately. @@ -221,6 +226,11 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Record the start time. let start_time = Instant::now(); @@ -618,6 +628,11 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Clone the request. let request = request.into_inner(); @@ -657,6 +672,11 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Clone the request. let request = request.into_inner(); @@ -691,7 +711,12 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { /// delete_host calls the scheduler to delete the host. #[instrument(skip_all, fields(host_id))] - async fn delete_host(&self, _: Request<()>) -> Result, Status> { + async fn delete_host(&self, request: Request<()>) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Generate the host id. let host_id = self.task.id_generator.host_id(); @@ -727,6 +752,11 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Record the start time. let start_time = Instant::now(); @@ -947,6 +977,11 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Record the start time. let start_time = Instant::now(); @@ -1039,6 +1074,11 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Clone the request. let request = request.into_inner(); @@ -1076,13 +1116,12 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler { #[derive(Clone)] pub struct DfdaemonDownloadClient { /// client is the grpc client of the dfdaemon. - pub client: DfdaemonDownloadGRPCClient>, + pub client: DfdaemonDownloadGRPCClient>, } /// DfdaemonDownloadClient implements the grpc client of the dfdaemon download. impl DfdaemonDownloadClient { /// new_unix creates a new DfdaemonDownloadClient with unix domain socket. - #[instrument(skip_all)] pub async fn new_unix(socket_path: PathBuf) -> ClientResult { // Ignore the uri because it is not used. let channel = Endpoint::try_from("http://[::]:50051") @@ -1107,9 +1146,10 @@ impl DfdaemonDownloadClient { }) .or_err(ErrorType::ConnectError)?; - let client = DfdaemonDownloadGRPCClient::with_interceptor(channel, TracingInterceptor) - .max_decoding_message_size(usize::MAX) - .max_encoding_message_size(usize::MAX); + let client = + DfdaemonDownloadGRPCClient::with_interceptor(channel, InjectTracingInterceptor) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX); Ok(Self { client }) } @@ -1228,7 +1268,6 @@ impl DfdaemonDownloadClient { } /// make_request creates a new request with timeout. - #[instrument(skip_all)] fn make_request(request: T) -> tonic::Request { let mut request = tonic::Request::new(request); request.set_timeout(super::REQUEST_TIMEOUT); diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index e5816c6d..dd19f767 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -14,7 +14,6 @@ * limitations under the License. */ -use super::interceptor::TracingInterceptor; use crate::metrics::{ collect_delete_task_failure_metrics, collect_delete_task_started_metrics, collect_download_task_failure_metrics, collect_download_task_finished_metrics, @@ -51,6 +50,7 @@ use dragonfly_client_util::{ id_generator::TaskIDParameter, net::{get_interface_info, Interface}, }; +use opentelemetry::Context; use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -68,8 +68,11 @@ use tonic::{ }; use tower::ServiceBuilder; use tracing::{debug, error, info, instrument, Instrument, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use url::Url; +use super::interceptor::{ExtractTracingInterceptor, InjectTracingInterceptor}; + /// DfdaemonUploadServer is the grpc server of the upload. pub struct DfdaemonUploadServer { /// config is the configuration of the dfdaemon. @@ -78,8 +81,11 @@ pub struct DfdaemonUploadServer { /// addr is the address of the grpc server. addr: SocketAddr, - /// service is the grpc service of the dfdaemon upload. - service: DfdaemonUploadGRPCServer, + /// task is the task manager. + task: Arc, + + /// persistent_cache_task is the persistent cache task manager. + persistent_cache_task: Arc, /// shutdown is used to shutdown the grpc server. shutdown: shutdown::Shutdown, @@ -91,7 +97,6 @@ pub struct DfdaemonUploadServer { /// DfdaemonUploadServer implements the grpc server of the upload. impl DfdaemonUploadServer { /// new creates a new DfdaemonUploadServer. - #[instrument(skip_all)] pub fn new( config: Arc, addr: SocketAddr, @@ -100,31 +105,33 @@ impl DfdaemonUploadServer { shutdown: shutdown::Shutdown, shutdown_complete_tx: mpsc::UnboundedSender<()>, ) -> Self { - // Initialize the grpc service. - let interface = - get_interface_info(config.host.ip.unwrap(), config.upload.rate_limit).unwrap(); - - let service = DfdaemonUploadGRPCServer::new(DfdaemonUploadServerHandler { - interface, - socket_path: config.download.server.socket_path.clone(), - task, - persistent_cache_task, - }) - .max_decoding_message_size(usize::MAX) - .max_encoding_message_size(usize::MAX); - Self { config, addr, - service, + task, + persistent_cache_task, shutdown, _shutdown_complete: shutdown_complete_tx, } } /// run starts the upload server. - #[instrument(skip_all)] pub async fn run(&mut self, grpc_server_started_barrier: Arc) -> ClientResult<()> { + // Initialize the grpc service. + let interface = + get_interface_info(self.config.host.ip.unwrap(), self.config.upload.rate_limit) + .unwrap(); + + let service = DfdaemonUploadGRPCServer::with_interceptor( + DfdaemonUploadServerHandler { + interface, + socket_path: self.config.download.server.socket_path.clone(), + task: self.task.clone(), + persistent_cache_task: self.persistent_cache_task.clone(), + }, + ExtractTracingInterceptor, + ); + // Register the reflection service. let reflection = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(dragonfly_api::FILE_DESCRIPTOR_SET) @@ -143,7 +150,7 @@ impl DfdaemonUploadServer { // TODO(Gaius): RateLimitLayer is not implemented Clone, so we can't use it here. // Only use the LoadShed layer and the ConcurrencyLimit layer. - let layer = ServiceBuilder::new() + let rate_limit_layer = ServiceBuilder::new() .concurrency_limit(self.config.upload.server.request_rate_limit as usize) .load_shed() .into_inner(); @@ -163,10 +170,10 @@ impl DfdaemonUploadServer { .tcp_keepalive(Some(super::TCP_KEEPALIVE)) .http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL)) .http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT)) - .layer(layer) + .layer(rate_limit_layer) .add_service(reflection.clone()) .add_service(health_service) - .add_service(self.service.clone()) + .add_service(service) .serve_with_shutdown(self.addr, async move { // When the grpc server is started, notify the barrier. If the shutdown signal is received // before barrier is waited successfully, the server will shutdown immediately. @@ -220,6 +227,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Record the start time. let start_time = Instant::now(); @@ -617,6 +629,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { /// stat_task stats the task. #[instrument(skip_all, fields(host_id, task_id))] async fn stat_task(&self, request: Request) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Clone the request. let request = request.into_inner(); @@ -656,6 +673,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Clone the request. let request = request.into_inner(); @@ -697,6 +719,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Clone the request. let request = request.into_inner(); @@ -835,6 +862,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Clone the request. let request = request.into_inner(); @@ -853,7 +885,6 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { // Generate the piece id. let piece_id = self.task.piece.id(task_id.as_str(), piece_number); - // Span record the host id, task id and piece number. Span::current().record("host_id", host_id.as_str()); Span::current().record("remote_host_id", remote_host_id.as_str()); Span::current().record("task_id", task_id.as_str()); @@ -940,6 +971,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // DEFAULT_HOST_INFO_REFRESH_INTERVAL is the default interval for refreshing the host info. const DEFAULT_HOST_INFO_REFRESH_INTERVAL: Duration = Duration::from_millis(500); @@ -1029,6 +1065,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Record the start time. let start_time = Instant::now(); @@ -1249,6 +1290,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Clone the request. let request = request.into_inner(); @@ -1287,6 +1333,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Clone the request. let request = request.into_inner(); @@ -1325,6 +1376,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Clone the request. let request = request.into_inner(); @@ -1355,6 +1411,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Clone the request. let request = request.into_inner(); @@ -1487,6 +1548,11 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { &self, request: Request, ) -> Result, Status> { + // If the parent context is set, use it as the parent context for the span. + if let Some(parent_ctx) = request.extensions().get::() { + Span::current().set_parent(parent_ctx.clone()); + }; + // Clone the request. let request = request.into_inner(); @@ -1602,13 +1668,12 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler { #[derive(Clone)] pub struct DfdaemonUploadClient { /// client is the grpc client of the dfdaemon upload. - pub client: DfdaemonUploadGRPCClient>, + pub client: DfdaemonUploadGRPCClient>, } /// DfdaemonUploadClient implements the dfdaemon upload grpc client. impl DfdaemonUploadClient { /// new creates a new DfdaemonUploadClient. - #[instrument(skip_all)] pub async fn new( config: Arc, addr: String, @@ -1667,7 +1732,7 @@ impl DfdaemonUploadClient { .or_err(ErrorType::ConnectError)?, }; - let client = DfdaemonUploadGRPCClient::with_interceptor(channel, TracingInterceptor) + let client = DfdaemonUploadGRPCClient::with_interceptor(channel, InjectTracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX); Ok(Self { client }) @@ -1844,7 +1909,6 @@ impl DfdaemonUploadClient { } /// make_request creates a new request with timeout. - #[instrument(skip_all)] fn make_request(request: T) -> tonic::Request { let mut request = tonic::Request::new(request); request.set_timeout(super::REQUEST_TIMEOUT); diff --git a/dragonfly-client/src/grpc/health.rs b/dragonfly-client/src/grpc/health.rs index 21e8d2d6..6c6132b1 100644 --- a/dragonfly-client/src/grpc/health.rs +++ b/dragonfly-client/src/grpc/health.rs @@ -21,27 +21,27 @@ use dragonfly_client_core::{ use hyper_util::rt::TokioIo; use std::path::PathBuf; use tokio::net::UnixStream; +use tonic::service::interceptor::InterceptedService; +use tonic::transport::ClientTlsConfig; use tonic::transport::{Channel, Endpoint, Uri}; -use tonic::{service::interceptor::InterceptedService, transport::ClientTlsConfig}; use tonic_health::pb::{ health_client::HealthClient as HealthGRPCClient, HealthCheckRequest, HealthCheckResponse, }; use tower::service_fn; use tracing::{error, instrument}; -use super::interceptor::TracingInterceptor; +use super::interceptor::InjectTracingInterceptor; /// HealthClient is a wrapper of HealthGRPCClient. #[derive(Clone)] pub struct HealthClient { /// client is the grpc client of the certificate. - client: HealthGRPCClient>, + client: HealthGRPCClient>, } /// HealthClient implements the grpc client of the health. impl HealthClient { /// new creates a new HealthClient. - #[instrument(skip_all)] pub async fn new(addr: &str, client_tls_config: Option) -> Result { let channel = match client_tls_config { Some(client_tls_config) => Channel::from_shared(addr.to_string()) @@ -73,14 +73,13 @@ impl HealthClient { .or_err(ErrorType::ConnectError)?, }; - let client = HealthGRPCClient::with_interceptor(channel, TracingInterceptor) + let client = HealthGRPCClient::with_interceptor(channel, InjectTracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX); Ok(Self { client }) } /// new_unix creates a new HealthClient with unix domain socket. - #[instrument(skip_all)] pub async fn new_unix(socket_path: PathBuf) -> Result { // Ignore the uri because it is not used. let channel = Endpoint::try_from("http://[::]:50051") @@ -98,7 +97,8 @@ impl HealthClient { error!("connect failed: {}", err); }) .or_err(ErrorType::ConnectError)?; - let client = HealthGRPCClient::with_interceptor(channel, TracingInterceptor) + + let client = HealthGRPCClient::with_interceptor(channel, InjectTracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX); Ok(Self { client }) @@ -137,7 +137,6 @@ impl HealthClient { } /// make_request creates a new request with timeout. - #[instrument(skip_all)] fn make_request(request: T) -> tonic::Request { let mut request = tonic::Request::new(request); request.set_timeout(super::REQUEST_TIMEOUT); diff --git a/dragonfly-client/src/grpc/interceptor.rs b/dragonfly-client/src/grpc/interceptor.rs index 32f18330..40bbf547 100644 --- a/dragonfly-client/src/grpc/interceptor.rs +++ b/dragonfly-client/src/grpc/interceptor.rs @@ -17,9 +17,28 @@ use tonic::{metadata, service::Interceptor, Request, Status}; use tracing_opentelemetry::OpenTelemetrySpanExt; -/// MetadataMap is a tracing meda data map container. +/// MetadataMap is a tracing meda data map container for span context. struct MetadataMap<'a>(&'a mut metadata::MetadataMap); +/// MetadataMap implements the otel tracing Extractor. +impl opentelemetry::propagation::Extractor for MetadataMap<'_> { + /// Get a value for a key from the `MetadataMap`. If the value can't be converted to &str, returns None + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|metadata| metadata.to_str().ok()) + } + + /// Collect all the keys from the `MetadataMap`. + fn keys(&self) -> Vec<&str> { + self.0 + .keys() + .map(|key| match key { + tonic::metadata::KeyRef::Ascii(v) => v.as_str(), + tonic::metadata::KeyRef::Binary(v) => v.as_str(), + }) + .collect::>() + } +} + /// MetadataMap implements the otel tracing Injector. impl opentelemetry::propagation::Injector for MetadataMap<'_> { /// set a key-value pair to the injector. @@ -32,12 +51,12 @@ impl opentelemetry::propagation::Injector for MetadataMap<'_> { } } -/// TracingInterceptor is a auto-inject tracing gRPC interceptor. +/// InjectTracingInterceptor is a auto-inject tracing gRPC interceptor. #[derive(Clone)] -pub struct TracingInterceptor; +pub struct InjectTracingInterceptor; -/// TracingInterceptor implements the tonic Interceptor interface. -impl Interceptor for TracingInterceptor { +/// InjectTracingInterceptor implements the tonic Interceptor interface. +impl Interceptor for InjectTracingInterceptor { /// call and inject tracing context into lgobal propagator. fn call(&mut self, mut request: Request<()>) -> std::result::Result, Status> { let context = tracing::Span::current().context(); @@ -48,3 +67,20 @@ impl Interceptor for TracingInterceptor { Ok(request) } } + +/// ExtractTracingInterceptor is a auto-extract tracing gRPC interceptor. +#[derive(Clone)] +pub struct ExtractTracingInterceptor; + +/// ExtractTracingInterceptor implements the tonic Interceptor interface. +impl Interceptor for ExtractTracingInterceptor { + /// call and inject tracing context into lgobal propagator. + fn call(&mut self, mut request: Request<()>) -> std::result::Result, Status> { + let parent_cx = opentelemetry::global::get_text_map_propagator(|prop| { + prop.extract(&MetadataMap(request.metadata_mut())) + }); + + request.extensions_mut().insert(parent_cx); + Ok(request) + } +} diff --git a/dragonfly-client/src/grpc/manager.rs b/dragonfly-client/src/grpc/manager.rs index 88dd7147..5360e44a 100644 --- a/dragonfly-client/src/grpc/manager.rs +++ b/dragonfly-client/src/grpc/manager.rs @@ -27,22 +27,21 @@ use dragonfly_client_core::{ use std::sync::Arc; use tonic::{service::interceptor::InterceptedService, transport::Channel}; use tonic_health::pb::health_check_response::ServingStatus; -use tracing::{error, instrument, warn}; +use tracing::{error, instrument}; use url::Url; -use super::interceptor::TracingInterceptor; +use super::interceptor::InjectTracingInterceptor; /// ManagerClient is a wrapper of ManagerGRPCClient. #[derive(Clone)] pub struct ManagerClient { /// client is the grpc client of the manager. - pub client: ManagerGRPCClient>, + pub client: ManagerGRPCClient>, } /// ManagerClient implements the grpc client of the manager. impl ManagerClient { /// new creates a new ManagerClient. - #[instrument(skip_all)] pub async fn new(config: Arc, addr: String) -> Result { let domain_name = Url::parse(addr.as_str())? .host_str() @@ -99,7 +98,7 @@ impl ManagerClient { .or_err(ErrorType::ConnectError)?, }; - let client = ManagerGRPCClient::with_interceptor(channel, TracingInterceptor) + let client = ManagerGRPCClient::with_interceptor(channel, InjectTracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX); Ok(Self { client }) @@ -133,7 +132,6 @@ impl ManagerClient { } /// make_request creates a new request with timeout. - #[instrument(skip_all)] fn make_request(request: T) -> tonic::Request { let mut request = tonic::Request::new(request); request.set_timeout(super::REQUEST_TIMEOUT); diff --git a/dragonfly-client/src/grpc/scheduler.rs b/dragonfly-client/src/grpc/scheduler.rs index 2c6a6a37..8c24bd9b 100644 --- a/dragonfly-client/src/grpc/scheduler.rs +++ b/dragonfly-client/src/grpc/scheduler.rs @@ -40,7 +40,7 @@ use tonic::transport::Channel; use tracing::{error, info, instrument, Instrument}; use url::Url; -use super::interceptor::TracingInterceptor; +use super::interceptor::InjectTracingInterceptor; /// VNode is the virtual node of the hashring. #[derive(Debug, Copy, Clone, Hash, PartialEq)] @@ -79,7 +79,6 @@ pub struct SchedulerClient { /// SchedulerClient implements the grpc client of the scheduler. impl SchedulerClient { /// new creates a new SchedulerClient. - #[instrument(skip_all)] pub async fn new(config: Arc, dynconfig: Arc) -> Result { let client = Self { config, @@ -192,9 +191,10 @@ impl SchedulerClient { }) .or_err(ErrorType::ConnectError)?; - let mut client = SchedulerGRPCClient::with_interceptor(channel, TracingInterceptor) - .max_decoding_message_size(usize::MAX) - .max_encoding_message_size(usize::MAX); + let mut client = + SchedulerGRPCClient::with_interceptor(channel, InjectTracingInterceptor) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX); client.announce_host(request).await?; Ok(()) } @@ -245,9 +245,10 @@ impl SchedulerClient { }) .or_err(ErrorType::ConnectError)?; - let mut client = SchedulerGRPCClient::with_interceptor(channel, TracingInterceptor) - .max_decoding_message_size(usize::MAX) - .max_encoding_message_size(usize::MAX); + let mut client = + SchedulerGRPCClient::with_interceptor(channel, InjectTracingInterceptor) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX); client.announce_host(request).await?; Ok(()) } @@ -303,9 +304,10 @@ impl SchedulerClient { }) .or_err(ErrorType::ConnectError)?; - let mut client = SchedulerGRPCClient::with_interceptor(channel, TracingInterceptor) - .max_decoding_message_size(usize::MAX) - .max_encoding_message_size(usize::MAX); + let mut client = + SchedulerGRPCClient::with_interceptor(channel, InjectTracingInterceptor) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX); client.delete_host(request).await?; Ok(()) } @@ -457,7 +459,7 @@ impl SchedulerClient { &self, task_id: &str, peer_id: Option<&str>, - ) -> Result>> { + ) -> Result>> { // Update scheduler addresses of the client. self.update_available_scheduler_addrs().await?; @@ -516,7 +518,7 @@ impl SchedulerClient { }; Ok( - SchedulerGRPCClient::with_interceptor(channel, TracingInterceptor) + SchedulerGRPCClient::with_interceptor(channel, InjectTracingInterceptor) .max_decoding_message_size(usize::MAX) .max_encoding_message_size(usize::MAX), ) @@ -619,7 +621,6 @@ impl SchedulerClient { } /// make_request creates a new request with timeout. - #[instrument(skip_all)] fn make_request(request: T) -> tonic::Request { let mut request = tonic::Request::new(request); request.set_timeout(super::REQUEST_TIMEOUT); diff --git a/dragonfly-client/src/health/mod.rs b/dragonfly-client/src/health/mod.rs index 5a176d33..5f183a7a 100644 --- a/dragonfly-client/src/health/mod.rs +++ b/dragonfly-client/src/health/mod.rs @@ -36,7 +36,6 @@ pub struct Health { /// Health implements the health server. impl Health { /// new creates a new Health. - #[instrument(skip_all)] pub fn new( addr: SocketAddr, shutdown: shutdown::Shutdown, @@ -50,7 +49,6 @@ impl Health { } /// run starts the health server. - #[instrument(skip_all)] pub async fn run(&self) { // Clone the shutdown channel. let mut shutdown = self.shutdown.clone(); @@ -71,7 +69,6 @@ impl Health { _ = shutdown.recv() => { // Health server shutting down with signals. info!("health server shutting down"); - return } } } diff --git a/dragonfly-client/src/metrics/mod.rs b/dragonfly-client/src/metrics/mod.rs index 1254a46c..192a65b2 100644 --- a/dragonfly-client/src/metrics/mod.rs +++ b/dragonfly-client/src/metrics/mod.rs @@ -271,7 +271,6 @@ lazy_static! { } /// register_custom_metrics registers all custom metrics. -#[instrument(skip_all)] fn register_custom_metrics() { REGISTRY .register(Box::new(VERSION_GAUGE.clone())) @@ -387,7 +386,6 @@ fn register_custom_metrics() { } /// reset_custom_metrics resets all custom metrics. -#[instrument(skip_all)] fn reset_custom_metrics() { VERSION_GAUGE.reset(); DOWNLOAD_TASK_COUNT.reset(); @@ -856,7 +854,6 @@ pub struct Metrics { /// Metrics implements the metrics server. impl Metrics { /// new creates a new Metrics. - #[instrument(skip_all)] pub fn new( config: Arc, shutdown: shutdown::Shutdown, @@ -877,7 +874,6 @@ impl Metrics { } /// run starts the metrics server. - #[instrument(skip_all)] pub async fn run(&self) { // Clone the shutdown channel. let mut shutdown = self.shutdown.clone(); @@ -929,7 +925,6 @@ impl Metrics { _ = shutdown.recv() => { // Metrics server shutting down with signals. info!("metrics server shutting down"); - return } } } diff --git a/dragonfly-client/src/proxy/header.rs b/dragonfly-client/src/proxy/header.rs index 2ecd5907..ee08f96d 100644 --- a/dragonfly-client/src/proxy/header.rs +++ b/dragonfly-client/src/proxy/header.rs @@ -17,7 +17,7 @@ use bytesize::ByteSize; use dragonfly_api::common::v2::Priority; use reqwest::header::HeaderMap; -use tracing::{error, instrument}; +use tracing::error; /// DRAGONFLY_TAG_HEADER is the header key of tag in http request. pub const DRAGONFLY_TAG_HEADER: &str = "X-Dragonfly-Tag"; @@ -80,7 +80,6 @@ pub const DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID: &str = "X-Dragonfly-Content-For-Calculating-Task-ID"; /// get_tag gets the tag from http header. -#[instrument(skip_all)] pub fn get_tag(header: &HeaderMap) -> Option { header .get(DRAGONFLY_TAG_HEADER) @@ -89,7 +88,6 @@ pub fn get_tag(header: &HeaderMap) -> Option { } /// get_application gets the application from http header. -#[instrument(skip_all)] pub fn get_application(header: &HeaderMap) -> Option { header .get(DRAGONFLY_APPLICATION_HEADER) @@ -98,7 +96,6 @@ pub fn get_application(header: &HeaderMap) -> Option { } /// get_priority gets the priority from http header. -#[instrument(skip_all)] pub fn get_priority(header: &HeaderMap) -> i32 { let default_priority = Priority::Level6 as i32; match header.get(DRAGONFLY_PRIORITY_HEADER) { @@ -120,7 +117,6 @@ pub fn get_priority(header: &HeaderMap) -> i32 { } /// get_registry gets the custom address of container registry from http header. -#[instrument(skip_all)] pub fn get_registry(header: &HeaderMap) -> Option { header .get(DRAGONFLY_REGISTRY_HEADER) @@ -129,7 +125,6 @@ pub fn get_registry(header: &HeaderMap) -> Option { } /// get_filters gets the filters from http header. -#[instrument(skip_all)] pub fn get_filtered_query_params( header: &HeaderMap, default_filtered_query_params: Vec, @@ -147,7 +142,6 @@ pub fn get_filtered_query_params( } /// get_use_p2p gets the use p2p from http header. -#[instrument(skip_all)] pub fn get_use_p2p(header: &HeaderMap) -> bool { match header.get(DRAGONFLY_USE_P2P_HEADER) { Some(value) => match value.to_str() { @@ -162,7 +156,6 @@ pub fn get_use_p2p(header: &HeaderMap) -> bool { } /// get_prefetch gets the prefetch from http header. -#[instrument(skip_all)] pub fn get_prefetch(header: &HeaderMap) -> Option { match header.get(DRAGONFLY_PREFETCH_HEADER) { Some(value) => match value.to_str() { @@ -185,7 +178,6 @@ pub fn get_output_path(header: &HeaderMap) -> Option { } /// get_force_hard_link gets the force hard link from http header. -#[instrument(skip_all)] pub fn get_force_hard_link(header: &HeaderMap) -> bool { match header.get(DRAGONFLY_FORCE_HARD_LINK_HEADER) { Some(value) => match value.to_str() { diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 72f99e5c..da7c729e 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -99,7 +99,6 @@ pub struct Proxy { /// Proxy implements the proxy server. impl Proxy { /// new creates a new Proxy. - #[instrument(skip_all)] pub fn new( config: Arc, task: Arc, @@ -144,7 +143,6 @@ impl Proxy { } /// run starts the proxy server. - #[instrument(skip_all)] pub async fn run(&self, grpc_server_started_barrier: Arc) -> ClientResult<()> { let mut shutdown = self.shutdown.clone(); let read_buffer_size = self.config.proxy.read_buffer_size; @@ -981,7 +979,6 @@ async fn proxy_via_https( } /// make_registry_mirror_request makes a registry mirror request by the request. -#[instrument(skip_all)] fn make_registry_mirror_request( config: Arc, mut request: Request, @@ -1015,7 +1012,6 @@ fn make_registry_mirror_request( } /// make_download_task_request makes a download task request by the request. -#[instrument(skip_all)] fn make_download_task_request( config: Arc, rule: &Rule, @@ -1074,7 +1070,6 @@ fn make_download_task_request( /// need_prefetch returns whether the prefetch is needed by the configuration and the request /// header. -#[instrument(skip_all)] fn need_prefetch(config: Arc, header: &http::HeaderMap) -> bool { // If the header not contains the range header, the request does not need prefetch. if !header.contains_key(reqwest::header::RANGE) { @@ -1088,11 +1083,10 @@ fn need_prefetch(config: Arc, header: &http::HeaderMap) -> bool { } // Return the prefetch value from the configuration. - return config.proxy.prefetch; + config.proxy.prefetch } /// make_download_url makes a download url by the given uri. -#[instrument(skip_all)] fn make_download_url( uri: &hyper::Uri, use_tls: bool, @@ -1117,7 +1111,6 @@ fn make_download_url( } /// make_response_headers makes the response headers. -#[instrument(skip_all)] fn make_response_headers( mut download_task_started_response: DownloadTaskStartedResponse, ) -> ClientResult { @@ -1144,13 +1137,11 @@ fn make_response_headers( /// find_matching_rule returns whether the dfdaemon should be used to download the task. /// If the dfdaemon should be used, return the matched rule. -#[instrument(skip_all)] fn find_matching_rule(rules: Option<&[Rule]>, url: &str) -> Option { rules?.iter().find(|rule| rule.regex.is_match(url)).cloned() } /// make_error_response makes an error response with the given status and message. -#[instrument(skip_all)] fn make_error_response(status: http::StatusCode, header: Option) -> Response { let mut response = Response::new(empty()); *response.status_mut() = status; @@ -1164,7 +1155,6 @@ fn make_error_response(status: http::StatusCode, header: Option } /// empty returns an empty body. -#[instrument(skip_all)] fn empty() -> BoxBody { Empty::::new() .map_err(|never| match never {}) diff --git a/dragonfly-client/src/resource/persistent_cache_task.rs b/dragonfly-client/src/resource/persistent_cache_task.rs index e164f399..ebea7ce6 100644 --- a/dragonfly-client/src/resource/persistent_cache_task.rs +++ b/dragonfly-client/src/resource/persistent_cache_task.rs @@ -84,7 +84,6 @@ pub struct PersistentCacheTask { /// PersistentCacheTask is the implementation of PersistentCacheTask. impl PersistentCacheTask { /// new creates a new PersistentCacheTask. - #[instrument(skip_all)] pub fn new( config: Arc, id_generator: Arc, @@ -503,7 +502,6 @@ impl PersistentCacheTask { } /// is_same_dev_inode checks if the persistent cache task is on the same device inode as the given path. - #[instrument(skip_all)] pub async fn is_same_dev_inode(&self, id: &str, to: &Path) -> ClientResult { self.storage .is_same_dev_inode_as_persistent_cache_task(id, to) diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index f5a2adcc..4646f0e9 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -87,7 +87,6 @@ pub struct Piece { /// Piece implements the piece manager. impl Piece { /// new returns a new Piece. - #[instrument(skip_all)] pub fn new( config: Arc, id_generator: Arc, @@ -136,7 +135,6 @@ impl Piece { /// id generates a new piece id. #[inline] - #[instrument(skip_all)] pub fn id(&self, task_id: &str, number: u32) -> String { self.storage.piece_id(task_id, number) } @@ -662,7 +660,6 @@ impl Piece { /// persistent_cache_id generates a new persistent cache piece id. #[inline] - #[instrument(skip_all)] pub fn persistent_cache_id(&self, task_id: &str, number: u32) -> String { self.storage.persistent_cache_piece_id(task_id, number) } diff --git a/dragonfly-client/src/resource/piece_collector.rs b/dragonfly-client/src/resource/piece_collector.rs index 8b7308ae..eaabfc38 100644 --- a/dragonfly-client/src/resource/piece_collector.rs +++ b/dragonfly-client/src/resource/piece_collector.rs @@ -75,7 +75,6 @@ pub struct PieceCollector { /// PieceCollector is used to collect pieces from peers. impl PieceCollector { /// new creates a new PieceCollector. - #[instrument(skip_all)] pub async fn new( config: Arc, host_id: &str, @@ -305,7 +304,6 @@ pub struct PersistentCachePieceCollector { /// PersistentCachePieceCollector is used to collect persistent cache pieces from peers. impl PersistentCachePieceCollector { /// new creates a new PieceCollector. - #[instrument(skip_all)] pub async fn new( config: Arc, host_id: &str, diff --git a/dragonfly-client/src/resource/piece_downloader.rs b/dragonfly-client/src/resource/piece_downloader.rs index 932608c9..dc630598 100644 --- a/dragonfly-client/src/resource/piece_downloader.rs +++ b/dragonfly-client/src/resource/piece_downloader.rs @@ -66,7 +66,6 @@ pub struct DownloaderFactory { /// DownloadFactory implements the DownloadFactory trait. impl DownloaderFactory { /// new returns a new DownloadFactory. - #[instrument(skip_all)] pub fn new(protocol: &str, config: Arc) -> Result { let downloader = match protocol { "grpc" => Arc::new(GRPCDownloader::new( @@ -84,7 +83,6 @@ impl DownloaderFactory { } /// build returns the downloader. - #[instrument(skip_all)] pub fn build(&self) -> Arc { self.downloader.clone() } @@ -151,7 +149,6 @@ pub struct GRPCDownloader { /// GRPCDownloader implements the downloader with the gRPC protocol. impl GRPCDownloader { /// new returns a new GRPCDownloader. - #[instrument(skip_all)] pub fn new(config: Arc, capacity: usize, idle_timeout: Duration) -> Self { Self { config, diff --git a/dragonfly-client/src/resource/task.rs b/dragonfly-client/src/resource/task.rs index 235f6c4b..c36f8ce9 100644 --- a/dragonfly-client/src/resource/task.rs +++ b/dragonfly-client/src/resource/task.rs @@ -90,7 +90,6 @@ pub struct Task { /// Task implements the task manager. impl Task { /// new returns a new Task. - #[instrument(skip_all)] pub fn new( config: Arc, id_generator: Arc, @@ -117,6 +116,7 @@ impl Task { } /// get gets the metadata of the task. + #[instrument(skip_all)] pub fn get(&self, id: &str) -> ClientResult> { self.storage.get_task(id) } @@ -304,7 +304,6 @@ impl Task { } /// is_same_dev_inode checks if the task is on the same device inode as the given path. - #[instrument(skip_all)] pub async fn is_same_dev_inode(&self, id: &str, to: &Path) -> ClientResult { self.storage.is_same_dev_inode_as_task(id, to).await } diff --git a/dragonfly-client/src/stats/mod.rs b/dragonfly-client/src/stats/mod.rs index 189c05ce..4d6fc990 100644 --- a/dragonfly-client/src/stats/mod.rs +++ b/dragonfly-client/src/stats/mod.rs @@ -67,7 +67,6 @@ pub struct Stats { /// Stats implements the stats server. impl Stats { /// new creates a new Stats. - #[instrument(skip_all)] pub fn new( addr: SocketAddr, shutdown: shutdown::Shutdown, @@ -81,7 +80,6 @@ impl Stats { } /// run starts the stats server. - #[instrument(skip_all)] pub async fn run(&self) { // Clone the shutdown channel. let mut shutdown = self.shutdown.clone(); @@ -110,7 +108,6 @@ impl Stats { _ = shutdown.recv() => { // Stats server shutting down with signals. info!("stats server shutting down"); - return } } } diff --git a/dragonfly-client/src/tracing/mod.rs b/dragonfly-client/src/tracing/mod.rs index b4401516..53069292 100644 --- a/dragonfly-client/src/tracing/mod.rs +++ b/dragonfly-client/src/tracing/mod.rs @@ -15,7 +15,7 @@ */ use dragonfly_client_config::dfdaemon::Host; -use opentelemetry::{global, trace::TracerProvider}; +use opentelemetry::{global, trace::TracerProvider, KeyValue}; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{propagation::TraceContextPropagator, Resource}; use rolling_file::*; @@ -44,6 +44,7 @@ pub fn init_tracing( log_max_files: usize, jaeger_addr: Option, host: Option, + is_seed_peer: bool, console: bool, ) -> Vec { let mut guards = vec![]; @@ -102,7 +103,13 @@ pub fn init_tracing( .with(stdout_logging_layer); // Setup jaeger layer. - if let Some(jaeger_addr) = jaeger_addr { + if let Some(mut jaeger_addr) = jaeger_addr { + jaeger_addr = if jaeger_addr.starts_with("http://") { + jaeger_addr + } else { + format!("http://{}", jaeger_addr) + }; + let otlp_exporter = opentelemetry_otlp::SpanExporter::builder() .with_tonic() .with_endpoint(jaeger_addr) @@ -115,20 +122,33 @@ pub fn init_tracing( .with_batch_exporter(otlp_exporter) .with_resource( Resource::builder() - .with_service_name(name.to_owned()) + .with_service_name(format!("{}-{}", name, host.ip.unwrap())) + .with_schema_url( + [ + KeyValue::new( + opentelemetry_semantic_conventions::attribute::SERVICE_NAMESPACE, + "dragonfly", + ), + KeyValue::new( + opentelemetry_semantic_conventions::attribute::HOST_NAME, + host.hostname, + ), + KeyValue::new( + opentelemetry_semantic_conventions::attribute::HOST_IP, + host.ip.unwrap().to_string(), + ), + ], + opentelemetry_semantic_conventions::SCHEMA_URL, + ) .with_attribute(opentelemetry::KeyValue::new( - "idc", + "host.idc", host.idc.unwrap_or_default(), )) .with_attribute(opentelemetry::KeyValue::new( - "location", + "host.location", host.location.unwrap_or_default(), )) - .with_attribute(opentelemetry::KeyValue::new("hostname", host.hostname)) - .with_attribute(opentelemetry::KeyValue::new( - "ip", - host.ip.unwrap().to_string(), - )) + .with_attribute(opentelemetry::KeyValue::new("host.seed_peer", is_seed_peer)) .build(), ) .build();