Skip to content

Commit 44f835f

Browse files
committed
feat: evolve worker distribution strategy
1 parent d5a9c21 commit 44f835f

11 files changed

+158
-97
lines changed

src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,19 +136,19 @@ IConsumerConfigurationBuilder WithWorkersCount(
136136
/// <summary>
137137
/// Sets the strategy to choose a worker when a message arrives
138138
/// </summary>
139-
/// <typeparam name="T">A class that implements the <see cref="IDistributionStrategy"/> interface</typeparam>
139+
/// <typeparam name="T">A class that implements the <see cref="IWorkerDistributionStrategy"/> interface</typeparam>
140140
/// <param name="factory">A factory to create the instance</param>
141141
/// <returns></returns>
142142
IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>(Factory<T> factory)
143-
where T : class, IDistributionStrategy;
143+
where T : class, IWorkerDistributionStrategy;
144144

145145
/// <summary>
146146
/// Sets the strategy to choose a worker when a message arrives
147147
/// </summary>
148-
/// <typeparam name="T">A class that implements the <see cref="IDistributionStrategy"/> interface</typeparam>
148+
/// <typeparam name="T">A class that implements the <see cref="IWorkerDistributionStrategy"/> interface</typeparam>
149149
/// <returns></returns>
150150
IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
151-
where T : class, IDistributionStrategy;
151+
where T : class, IWorkerDistributionStrategy;
152152

153153
/// <summary>
154154
/// Configures the consumer for manual message completion.

src/KafkaFlow.Abstractions/IDistributionStrategy.cs

Lines changed: 0 additions & 26 deletions
This file was deleted.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace KafkaFlow;
2+
3+
using System.Collections.Generic;
4+
using System.Threading.Tasks;
5+
6+
/// <summary>
7+
/// An interface used to create a distribution strategy
8+
/// </summary>
9+
public interface IWorkerDistributionStrategy
10+
{
11+
/// <summary>
12+
/// Initializes the distribution strategy, this method is called when a consumer is started
13+
/// </summary>
14+
/// <param name="workers">List of workers to be initialized</param>
15+
void Initialize(IReadOnlyList<IWorker> workers);
16+
17+
/// <summary>
18+
/// Retrieves an available worker based on the provided distribution strategy context.
19+
/// </summary>
20+
/// <param name="context">The distribution strategy context containing message and consumer details.</param>
21+
/// <returns>The selected <see cref="IWorker"/> instance.</returns>
22+
ValueTask<IWorker> GetWorkerAsync(WorkerDistributionStrategy context);
23+
}

src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,9 @@
77
<Description>Contains all KafkaFlow extendable interfaces</Description>
88
</PropertyGroup>
99

10+
<ItemGroup>
11+
<PackageReference Include="System.Memory" Version="4.5.5" />
12+
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
13+
</ItemGroup>
14+
1015
</Project>
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
namespace KafkaFlow;
2+
3+
using System;
4+
using System.Threading;
5+
6+
/// <summary>
7+
/// Represents a strategy context for distributing workers based on specific message and consumer details.
8+
/// </summary>
9+
public ref struct WorkerDistributionStrategy
10+
{
11+
/// <summary>
12+
/// Initializes a new instance of the <see cref="WorkerDistributionStrategy"/> struct.
13+
/// </summary>
14+
/// <param name="consumerName">Name of the consumer.</param>
15+
/// <param name="topic">Topic associated with the message.</param>
16+
/// <param name="partition">Partition of the topic.</param>
17+
/// <param name="rawMessageKey">Raw key of the message.</param>
18+
/// <param name="consumerStoppedCancellationToken">A cancellation token that is cancelled when the consumer has stopped</param>
19+
public WorkerDistributionStrategy(
20+
string consumerName,
21+
string topic,
22+
int partition,
23+
ReadOnlyMemory<byte>? rawMessageKey,
24+
CancellationToken consumerStoppedCancellationToken)
25+
{
26+
this.ConsumerName = consumerName;
27+
this.Topic = topic;
28+
this.Partition = partition;
29+
this.RawMessageKey = rawMessageKey;
30+
this.ConsumerStoppedCancellationToken = consumerStoppedCancellationToken;
31+
}
32+
33+
/// <summary>
34+
/// Gets the name of the consumer.
35+
/// </summary>
36+
public string ConsumerName { get; }
37+
38+
/// <summary>
39+
/// Gets the topic associated with the message.
40+
/// </summary>
41+
public string Topic { get; }
42+
43+
/// <summary>
44+
/// Gets the partition number of the topic.
45+
/// </summary>
46+
public int Partition { get; }
47+
48+
/// <summary>
49+
/// Gets the raw key of the message.
50+
/// </summary>
51+
public ReadOnlyMemory<byte>? RawMessageKey { get; }
52+
53+
/// <summary>
54+
/// Gets the cancellation token that is cancelled when the consumer has stopped
55+
/// </summary>
56+
public CancellationToken ConsumerStoppedCancellationToken { get; }
57+
}

src/KafkaFlow/Configuration/ConsumerConfiguration.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public ConsumerConfiguration(
2020
TimeSpan workersCountEvaluationInterval,
2121
int bufferSize,
2222
TimeSpan workerStopTimeout,
23-
Factory<IDistributionStrategy> distributionStrategyFactory,
23+
Factory<IWorkerDistributionStrategy> distributionStrategyFactory,
2424
IReadOnlyList<MiddlewareConfiguration> middlewaresConfigurations,
2525
bool autoMessageCompletion,
2626
bool noStoreOffsets,
@@ -69,7 +69,7 @@ public ConsumerConfiguration(
6969
"The value must be greater than 0");
7070
}
7171

72-
public Factory<IDistributionStrategy> DistributionStrategyFactory { get; }
72+
public Factory<IWorkerDistributionStrategy> DistributionStrategyFactory { get; }
7373

7474
public IReadOnlyList<MiddlewareConfiguration> MiddlewaresConfigurations { get; }
7575

src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild
3636
private ConsumerInitialState initialState = ConsumerInitialState.Running;
3737
private int statisticsInterval;
3838

39-
private Factory<IDistributionStrategy> distributionStrategyFactory = _ => new BytesSumDistributionStrategy();
39+
private Factory<IWorkerDistributionStrategy> distributionStrategyFactory = _ => new BytesSumDistributionStrategy();
4040
private TimeSpan autoCommitInterval = TimeSpan.FromSeconds(5);
4141

4242
private ConsumerCustomFactory customFactory = (consumer, _) => consumer;
@@ -158,14 +158,14 @@ public IConsumerConfigurationBuilder WithWorkerStopTimeout(TimeSpan timeout)
158158
}
159159

