Skip to content

Commit 5509304

Browse files
committed
等待任务执行完成
1 parent 0df34c1 commit 5509304

File tree

1 file changed

+51
-3
lines changed

1 file changed

+51
-3
lines changed

AsyncWorkerCollection/AsyncQueue.cs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace dotnetCampus.Threading
1010
/// 提供一个异步的队列。可以使用 await 关键字异步等待出队,当有元素入队的时候,等待就会完成。
1111
/// </summary>
1212
/// <typeparam name="T">存入异步队列中的元素类型。</typeparam>
13-
public class AsyncQueue<T> : IDisposable
13+
public class AsyncQueue<T> : IDisposable, IAsyncDisposable
1414
{
1515
private readonly SemaphoreSlim _semaphoreSlim;
1616
private readonly ConcurrentQueue<T> _queue;
@@ -36,6 +36,8 @@ public AsyncQueue()
3636
/// <param name="item">要入队的元素。</param>
3737
public void Enqueue(T item)
3838
{
39+
ThrowIfDisposing();
40+
3941
_queue.Enqueue(item);
4042
_semaphoreSlim.Release();
4143
}
@@ -46,12 +48,15 @@ public void Enqueue(T item)
4648
/// <param name="source">要入队的元素序列。</param>
4749
public void EnqueueRange(IEnumerable<T> source)
4850
{
51+
ThrowIfDisposing();
52+
4953
var n = 0;
5054
foreach (var item in source)
5155
{
5256
_queue.Enqueue(item);
5357
n++;
5458
}
59+
5560
_semaphoreSlim.Release(n);
5661
}
5762

@@ -63,6 +68,7 @@ public void EnqueueRange(IEnumerable<T> source)
6368
/// 由于此方法有返回值,后续方法可能依赖于此返回值,所以如果取消将抛出 <see cref="TaskCanceledException"/>。
6469
/// </param>
6570
/// <returns>可以异步等待的队列返回的元素。</returns>
71+
/// <exception cref="ObjectDisposedException"></exception>
6672
public async Task<T> DequeueAsync(CancellationToken cancellationToken = default)
6773
{
6874
while (!_isDisposed)
@@ -73,8 +79,19 @@ public async Task<T> DequeueAsync(CancellationToken cancellationToken = default)
7379
{
7480
return item;
7581
}
82+
else
83+
{
84+
// 没有内容了,此时也准备gg了,那么就 Break 了
85+
if (_isDisposing)
86+
{
87+
_disposeTaskCompletionSource.TrySetResult(true);
88+
break;
89+
}
90+
}
7691
}
7792

93+
ThrowIfDisposing();
94+
7895
return default;
7996
}
8097

@@ -85,13 +102,44 @@ public void Dispose()
85102
{
86103
// 当释放的时候,将通过 _queue 的 Clear 清空内容,而通过 _semaphoreSlim 的释放让 DequeueAsync 释放锁
87104
// 此时将会在 DequeueAsync 进入 TryDequeue 方法,也许此时依然有开发者在 _queue.Clear() 之后插入元素,但是没关系,我只是需要保证调用 Dispose 之后会让 DequeueAsync 方法返回而已
88-
_isDisposed = true;
105+
_isDisposing = true;
89106
_queue.Clear();
90107
// 释放 DequeueAsync 方法
91108
_semaphoreSlim.Release(int.MaxValue);
109+
_isDisposed = true;
92110
_semaphoreSlim.Dispose();
93111
}
94112

95113
private bool _isDisposed;
114+
115+
/// <summary>
116+
/// 等待所有任务执行完成
117+
/// </summary>
118+
/// <returns></returns>
119+
public async ValueTask DisposeAsync()
120+
{
121+
_disposeTaskCompletionSource = new TaskCompletionSource<bool>();
122+
_isDisposing = true;
123+
124+
// 释放 DequeueAsync 方法
125+
_semaphoreSlim.Release(int.MaxValue);
126+
127+
await _disposeTaskCompletionSource.Task;
128+
129+
_isDisposed = true;
130+
_semaphoreSlim.Dispose();
131+
}
132+
133+
private bool _isDisposing;
134+
private TaskCompletionSource<bool> _disposeTaskCompletionSource;
135+
136+
// 这里忽略线程安全
137+
private void ThrowIfDisposing()
138+
{
139+
if (_isDisposing)
140+
{
141+
throw new ObjectDisposedException(nameof(AsyncQueue<T>));
142+
}
143+
}
96144
}
97-
}
145+
}

0 commit comments

Comments
 (0)