Skip to content

Commit e6073cf

Browse files
committed
Optimized AsyncTaskQueue to reuse Semaphore if reset
1 parent 17d91c2 commit e6073cf

File tree

1 file changed

+44
-38
lines changed

1 file changed

+44
-38
lines changed

src/ZirconNet.Core/Async/AsyncTaskQueue.cs

Lines changed: 44 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@
44

55
namespace ZirconNet.Core.Async;
66

7-
/// <summary>
8-
/// A class that allows queueing and running tasks asynchronously with a specified maximum number of concurrent tasks.
9-
/// </summary>
107
public sealed class AsyncTaskQueue
118
{
129
private SemaphoreSlim _taskSemaphore;
@@ -19,11 +16,7 @@ public sealed class AsyncTaskQueue
1916

2017
public AsyncTaskQueue(int maximumThreads = -1)
2118
{
22-
if (maximumThreads <= 0 || maximumThreads <= 0 || maximumThreads > Environment.ProcessorCount)
23-
{
24-
maximumThreads = Environment.ProcessorCount;
25-
}
26-
19+
SetMaxThreads(ref maximumThreads);
2720
_taskSemaphore = new SemaphoreSlim(maximumThreads);
2821
_queueSemaphore = new SemaphoreSlim(0, int.MaxValue);
2922
_waitForFirst = new SemaphoreSlim(0, int.MaxValue);
@@ -34,33 +27,35 @@ private async Task RunAction(Func<Task> actionToRun, CancellationToken cancellat
3427
Interlocked.Increment(ref _tasksInQueue);
3528
await _taskSemaphore.WaitAsync(cancellationToken);
3629

37-
_ = Task.Run(async () =>
30+
if (!cancellationToken.IsCancellationRequested)
31+
{
32+
ThreadPool.QueueUserWorkItem(_ => RunTask(actionToRun));
33+
}
34+
}
35+
36+
private async void RunTask(Func<Task> actionToRun)
37+
{
38+
try
39+
{
40+
await actionToRun();
41+
}
42+
catch (Exception ex)
43+
{
44+
IsFaulted = true;
45+
_exceptions.Add(ex);
46+
}
47+
finally
3848
{
39-
if (!cancellationToken.IsCancellationRequested)
49+
Interlocked.Decrement(ref _tasksInQueue);
50+
if (_tasksInQueue == 0)
4051
{
41-
try
42-
{
43-
await actionToRun();
44-
}
45-
catch (Exception ex)
46-
{
47-
IsFaulted = true;
48-
_exceptions.Add(ex);
49-
}
50-
finally
51-
{
52-
Interlocked.Decrement(ref _tasksInQueue);
53-
if (_tasksInQueue == 0)
54-
{
55-
_queueSemaphore.Release();
56-
}
57-
_taskSemaphore.Release();
58-
}
52+
_queueSemaphore.Release();
5953
}
60-
}, cancellationToken);
54+
_taskSemaphore.Release();
55+
}
6156
}
6257

63-
public async Task AddTaskAsync(Func<Task> actionToRun, CancellationToken cancellationToken = default)
58+
public async Task EnqueueTask(Func<Task> actionToRun, CancellationToken cancellationToken = default)
6459
{
6560
if (!cancellationToken.IsCancellationRequested)
6661
{
@@ -83,20 +78,31 @@ public async ValueTask WaitForQueueToEnd(bool waitForFirstTask = true, Cancellat
8378

8479
public async ValueTask Reset(int maximumThreads = -1)
8580
{
86-
if (_tasksInQueue > 0)
81+
await WaitForQueueToEnd();
82+
83+
SetMaxThreads(ref maximumThreads);
84+
85+
_taskSemaphore.Release(maximumThreads - _taskSemaphore.CurrentCount);
86+
87+
while (_queueSemaphore.CurrentCount > 0)
8788
{
88-
await WaitForQueueToEnd();
89+
_queueSemaphore.Wait();
8990
}
9091

91-
if (maximumThreads <= 0 || maximumThreads <= 0 || maximumThreads > Environment.ProcessorCount)
92+
while (_waitForFirst.CurrentCount > 0)
9293
{
93-
maximumThreads = Environment.ProcessorCount;
94+
_waitForFirst.Wait();
9495
}
9596

96-
_taskSemaphore = new (maximumThreads);
97-
_queueSemaphore = new (0, int.MaxValue);
98-
_waitForFirst = new(0, int.MaxValue);
9997
_tasksInQueue = 0;
10098
IsFaulted = false;
10199
}
102-
}
100+
101+
private void SetMaxThreads(ref int maximumThreads)
102+
{
103+
if (maximumThreads <= 0 || maximumThreads > Environment.ProcessorCount)
104+
{
105+
maximumThreads = Environment.ProcessorCount;
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)