@@ -10,7 +10,7 @@ namespace dotnetCampus.Threading
10
10
/// 提供一个异步的队列。可以使用 await 关键字异步等待出队,当有元素入队的时候,等待就会完成。
11
11
/// </summary>
12
12
/// <typeparam name="T">存入异步队列中的元素类型。</typeparam>
13
- public class AsyncQueue < T > : IDisposable
13
+ public class AsyncQueue < T > : IDisposable , IAsyncDisposable
14
14
{
15
15
private readonly SemaphoreSlim _semaphoreSlim ;
16
16
private readonly ConcurrentQueue < T > _queue ;
@@ -36,6 +36,7 @@ public AsyncQueue()
36
36
/// <param name="item">要入队的元素。</param>
37
37
public void Enqueue ( T item )
38
38
{
39
+ ThrowIfDisposing ( ) ;
39
40
_queue . Enqueue ( item ) ;
40
41
_semaphoreSlim . Release ( ) ;
41
42
}
@@ -46,12 +47,14 @@ public void Enqueue(T item)
46
47
/// <param name="source">要入队的元素序列。</param>
47
48
public void EnqueueRange ( IEnumerable < T > source )
48
49
{
50
+ ThrowIfDisposing ( ) ;
49
51
var n = 0 ;
50
52
foreach ( var item in source )
51
53
{
52
54
_queue . Enqueue ( item ) ;
53
55
n ++ ;
54
56
}
57
+
55
58
_semaphoreSlim . Release ( n ) ;
56
59
}
57
60
@@ -75,8 +78,10 @@ public async Task<T> DequeueAsync(CancellationToken cancellationToken = default)
75
78
}
76
79
else
77
80
{
81
+ // 当前没有任务
78
82
lock ( _queue )
79
83
{
84
+ // 事件不是线程安全,因为存在事件的加等
80
85
CurrentFinished ? . Invoke ( this , EventArgs . Empty ) ;
81
86
}
82
87
}
@@ -85,8 +90,10 @@ public async Task<T> DequeueAsync(CancellationToken cancellationToken = default)
85
90
return default ;
86
91
}
87
92
88
- private event EventHandler CurrentFinished ;
89
-
93
+ /// <summary>
94
+ /// 等待当前的所有任务执行完成
95
+ /// </summary>
96
+ /// <returns></returns>
90
97
public async ValueTask WaitForCurrentFinished ( )
91
98
{
92
99
if ( _queue . Count == 0 )
@@ -96,6 +103,8 @@ public async ValueTask WaitForCurrentFinished()
96
103
97
104
using var currentFinishedTask = new CurrentFinishedTask ( this ) ;
98
105
106
+ // 有线程执行事件触发,刚好此时在创建 CurrentFinishedTask 对象
107
+ // 此时需要重新判断是否存在任务
99
108
if ( _queue . Count == 0 )
100
109
{
101
110
return ;
@@ -106,9 +115,13 @@ public async ValueTask WaitForCurrentFinished()
106
115
107
116
/// <summary>
108
117
/// 主要用来释放锁,让 DequeueAsync 方法返回,解决因为锁让此对象内存不释放
118
+ /// <para></para>
119
+ /// 这个方法不是线程安全
109
120
/// </summary>
110
121
public void Dispose ( )
111
122
{
123
+ _isDisposing = true ;
124
+
112
125
// 当释放的时候,将通过 _queue 的 Clear 清空内容,而通过 _semaphoreSlim 的释放让 DequeueAsync 释放锁
113
126
// 此时将会在 DequeueAsync 进入 TryDequeue 方法,也许此时依然有开发者在 _queue.Clear() 之后插入元素,但是没关系,我只是需要保证调用 Dispose 之后会让 DequeueAsync 方法返回而已
114
127
_isDisposed = true ;
@@ -118,6 +131,44 @@ public void Dispose()
118
131
_semaphoreSlim . Dispose ( ) ;
119
132
}
120
133
134
+ /// <summary>
135
+ /// 等待任务执行完成之后返回,此方法不是线程安全
136
+ /// <para></para>
137
+ /// 如果在调用此方法同时添加任务,那么添加的任务存在线程安全
138
+ /// </summary>
139
+ /// <returns></returns>
140
+ public async ValueTask DisposeAsync ( )
141
+ {
142
+ _isDisposing = true ;
143
+ await WaitForCurrentFinished ( ) ;
144
+
145
+ // 在设置 _isDisposing 完成,刚好有 Enqueue 的代码
146
+ if ( _queue . Count != 0 )
147
+ {
148
+ // 再次等待
149
+ await WaitForCurrentFinished ( ) ;
150
+ }
151
+
152
+ // 其实此时依然可以存在有线程在 Enqueue 执行,但是此时就忽略了
153
+
154
+ // 设置变量,此时循环将会跳出
155
+ _isDisposed = true ;
156
+ _semaphoreSlim . Release ( int . MaxValue ) ;
157
+ _semaphoreSlim . Dispose ( ) ;
158
+ }
159
+
160
+ // 这里忽略线程安全
161
+ private void ThrowIfDisposing ( )
162
+ {
163
+ if ( _isDisposing )
164
+ {
165
+ throw new ObjectDisposedException ( nameof ( AsyncQueue < T > ) ) ;
166
+ }
167
+ }
168
+
169
+ private event EventHandler CurrentFinished ;
170
+
171
+ private bool _isDisposing ;
121
172
private bool _isDisposed ;
122
173
123
174
class CurrentFinishedTask : IDisposable
@@ -132,7 +183,7 @@ public CurrentFinishedTask(AsyncQueue<T> asyncQueue)
132
183
}
133
184
}
134
185
135
- private void CurrentFinished ( object ? sender , EventArgs e )
186
+ private void CurrentFinished ( object sender , EventArgs e )
136
187
{
137
188
_currentFinishedTaskCompletionSource . TrySetResult ( true ) ;
138
189
}
@@ -142,7 +193,8 @@ public async ValueTask WaitForCurrentFinished()
142
193
await _currentFinishedTaskCompletionSource . Task ;
143
194
}
144
195
145
- private readonly TaskCompletionSource < bool > _currentFinishedTaskCompletionSource = new TaskCompletionSource < bool > ( ) ;
196
+ private readonly TaskCompletionSource < bool > _currentFinishedTaskCompletionSource =
197
+ new TaskCompletionSource < bool > ( ) ;
146
198
147
199
private readonly AsyncQueue < T > _asyncQueue ;
148
200
@@ -156,4 +208,4 @@ public void Dispose()
156
208
}
157
209
}
158
210
}
159
- }
211
+ }
0 commit comments