Skip to content

Commit cffea98

Browse files
dfettiLukasPukenis
authored andcommitted
Merge pull request #313 from NordSecurity/FILE-579-checksum-progress-granularity
Add checksum_events_granularity config
1 parent 41ace6b commit cffea98

File tree

14 files changed

+410
-20
lines changed

14 files changed

+410
-20
lines changed

changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
* Disallow downloading file for which any path component is larger than 250 characters
3434
* Fix ocassional missing of `TransferPaused` event when toggling libdrop on and off quickly
3535
* Report file transfer error in case file subpath contains perent directory `..`
36+
* Add checksum_events_granularity config
3637

3738
---
3839
<br>

drop-config/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ pub struct DropConfig {
1313
pub storage_path: String,
1414
// If set the checksum events will be emited for every file of this or bigger size
1515
pub checksum_events_size_threshold: Option<usize>,
16+
// If set the checksum events will be emited for every checksum_events_granularity bytes
17+
// Default value is 256KB.
18+
pub checksum_events_granularity: u64,
1619
pub connection_retries: u32,
1720
}
1821

@@ -23,6 +26,7 @@ impl Default for DropConfig {
2326
transfer_file_limit: 1000,
2427
storage_path: "libdrop.sqlite".to_string(),
2528
checksum_events_size_threshold: None,
29+
checksum_events_granularity: 256 * 1024,
2630
connection_retries: 5,
2731
}
2832
}

