Skip to content

Commit dcf0f89

Browse files
authored
Merge pull request #13 from dotnet-campus/t/lindexi/KallwarwhelkuGayjohuferekerene
等待任务执行完成
2 parents 728f27b + c35874c commit dcf0f89

File tree

1 file changed

+58
-6
lines changed

1 file changed

+58
-6
lines changed

AsyncWorkerCollection/AsyncQueue.cs

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace dotnetCampus.Threading
1010
/// 提供一个异步的队列。可以使用 await 关键字异步等待出队,当有元素入队的时候,等待就会完成。
1111
/// </summary>
1212
/// <typeparam name="T">存入异步队列中的元素类型。</typeparam>
13-
public class AsyncQueue<T> : IDisposable
13+
public class AsyncQueue<T> : IDisposable, IAsyncDisposable
1414
{
1515
private readonly SemaphoreSlim _semaphoreSlim;
1616
private readonly ConcurrentQueue<T> _queue;
@@ -36,6 +36,7 @@ public AsyncQueue()
3636
/// <param name="item">要入队的元素。</param>
3737
public void Enqueue(T item)
3838
{
39+
ThrowIfDisposing();
3940
_queue.Enqueue(item);
4041
_semaphoreSlim.Release();
4142
}
@@ -46,12 +47,14 @@ public void Enqueue(T item)
4647
/// <param name="source">要入队的元素序列。</param>
4748
public void EnqueueRange(IEnumerable<T> source)
4849
{
50+
ThrowIfDisposing();
4951
var n = 0;
5052
foreach (var item in source)
5153
{
5254
_queue.Enqueue(item);
5355
n++;
5456
}
57+
5558
_semaphoreSlim.Release(n);
5659
}
5760

@@ -75,8 +78,10 @@ public async Task<T> DequeueAsync(CancellationToken cancellationToken = default)
7578
}
7679
else
7780
{
81+
// 当前没有任务
7882
lock (_queue)
7983
{
84+
// 事件不是线程安全,因为存在事件的加等
8085
CurrentFinished?.Invoke(this, EventArgs.Empty);
8186
}
8287
}
@@ -85,8 +90,10 @@ public async Task<T> DequeueAsync(CancellationToken cancellationToken = default)
8590
return default;
8691
}
8792

88-
private event EventHandler CurrentFinished;
89-
93+
/// <summary>
94+
/// 等待当前的所有任务执行完成
95+
/// </summary>
96+
/// <returns></returns>
9097
public async ValueTask WaitForCurrentFinished()
9198
{
9299
if (_queue.Count == 0)
@@ -96,6 +103,8 @@ public async ValueTask WaitForCurrentFinished()
96103

97104
using var currentFinishedTask = new CurrentFinishedTask(this);
98105

106+
// 有线程执行事件触发,刚好此时在创建 CurrentFinishedTask 对象
107+
// 此时需要重新判断是否存在任务
99108
if (_queue.Count == 0)
100109
{
101110
return;
@@ -106,9 +115,13 @@ public async ValueTask WaitForCurrentFinished()
106115

107116
/// <summary>
108117
/// 主要用来释放锁,让 DequeueAsync 方法返回,解决因为锁让此对象内存不释放
118+
/// <para></para>
119+
/// 这个方法不是线程安全
109120
/// </summary>
110121
public void Dispose()
111122
{
123+
_isDisposing = true;
124+
112125
// 当释放的时候,将通过 _queue 的 Clear 清空内容,而通过 _semaphoreSlim 的释放让 DequeueAsync 释放锁
113126
// 此时将会在 DequeueAsync 进入 TryDequeue 方法,也许此时依然有开发者在 _queue.Clear() 之后插入元素,但是没关系,我只是需要保证调用 Dispose 之后会让 DequeueAsync 方法返回而已
114127
_isDisposed = true;
@@ -118,6 +131,44 @@ public void Dispose()
118131
_semaphoreSlim.Dispose();
119132
}
120133

134+
/// <summary>
135+
/// 等待任务执行完成之后返回,此方法不是线程安全
136+
/// <para></para>
137+
/// 如果在调用此方法同时添加任务,那么添加的任务存在线程安全
138+
/// </summary>
139+
/// <returns></returns>
140+
public async ValueTask DisposeAsync()
141+
{
142+
_isDisposing = true;
143+
await WaitForCurrentFinished();
144+
145+
// 在设置 _isDisposing 完成,刚好有 Enqueue 的代码
146+
if (_queue.Count != 0)
147+
{
148+
// 再次等待
149+
await WaitForCurrentFinished();
150+
}
151+
152+
// 其实此时依然可以存在有线程在 Enqueue 执行,但是此时就忽略了
153+
154+
// 设置变量,此时循环将会跳出
155+
_isDisposed = true;
156+
_semaphoreSlim.Release(int.MaxValue);
157+
_semaphoreSlim.Dispose();
158+
}
159+
160+
// 这里忽略线程安全
161+
private void ThrowIfDisposing()
162+
{
163+
if (_isDisposing)
164+
{
165+
throw new ObjectDisposedException(nameof(AsyncQueue<T>));
166+
}
167+
}
168+
169+
private event EventHandler CurrentFinished;
170+
171+
private bool _isDisposing;
121172
private bool _isDisposed;
122173

123174
class CurrentFinishedTask : IDisposable
@@ -132,7 +183,7 @@ public CurrentFinishedTask(AsyncQueue<T> asyncQueue)
132183
}
133184
}
134185

135-
private void CurrentFinished(object? sender, EventArgs e)
186+
private void CurrentFinished(object sender, EventArgs e)
136187
{
137188
_currentFinishedTaskCompletionSource.TrySetResult(true);
138189
}
@@ -142,7 +193,8 @@ public async ValueTask WaitForCurrentFinished()
142193
await _currentFinishedTaskCompletionSource.Task;
143194
}
144195

145-
private readonly TaskCompletionSource<bool> _currentFinishedTaskCompletionSource = new TaskCompletionSource<bool>();
196+
private readonly TaskCompletionSource<bool> _currentFinishedTaskCompletionSource =
197+
new TaskCompletionSource<bool>();
146198

147199
private readonly AsyncQueue<T> _asyncQueue;
148200

@@ -156,4 +208,4 @@ public void Dispose()
156208
}
157209
}
158210
}
159-
}
211+
}

0 commit comments

Comments
 (0)