1
+ #nullable enable
2
+ using System . Collections . Generic ;
3
+ using System . Linq ;
4
+ using System . Threading . Tasks ;
5
+
6
+ #if NETFRAMEWORK
7
+ using ValueTask = System . Threading . Tasks . Task ;
8
+ #endif
9
+
10
+ namespace dotnetCampus . Threading
11
+ {
12
+ /// <summary>
13
+ /// 限制执行数量的任务,执行的任务超过设置的数量将可以等待直到正在执行任务数小于设置的数量
14
+ /// </summary>
15
+ #if PublicAsInternal
16
+ internal
17
+ #else
18
+ public
19
+ #endif
20
+ class LimitedRunningCountTask
21
+ {
22
+ /// <summary>
23
+ /// 创建限制执行数量的任务
24
+ /// </summary>
25
+ /// <param name="maxRunningCount">允许最大的执行数量的任务</param>
26
+ public LimitedRunningCountTask ( uint maxRunningCount )
27
+ {
28
+ MaxRunningCount = maxRunningCount ;
29
+ }
30
+
31
+ /// <summary>
32
+ /// 执行的任务数
33
+ /// </summary>
34
+ public int RunningCount
35
+ {
36
+ set
37
+ {
38
+ lock ( Locker )
39
+ {
40
+ _runningCount = value ;
41
+ }
42
+ }
43
+ get
44
+ {
45
+ lock ( Locker )
46
+ {
47
+ return _runningCount ;
48
+ }
49
+ }
50
+ }
51
+
52
+ /// <summary>
53
+ /// 允许最大的执行数量的任务
54
+ /// </summary>
55
+ public uint MaxRunningCount { get ; }
56
+
57
+ /// <summary>
58
+ /// 加入执行任务
59
+ /// </summary>
60
+ /// <param name="task"></param>
61
+ public void Add ( Task task )
62
+ {
63
+ RunningCount ++ ;
64
+ lock ( Locker )
65
+ {
66
+ Buffer . Add ( task ) ;
67
+
68
+ RunningBreakTask ? . TrySetResult ( true ) ;
69
+ }
70
+
71
+ RunningInner ( ) ;
72
+ }
73
+
74
+ /// <summary>
75
+ /// 加入等待任务,在空闲之后等待才会返回
76
+ /// </summary>
77
+ /// <param name="task"></param>
78
+ /// <returns></returns>
79
+ public async ValueTask AddAsync ( Task task )
80
+ {
81
+ // ReSharper disable once MethodHasAsyncOverload
82
+ Add ( task ) ;
83
+ await WaitForFree ( ) ;
84
+ }
85
+
86
+ /// <summary>
87
+ /// 等待空闲
88
+ /// </summary>
89
+ /// <returns></returns>
90
+ public async ValueTask WaitForFree ( )
91
+ {
92
+ if ( WaitForFreeTask == null )
93
+ {
94
+ return ;
95
+ }
96
+
97
+ await WaitForFreeTask . Task ;
98
+ }
99
+
100
+
101
+ private TaskCompletionSource < bool > ? RunningBreakTask
102
+ {
103
+ set
104
+ {
105
+ lock ( Locker )
106
+ {
107
+ _runningBreakTask = value ;
108
+ }
109
+ }
110
+ get
111
+ {
112
+ lock ( Locker )
113
+ {
114
+ return _runningBreakTask ;
115
+ }
116
+ }
117
+ }
118
+
119
+ private TaskCompletionSource < bool > ? WaitForFreeTask
120
+ {
121
+ set
122
+ {
123
+ lock ( Locker )
124
+ {
125
+ _waitForFreeTask = value ;
126
+ }
127
+ }
128
+ get
129
+ {
130
+ lock ( Locker )
131
+ {
132
+ return _waitForFreeTask ;
133
+ }
134
+ }
135
+ }
136
+
137
+ private List < Task > Buffer { get ; } = new List < Task > ( ) ;
138
+
139
+ private object Locker => Buffer ;
140
+
141
+ private bool _isRunning ;
142
+
143
+ private int _runningCount ;
144
+
145
+ private TaskCompletionSource < bool > ? _runningBreakTask ;
146
+
147
+ private TaskCompletionSource < bool > ? _waitForFreeTask ;
148
+
149
+ private async void RunningInner ( )
150
+ {
151
+ // ReSharper disable once InconsistentlySynchronizedField
152
+ if ( _isRunning )
153
+ {
154
+ return ;
155
+ }
156
+
157
+ lock ( Locker )
158
+ {
159
+ if ( _isRunning )
160
+ {
161
+ return ;
162
+ }
163
+
164
+ _isRunning = true ;
165
+ }
166
+
167
+ List < Task > runningTaskList ;
168
+ lock ( Locker )
169
+ {
170
+ runningTaskList = Buffer . ToList ( ) ;
171
+ Buffer . Clear ( ) ;
172
+ RunningBreakTask = new TaskCompletionSource < bool > ( ) ;
173
+ runningTaskList . Add ( RunningBreakTask . Task ) ;
174
+
175
+ SetWaitForFreeTask ( ) ;
176
+ }
177
+
178
+ while ( runningTaskList . Count > 0 )
179
+ {
180
+ // 加入等待
181
+ await Task . WhenAny ( runningTaskList ) ;
182
+
183
+ // 干掉不需要的任务
184
+ runningTaskList . RemoveAll ( task => task . IsCompleted ) ;
185
+
186
+ lock ( Locker )
187
+ {
188
+ runningTaskList . AddRange ( Buffer ) ;
189
+ Buffer . Clear ( ) ;
190
+
191
+ RunningCount = runningTaskList . Count ;
192
+
193
+ if ( ! RunningBreakTask . Task . IsCompleted )
194
+ {
195
+ runningTaskList . Add ( RunningBreakTask . Task ) ;
196
+ }
197
+ else
198
+ {
199
+ RunningBreakTask = new TaskCompletionSource < bool > ( ) ;
200
+ runningTaskList . Add ( RunningBreakTask . Task ) ;
201
+ }
202
+
203
+ if ( runningTaskList . Count < MaxRunningCount )
204
+ {
205
+ WaitForFreeTask ? . TrySetResult ( true ) ;
206
+ }
207
+ else
208
+ {
209
+ SetWaitForFreeTask ( ) ;
210
+ }
211
+ }
212
+ }
213
+
214
+ lock ( Locker )
215
+ {
216
+ _isRunning = false ;
217
+ }
218
+
219
+ void SetWaitForFreeTask ( )
220
+ {
221
+ if ( runningTaskList . Count > MaxRunningCount )
222
+ {
223
+ if ( WaitForFreeTask ? . Task . IsCompleted is false )
224
+ {
225
+ }
226
+ else
227
+ {
228
+ WaitForFreeTask = new TaskCompletionSource < bool > ( ) ;
229
+ }
230
+ }
231
+ }
232
+ }
233
+ }
234
+ }
0 commit comments