Skip to content

feat: add buffer size config for dfdaemon #434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
]

[workspace.package]
version = "0.1.43"
version = "0.1.44"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand All @@ -22,13 +22,13 @@ readme = "README.md"
edition = "2021"

[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "0.1.43" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.43" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.43" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.43" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.43" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.43" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.43" }
dragonfly-client = { path = "dragonfly-client", version = "0.1.44" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.44" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.44" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.44" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.44" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.44" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.44" }
thiserror = "1.0"
dragonfly-api = "2.0.110"
reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
Expand Down
48 changes: 47 additions & 1 deletion dragonfly-client-config/src/dfdaemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,18 @@ fn default_dynconfig_refresh_interval() -> Duration {
Duration::from_secs(300)
}

// default_storage_write_buffer_size is the default buffer size for writing piece to disk, default is 16KB.
#[inline]
fn default_storage_write_buffer_size() -> usize {
16 * 1024
}

// default_storage_read_buffer_size is the default buffer size for reading piece from disk, default is 16KB.
#[inline]
fn default_storage_read_buffer_size() -> usize {
16 * 1024
}

// default_seed_peer_cluster_id is the default cluster id of seed peer.
#[inline]
fn default_seed_peer_cluster_id() -> u64 {
Expand Down Expand Up @@ -187,6 +199,12 @@ pub fn default_proxy_server_port() -> u16 {
4001
}

// default_proxy_read_buffer_size is the default buffer size for reading piece, default is 16KB.
#[inline]
pub fn default_proxy_read_buffer_size() -> usize {
16 * 1024
}

// default_s3_filtered_query_params is the default filtered query params with s3 protocol to generate the task id.
#[inline]
fn s3_filtered_query_params() -> Vec<String> {
Expand Down Expand Up @@ -578,13 +596,23 @@ pub struct Storage {
// dir is the directory to store task's metadata and content.
#[serde(default = "crate::default_storage_dir")]
pub dir: PathBuf,

// write_buffer_size is the buffer size for writing piece to disk, default is 16KB.
#[serde(default = "default_storage_write_buffer_size")]
pub write_buffer_size: usize,

// read_buffer_size is the buffer size for reading piece from disk, default is 16KB.
#[serde(default = "default_storage_read_buffer_size")]
pub read_buffer_size: usize,
}

// Storage implements Default.
impl Default for Storage {
fn default() -> Self {
Storage {
dir: crate::default_storage_dir(),
write_buffer_size: default_storage_write_buffer_size(),
read_buffer_size: default_storage_read_buffer_size(),
}
}
}
Expand Down Expand Up @@ -761,7 +789,7 @@ impl Default for RegistryMirror {
}

// Proxy is the proxy configuration for dfdaemon.
#[derive(Debug, Clone, Default, Validate, Deserialize)]
#[derive(Debug, Clone, Validate, Deserialize)]
#[serde(default, rename_all = "camelCase")]
pub struct Proxy {
// server is the proxy server configuration for dfdaemon.
Expand All @@ -779,6 +807,24 @@ pub struct Proxy {

// prefetch pre-downloads full of the task when download with range request.
pub prefetch: bool,

// read_buffer_size is the buffer size for reading piece from disk, default is 16KB.
#[serde(default = "default_proxy_read_buffer_size")]
pub read_buffer_size: usize,
}

// Proxy implements Default.
impl Default for Proxy {
fn default() -> Self {
Self {
server: ProxyServer::default(),
rules: None,
registry_mirror: RegistryMirror::default(),
disable_back_to_source: false,
prefetch: false,
read_buffer_size: default_proxy_read_buffer_size(),
}
}
}

// Security is the security configuration for dfdaemon.
Expand Down
17 changes: 10 additions & 7 deletions dragonfly-client-storage/src/content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/

use dragonfly_api::common::v2::Range;
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result;
use sha2::{Digest, Sha256};
use std::cmp::{max, min};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs::{self, File, OpenOptions};
use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, SeekFrom};
use tokio_util::io::InspectReader;
Expand All @@ -27,11 +29,11 @@ use tracing::{error, info, warn};
// DEFAULT_DIR_NAME is the default directory name to store content.
const DEFAULT_DIR_NAME: &str = "content";

// DEFAULT_BUFFER_SIZE is the buffer size to read and write, default is 32KB.
const DEFAULT_BUFFER_SIZE: usize = 32 * 1024;

// Content is the content of a piece.
pub struct Content {
// config is the configuration of the dfdaemon.
config: Arc<Config>,

// dir is the directory to store content.
dir: PathBuf,
}
Expand All @@ -48,12 +50,12 @@ pub struct WritePieceResponse {
// Content implements the content storage.
impl Content {
// new returns a new content.
pub async fn new(dir: &Path) -> Result<Content> {
pub async fn new(config: Arc<Config>, dir: &Path) -> Result<Content> {
let dir = dir.join(DEFAULT_DIR_NAME);
fs::create_dir_all(&dir).await?;
info!("content initialized directory: {:?}", dir);

Ok(Content { dir })
Ok(Content { config, dir })
}

// hard_link_or_copy_task hard links or copies the task content to the destination.
Expand Down Expand Up @@ -144,7 +146,8 @@ impl Content {
let range_reader = from_f.take(range.length);

// Use a buffer to read the range.
let mut range_reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, range_reader);
let mut range_reader =
BufReader::with_capacity(self.config.storage.read_buffer_size, range_reader);

let mut to_f = OpenOptions::new()
.create(true)
Expand Down Expand Up @@ -240,7 +243,7 @@ impl Content {
let task_path = self.dir.join(task_id);

// Use a buffer to read the piece.
let reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, reader);
let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);

// Sha256 is used to calculate the hash of the piece.
let mut hasher = Sha256::new();
Expand Down
2 changes: 1 addition & 1 deletion dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Storage {
// new returns a new storage.
pub async fn new(config: Arc<Config>, dir: &Path) -> Result<Self> {
let metadata = metadata::Metadata::new(dir)?;
let content = content::Content::new(dir).await?;
let content = content::Content::new(config.clone(), dir).await?;
Ok(Storage {
config,
metadata,
Expand Down
11 changes: 6 additions & 5 deletions dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ use tracing::{error, info, instrument, Span};

pub mod header;

// DEFAULT_BUFFER_SIZE is the buffer size to read and write, default is 32KB.
const DEFAULT_BUFFER_SIZE: usize = 32 * 1024;

// Response is the response of the proxy server.
pub type Response = hyper::Response<BoxBody<Bytes, ClientError>>;

Expand Down Expand Up @@ -474,7 +471,7 @@ async fn proxy_by_dfdaemon(
};

// Make the download task request.
let download_task_request = match make_download_task_request(config, rule, request) {
let download_task_request = match make_download_task_request(config.clone(), rule, request) {
Ok(download_task_request) => download_task_request,
Err(err) => {
error!("make download task request failed: {}", err);
Expand Down Expand Up @@ -562,6 +559,9 @@ async fn proxy_by_dfdaemon(
*response.headers_mut() = make_response_headers(download_task_started_response.clone())?;
*response.status_mut() = http::StatusCode::OK;

// Get the read buffer size from the config.
let read_buffer_size = config.proxy.read_buffer_size;

// Write task data to pipe. If grpc received error message,
// shutdown the writer.
tokio::spawn(async move {
Expand Down Expand Up @@ -616,8 +616,9 @@ async fn proxy_by_dfdaemon(
return;
}
};

// Use a buffer to read the piece.
let piece_reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, piece_reader);
let piece_reader = BufReader::with_capacity(read_buffer_size, piece_reader);

// Write the piece data to the pipe in order.
finished_piece_readers.insert(piece.number, piece_reader);
Expand Down
Loading