Skip to content

Commit 024e24d

Browse files
authored
Merge pull request #332 from NordSecurity/msz/FILE-601-receiver-does-not-reports-events
FILE-601 Receiver does not reports events
2 parents 3bcec64 + 7b48e34 commit 024e24d

File tree

8 files changed

+267
-27
lines changed

8 files changed

+267
-27
lines changed

changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ This means the whole API has changed. Even though semantics are the same
66
the mechanism is now different and requires new implementation to use properly.
77
* Split checksum events into finalize and verify
88
* Add `base_dir` field in the `RequestQueued` event files
9+
* Fix rare issue of missing receiver's in-progress events
910

1011
---
1112
<br>

drop-transfer/src/ws/server/handler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ pub trait HandlerLoop {
6060
async fn on_bin_msg(&mut self, ws: &mut WebSocket, bytes: Vec<u8>) -> anyhow::Result<()>;
6161

6262
async fn finalize_success(self);
63+
async fn finalize_failure(self);
6364
}
6465

6566
pub trait Request {

drop-transfer/src/ws/server/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ impl RunContext<'_> {
564564
"WS connection broke for {}: {err:?}",
565565
xfer.id()
566566
);
567+
handler.finalize_failure().await;
567568
} else {
568569
info!(self.logger, "Sucesfully finalizing transfer loop");
569570
handler.finalize_success().await;

drop-transfer/src/ws/server/v2.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,18 @@ impl<const PING: bool> HandlerLoop<'_, PING> {
226226
self.stop_task(&file_id, Status::FileRejected).await;
227227
}
228228
}
229+
230+
fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
231+
let jobs = std::mem::take(&mut self.jobs);
232+
233+
async move {
234+
let tasks = jobs.into_values().map(|task| async move {
235+
task.events.pause().await;
236+
});
237+
238+
futures::future::join_all(tasks).await;
239+
}
240+
}
229241
}
230242

231243
#[async_trait::async_trait]
@@ -359,20 +371,18 @@ impl<const PING: bool> handler::HandlerLoop for HandlerLoop<'_, PING> {
359371
async fn finalize_success(mut self) {
360372
debug!(self.logger, "Finalizing");
361373
}
374+
375+
// While the destructor ensures the events are paused this function waits for
376+
// the execution to be finished
377+
async fn finalize_failure(mut self) {
378+
self.take_pause_futures().await;
379+
}
362380
}
363381

364382
impl<const PING: bool> Drop for HandlerLoop<'_, PING> {
365383
fn drop(&mut self) {
366384
debug!(self.logger, "Stopping server handler");
367-
368-
let jobs = std::mem::take(&mut self.jobs);
369-
tokio::spawn(async move {
370-
let tasks = jobs.into_values().map(|task| async move {
371-
task.events.pause().await;
372-
});
373-
374-
futures::future::join_all(tasks).await;
375-
});
385+
tokio::spawn(self.take_pause_futures());
376386
}
377387
}
378388

drop-transfer/src/ws/server/v4.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,18 @@ impl HandlerLoop<'_> {
325325
}
326326
}
327327
}
328+
329+
fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
330+
let jobs = std::mem::take(&mut self.jobs);
331+
332+
async move {
333+
let tasks = jobs.into_values().map(|task| async move {
334+
task.events.pause().await;
335+
});
336+
337+
futures::future::join_all(tasks).await;
338+
}
339+
}
328340
}
329341

330342
#[async_trait::async_trait]
@@ -476,20 +488,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
476488
.map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))),
477489
);
478490
}
491+
492+
// While the destructor ensures the events are paused this function waits for
493+
// the execution to be finished
494+
async fn finalize_failure(mut self) {
495+
self.take_pause_futures().await;
496+
}
479497
}
480498

481499
impl Drop for HandlerLoop<'_> {
482500
fn drop(&mut self) {
483501
debug!(self.logger, "Stopping server handler");
484-
485-
let jobs = std::mem::take(&mut self.jobs);
486-
tokio::spawn(async move {
487-
let tasks = jobs.into_values().map(|task| async move {
488-
task.events.pause().await;
489-
});
490-
491-
futures::future::join_all(tasks).await;
492-
});
502+
tokio::spawn(self.take_pause_futures());
493503
}
494504
}
495505

