Skip to content

Commit 80f6912

Browse files
authored
Merge pull request #32 from dotnet-campus/t/lindexi/Reentrancy
加入可重入方法
2 parents 1e4689f + a184759 commit 80f6912

File tree

5 files changed

+458
-0
lines changed

5 files changed

+458
-0
lines changed

AsyncWorkerCollection.sln.DotSettings

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
2+
<s:Boolean x:Key="/Default/UserDictionary/Words/=Reentrancy/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace dotnetCampus.Threading.Reentrancy
8+
{
9+
/// <summary>
10+
/// 执行当前队列中的最后一个任务,并对所有当前队列任务赋值该任务结果。
11+
/// </summary>
12+
/// <typeparam name="TParameter">
13+
/// 重入任务中单次执行时所使用的参数。
14+
/// 此重入策略不会忽略任何参数。
15+
/// </typeparam>
16+
/// <typeparam name="TReturn">
17+
/// 重入任务中单次执行时所得到的返回值。
18+
/// 此重入策略不会忽略任何返回值。
19+
/// </typeparam>
20+
public sealed class KeepLastReentrancyTask<TParameter, TReturn> : ReentrancyTask<TParameter, TReturn>
21+
{
22+
/// <summary>
23+
/// 用于原子操作判断当前是否正在执行队列中的可重入任务。
24+
/// 1 表示当前正在执行可重入任务;0 表示不确定。
25+
/// 不可使用 bool 类型,因为 bool 类型无法执行可靠的原子操作。
26+
/// </summary>
27+
private volatile int _isRunning;
28+
29+
/// <summary>
30+
/// 由于原子操作仅提供高性能的并发处理而不保证准确性,因此需要一个锁来同步 <see cref="_isRunning"/> 中值为 0 时所指的不确定情况。
31+
/// 不能使用一个锁来同步所有情况是因为在锁中使用 async/await 是不安全的,因此避免在锁中执行异步任务;我们使用原子操作来判断异步任务的执行条件。
32+
/// </summary>
33+
private readonly object _locker = new object();
34+
35+
/// <summary>
36+
/// 使用一个并发队列来表示目前已加入到队列中的全部可重入任务。
37+
/// 因为我们的 <see cref="_locker"/> 不能锁全部队列操作(原因见 <see cref="_locker"/>),因此需要使用并发队列。
38+
/// </summary>
39+
private readonly ConcurrentQueue<TaskWrapper> _queue = new ConcurrentQueue<TaskWrapper>();
40+
41+
/// <summary>
42+
/// 使用一个队列表示当前执行任务开始时所有需要进行赋值结果的任务。
43+
/// </summary>
44+
private readonly Queue<TaskWrapper> _skipQueue = new Queue<TaskWrapper>();
45+
46+
/// <summary>
47+
/// 创建以KeepLast策略执行的可重入任务。
48+
/// </summary>
49+
/// <param name="task">可重入任务本身。</param>
50+
public KeepLastReentrancyTask(Func<TParameter, Task<TReturn>> task) : base(task) { }
51+
52+
/// <summary>
53+
/// 以KeepLast策略执行重入任务,并获取此次重入任务的返回值。
54+
/// 此重入策略会确保执行当前队列中的最后一个任务,并对所有当前队列任务赋值该任务结果。
55+
/// </summary>
56+
/// <param name="arg">此次重入任务使用的参数。</param>
57+
/// <returns>重入任务本次执行的返回值。</returns>
58+
public override Task<TReturn> InvokeAsync(TParameter arg)
59+
{
60+
var wrapper = new TaskWrapper(() => RunCore(arg));
61+
_queue.Enqueue(wrapper);
62+
Run();
63+
return wrapper.AsTask();
64+
}
65+
66+
/// <summary>
67+
/// 以KeepLast策略执行重入任务。此方法确保线程安全。
68+
/// </summary>
69+
private async void Run()
70+
{
71+
var isRunning = Interlocked.CompareExchange(ref _isRunning, 1, 0);
72+
if (isRunning is 1)
73+
{
74+
lock (_locker)
75+
{
76+
if (_isRunning is 1)
77+
{
78+
// 当前已经在执行队列,因此无需继续执行。
79+
return;
80+
}
81+
}
82+
}
83+
84+
//下面这段是在临界区执行的,不存在多线程问题
85+
var hasTask = true;
86+
while (hasTask)
87+
{
88+
TaskWrapper runTask = null;
89+
// 当前还没有任何队列开始执行,因此需要开始执行队列。
90+
while (_queue.TryDequeue(out var wrapper))
91+
{
92+
//所有任务项转入执行队列
93+
if (runTask != null)
94+
{
95+
_skipQueue.Enqueue(runTask);
96+
}
97+
98+
runTask = wrapper;
99+
}
100+
101+
if (runTask != null)
102+
{
103+
// 内部已包含异常处理,因此外面可以无需捕获或者清理。
104+
await runTask.RunAsync().ConfigureAwait(false);
105+
//完成后对等待队列中的项赋值
106+
if (runTask.Exception != null)
107+
{
108+
SetException(runTask.Exception);
109+
}
110+
else
111+
{
112+
SetResult(runTask.Result);
113+
}
114+
}
115+
116+
lock (_locker)
117+
{
118+
hasTask = _queue.TryPeek(out _);
119+
if (!hasTask)
120+
{
121+
//退出临界区
122+
_isRunning = 0;
123+
}
124+
}
125+
}
126+
}
127+
128+
private void SetException(Exception exception)
129+
{
130+
while (_skipQueue.Count > 0)
131+
{
132+
var taskWrapper = _skipQueue.Dequeue();
133+
taskWrapper.SetException(exception);
134+
}
135+
}
136+
137+
private void SetResult(TReturn result)
138+
{
139+
while (_skipQueue.Count > 0)
140+
{
141+
var taskWrapper = _skipQueue.Dequeue();
142+
taskWrapper.SetResult(result);
143+
}
144+
}
145+
146+
/// <summary>
147+
/// 包装一个异步任务,以便在可以执行此异步任务的情况下可以在其他方法中监视其完成情况。
148+
/// </summary>
149+
private class TaskWrapper
150+
{
151+
/// <summary>
152+
/// 创建一个任务包装。
153+
/// </summary>
154+
internal TaskWrapper(Func<Task<TReturn>> workingTask)
155+
{
156+
_taskSource = new TaskCompletionSource<TReturn>();
157+
_task = workingTask;
158+
}
159+
160+
private readonly TaskCompletionSource<TReturn> _taskSource;
161+
private readonly Func<Task<TReturn>> _task;
162+
163+
public TReturn Result { get; set; }
164+
public Exception Exception { get; set; }
165+
166+
/// <summary>
167+
/// 执行此异步任务。
168+
/// </summary>
169+
internal async Task RunAsync()
170+
{
171+
try
172+
{
173+
var task = _task();
174+
if (task is null)
175+
{
176+
throw new InvalidOperationException("在指定 KeepLastReentrancyTask 的任务时,方法内不允许返回 null。请至少返回 Task.FromResult<object>(null)。");
177+
}
178+
var result = await task.ConfigureAwait(false);
179+
_taskSource.SetResult(result);
180+
Result = result;
181+
}
182+
#pragma warning disable CA1031 // 异常已经被通知到异步代码中,因此此处无需处理异常。
183+
catch (Exception ex)
184+
{
185+
_taskSource.SetException(ex);
186+
Exception = ex;
187+
}
188+
#pragma warning restore CA1031 // 异常已经被通知到异步代码中,因此此处无需处理异常。
189+
}
190+
191+
public void SetResult(TReturn result)
192+
{
193+
if (_taskSource.Task.IsCompleted || _taskSource.Task.IsFaulted)
194+
{
195+
return;
196+
}
197+
198+
_taskSource.SetResult(result);
199+
}
200+
201+
public void SetException(Exception exception)
202+
{
203+
if (_taskSource.Task.IsCompleted || _taskSource.Task.IsFaulted)
204+
{
205+
return;
206+
}
207+
208+
_taskSource.SetException(exception);
209+
}
210+
211+
/// <summary>
212+
/// 将此异步包装器作为 <see cref="Task"/> 使用,以便获得 async/await 特性。
213+
/// </summary>
214+
internal Task<TReturn> AsTask() => _taskSource.Task;
215+
}
216+
}
217+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace dotnetCampus.Threading.Reentrancy
7+
{
8+
/// <summary>
9+
/// 以队列策略执行的可重入任务。
10+
/// </summary>
11+
/// <typeparam name="TParameter">
12+
/// 重入任务中单次执行时所使用的参数。
13+
/// 此重入策略不会忽略任何参数。
14+
/// </typeparam>
15+
/// <typeparam name="TReturn">
16+
/// 重入任务中单次执行时所得到的返回值。
17+
/// 此重入策略不会忽略任何返回值。
18+
/// </typeparam>
19+
public sealed class QueueReentrancyTask<TParameter, TReturn> : ReentrancyTask<TParameter, TReturn>
20+
{
21+
/// <summary>
22+
/// 用于原子操作判断当前是否正在执行队列中的可重入任务。
23+
/// 1 表示当前正在执行可重入任务;0 表示不确定。
24+
/// 不可使用 bool 类型,因为 bool 类型无法执行可靠的原子操作。
25+
/// </summary>
26+
private volatile int _isRunning;
27+
28+
/// <summary>
29+
/// 由于原子操作仅提供高性能的并发处理而不保证准确性,因此需要一个锁来同步 <see cref="_isRunning"/> 中值为 0 时所指的不确定情况。
30+
/// 不能使用一个锁来同步所有情况是因为在锁中使用 async/await 是不安全的,因此避免在锁中执行异步任务;我们使用原子操作来判断异步任务的执行条件。
31+
/// </summary>
32+
private readonly object _locker = new object();
33+
34+
/// <summary>
35+
/// 使用一个并发队列来表示目前已加入到队列中的全部可重入任务。
36+
/// 因为我们的 <see cref="_locker"/> 不能锁全部队列操作(原因见 <see cref="_locker"/>),因此需要使用并发队列。
37+
/// </summary>
38+
private readonly ConcurrentQueue<TaskWrapper> _queue = new ConcurrentQueue<TaskWrapper>();
39+
40+
private readonly bool _configureAwait;
41+
42+
/// <summary>
43+
/// 创建以队列策略执行的可重入任务。
44+
/// </summary>
45+
/// <param name="task">可重入任务本身。</param>
46+
/// <param name="configureAwait"></param>
47+
public QueueReentrancyTask(Func<TParameter, Task<TReturn>> task, bool configureAwait) : base(task)
48+
{
49+
_configureAwait = configureAwait;
50+
}
51+
52+
/// <summary>
53+
/// 创建以队列策略执行的可重入任务
54+
/// </summary>
55+
/// <param name="task"></param>
56+
public QueueReentrancyTask(Func<TParameter, Task<TReturn>> task) : this(task, false) { }
57+
58+
/// <summary>
59+
/// 以队列策略执行重入任务,并获取此次重入任务的返回值。
60+
/// 此重入策略会确保执行每一次可重入任务。
61+
/// </summary>
62+
/// <param name="arg">此次重入任务使用的参数。</param>
63+
/// <returns>重入任务本次执行的返回值。</returns>
64+
public override Task<TReturn> InvokeAsync(TParameter arg)
65+
{
66+
var wrapper = new TaskWrapper(() => RunCore(arg));
67+
_queue.Enqueue(wrapper);
68+
Run();
69+
return wrapper.AsTask();
70+
}
71+
72+
/// <summary>
73+
/// 以队列策略执行重入任务。此方法确保线程安全。
74+
/// </summary>
75+
private async void Run()
76+
{
77+
var isRunning = Interlocked.CompareExchange(ref _isRunning, 1, 0);
78+
if (isRunning is 1)
79+
{
80+
lock (_locker)
81+
{
82+
if (_isRunning is 1)
83+
{
84+
// 当前已经在执行队列,因此无需继续执行。
85+
return;
86+
}
87+
}
88+
}
89+
90+
var hasTask = true;
91+
while (hasTask)
92+
{
93+
// 当前还没有任何队列开始执行,因此需要开始执行队列。
94+
while (_queue.TryDequeue(out var wrapper))
95+
{
96+
// 内部已包含异常处理,因此外面可以无需捕获或者清理。
97+
await wrapper.RunAsync().ConfigureAwait(_configureAwait);
98+
}
99+
100+
lock (_locker)
101+
{
102+
hasTask = _queue.TryPeek(out _);
103+
if (!hasTask)
104+
{
105+
_isRunning = 0;
106+
}
107+
}
108+
}
109+
}
110+
111+
/// <summary>
112+
/// 包装一个异步任务,以便在可以执行此异步任务的情况下可以在其他方法中监视其完成情况。
113+
/// </summary>
114+
private class TaskWrapper
115+
{
116+
/// <summary>
117+
/// 创建一个任务包装。
118+
/// </summary>
119+
internal TaskWrapper(Func<Task<TReturn>> workingTask)
120+
{
121+
_taskSource = new TaskCompletionSource<TReturn>();
122+
_task = workingTask;
123+
}
124+
125+
private readonly TaskCompletionSource<TReturn> _taskSource;
126+
private readonly Func<Task<TReturn>> _task;
127+
128+
/// <summary>
129+
/// 执行此异步任务。
130+
/// </summary>
131+
internal async Task RunAsync()
132+
{
133+
try
134+
{
135+
var result = await _task().ConfigureAwait(false);
136+
_taskSource.SetResult(result);
137+
}
138+
#pragma warning disable CA1031 // 异常已经被通知到异步代码中,因此此处无需处理异常。
139+
catch (Exception ex)
140+
{
141+
_taskSource.SetException(ex);
142+
}
143+
#pragma warning restore CA1031 // 异常已经被通知到异步代码中,因此此处无需处理异常。
144+
}
145+
146+
/// <summary>
147+
/// 将此异步包装器作为 <see cref="Task"/> 使用,以便获得 async/await 特性。
148+
/// </summary>
149+
internal Task<TReturn> AsTask() => _taskSource.Task;
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)