@@ -256,16 +256,6 @@ impl<Fut> FuturesUnordered<Fut> {
256
256
// `wake` from doing any work in the future
257
257
let prev = task. queued . swap ( true , SeqCst ) ;
258
258
259
- // Drop the future, even if it hasn't finished yet. This is safe
260
- // because we're dropping the future on the thread that owns
261
- // `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and
262
- // such.
263
- unsafe {
264
- // Set to `None` rather than `take()`ing to prevent moving the
265
- // future.
266
- * task. future . get ( ) = None ;
267
- }
268
-
269
259
// If the queued flag was previously set, then it means that this task
270
260
// is still in our internal ready to run queue. We then transfer
271
261
// ownership of our reference count to the ready to run queue, and it'll
@@ -277,8 +267,25 @@ impl<Fut> FuturesUnordered<Fut> {
277
267
// enqueue the task, so our task will never see the ready to run queue
278
268
// again. The task itself will be deallocated once all reference counts
279
269
// have been dropped elsewhere by the various wakers that contain it.
280
- if prev {
281
- mem:: forget ( task) ;
270
+ //
271
+ // Use ManuallyDrop to transfer the reference count ownership before
272
+ // dropping the future so unwinding won't release the reference count.
273
+ let md_slot;
274
+ let task = if prev {
275
+ md_slot = mem:: ManuallyDrop :: new ( task) ;
276
+ & * md_slot
277
+ } else {
278
+ & task
279
+ } ;
280
+
281
+ // Drop the future, even if it hasn't finished yet. This is safe
282
+ // because we're dropping the future on the thread that owns
283
+ // `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and
284
+ // such.
285
+ unsafe {
286
+ // Set to `None` rather than `take()`ing to prevent moving the
287
+ // future.
288
+ * task. future . get ( ) = None ;
282
289
}
283
290
}
284
291
@@ -567,15 +574,27 @@ impl<Fut> FuturesUnordered<Fut> {
567
574
568
575
impl < Fut > Drop for FuturesUnordered < Fut > {
569
576
fn drop ( & mut self ) {
577
+ // Before the strong reference to the queue is dropped we need all
578
+ // futures to be dropped. See note at the bottom of this method.
579
+ //
580
+ // If there is a panic before this completes, we leak the queue.
581
+ struct LeakQueueOnDrop < ' a , Fut > ( & ' a mut FuturesUnordered < Fut > ) ;
582
+ impl < Fut > Drop for LeakQueueOnDrop < ' _ , Fut > {
583
+ fn drop ( & mut self ) {
584
+ mem:: forget ( Arc :: clone ( & self . 0 . ready_to_run_queue ) ) ;
585
+ }
586
+ }
587
+ let guard = LeakQueueOnDrop ( self ) ;
570
588
// When a `FuturesUnordered` is dropped we want to drop all futures
571
589
// associated with it. At the same time though there may be tons of
572
590
// wakers flying around which contain `Task<Fut>` references
573
591
// inside them. We'll let those naturally get deallocated.
574
- while !self . head_all . get_mut ( ) . is_null ( ) {
575
- let head = * self . head_all . get_mut ( ) ;
576
- let task = unsafe { self . unlink ( head) } ;
577
- self . release_task ( task) ;
592
+ while !guard . 0 . head_all . get_mut ( ) . is_null ( ) {
593
+ let head = * guard . 0 . head_all . get_mut ( ) ;
594
+ let task = unsafe { guard . 0 . unlink ( head) } ;
595
+ guard . 0 . release_task ( task) ;
578
596
}
597
+ mem:: forget ( guard) ; // safe to release strong reference to queue
579
598
580
599
// Note that at this point we could still have a bunch of tasks in the
581
600
// ready to run queue. None of those tasks, however, have futures
0 commit comments