160160
public IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>(Factory<T> factory)
161-
where T : class, IDistributionStrategy
161+
where T : class, IWorkerDistributionStrategy
162162
{
163163
this.distributionStrategyFactory = factory;
164164
return this;
165165
}
166166

167167
public IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
168-
where T : class, IDistributionStrategy
168+
where T : class, IWorkerDistributionStrategy
169169
{
170170
this.DependencyConfigurator.AddTransient<T>();
171171
this.distributionStrategyFactory = resolver => resolver.Resolve<T>();

src/KafkaFlow/Configuration/IConsumerConfiguration.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public interface IConsumerConfiguration
1313
/// <summary>
1414
/// Gets the consumer worker distribution strategy
1515
/// </summary>
16-
Factory<IDistributionStrategy> DistributionStrategyFactory { get; }
16+
Factory<IWorkerDistributionStrategy> DistributionStrategyFactory { get; }
1717

1818
/// <summary>
1919
/// Gets the consumer middlewares configurations

src/KafkaFlow/Consumers/ConsumerWorkerPool.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool
1414
private readonly IDependencyResolver consumerDependencyResolver;
1515
private readonly IMiddlewareExecutor middlewareExecutor;
1616
private readonly ILogHandler logHandler;
17-
private readonly Factory<IDistributionStrategy> distributionStrategyFactory;
17+
private readonly Factory<IWorkerDistributionStrategy> distributionStrategyFactory;
1818
private readonly IOffsetCommitter offsetCommitter;
1919

2020
private readonly Event workerPoolStoppedSubject;
2121

2222
private TaskCompletionSource<object> startedTaskSource = new();
2323
private List<IConsumerWorker> workers = new();
2424

25-
private IDistributionStrategy distributionStrategy;
25+
private IWorkerDistributionStrategy distributionStrategy;
2626
private IOffsetManager offsetManager;
2727

2828
public ConsumerWorkerPool(
@@ -85,7 +85,7 @@ await Task.WhenAll(
8585
.ConfigureAwait(false);
8686

8787
this.distributionStrategy = this.distributionStrategyFactory(this.consumerDependencyResolver);
88-
this.distributionStrategy.Init(this.workers.AsReadOnly());
88+
this.distributionStrategy.Initialize(this.workers.AsReadOnly());
8989

9090
this.startedTaskSource.TrySetResult(null);
9191
}
@@ -130,7 +130,13 @@ public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, Cancellati
130130
await this.startedTaskSource.Task.ConfigureAwait(false);
131131

132132
var worker = (IConsumerWorker)await this.distributionStrategy
133-
.GetWorkerAsync(message.Message.Key, stopCancellationToken)
133+
.GetWorkerAsync(
134+
new WorkerDistributionStrategy(
135+
this.consumer.Configuration.ConsumerName,
136+
message.Topic,
137+
message.Partition.Value,
138+
message.Message.Key,
139+
stopCancellationToken))
134140
.ConfigureAwait(false);
135141

136142
if (worker is null)
Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,42 @@
1-
namespace KafkaFlow.Consumers.DistributionStrategies
1+
namespace KafkaFlow.Consumers.DistributionStrategies;
2+
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Threading.Tasks;
6+
7+
/// <summary>
8+
/// This strategy sums all bytes in the partition key and apply a mod operator with the total number of workers, the resulting number is the worker ID to be chosen
9+
/// This algorithm is fast and creates a good work balance. Messages with the same partition key are always delivered in the same worker, so, message order is guaranteed
10+
/// Set an optimal message buffer value to avoid idle workers (it will depends how many messages with the same partition key are consumed)
11+
/// </summary>
12+
public class BytesSumDistributionStrategy : IWorkerDistributionStrategy
213
{
3-
using System.Collections.Generic;
4-
using System.Linq;
5-
using System.Threading;
6-
using System.Threading.Tasks;
14+
private IReadOnlyList<IWorker> workers;
715

8-
/// <summary>
9-
/// This strategy sums all bytes in the partition key and apply a mod operator with the total number of workers, the resulting number is the worker ID to be chosen
10-
/// This algorithm is fast and creates a good work balance. Messages with the same partition key are always delivered in the same worker, so, message order is guaranteed
11-
/// Set an optimal message buffer value to avoid idle workers (it will depends how many messages with the same partition key are consumed)
12-
/// </summary>
13-
public class BytesSumDistributionStrategy : IDistributionStrategy
16+
/// <inheritdoc />
17+
public void Initialize(IReadOnlyList<IWorker> workers)
1418
{
15-
private IReadOnlyList<IWorker> workers;
19+
this.workers = workers;
20+
}
1621

17-
/// <inheritdoc />
18-
public void Init(IReadOnlyList<IWorker> workers)
22+
/// <inheritdoc />
23+
public ValueTask<IWorker> GetWorkerAsync(WorkerDistributionStrategy context)
24+
{
25+
if (context.RawMessageKey is null || this.workers.Count == 1)
1926
{
20-
this.workers = workers;
27+
return new ValueTask<IWorker>(this.workers[0]);
2128
}
2229

23-
/// <inheritdoc />
24-
public Task<IWorker> GetWorkerAsync(byte[] partitionKey, CancellationToken cancellationToken)
25-
{
26-
if (partitionKey is null || this.workers.Count == 1)
27-
{
28-
return Task.FromResult(this.workers[0]);
29-
}
30+
var bytesSum = 0;
3031

31-
var bytesSum = 0;
32-
33-
for (int i = 0; i < partitionKey.Length; i++)
34-
{
35-
bytesSum += partitionKey[i];
36-
}
37-
38-
return Task.FromResult(
39-
cancellationToken.IsCancellationRequested
40-
? null
41-
: this.workers.ElementAtOrDefault(bytesSum % this.workers.Count));
32+
for (var i = 0; i < context.RawMessageKey.Value.Length; i++)
33+
{
34+
bytesSum += context.RawMessageKey.Value.Span[i];
4235
}
36+
37+
return new ValueTask<IWorker>(
38+
context.ConsumerStoppedCancellationToken.IsCancellationRequested
39+
? null
40+
: this.workers.ElementAtOrDefault(bytesSum % this.workers.Count));
4341
}
4442
}
Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,30 @@
1-
namespace KafkaFlow.Consumers.DistributionStrategies
1+
namespace KafkaFlow.Consumers.DistributionStrategies;
2+
3+
using System.Collections.Generic;
4+
using System.Threading.Channels;
5+
using System.Threading.Tasks;
6+
7+
/// <summary>
8+
/// This strategy chooses the first free worker to process the message. When a worker finishes the processing, it notifies the worker pool that it is free to get a new message
9+
/// This is the fastest and resource-friendly strategy (the message buffer is not used) but messages with the same partition key can be delivered in different workers, so, no message order guarantee
10+
/// </summary>
11+
public class FreeWorkerDistributionStrategy : IWorkerDistributionStrategy
212
{
3-
using System.Collections.Generic;
4-
using System.Threading;
5-
using System.Threading.Channels;
6-
using System.Threading.Tasks;
13+
private readonly Channel<IWorker> freeWorkers = Channel.CreateUnbounded<IWorker>();
714

8-
/// <summary>
9-
/// This strategy chooses the first free worker to process the message. When a worker finishes the processing, it notifies the worker pool that it is free to get a new message
10-
/// This is the fastest and resource-friendly strategy (the message buffer is not used) but messages with the same partition key can be delivered in different workers, so, no message order guarantee
11-
/// </summary>
12-
public class FreeWorkerDistributionStrategy : IDistributionStrategy
15+
/// <inheritdoc />
16+
public void Initialize(IReadOnlyList<IWorker> workers)
1317
{
14-
private readonly Channel<IWorker> freeWorkers = Channel.CreateUnbounded<IWorker>();
15-
16-
/// <inheritdoc />
17-
public void Init(IReadOnlyList<IWorker> workers)
18+
foreach (var worker in workers)
1819
{
19-
foreach (var worker in workers)
20-
{
21-
worker.WorkerProcessingEnded.Subscribe(_ => Task.FromResult(this.freeWorkers.Writer.WriteAsync(worker)));
22-
this.freeWorkers.Writer.TryWrite(worker);
23-
}
20+
worker.WorkerProcessingEnded.Subscribe(_ => Task.FromResult(this.freeWorkers.Writer.WriteAsync(worker)));
21+
this.freeWorkers.Writer.TryWrite(worker);
2422
}
23+
}
2524

26-
/// <inheritdoc />
27-
public Task<IWorker> GetWorkerAsync(byte[] partitionKey, CancellationToken cancellationToken)
28-
{
29-
return this.freeWorkers.Reader.ReadAsync(cancellationToken).AsTask();
30-
}
25+
/// <inheritdoc />
26+
public ValueTask<IWorker> GetWorkerAsync(WorkerDistributionStrategy context)
27+
{
28+
return this.freeWorkers.Reader.ReadAsync(context.ConsumerStoppedCancellationToken);
3129
}
3230
}

0 commit comments

Comments
 (0)