diff --git a/Cargo.lock b/Cargo.lock index 5d2fc2c0..42169dfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -934,7 +934,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.1.43" +version = "0.1.44" dependencies = [ "anyhow", "bytes", @@ -997,7 +997,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.1.43" +version = "0.1.44" dependencies = [ "dragonfly-client-core", "futures", @@ -1015,7 +1015,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.1.43" +version = "0.1.44" dependencies = [ "dragonfly-client-core", "home", @@ -1034,7 +1034,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.1.43" +version = "0.1.44" dependencies = [ "libloading", "reqwest", @@ -1045,7 +1045,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.1.43" +version = "0.1.44" dependencies = [ "anyhow", "clap", @@ -1061,7 +1061,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.1.43" +version = "0.1.44" dependencies = [ "base16ct", "chrono", @@ -1084,7 +1084,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.1.43" +version = "0.1.44" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1542,7 +1542,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.1.43" +version = "0.1.44" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index 73bcc8c1..9e69945e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.1.43" +version = "0.1.44" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.1.43" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.43" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.43" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.43" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.43" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.43" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.43" } +dragonfly-client = { path = "dragonfly-client", version = "0.1.44" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.44" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.44" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.44" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.44" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.44" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.44" } thiserror = "1.0" dragonfly-api = "2.0.110" reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] } diff --git a/dragonfly-client-config/src/dfdaemon.rs b/dragonfly-client-config/src/dfdaemon.rs index f06e6277..3e841f3c 100644 --- a/dragonfly-client-config/src/dfdaemon.rs +++ b/dragonfly-client-config/src/dfdaemon.rs @@ -145,6 +145,18 @@ fn default_dynconfig_refresh_interval() -> Duration { Duration::from_secs(300) } +// default_storage_write_buffer_size is the default buffer size for writing piece to disk, default is 16KB. +#[inline] +fn default_storage_write_buffer_size() -> usize { + 16 * 1024 +} + +// default_storage_read_buffer_size is the default buffer size for reading piece from disk, default is 16KB. +#[inline] +fn default_storage_read_buffer_size() -> usize { + 16 * 1024 +} + // default_seed_peer_cluster_id is the default cluster id of seed peer. #[inline] fn default_seed_peer_cluster_id() -> u64 { @@ -187,6 +199,12 @@ pub fn default_proxy_server_port() -> u16 { 4001 } +// default_proxy_read_buffer_size is the default buffer size for reading piece, default is 16KB. +#[inline] +pub fn default_proxy_read_buffer_size() -> usize { + 16 * 1024 +} + // default_s3_filtered_query_params is the default filtered query params with s3 protocol to generate the task id. #[inline] fn s3_filtered_query_params() -> Vec { @@ -578,6 +596,14 @@ pub struct Storage { // dir is the directory to store task's metadata and content. #[serde(default = "crate::default_storage_dir")] pub dir: PathBuf, + + // write_buffer_size is the buffer size for writing piece to disk, default is 16KB. + #[serde(default = "default_storage_write_buffer_size")] + pub write_buffer_size: usize, + + // read_buffer_size is the buffer size for reading piece from disk, default is 16KB. + #[serde(default = "default_storage_read_buffer_size")] + pub read_buffer_size: usize, } // Storage implements Default. @@ -585,6 +611,8 @@ impl Default for Storage { fn default() -> Self { Storage { dir: crate::default_storage_dir(), + write_buffer_size: default_storage_write_buffer_size(), + read_buffer_size: default_storage_read_buffer_size(), } } } @@ -761,7 +789,7 @@ impl Default for RegistryMirror { } // Proxy is the proxy configuration for dfdaemon. -#[derive(Debug, Clone, Default, Validate, Deserialize)] +#[derive(Debug, Clone, Validate, Deserialize)] #[serde(default, rename_all = "camelCase")] pub struct Proxy { // server is the proxy server configuration for dfdaemon. @@ -779,6 +807,24 @@ pub struct Proxy { // prefetch pre-downloads full of the task when download with range request. pub prefetch: bool, + + // read_buffer_size is the buffer size for reading piece from disk, default is 16KB. + #[serde(default = "default_proxy_read_buffer_size")] + pub read_buffer_size: usize, +} + +// Proxy implements Default. +impl Default for Proxy { + fn default() -> Self { + Self { + server: ProxyServer::default(), + rules: None, + registry_mirror: RegistryMirror::default(), + disable_back_to_source: false, + prefetch: false, + read_buffer_size: default_proxy_read_buffer_size(), + } + } } // Security is the security configuration for dfdaemon. diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index 5bb04a76..85c49d8e 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -15,10 +15,12 @@ */ use dragonfly_api::common::v2::Range; +use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::Result; use sha2::{Digest, Sha256}; use std::cmp::{max, min}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use tokio::fs::{self, File, OpenOptions}; use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, SeekFrom}; use tokio_util::io::InspectReader; @@ -27,11 +29,11 @@ use tracing::{error, info, warn}; // DEFAULT_DIR_NAME is the default directory name to store content. const DEFAULT_DIR_NAME: &str = "content"; -// DEFAULT_BUFFER_SIZE is the buffer size to read and write, default is 32KB. -const DEFAULT_BUFFER_SIZE: usize = 32 * 1024; - // Content is the content of a piece. pub struct Content { + // config is the configuration of the dfdaemon. + config: Arc, + // dir is the directory to store content. dir: PathBuf, } @@ -48,12 +50,12 @@ pub struct WritePieceResponse { // Content implements the content storage. impl Content { // new returns a new content. - pub async fn new(dir: &Path) -> Result { + pub async fn new(config: Arc, dir: &Path) -> Result { let dir = dir.join(DEFAULT_DIR_NAME); fs::create_dir_all(&dir).await?; info!("content initialized directory: {:?}", dir); - Ok(Content { dir }) + Ok(Content { config, dir }) } // hard_link_or_copy_task hard links or copies the task content to the destination. @@ -144,7 +146,8 @@ impl Content { let range_reader = from_f.take(range.length); // Use a buffer to read the range. - let mut range_reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, range_reader); + let mut range_reader = + BufReader::with_capacity(self.config.storage.read_buffer_size, range_reader); let mut to_f = OpenOptions::new() .create(true) @@ -240,7 +243,7 @@ impl Content { let task_path = self.dir.join(task_id); // Use a buffer to read the piece. - let reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, reader); + let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); // Sha256 is used to calculate the hash of the piece. let mut hasher = Sha256::new(); diff --git a/dragonfly-client-storage/src/lib.rs b/dragonfly-client-storage/src/lib.rs index 7f80bdf2..827f8b37 100644 --- a/dragonfly-client-storage/src/lib.rs +++ b/dragonfly-client-storage/src/lib.rs @@ -48,7 +48,7 @@ impl Storage { // new returns a new storage. pub async fn new(config: Arc, dir: &Path) -> Result { let metadata = metadata::Metadata::new(dir)?; - let content = content::Content::new(dir).await?; + let content = content::Content::new(config.clone(), dir).await?; Ok(Storage { config, metadata, diff --git a/dragonfly-client/src/proxy/mod.rs b/dragonfly-client/src/proxy/mod.rs index 3af0d84f..5c90b9f3 100644 --- a/dragonfly-client/src/proxy/mod.rs +++ b/dragonfly-client/src/proxy/mod.rs @@ -65,9 +65,6 @@ use tracing::{error, info, instrument, Span}; pub mod header; -// DEFAULT_BUFFER_SIZE is the buffer size to read and write, default is 32KB. -const DEFAULT_BUFFER_SIZE: usize = 32 * 1024; - // Response is the response of the proxy server. pub type Response = hyper::Response>; @@ -474,7 +471,7 @@ async fn proxy_by_dfdaemon( }; // Make the download task request. - let download_task_request = match make_download_task_request(config, rule, request) { + let download_task_request = match make_download_task_request(config.clone(), rule, request) { Ok(download_task_request) => download_task_request, Err(err) => { error!("make download task request failed: {}", err); @@ -562,6 +559,9 @@ async fn proxy_by_dfdaemon( *response.headers_mut() = make_response_headers(download_task_started_response.clone())?; *response.status_mut() = http::StatusCode::OK; + // Get the read buffer size from the config. + let read_buffer_size = config.proxy.read_buffer_size; + // Write task data to pipe. If grpc received error message, // shutdown the writer. tokio::spawn(async move { @@ -616,8 +616,9 @@ async fn proxy_by_dfdaemon( return; } }; + // Use a buffer to read the piece. - let piece_reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, piece_reader); + let piece_reader = BufReader::with_capacity(read_buffer_size, piece_reader); // Write the piece data to the pipe in order. finished_piece_readers.insert(piece.number, piece_reader);