drop-transfer/src/file/mod.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -299,13 +299,14 @@ impl FileToSend {
299299
&self,
300300
limit: u64,
301301
progress_cb: Option<F>,
302+
event_granularity: Option<u64>,
302303
) -> crate::Result<[u8; 32]>
303304
where
304305
F: FnMut(u64) -> Fut + Send + Sync,
305306
Fut: Future<Output = ()>,
306307
{
307308
let reader = reader::open(&self.source)?.take(limit);
308-
let csum = checksum(reader, progress_cb).await?;
309+
let csum = checksum(reader, progress_cb, event_granularity).await?;
309310
Ok(csum)
310311
}
311312
}
@@ -315,6 +316,7 @@ impl FileToSend {
315316
pub async fn checksum<F, Fut>(
316317
reader: impl io::Read,
317318
mut progress_cb: Option<F>,
319+
event_granularity: Option<u64>,
318320
) -> io::Result<[u8; 32]>
319321
where
320322
F: FnMut(u64) -> Fut + Send + Sync,
@@ -325,6 +327,7 @@ where
325327
let mut reader = io::BufReader::with_capacity(CHECKSUM_CHUNK_SIZE, reader);
326328

327329
let mut total_n: u64 = 0;
330+
let mut from_last_event: u64 = 0;
328331
loop {
329332
let buf = reader.fill_buf()?;
330333
if buf.is_empty() {
@@ -337,9 +340,15 @@ where
337340
reader.consume(n);
338341

339342
total_n += n as u64;
340-
if let Some(progress_cb) = progress_cb.as_mut() {
341-
progress_cb(total_n).await;
343+
344+
if let (Some(progress_cb), Some(granularity)) = (progress_cb.as_mut(), event_granularity) {
345+
from_last_event += n as u64;
346+
while from_last_event >= granularity {
347+
progress_cb(total_n).await;
348+
from_last_event -= granularity;
349+
}
342350
}
351+
343352
// Since these are all blocking operation we need to give tokio runtime a
344353
// timeslice
345354
tokio::task::yield_now().await;
@@ -375,6 +384,7 @@ mod tests {
375384
let csum = super::checksum(
376385
&mut &TEST[..],
377386
None::<fn(u64) -> futures::future::Ready<()>>,
387+
None,
378388
)
379389
.await
380390
.unwrap();
@@ -391,7 +401,7 @@ mod tests {
391401

392402
let size = TEST.len() as _;
393403
let file = super::FileToSend::from_path(tmp.path(), size).unwrap();
394-
file.checksum(size, None::<fn(u64) -> futures::future::Ready<()>>)
404+
file.checksum(size, None::<fn(u64) -> futures::future::Ready<()>>, None)
395405
.await
396406
.unwrap()
397407
};
@@ -421,8 +431,11 @@ mod tests {
421431
let mut cx = Context::from_waker(&waker);
422432

423433
let mut cursor = io::Cursor::new(&buf);
424-
let mut future =
425-
super::checksum(&mut cursor, None::<fn(u64) -> futures::future::Ready<()>>);
434+
let mut future = super::checksum(
435+
&mut cursor,
436+
None::<fn(u64) -> futures::future::Ready<()>>,
437+
None,
438+
);
426439
let mut future = unsafe { Pin::new_unchecked(&mut future) };
427440

428441
// expect it to yield 3 times (one at the very end)

drop-transfer/src/ws/client/v4.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ impl HandlerLoop<'_> {
140140
.checksum::<_, futures::future::Ready<()>>(
141141
limit,
142142
None::<fn(u64) -> futures::future::Ready<()>>,
143+
None,
143144
)
144145
.await?;
145146

drop-transfer/src/ws/client/v6.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ impl HandlerLoop<'_> {
172172
.checksum::<_, futures::future::Ready<()>>(
173173
limit,
174174
None::<fn(u64) -> futures::future::Ready<()>>,
175+
None,
175176
)
176177
.await?;
177178

drop-transfer/src/ws/server/handler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ pub trait Downloader {
8282
&mut self,
8383
location: &Hidden<PathBuf>,
8484
progress_cb: Option<F>,
85+
event_granularity: Option<u64>,
8586
) -> crate::Result<()>
8687
where
8788
F: FnMut(u64) -> Fut + Send + Sync,

drop-transfer/src/ws/server/mod.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,7 @@ impl FileXferTask {
712712
downloader: &mut impl Downloader,
713713
offset: u64,
714714
emit_checksum_events: bool,
715+
checksum_events_granularity: u64,
715716
) -> crate::Result<PathBuf> {
716717
let mut out_file = match downloader.open(tmp_loc).await {
717718
Ok(out_file) => out_file,
@@ -763,21 +764,27 @@ impl FileXferTask {
763764

764765
if emit_checksum_events {
765766
events.checksum_start(self.file.size()).await;
766-
767767
let progress_cb = {
768-
move |progress: u64| async move {
769-
events.checksum_progress(progress).await;
768+
move |progress_bytes: u64| async move {
769+
events.checksum_progress(progress_bytes).await;
770770
}
771771
};
772772

773-
downloader.validate(tmp_loc, Some(progress_cb)).await?;
773+
downloader
774+
.validate(
775+
tmp_loc,
776+
Some(progress_cb),
777+
Some(checksum_events_granularity),
778+
)
779+
.await?;
774780

775781
events.checksum_finish().await;
776782
} else {
777783
downloader
778784
.validate::<_, futures::future::Ready<()>>(
779785
tmp_loc,
780786
None::<fn(u64) -> futures::future::Ready<()>>,
787+
None,
781788
)
782789
.await?;
783790
}
@@ -865,6 +872,7 @@ impl FileXferTask {
865872
events: &FileEventTx<IncomingTransfer>,
866873
tmp_location: &Hidden<PathBuf>,
867874
emit_checksum_events: bool,
875+
checksum_events_granularity: u64,
868876
) -> Option<TmpFileState> {
869877
// TODO: we load the file's metadata to check if we should emit checksum events
870878
// based on size threshold. However TmpFileState::load also does the
@@ -880,13 +888,19 @@ impl FileXferTask {
880888
let size = tmp_size.unwrap_or(0);
881889
events.checksum_start(size).await;
882890

883-
Some(|progress| events.checksum_progress(progress))
891+
Some(|progress_bytes| events.checksum_progress(progress_bytes))
884892
} else {
885893
None
886894
};
887895

888896
// Check if we can resume the temporary file
889-
let tmp_file_state = match TmpFileState::load(&tmp_location.0, cb).await {
897+
let tmp_file_state = match TmpFileState::load(
898+
&tmp_location.0,
899+
cb,
900+
Some(checksum_events_granularity),
901+
)
902+
.await
903+
{
890904
Ok(tmp_file_state) => {
891905
debug!(
892906
logger,
@@ -930,6 +944,7 @@ impl FileXferTask {
930944
false
931945
}
932946
};
947+
let checksum_events_granularity = state.config.checksum_events_granularity;
933948

934949
events.preflight().await;
935950

@@ -939,7 +954,13 @@ impl FileXferTask {
939954
);
940955

941956
let tmp_file_state = self
942-
.handle_tmp_file(&logger, &events, &tmp_location, emit_checksum_events)
957+
.handle_tmp_file(
958+
&logger,
959+
&events,
960+
&tmp_location,
961+
emit_checksum_events,
962+
checksum_events_granularity,
963+
)
943964
.await;
944965

945966
let init_res = downloader.init(&self, tmp_file_state).await?;
@@ -970,6 +991,7 @@ impl FileXferTask {
970991
&mut downloader,
971992
offset,
972993
emit_checksum_events,
994+
checksum_events_granularity,
973995
)
974996
.await
975997
}
@@ -1043,7 +1065,11 @@ impl FileXferTask {
10431065

10441066
impl TmpFileState {
10451067
// Blocking operation
1046-
async fn load<F, Fut>(path: &Path, progress_cb: Option<F>) -> io::Result<Self>
1068+
async fn load<F, Fut>(
1069+
path: &Path,
1070+
progress_cb: Option<F>,
1071+
event_granularity: Option<u64>,
1072+
) -> io::Result<Self>
10471073
where
10481074
F: Fn(u64) -> Fut + Sync + Send,
10491075
Fut: Future<Output = ()>,
@@ -1052,7 +1078,7 @@ impl TmpFileState {
10521078

10531079
let meta = file.metadata()?;
10541080

1055-
let csum = file::checksum(file, progress_cb).await?;
1081+
let csum = file::checksum(file, progress_cb, event_granularity).await?;
10561082
Ok(TmpFileState { meta, csum })
10571083
}
10581084
}

drop-transfer/src/ws/server/v2.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,12 @@ impl handler::Downloader for Downloader {
455455
.await
456456
}
457457

458-
async fn validate<F, Fut>(&mut self, _path: &Hidden<PathBuf>, _: Option<F>) -> crate::Result<()>
458+
async fn validate<F, Fut>(
459+
&mut self,
460+
_path: &Hidden<PathBuf>,
461+
_: Option<F>,
462+
_: Option<u64>,
463+
) -> crate::Result<()>
459464
where
460465
F: FnMut(u64) -> Fut + Send + Sync,
461466
Fut: Future<Output = ()>,

drop-transfer/src/ws/server/v4.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,13 +594,14 @@ impl handler::Downloader for Downloader {
594594
&mut self,
595595
path: &Hidden<PathBuf>,
596596
progress_cb: Option<F>,
597+
event_granularity: Option<u64>,
597598
) -> crate::Result<()>
598599
where
599600
F: FnMut(u64) -> Fut + Send + Sync,
600601
Fut: Future<Output = ()> + Send + Sync,
601602
{
602603
let file = std::fs::File::open(&path.0)?;
603-
let csum = file::checksum(file, progress_cb).await?;
604+
let csum = file::checksum(file, progress_cb, event_granularity).await?;
604605

605606
if self.full_csum.get().await != csum {
606607
return Err(crate::Error::ChecksumMismatch);

drop-transfer/src/ws/server/v6.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,13 +653,14 @@ impl handler::Downloader for Downloader {
653653
&mut self,
654654
path: &Hidden<PathBuf>,
655655
progress_cb: Option<F>,
656+
event_granularity: Option<u64>,
656657
) -> crate::Result<()>
657658
where
658659
F: FnMut(u64) -> Fut + Send + Sync,
659660
Fut: Future<Output = ()> + Send,
660661
{
661662
let file = std::fs::File::open(&path.0)?;
662-
let csum = file::checksum(file, progress_cb).await?;
663+
let csum = file::checksum(file, progress_cb, event_granularity).await?;
663664

664665
if self.full_csum.get().await != csum {
665666
return Err(crate::Error::ChecksumMismatch);

norddrop/src/device/types.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@ pub struct Config {
157157
#[serde(rename = "checksum_events_size_threshold_bytes")]
158158
pub checksum_events_size_threshold: Option<usize>,
159159

160+
#[serde(
161+
rename = "checksum_events_granularity_bytes",
162+
default = "Config::default_checksum_events_granularity"
163+
)]
164+
pub checksum_events_granularity: u64,
165+
160166
#[serde(default = "Config::default_connection_retries")]
161167
pub connection_retries: u32,
162168
}
@@ -165,6 +171,10 @@ impl Config {
165171
const fn default_connection_retries() -> u32 {
166172
5
167173
}
174+
175+
const fn default_checksum_events_granularity() -> u64 {
176+
256 * 1024
177+
}
168178
}
169179

170180
impl From<&drop_transfer::Error> for Status {
@@ -415,6 +425,7 @@ impl From<Config> for drop_config::Config {
415425
storage_path,
416426
moose_app_version,
417427
checksum_events_size_threshold,
428+
checksum_events_granularity,
418429
connection_retries,
419430
} = val;
420431

@@ -425,6 +436,7 @@ impl From<Config> for drop_config::Config {
425436
storage_path,
426437
checksum_events_size_threshold,
427438
connection_retries,
439+
checksum_events_granularity,
428440
},
429441
moose: drop_config::MooseConfig {
430442
app_version: moose_app_version,
@@ -453,7 +465,8 @@ mod tests {
453465
"storage_path": ":memory:",
454466
"max_uploads_in_flight": 16,
455467
"max_requests_per_sec": 15,
456-
"checksum_events_size_threshold_bytes": 1234
468+
"checksum_events_size_threshold_bytes": 1234,
469+
"checksum_events_granularity_bytes": 1024
457470
}
458471
"#;
459472

@@ -466,6 +479,7 @@ mod tests {
466479
transfer_file_limit,
467480
storage_path,
468481
checksum_events_size_threshold: checksum_events_size_threshold_bytes,
482+
checksum_events_granularity: checksum_events_granularity_bytes,
469483
connection_retries,
470484
},
471485
moose:
@@ -483,6 +497,7 @@ mod tests {
483497
assert_eq!(app_version, "1.2.5");
484498
assert!(prod);
485499
assert_eq!(checksum_events_size_threshold_bytes, Some(1234));
500+
assert_eq!(checksum_events_granularity_bytes, 1024);
486501
assert_eq!(connection_retries, 5);
487502
}
488503
}

test/drop_test/action.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,16 +740,19 @@ def __init__(
740740
addr: str,
741741
dbpath: str = ":memory:",
742742
checksum_events_size_threshold=2**32, # don't emit events for existing tests
743+
checksum_events_granularity=None,
743744
):
744745
self._addr = addr
745746
self._dbpath = dbpath
746747
self._checksum_events_size_threshold = checksum_events_size_threshold
748+
self._checksum_events_granularity = checksum_events_granularity
747749

748750
async def run(self, drop: ffi.Drop):
749751
drop.start(
750752
peer_resolver.resolve(self._addr),
751753
self._dbpath,
752754
self._checksum_events_size_threshold,
755+
self._checksum_events_granularity,
753756
)
754757

755758
def __str__(self):

0 commit comments

Comments
 (0)