Skip to content

Commit 60d4299

Browse files
committed
Merge branch 'main' into release/v7-moose-no-context
2 parents 6d6fc64 + 024e24d commit 60d4299

File tree

12 files changed

+272
-45
lines changed

12 files changed

+272
-45
lines changed

.github/workflows/analytics-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
default: true
1616
- name: docker sdk
1717
run: |
18-
pip3 install docker
18+
pip3 install -r test/requirements.txt
1919
- name: Print docker version
2020
run: docker version
2121
- name: Build Docker image

.github/workflows/integration-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
default: true
1616
- name: docker sdk
1717
run: |
18-
pip3 install docker
18+
pip3 install -r test/requirements.txt
1919
- name: Print docker version
2020
run: docker version
2121
- name: Build Docker image

changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ This means the whole API has changed. Even though semantics are the same
66
the mechanism is now different and requires new implementation to use properly.
77
* Split checksum events into finalize and verify
88
* Add `base_dir` field in the `RequestQueued` event files
9+
* Fix rare issue of missing receiver's in-progress events
910

1011
---
1112
<br>

drop-transfer/src/ws/server/handler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ pub trait HandlerLoop {
6060
async fn on_bin_msg(&mut self, ws: &mut WebSocket, bytes: Vec<u8>) -> anyhow::Result<()>;
6161

6262
async fn finalize_success(self);
63+
async fn finalize_failure(self);
6364
}
6465

6566
pub trait Request {

drop-transfer/src/ws/server/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ impl RunContext<'_> {
564564
"WS connection broke for {}: {err:?}",
565565
xfer.id()
566566
);
567+
handler.finalize_failure().await;
567568
} else {
568569
info!(self.logger, "Sucesfully finalizing transfer loop");
569570
handler.finalize_success().await;

drop-transfer/src/ws/server/v2.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,18 @@ impl<const PING: bool> HandlerLoop<'_, PING> {
226226
self.stop_task(&file_id, Status::FileRejected).await;
227227
}
228228
}
229+
230+
fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
231+
let jobs = std::mem::take(&mut self.jobs);
232+
233+
async move {
234+
let tasks = jobs.into_values().map(|task| async move {
235+
task.events.pause().await;
236+
});
237+
238+
futures::future::join_all(tasks).await;
239+
}
240+
}
229241
}
230242

231243
#[async_trait::async_trait]
@@ -359,20 +371,18 @@ impl<const PING: bool> handler::HandlerLoop for HandlerLoop<'_, PING> {
359371
async fn finalize_success(mut self) {
360372
debug!(self.logger, "Finalizing");
361373
}
374+
375+
// While the destructor ensures the events are paused this function waits for
376+
// the execution to be finished
377+
async fn finalize_failure(mut self) {
378+
self.take_pause_futures().await;
379+
}
362380
}
363381

364382
impl<const PING: bool> Drop for HandlerLoop<'_, PING> {
365383
fn drop(&mut self) {
366384
debug!(self.logger, "Stopping server handler");
367-
368-
let jobs = std::mem::take(&mut self.jobs);
369-
tokio::spawn(async move {
370-
let tasks = jobs.into_values().map(|task| async move {
371-
task.events.pause().await;
372-
});
373-
374-
futures::future::join_all(tasks).await;
375-
});
385+
tokio::spawn(self.take_pause_futures());
376386
}
377387
}
378388

drop-transfer/src/ws/server/v4.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,18 @@ impl HandlerLoop<'_> {
325325
}
326326
}
327327
}
328+
329+
fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
330+
let jobs = std::mem::take(&mut self.jobs);
331+
332+
async move {
333+
let tasks = jobs.into_values().map(|task| async move {
334+
task.events.pause().await;
335+
});
336+
337+
futures::future::join_all(tasks).await;
338+
}
339+
}
328340
}
329341

330342
#[async_trait::async_trait]
@@ -476,20 +488,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
476488
.map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))),
477489
);
478490
}
491+
492+
// While the destructor ensures the events are paused this function waits for
493+
// the execution to be finished
494+
async fn finalize_failure(mut self) {
495+
self.take_pause_futures().await;
496+
}
479497
}
480498

481499
impl Drop for HandlerLoop<'_> {
482500
fn drop(&mut self) {
483501
debug!(self.logger, "Stopping server handler");
484-
485-
let jobs = std::mem::take(&mut self.jobs);
486-
tokio::spawn(async move {
487-
let tasks = jobs.into_values().map(|task| async move {
488-
task.events.pause().await;
489-
});
490-
491-
futures::future::join_all(tasks).await;
492-
});
502+
tokio::spawn(self.take_pause_futures());
493503
}
494504
}
495505

drop-transfer/src/ws/server/v6.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,18 @@ impl HandlerLoop<'_> {
370370
}
371371
}
372372
}
373+
374+
fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
375+
let jobs = std::mem::take(&mut self.jobs);
376+
377+
async move {
378+
let tasks = jobs.into_values().map(|task| async move {
379+
task.events.pause().await;
380+
});
381+
382+
futures::future::join_all(tasks).await;
383+
}
384+
}
373385
}
374386

375387
#[async_trait::async_trait]
@@ -535,20 +547,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
535547
.map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))),
536548
);
537549
}
550+
551+
// While the destructor ensures the events are paused this function waits for
552+
// the execution to be finished
553+
async fn finalize_failure(mut self) {
554+
self.take_pause_futures().await;
555+
}
538556
}
539557

540558
impl Drop for HandlerLoop<'_> {
541559
fn drop(&mut self) {
542560
debug!(self.logger, "Stopping server handler");
543-
544-
let jobs = std::mem::take(&mut self.jobs);
545-
tokio::spawn(async move {
546-
let tasks = jobs.into_values().map(|task| async move {
547-
task.events.pause().await;
548-
});
549-
550-
futures::future::join_all(tasks).await;
551-
});
561+
tokio::spawn(self.take_pause_futures());
552562
}
553563
}
554564

go.mod

Lines changed: 0 additions & 3 deletions
This file was deleted.

test/drop_test/action.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,17 @@ def __str__(self):
682682
return f"DrainEvents({self._count})"
683683

684684

685+
class ClearEventQueue(Action):
686+
def __init__(self):
687+
pass
688+
689+
async def run(self, drop: ffi.Drop):
690+
_ = await drop._events.gather_all(0)
691+
692+
def __str__(self):
693+
return "ClearEventQueue()"
694+
695+
685696
class NoEvent(Action):
686697
def __init__(self, duration: int = 3):
687698
self._duration = duration

test/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
osxmetadata ~= 1.2; sys_platform == 'darwin'
2+
requests < 2.32.0
3+
docker == 7.0.0

0 commit comments

Comments
 (0)