File tree Expand file tree Collapse file tree 1 file changed +62
-0
lines changed Expand file tree Collapse file tree 1 file changed +62
-0
lines changed Original file line number Diff line number Diff line change @@ -73,11 +73,37 @@ public async Task<T> DequeueAsync(CancellationToken cancellationToken = default)
73
73
{
74
74
return item ;
75
75
}
76
+ else
77
+ {
78
+ lock ( _queue )
79
+ {
80
+ CurrentFinished ? . Invoke ( this , EventArgs . Empty ) ;
81
+ }
82
+ }
76
83
}
77
84
78
85
return default ;
79
86
}
80
87
88
+ private event EventHandler CurrentFinished ;
89
+
90
+ public async ValueTask WaitForCurrentFinished ( )
91
+ {
92
+ if ( _queue . Count == 0 )
93
+ {
94
+ return ;
95
+ }
96
+
97
+ using var currentFinishedTask = new CurrentFinishedTask ( this ) ;
98
+
99
+ if ( _queue . Count == 0 )
100
+ {
101
+ return ;
102
+ }
103
+
104
+ await currentFinishedTask . WaitForCurrentFinished ( ) ;
105
+ }
106
+
81
107
/// <summary>
82
108
/// 主要用来释放锁,让 DequeueAsync 方法返回,解决因为锁让此对象内存不释放
83
109
/// </summary>
@@ -93,5 +119,41 @@ public void Dispose()
93
119
}
94
120
95
121
private bool _isDisposed ;
122
+
123
+ class CurrentFinishedTask : IDisposable
124
+ {
125
+ public CurrentFinishedTask ( AsyncQueue < T > asyncQueue )
126
+ {
127
+ _asyncQueue = asyncQueue ;
128
+
129
+ lock ( _asyncQueue )
130
+ {
131
+ _asyncQueue . CurrentFinished += CurrentFinished ;
132
+ }
133
+ }
134
+
135
+ private void CurrentFinished ( object ? sender , EventArgs e )
136
+ {
137
+ _currentFinishedTaskCompletionSource . TrySetResult ( true ) ;
138
+ }
139
+
140
+ public async ValueTask WaitForCurrentFinished ( )
141
+ {
142
+ await _currentFinishedTaskCompletionSource . Task ;
143
+ }
144
+
145
+ private readonly TaskCompletionSource < bool > _currentFinishedTaskCompletionSource = new TaskCompletionSource < bool > ( ) ;
146
+
147
+ private readonly AsyncQueue < T > _asyncQueue ;
148
+
149
+ public void Dispose ( )
150
+ {
151
+ lock ( _asyncQueue )
152
+ {
153
+ _currentFinishedTaskCompletionSource . TrySetResult ( true ) ;
154
+ _asyncQueue . CurrentFinished -= CurrentFinished ;
155
+ }
156
+ }
157
+ }
96
158
}
97
159
}
You can’t perform that action at this time.
0 commit comments