Skip to content

Commit 7e409a1

Browse files
committed
feat: add buffer size config for dfdaemon
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent 9d94519 commit 7e409a1

File tree

6 files changed

+80
-30
lines changed

6 files changed

+80
-30
lines changed

Cargo.lock

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ members = [
1212
]
1313

1414
[workspace.package]
15-
version = "0.1.43"
15+
version = "0.1.44"
1616
authors = ["The Dragonfly Developers"]
1717
homepage = "https://d7y.io/"
1818
repository = "https://github.com/dragonflyoss/client.git"
@@ -22,13 +22,13 @@ readme = "README.md"
2222
edition = "2021"
2323

2424
[workspace.dependencies]
25-
dragonfly-client = { path = "dragonfly-client", version = "0.1.43" }
26-
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.43" }
27-
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.43" }
28-
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.43" }
29-
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.43" }
30-
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.43" }
31-
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.43" }
25+
dragonfly-client = { path = "dragonfly-client", version = "0.1.44" }
26+
dragonfly-client-core = { path = "dragonfly-client-core", version = "0.1.44" }
27+
dragonfly-client-config = { path = "dragonfly-client-config", version = "0.1.44" }
28+
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.1.44" }
29+
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.44" }
30+
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.44" }
31+
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.44" }
3232
thiserror = "1.0"
3333
dragonfly-api = "2.0.110"
3434
reqwest = { version = "0.11.27", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }

dragonfly-client-config/src/dfdaemon.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,18 @@ fn default_dynconfig_refresh_interval() -> Duration {
145145
Duration::from_secs(300)
146146
}
147147

