Skip to content

Commit 19c2c00

Browse files
committed
Modified things.
1 parent 6a03235 commit 19c2c00

File tree

2 files changed

+38
-142
lines changed

2 files changed

+38
-142
lines changed

src/ZirconNet.Core/Async/AsyncTaskQueue.cs

Lines changed: 38 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,24 @@ namespace ZirconNet.Core.Async;
77
/// <summary>
88
/// A class that allows queueing and running tasks asynchronously with a specified maximum number of concurrent tasks.
99
/// </summary>
10-
public sealed class AsyncTaskQueue : IDisposable
10+
public sealed class AsyncTaskQueue
1111
{
1212
private SemaphoreSlim _taskSemaphore;
13-
private TaskCompletionSource<bool> _queueCompletionSource;
13+
private SemaphoreSlim _queueSemaphore;
1414
private int _tasksInQueue = 0;
15-
private readonly ConcurrentBag<Exception> _exceptions;
15+
private readonly ConcurrentBag<Exception> _exceptions = new ();
16+
17+
public bool IsFaulted { get; private set; } = false;
1618

1719
public AsyncTaskQueue(int maximumThreads = -1)
1820
{
19-
if (maximumThreads <= 0 || maximumThreads > Environment.ProcessorCount)
21+
if (maximumThreads <= 0 || maximumThreads <= 0 || maximumThreads > Environment.ProcessorCount)
2022
{
2123
maximumThreads = Environment.ProcessorCount;
2224
}
2325

2426
_taskSemaphore = new SemaphoreSlim(maximumThreads);
25-
_queueCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
26-
_exceptions = new ConcurrentBag<Exception>();
27+
_queueSemaphore = new SemaphoreSlim(0, int.MaxValue);
2728
}
2829

2930
private async Task RunAction(Func<Task> actionToRun, CancellationToken cancellationToken)
@@ -33,66 +34,58 @@ private async Task RunAction(Func<Task> actionToRun, CancellationToken cancellat
3334

3435
_ = Task.Run(async () =>
3536
{
36-
try
37-
{
38-
await actionToRun();
39-
}
40-
catch (Exception ex)
37+
if (!cancellationToken.IsCancellationRequested)
4138
{
42-
_exceptions.Add(ex);
43-
}
44-
finally
45-
{
46-
_taskSemaphore.Release();
47-
if (Interlocked.Decrement(ref _tasksInQueue) == 0)
39+
try
4840
{
49-
_queueCompletionSource.TrySetResult(true);
41+
await actionToRun();
42+
}
43+
catch (Exception ex)
44+
{
45+
IsFaulted = true;
46+
_exceptions.Add(ex);
47+
}
48+
finally
49+
{
50+
Interlocked.Decrement(ref _tasksInQueue);
51+
if (_tasksInQueue == 0)
52+
{
53+
_queueSemaphore.Release();
54+
}
55+
_taskSemaphore.Release();
5056
}
5157
}
52-
});
58+
}, cancellationToken);
5359
}
5460

55-
public Task AddTaskAsync(Func<Task> actionToRun, CancellationToken cancellationToken = default)
61+
public async Task AddTaskAsync(Func<Task> actionToRun, CancellationToken cancellationToken = default)
5662
{
5763
if (!cancellationToken.IsCancellationRequested)
5864
{
59-
return RunAction(actionToRun, cancellationToken);
65+
await RunAction(actionToRun, cancellationToken);
6066
}
61-
62-
return Task.CompletedTask;
6367
}
6468

65-
public async Task WaitForQueueEnd(CancellationToken cancellationToken = default)
69+
public async Task WaitForQueueToEndAsync(CancellationToken cancellationToken = default)
6670
{
67-
using (cancellationToken.Register(() => _queueCompletionSource.TrySetCanceled()))
68-
{
69-
await _queueCompletionSource.Task.ConfigureAwait(false);
70-
}
71+
await _queueSemaphore.WaitAsync(cancellationToken);
7172
}
7273

73-
public IReadOnlyList<Exception> GetExceptions()
74+
public async ValueTask Reset(int maximumThreads = -1)
7475
{
75-
return _exceptions.ToList().AsReadOnly();
76-
}
77-
78-
public async Task Reset(int maximumThreads = -1, CancellationToken cancellationToken = default)
79-
{
80-
await WaitForQueueEnd(cancellationToken);
76+
if (_tasksInQueue is not 0)
77+
{
78+
await WaitForQueueToEndAsync();
79+
}
8180

82-
if (maximumThreads <= 0 || maximumThreads > Environment.ProcessorCount)
81+
if (maximumThreads <= 0 || maximumThreads <= 0 || maximumThreads > Environment.ProcessorCount)
8382
{
8483
maximumThreads = Environment.ProcessorCount;
8584
}
8685

87-
_taskSemaphore.Dispose();
8886
_taskSemaphore = new SemaphoreSlim(maximumThreads);
89-
_queueCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
90-
91-
while (_exceptions.TryTake(out _)) { }
92-
}
93-
94-
public void Dispose()
95-
{
96-
_taskSemaphore.Dispose();
87+
_queueSemaphore = new SemaphoreSlim(0, int.MaxValue);
88+
_tasksInQueue = 0;
89+
IsFaulted = false;
9790
}
9891
}

src/ZirconNet.Core/Async/AsyncValueTaskQueue.cs

Lines changed: 0 additions & 97 deletions
This file was deleted.

0 commit comments

Comments
 (0)