Skip to content

Commit 082db85

Browse files
committed
Merge remote-tracking branch 'origin/master' into t/lindexi/KallwarwhelkuGayjohuferekerene
2 parents 5509304 + 728f27b commit 082db85

File tree

1 file changed

+50
-36
lines changed

1 file changed

+50
-36
lines changed

AsyncWorkerCollection/AsyncQueue.cs

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace dotnetCampus.Threading
1010
/// 提供一个异步的队列。可以使用 await 关键字异步等待出队,当有元素入队的时候,等待就会完成。
1111
/// </summary>
1212
/// <typeparam name="T">存入异步队列中的元素类型。</typeparam>
13-
public class AsyncQueue<T> : IDisposable, IAsyncDisposable
13+
public class AsyncQueue<T> : IDisposable
1414
{
1515
private readonly SemaphoreSlim _semaphoreSlim;
1616
private readonly ConcurrentQueue<T> _queue;
@@ -36,8 +36,6 @@ public AsyncQueue()
3636
/// <param name="item">要入队的元素。</param>
3737
public void Enqueue(T item)
3838
{
39-
ThrowIfDisposing();
40-
4139
_queue.Enqueue(item);
4240
_semaphoreSlim.Release();
4341
}
@@ -48,15 +46,12 @@ public void Enqueue(T item)
4846
/// <param name="source">要入队的元素序列。</param>
4947
public void EnqueueRange(IEnumerable<T> source)
5048
{
51-
ThrowIfDisposing();
52-
5349
var n = 0;
5450
foreach (var item in source)
5551
{
5652
_queue.Enqueue(item);
5753
n++;
5854
}
59-
6055
_semaphoreSlim.Release(n);
6156
}
6257

@@ -68,7 +63,6 @@ public void EnqueueRange(IEnumerable<T> source)
6863
/// 由于此方法有返回值,后续方法可能依赖于此返回值,所以如果取消将抛出 <see cref="TaskCanceledException"/>。
6964
/// </param>
7065
/// <returns>可以异步等待的队列返回的元素。</returns>
71-
/// <exception cref="ObjectDisposedException"></exception>
7266
public async Task<T> DequeueAsync(CancellationToken cancellationToken = default)
7367
{
7468
while (!_isDisposed)
@@ -81,65 +75,85 @@ public async Task<T> DequeueAsync(CancellationToken cancellationToken = default)
8175
}
8276
else
8377
{
84-
// 没有内容了,此时也准备gg了,那么就 Break 了
85-
if (_isDisposing)
78+
lock (_queue)
8679
{
87-
_disposeTaskCompletionSource.TrySetResult(true);
88-
break;
80+
CurrentFinished?.Invoke(this, EventArgs.Empty);
8981
}
9082
}
9183
}
9284

93-
ThrowIfDisposing();
94-
9585
return default;
9686
}
9787

88+
private event EventHandler CurrentFinished;
89+
90+
public async ValueTask WaitForCurrentFinished()
91+
{
92+
if (_queue.Count == 0)
93+
{
94+
return;
95+
}
96+
97+
using var currentFinishedTask = new CurrentFinishedTask(this);
98+
99+
if (_queue.Count == 0)
100+
{
101+
return;
102+
}
103+
104+
await currentFinishedTask.WaitForCurrentFinished();
105+
}
106+
98107
/// <summary>
99108
/// 主要用来释放锁,让 DequeueAsync 方法返回,解决因为锁让此对象内存不释放
100109
/// </summary>
101110
public void Dispose()
102111
{
103112
// 当释放的时候,将通过 _queue 的 Clear 清空内容,而通过 _semaphoreSlim 的释放让 DequeueAsync 释放锁
104113
// 此时将会在 DequeueAsync 进入 TryDequeue 方法,也许此时依然有开发者在 _queue.Clear() 之后插入元素,但是没关系,我只是需要保证调用 Dispose 之后会让 DequeueAsync 方法返回而已
105-
_isDisposing = true;
114+
_isDisposed = true;
106115
_queue.Clear();
107116
// 释放 DequeueAsync 方法
108117
_semaphoreSlim.Release(int.MaxValue);
109-
_isDisposed = true;
110118
_semaphoreSlim.Dispose();
111119
}
112120

113121
private bool _isDisposed;
114122

115-
/// <summary>
116-
/// 等待所有任务执行完成
117-
/// </summary>
118-
/// <returns></returns>
119-
public async ValueTask DisposeAsync()
123+
class CurrentFinishedTask : IDisposable
120124
{
121-
_disposeTaskCompletionSource = new TaskCompletionSource<bool>();
122-
_isDisposing = true;
125+
public CurrentFinishedTask(AsyncQueue<T> asyncQueue)
126+
{
127+
_asyncQueue = asyncQueue;
123128

124-
// 释放 DequeueAsync 方法
125-
_semaphoreSlim.Release(int.MaxValue);
129+
lock (_asyncQueue)
130+
{
131+
_asyncQueue.CurrentFinished += CurrentFinished;
132+
}
133+
}
134+
135+
private void CurrentFinished(object? sender, EventArgs e)
136+
{
137+
_currentFinishedTaskCompletionSource.TrySetResult(true);
138+
}
126139

127-
await _disposeTaskCompletionSource.Task;
140+
public async ValueTask WaitForCurrentFinished()
141+
{
142+
await _currentFinishedTaskCompletionSource.Task;
143+
}
128144

129-
_isDisposed = true;
130-
_semaphoreSlim.Dispose();
131-
}
145+
private readonly TaskCompletionSource<bool> _currentFinishedTaskCompletionSource = new TaskCompletionSource<bool>();
132146

133-
private bool _isDisposing;
134-
private TaskCompletionSource<bool> _disposeTaskCompletionSource;
147+
private readonly AsyncQueue<T> _asyncQueue;
135148

136-
// 这里忽略线程安全
137-
private void ThrowIfDisposing()
138-
{
139-
if (_isDisposing)
149+
public void Dispose()
140150
{
141-
throw new ObjectDisposedException(nameof(AsyncQueue<T>));
151+
lock (_asyncQueue)
152+
{
153+
_currentFinishedTaskCompletionSource.TrySetResult(true);
154+
_asyncQueue.CurrentFinished -= CurrentFinished;
155+
}
142156
}
143157
}
144158
}
145-
}
159+
}

0 commit comments

Comments
 (0)