148+
// default_storage_write_buffer_size is the default buffer size for writing piece to disk, default is 16KB.
149+
#[inline]
150+
fn default_storage_write_buffer_size() -> usize {
151+
16 * 1024
152+
}
153+
154+
// default_storage_read_buffer_size is the default buffer size for reading piece from disk, default is 16KB.
155+
#[inline]
156+
fn default_storage_read_buffer_size() -> usize {
157+
16 * 1024
158+
}
159+
148160
// default_seed_peer_cluster_id is the default cluster id of seed peer.
149161
#[inline]
150162
fn default_seed_peer_cluster_id() -> u64 {
@@ -187,6 +199,12 @@ pub fn default_proxy_server_port() -> u16 {
187199
4001
188200
}
189201

202+
// default_proxy_read_buffer_size is the default buffer size for reading piece, default is 16KB.
203+
#[inline]
204+
pub fn default_proxy_read_buffer_size() -> usize {
205+
16 * 1024
206+
}
207+
190208
// default_s3_filtered_query_params is the default filtered query params with s3 protocol to generate the task id.
191209
#[inline]
192210
fn s3_filtered_query_params() -> Vec<String> {
@@ -578,13 +596,23 @@ pub struct Storage {
578596
// dir is the directory to store task's metadata and content.
579597
#[serde(default = "crate::default_storage_dir")]
580598
pub dir: PathBuf,
599+
600+
// write_buffer_size is the buffer size for writing piece to disk, default is 16KB.
601+
#[serde(default = "default_storage_write_buffer_size")]
602+
pub write_buffer_size: usize,
603+
604+
// read_buffer_size is the buffer size for reading piece from disk, default is 16KB.
605+
#[serde(default = "default_storage_read_buffer_size")]
606+
pub read_buffer_size: usize,
581607
}
582608

583609
// Storage implements Default.
584610
impl Default for Storage {
585611
fn default() -> Self {
586612
Storage {
587613
dir: crate::default_storage_dir(),
614+
write_buffer_size: default_storage_write_buffer_size(),
615+
read_buffer_size: default_storage_read_buffer_size(),
588616
}
589617
}
590618
}
@@ -761,7 +789,7 @@ impl Default for RegistryMirror {
761789
}
762790

763791
// Proxy is the proxy configuration for dfdaemon.
764-
#[derive(Debug, Clone, Default, Validate, Deserialize)]
792+
#[derive(Debug, Clone, Validate, Deserialize)]
765793
#[serde(default, rename_all = "camelCase")]
766794
pub struct Proxy {
767795
// server is the proxy server configuration for dfdaemon.
@@ -779,6 +807,24 @@ pub struct Proxy {
779807

780808
// prefetch pre-downloads full of the task when download with range request.
781809
pub prefetch: bool,
810+
811+
// read_buffer_size is the buffer size for reading piece from disk, default is 16KB.
812+
#[serde(default = "default_proxy_read_buffer_size")]
813+
pub read_buffer_size: usize,
814+
}
815+
816+
// Proxy implements Default.
817+
impl Default for Proxy {
818+
fn default() -> Self {
819+
Self {
820+
server: ProxyServer::default(),
821+
rules: None,
822+
registry_mirror: RegistryMirror::default(),
823+
disable_back_to_source: false,
824+
prefetch: false,
825+
read_buffer_size: default_proxy_read_buffer_size(),
826+
}
827+
}
782828
}
783829

784830
// Security is the security configuration for dfdaemon.

dragonfly-client-storage/src/content.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
*/
1616

1717
use dragonfly_api::common::v2::Range;
18+
use dragonfly_client_config::dfdaemon::Config;
1819
use dragonfly_client_core::Result;
1920
use sha2::{Digest, Sha256};
2021
use std::cmp::{max, min};
2122
use std::path::{Path, PathBuf};
23+
use std::sync::Arc;
2224
use tokio::fs::{self, File, OpenOptions};
2325
use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader, SeekFrom};
2426
use tokio_util::io::InspectReader;
@@ -27,11 +29,11 @@ use tracing::{error, info, warn};
2729
// DEFAULT_DIR_NAME is the default directory name to store content.
2830
const DEFAULT_DIR_NAME: &str = "content";
2931

30-
// DEFAULT_BUFFER_SIZE is the buffer size to read and write, default is 32KB.
31-
const DEFAULT_BUFFER_SIZE: usize = 32 * 1024;
32-
3332
// Content is the content of a piece.
3433
pub struct Content {
34+
// config is the configuration of the dfdaemon.
35+
config: Arc<Config>,
36+
3537
// dir is the directory to store content.
3638
dir: PathBuf,
3739
}
@@ -48,12 +50,12 @@ pub struct WritePieceResponse {
4850
// Content implements the content storage.
4951
impl Content {
5052
// new returns a new content.
51-
pub async fn new(dir: &Path) -> Result<Content> {
53+
pub async fn new(config: Arc<Config>, dir: &Path) -> Result<Content> {
5254
let dir = dir.join(DEFAULT_DIR_NAME);
5355
fs::create_dir_all(&dir).await?;
5456
info!("content initialized directory: {:?}", dir);
5557

56-
Ok(Content { dir })
58+
Ok(Content { config, dir })
5759
}
5860

5961
// hard_link_or_copy_task hard links or copies the task content to the destination.
@@ -144,7 +146,8 @@ impl Content {
144146
let range_reader = from_f.take(range.length);
145147

146148
// Use a buffer to read the range.
147-
let mut range_reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, range_reader);
149+
let mut range_reader =
150+
BufReader::with_capacity(self.config.storage.read_buffer_size, range_reader);
148151

149152
let mut to_f = OpenOptions::new()
150153
.create(true)
@@ -240,7 +243,7 @@ impl Content {
240243
let task_path = self.dir.join(task_id);
241244

242245
// Use a buffer to read the piece.
243-
let reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, reader);
246+
let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader);
244247

245248
// Sha256 is used to calculate the hash of the piece.
246249
let mut hasher = Sha256::new();

dragonfly-client-storage/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl Storage {
4848
// new returns a new storage.
4949
pub async fn new(config: Arc<Config>, dir: &Path) -> Result<Self> {
5050
let metadata = metadata::Metadata::new(dir)?;
51-
let content = content::Content::new(dir).await?;
51+
let content = content::Content::new(config.clone(), dir).await?;
5252
Ok(Storage {
5353
config,
5454
metadata,

dragonfly-client/src/proxy/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,6 @@ use tracing::{error, info, instrument, Span};
6565

6666
pub mod header;
6767

68-
// DEFAULT_BUFFER_SIZE is the buffer size to read and write, default is 32KB.
69-
const DEFAULT_BUFFER_SIZE: usize = 32 * 1024;
70-
7168
// Response is the response of the proxy server.
7269
pub type Response = hyper::Response<BoxBody<Bytes, ClientError>>;
7370

@@ -474,7 +471,7 @@ async fn proxy_by_dfdaemon(
474471
};
475472

476473
// Make the download task request.
477-
let download_task_request = match make_download_task_request(config, rule, request) {
474+
let download_task_request = match make_download_task_request(config.clone(), rule, request) {
478475
Ok(download_task_request) => download_task_request,
479476
Err(err) => {
480477
error!("make download task request failed: {}", err);
@@ -562,6 +559,9 @@ async fn proxy_by_dfdaemon(
562559
*response.headers_mut() = make_response_headers(download_task_started_response.clone())?;
563560
*response.status_mut() = http::StatusCode::OK;
564561

562+
// Get the read buffer size from the config.
563+
let read_buffer_size = config.proxy.read_buffer_size;
564+
565565
// Write task data to pipe. If grpc received error message,
566566
// shutdown the writer.
567567
tokio::spawn(async move {
@@ -616,8 +616,9 @@ async fn proxy_by_dfdaemon(
616616
return;
617617
}
618618
};
619+
619620
// Use a buffer to read the piece.
620-
let piece_reader = BufReader::with_capacity(DEFAULT_BUFFER_SIZE, piece_reader);
621+
let piece_reader = BufReader::with_capacity(read_buffer_size, piece_reader);
621622

622623
// Write the piece data to the pipe in order.
623624
finished_piece_readers.insert(piece.number, piece_reader);

0 commit comments

Comments
 (0)