drop-transfer/src/ws/server/v6.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,18 @@ impl HandlerLoop<'_> {
370370
}
371371
}
372372
}
373+
374+
fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
375+
let jobs = std::mem::take(&mut self.jobs);
376+
377+
async move {
378+
let tasks = jobs.into_values().map(|task| async move {
379+
task.events.pause().await;
380+
});
381+
382+
futures::future::join_all(tasks).await;
383+
}
384+
}
373385
}
374386

375387
#[async_trait::async_trait]
@@ -535,20 +547,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
535547
.map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))),
536548
);
537549
}
550+
551+
// While the destructor ensures the events are paused this function waits for
552+
// the execution to be finished
553+
async fn finalize_failure(mut self) {
554+
self.take_pause_futures().await;
555+
}
538556
}
539557

540558
impl Drop for HandlerLoop<'_> {
541559
fn drop(&mut self) {
542560
debug!(self.logger, "Stopping server handler");
543-
544-
let jobs = std::mem::take(&mut self.jobs);
545-
tokio::spawn(async move {
546-
let tasks = jobs.into_values().map(|task| async move {
547-
task.events.pause().await;
548-
});
549-
550-
futures::future::join_all(tasks).await;
551-
});
561+
tokio::spawn(self.take_pause_futures());
552562
}
553563
}
554564

test/drop_test/action.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,17 @@ def __str__(self):
682682
return f"DrainEvents({self._count})"
683683

684684

685+
class ClearEventQueue(Action):
686+
def __init__(self):
687+
pass
688+
689+
async def run(self, drop: ffi.Drop):
690+
_ = await drop._events.gather_all(0)
691+
692+
def __str__(self):
693+
return "ClearEventQueue()"
694+
695+
685696
class NoEvent(Action):
686697
def __init__(self, duration: int = 3):
687698
self._duration = duration

