@@ -255,16 +255,6 @@ impl<Fut> FuturesUnordered<Fut> {
255
255
// `wake` from doing any work in the future
256
256
let prev = task. queued . swap ( true , SeqCst ) ;
257
257
258
- // Drop the future, even if it hasn't finished yet. This is safe
259
- // because we're dropping the future on the thread that owns
260
- // `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and
261
- // such.
262
- unsafe {
263
- // Set to `None` rather than `take()`ing to prevent moving the
264
- // future.
265
- * task. future . get ( ) = None ;
266
- }
267
-
268
258
// If the queued flag was previously set, then it means that this task
269
259
// is still in our internal ready to run queue. We then transfer
270
260
// ownership of our reference count to the ready to run queue, and it'll
@@ -276,8 +266,25 @@ impl<Fut> FuturesUnordered<Fut> {
276
266
// enqueue the task, so our task will never see the ready to run queue
277
267
// again. The task itself will be deallocated once all reference counts
278
268
// have been dropped elsewhere by the various wakers that contain it.
279
- if prev {
280
- mem:: forget ( task) ;
269
+ //
270
+ // Use ManuallyDrop to transfer the reference count ownership before
271
+ // dropping the future so unwinding won't release the reference count.
272
+ let md_slot;
273
+ let task = if prev {
274
+ md_slot = mem:: ManuallyDrop :: new ( task) ;
275
+ & * md_slot
276
+ } else {
277
+ & task
278
+ } ;
279
+
280
+ // Drop the future, even if it hasn't finished yet. This is safe
281
+ // because we're dropping the future on the thread that owns
282
+ // `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and
283
+ // such.
284
+ unsafe {
285
+ // Set to `None` rather than `take()`ing to prevent moving the
286
+ // future.
287
+ * task. future . get ( ) = None ;
281
288
}
282
289
}
283
290
@@ -566,15 +573,27 @@ impl<Fut> FuturesUnordered<Fut> {
566
573
567
574
impl < Fut > Drop for FuturesUnordered < Fut > {
568
575
fn drop ( & mut self ) {
576
+ // Before the strong reference to the queue is dropped we need all
577
+ // futures to be dropped. See note at the bottom of this method.
578
+ //
579
+ // If there is a panic before this completes, we leak the queue.
580
+ struct LeakQueueOnDrop < ' a , Fut > ( & ' a mut FuturesUnordered < Fut > ) ;
581
+ impl < Fut > Drop for LeakQueueOnDrop < ' _ , Fut > {
582
+ fn drop ( & mut self ) {
583
+ mem:: forget ( Arc :: clone ( & self . 0 . ready_to_run_queue ) ) ;
584
+ }
585
+ }
586
+ let guard = LeakQueueOnDrop ( self ) ;
569
587
// When a `FuturesUnordered` is dropped we want to drop all futures
570
588
// associated with it. At the same time though there may be tons of
571
589
// wakers flying around which contain `Task<Fut>` references
572
590
// inside them. We'll let those naturally get deallocated.
573
- while !self . head_all . get_mut ( ) . is_null ( ) {
574
- let head = * self . head_all . get_mut ( ) ;
575
- let task = unsafe { self . unlink ( head) } ;
576
- self . release_task ( task) ;
591
+ while !guard . 0 . head_all . get_mut ( ) . is_null ( ) {
592
+ let head = * guard . 0 . head_all . get_mut ( ) ;
593
+ let task = unsafe { guard . 0 . unlink ( head) } ;
594
+ guard . 0 . release_task ( task) ;
577
595
}
596
+ mem:: forget ( guard) ; // safe to release strong reference to queue
578
597
579
598
// Note that at this point we could still have a bunch of tasks in the
580
599
// ready to run queue. None of those tasks, however, have futures
0 commit comments