test/scenarios.py

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2664,6 +2664,202 @@
26642664
),
26652665
},
26662666
),
2667+
Scenario(
2668+
"scenario11-4",
2669+
"Initate a transfers with multiple files. Wait for them to start and then stop the sender. Then restart sender and expect transfer to finish successfuly",
2670+
{
2671+
"DROP_PEER_REN": ActionList(
2672+
[
2673+
action.WaitForAnotherPeer("DROP_PEER_STIMPY"),
2674+
action.Start("DROP_PEER_REN", dbpath="/tmp/db/11-4-ren.sqlite"),
2675+
action.NewTransfer(
2676+
"DROP_PEER_STIMPY",
2677+
[
2678+
"/tmp/testfile-bulk-01",
2679+
"/tmp/testfile-bulk-02",
2680+
"/tmp/testfile-bulk-03",
2681+
"/tmp/testfile-bulk-04",
2682+
"/tmp/testfile-bulk-05",
2683+
"/tmp/testfile-bulk-06",
2684+
"/tmp/testfile-bulk-07",
2685+
"/tmp/testfile-bulk-08",
2686+
"/tmp/testfile-bulk-09",
2687+
"/tmp/testfile-bulk-10",
2688+
],
2689+
),
2690+
# fmt: off
2691+
action.Wait(
2692+
event.Queued(0, "DROP_PEER_STIMPY", [
2693+
norddrop.QueuedFile(FILES["testfile-bulk-01"].id, "testfile-bulk-01", 10485760, "/tmp"),
2694+
norddrop.QueuedFile(FILES["testfile-bulk-02"].id, "testfile-bulk-02", 10485760, "/tmp"),
2695+
norddrop.QueuedFile(FILES["testfile-bulk-03"].id, "testfile-bulk-03", 10485760, "/tmp"),
2696+
norddrop.QueuedFile(FILES["testfile-bulk-04"].id, "testfile-bulk-04", 10485760, "/tmp"),
2697+
norddrop.QueuedFile(FILES["testfile-bulk-05"].id, "testfile-bulk-05", 10485760, "/tmp"),
2698+
norddrop.QueuedFile(FILES["testfile-bulk-06"].id, "testfile-bulk-06", 10485760, "/tmp"),
2699+
norddrop.QueuedFile(FILES["testfile-bulk-07"].id, "testfile-bulk-07", 10485760, "/tmp"),
2700+
norddrop.QueuedFile(FILES["testfile-bulk-08"].id, "testfile-bulk-08", 10485760, "/tmp"),
2701+
norddrop.QueuedFile(FILES["testfile-bulk-09"].id, "testfile-bulk-09", 10485760, "/tmp"),
2702+
norddrop.QueuedFile(FILES["testfile-bulk-10"].id, "testfile-bulk-10", 10485760, "/tmp"),
2703+
]),
2704+
),
2705+
# Wait for some of the files and the stop
2706+
action.Repeated([action.WaitForOneOf([
2707+
event.Start(0, FILES["testfile-bulk-01"].id),
2708+
event.Start(0, FILES["testfile-bulk-02"].id),
2709+
event.Start(0, FILES["testfile-bulk-03"].id),
2710+
event.Start(0, FILES["testfile-bulk-04"].id),
2711+
event.Start(0, FILES["testfile-bulk-05"].id),
2712+
event.Start(0, FILES["testfile-bulk-06"].id),
2713+
event.Start(0, FILES["testfile-bulk-07"].id),
2714+
event.Start(0, FILES["testfile-bulk-08"].id),
2715+
event.Start(0, FILES["testfile-bulk-09"].id),
2716+
event.Start(0, FILES["testfile-bulk-10"].id),
2717+
])], 4),
2718+
# fmt: on
2719+
action.Stop(),
2720+
# fmt: off
2721+
# Wait of paused events
2722+
action.WaitAndIgnoreExcept([
2723+
event.Paused(0, FILES["testfile-bulk-01"].id),
2724+
event.Paused(0, FILES["testfile-bulk-02"].id),
2725+
event.Paused(0, FILES["testfile-bulk-03"].id),
2726+
event.Paused(0, FILES["testfile-bulk-04"].id),
2727+
event.Paused(0, FILES["testfile-bulk-05"].id),
2728+
event.Paused(0, FILES["testfile-bulk-06"].id),
2729+
event.Paused(0, FILES["testfile-bulk-07"].id),
2730+
event.Paused(0, FILES["testfile-bulk-08"].id),
2731+
event.Paused(0, FILES["testfile-bulk-09"].id),
2732+
event.Paused(0, FILES["testfile-bulk-10"].id),
2733+
]),
2734+
# fmt: on
2735+
action.Sleep(2),
2736+
action.ClearEventQueue(),
2737+
action.Start("DROP_PEER_REN", dbpath="/tmp/db/11-4-ren.sqlite"),
2738+
# fmt: off
2739+
# Wait for some of the files and the stop
2740+
action.WaitRacy([
2741+
event.Start(0, FILES["testfile-bulk-01"].id, None),
2742+
event.Start(0, FILES["testfile-bulk-02"].id, None),
2743+
event.Start(0, FILES["testfile-bulk-03"].id, None),
2744+
event.Start(0, FILES["testfile-bulk-04"].id, None),
2745+
event.Start(0, FILES["testfile-bulk-05"].id, None),
2746+
event.Start(0, FILES["testfile-bulk-06"].id, None),
2747+
event.Start(0, FILES["testfile-bulk-07"].id, None),
2748+
event.Start(0, FILES["testfile-bulk-08"].id, None),
2749+
event.Start(0, FILES["testfile-bulk-09"].id, None),
2750+
event.Start(0, FILES["testfile-bulk-10"].id, None),
2751+
event.FinishFileUploaded(0, FILES["testfile-bulk-01"].id),
2752+
event.FinishFileUploaded(0, FILES["testfile-bulk-02"].id),
2753+
event.FinishFileUploaded(0, FILES["testfile-bulk-03"].id),
2754+
event.FinishFileUploaded(0, FILES["testfile-bulk-04"].id),
2755+
event.FinishFileUploaded(0, FILES["testfile-bulk-05"].id),
2756+
event.FinishFileUploaded(0, FILES["testfile-bulk-06"].id),
2757+
event.FinishFileUploaded(0, FILES["testfile-bulk-07"].id),
2758+
event.FinishFileUploaded(0, FILES["testfile-bulk-08"].id),
2759+
event.FinishFileUploaded(0, FILES["testfile-bulk-09"].id),
2760+
event.FinishFileUploaded(0, FILES["testfile-bulk-10"].id),
2761+
]),
2762+
# fmt: on
2763+
action.ExpectCancel([0], True),
2764+
]
2765+
),
2766+
"DROP_PEER_STIMPY": ActionList(
2767+
[
2768+
action.Start("DROP_PEER_STIMPY"),
2769+
# fmt: off
2770+
action.Wait(
2771+
event.Receive(0, "DROP_PEER_REN", [
2772+
norddrop.ReceivedFile(FILES["testfile-bulk-01"].id, "testfile-bulk-01", 10485760),
2773+
norddrop.ReceivedFile(FILES["testfile-bulk-02"].id, "testfile-bulk-02", 10485760),
2774+
norddrop.ReceivedFile(FILES["testfile-bulk-03"].id, "testfile-bulk-03", 10485760),
2775+
norddrop.ReceivedFile(FILES["testfile-bulk-04"].id, "testfile-bulk-04", 10485760),
2776+
norddrop.ReceivedFile(FILES["testfile-bulk-05"].id, "testfile-bulk-05", 10485760),
2777+
norddrop.ReceivedFile(FILES["testfile-bulk-06"].id, "testfile-bulk-06", 10485760),
2778+
norddrop.ReceivedFile(FILES["testfile-bulk-07"].id, "testfile-bulk-07", 10485760),
2779+
norddrop.ReceivedFile(FILES["testfile-bulk-08"].id, "testfile-bulk-08", 10485760),
2780+
norddrop.ReceivedFile(FILES["testfile-bulk-09"].id, "testfile-bulk-09", 10485760),
2781+
norddrop.ReceivedFile(FILES["testfile-bulk-10"].id, "testfile-bulk-10", 10485760),
2782+
]),
2783+
),
2784+
# fmt: on
2785+
# fmt: off
2786+
action.Download(0, FILES["testfile-bulk-01"].id, "/tmp/received/11-4"),
2787+
action.Download(0, FILES["testfile-bulk-02"].id, "/tmp/received/11-4"),
2788+
action.Download(0, FILES["testfile-bulk-03"].id, "/tmp/received/11-4"),
2789+
action.Download(0, FILES["testfile-bulk-04"].id, "/tmp/received/11-4"),
2790+
action.Download(0, FILES["testfile-bulk-05"].id, "/tmp/received/11-4"),
2791+
action.Download(0, FILES["testfile-bulk-06"].id, "/tmp/received/11-4"),
2792+
action.Download(0, FILES["testfile-bulk-07"].id, "/tmp/received/11-4"),
2793+
action.Download(0, FILES["testfile-bulk-08"].id, "/tmp/received/11-4"),
2794+
action.Download(0, FILES["testfile-bulk-09"].id, "/tmp/received/11-4"),
2795+
action.Download(0, FILES["testfile-bulk-10"].id, "/tmp/received/11-4"),
2796+
# Wait for all 10 pending events
2797+
action.Repeated([action.WaitForOneOf([
2798+
event.Pending(0, FILES["testfile-bulk-01"].id),
2799+
event.Pending(0, FILES["testfile-bulk-02"].id),
2800+
event.Pending(0, FILES["testfile-bulk-03"].id),
2801+
event.Pending(0, FILES["testfile-bulk-04"].id),
2802+
event.Pending(0, FILES["testfile-bulk-05"].id),
2803+
event.Pending(0, FILES["testfile-bulk-06"].id),
2804+
event.Pending(0, FILES["testfile-bulk-07"].id),
2805+
event.Pending(0, FILES["testfile-bulk-08"].id),
2806+
event.Pending(0, FILES["testfile-bulk-09"].id),
2807+
event.Pending(0, FILES["testfile-bulk-10"].id),
2808+
event.Start(0, FILES["testfile-bulk-01"].id),
2809+
event.Start(0, FILES["testfile-bulk-02"].id),
2810+
event.Start(0, FILES["testfile-bulk-03"].id),
2811+
event.Start(0, FILES["testfile-bulk-04"].id),
2812+
event.Start(0, FILES["testfile-bulk-05"].id),
2813+
event.Start(0, FILES["testfile-bulk-06"].id),
2814+
event.Start(0, FILES["testfile-bulk-07"].id),
2815+
event.Start(0, FILES["testfile-bulk-08"].id),
2816+
event.Start(0, FILES["testfile-bulk-09"].id),
2817+
event.Start(0, FILES["testfile-bulk-10"].id),
2818+
]),
2819+
], 10),
2820+
action.WaitAndIgnoreExcept([
2821+
event.Paused(0, FILES["testfile-bulk-01"].id),
2822+
event.Paused(0, FILES["testfile-bulk-02"].id),
2823+
event.Paused(0, FILES["testfile-bulk-03"].id),
2824+
event.Paused(0, FILES["testfile-bulk-04"].id),
2825+
event.Paused(0, FILES["testfile-bulk-05"].id),
2826+
event.Paused(0, FILES["testfile-bulk-06"].id),
2827+
event.Paused(0, FILES["testfile-bulk-07"].id),
2828+
event.Paused(0, FILES["testfile-bulk-08"].id),
2829+
event.Paused(0, FILES["testfile-bulk-09"].id),
2830+
event.Paused(0, FILES["testfile-bulk-10"].id),
2831+
]),
2832+
# The sender is stopped
2833+
action.WaitRacy([
2834+
event.Start(0, FILES["testfile-bulk-01"].id, None),
2835+
event.Start(0, FILES["testfile-bulk-02"].id, None),
2836+
event.Start(0, FILES["testfile-bulk-03"].id, None),
2837+
event.Start(0, FILES["testfile-bulk-04"].id, None),
2838+
event.Start(0, FILES["testfile-bulk-05"].id, None),
2839+
event.Start(0, FILES["testfile-bulk-06"].id, None),
2840+
event.Start(0, FILES["testfile-bulk-07"].id, None),
2841+
event.Start(0, FILES["testfile-bulk-08"].id, None),
2842+
event.Start(0, FILES["testfile-bulk-09"].id, None),
2843+
event.Start(0, FILES["testfile-bulk-10"].id, None),
2844+
event.FinishFileDownloaded(0, FILES["testfile-bulk-01"].id, "/tmp/received/11-4/testfile-bulk-01"),
2845+
event.FinishFileDownloaded(0, FILES["testfile-bulk-02"].id, "/tmp/received/11-4/testfile-bulk-02"),
2846+
event.FinishFileDownloaded(0, FILES["testfile-bulk-03"].id, "/tmp/received/11-4/testfile-bulk-03"),
2847+
event.FinishFileDownloaded(0, FILES["testfile-bulk-04"].id, "/tmp/received/11-4/testfile-bulk-04"),
2848+
event.FinishFileDownloaded(0, FILES["testfile-bulk-05"].id, "/tmp/received/11-4/testfile-bulk-05"),
2849+
event.FinishFileDownloaded(0, FILES["testfile-bulk-06"].id, "/tmp/received/11-4/testfile-bulk-06"),
2850+
event.FinishFileDownloaded(0, FILES["testfile-bulk-07"].id, "/tmp/received/11-4/testfile-bulk-07"),
2851+
event.FinishFileDownloaded(0, FILES["testfile-bulk-08"].id, "/tmp/received/11-4/testfile-bulk-08"),
2852+
event.FinishFileDownloaded(0, FILES["testfile-bulk-09"].id, "/tmp/received/11-4/testfile-bulk-09"),
2853+
event.FinishFileDownloaded(0, FILES["testfile-bulk-10"].id, "/tmp/received/11-4/testfile-bulk-10"),
2854+
]),
2855+
# fmt: on
2856+
action.CancelTransferRequest([0]),
2857+
action.ExpectCancel([0], False),
2858+
action.Stop(),
2859+
]
2860+
),
2861+
},
2862+
),
26672863
Scenario(
26682864
"scenario12-1",
26692865
"Transfer file to two clients simultaneously",

0 commit comments

Comments
 (0)