Skip to content

Commit 1aeacf5

Browse files
Updated README.md (#480)
1 parent 7e8eeb4 commit 1aeacf5

File tree

1 file changed

+70
-90
lines changed

1 file changed

+70
-90
lines changed

README.md

Lines changed: 70 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ All stages are build with Akka.Streams advantages in mind:
99
- Each stage can make use of it's own `IConsumer` or `IProducer` instance, or can share them (can be used for optimization)
1010
- All Kafka failures can be handled with usual stream error handling strategies
1111

12-
## Builds
13-
[![Build status](https://ci.appveyor.com/api/projects/status/0glh2fi8uic17vl4/branch/dev?svg=true)](https://ci.appveyor.com/project/akkadotnet-contrib/akka-streams-kafka/branch/dev)
14-
1512
## Producer
1613

1714
A producer publishes messages to Kafka topics. The message itself contains information about what topic and partition to publish to so you can publish to different topics with the same producer.
@@ -123,7 +120,7 @@ Source
123120

124121
This flow accepts implementations of `Akka.Streams.Kafka.Messages.IEnvelope` and return `Akka.Streams.Kafka.Messages.IResults` elements.
125122
`IEnvelope` elements contain an extra field to pass through data, the so called `passThrough`.
126-
Its value is passed through the flow and becomes available in the `ProducerMessage.Results`s `PassThrough`.
123+
Its value is passed through the flow and becomes available in the `ProducerMessage.Results`'s `PassThrough`.
127124
It can for example hold a `Akka.Streams.Kafka.Messages.CommittableOffset` or `Akka.Streams.Kafka.Messages.CommittableOffsetBatch` (from a `KafkaConsumer.CommittableSource`)
128125
that can be committed after publishing to Kafka:
129126

@@ -252,76 +249,7 @@ var settings = ConsumerSettings<string, string>.Create(actorSystem, null, null)
252249

253250
### Default Consumer HOCON Settings
254251

255-
```HOCON
256-
# Properties for akka.kafka.ConsumerSettings can be
257-
# defined in this section or a configuration section with
258-
# the same layout.
259-
akka.kafka.consumer {
260-
# Tuning property of scheduled polls.
261-
# Controls the interval from one scheduled poll to the next.
262-
poll-interval = 50ms
263-
264-
# Tuning property of the `KafkaConsumer.poll` parameter.
265-
# Note that non-zero value means that the thread that
266-
# is executing the stage will be blocked. See also the `wakup-timeout` setting below.
267-
poll-timeout = 50ms
268-
269-
# The stage will delay stopping the internal actor to allow processing of
270-
# messages already in the stream (required for successful committing).
271-
# This can be set to 0 for streams using `DrainingControl`.
272-
stop-timeout = 30s
273-
274-
# If offset commit requests are not completed within this timeout
275-
# the returned Future is completed `CommitTimeoutException`.
276-
# The `Transactional.source` waits this ammount of time for the producer to mark messages as not
277-
# being in flight anymore as well as waiting for messages to drain, when rebalance is triggered.
278-
commit-timeout = 15s
279-
280-
# If commits take longer than this time a warning is logged
281-
commit-time-warning = 1s
282-
283-
# Not relevant for Kafka after version 2.1.0.
284-
# If set to a finite duration, the consumer will re-send the last committed offsets periodically
285-
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
286-
commit-refresh-interval = infinite
287-
288-
buffer-size = 128
289-
290-
# Fully qualified config path which holds the dispatcher configuration
291-
# to be used by the KafkaConsumerActor. Some blocking may occur.
292-
use-dispatcher = "akka.kafka.default-dispatcher"
293-
294-
# Properties defined by Confluent.Kafka.ConsumerConfig
295-
# can be defined in this configuration section.
296-
kafka-clients {
297-
# Disable auto-commit by default
298-
enable.auto.commit = false
299-
}
300-
301-
# Time to wait for pending requests when a partition is closed
302-
wait-close-partition = 500ms
303-
304-
# Limits the query to Kafka for a topic's position
305-
position-timeout = 5s
306-
307-
# When using `AssignmentOffsetsForTimes` subscriptions: timeout for the
308-
# call to Kafka's API
309-
offset-for-times-timeout = 5s
310-
311-
# Timeout for akka.kafka.Metadata requests
312-
# This value is used instead of Kafka's default from `default.api.timeout.ms`
313-
# which is 1 minute.
314-
metadata-request-timeout = 5s
315-
316-
# Interval for checking that transaction was completed before closing the consumer.
317-
# Used in the transactional flow for exactly-once-semantics processing.
318-
eos-draining-check-interval = 30ms
319-
320-
# Issue warnings when a call to a partition assignment handler method takes
321-
# longer than this.
322-
partition-handler-warning = 5s
323-
}
324-
```
252+
See [`reference.conf`](https://github.com/akkadotnet/Akka.Streams.Kafka/blob/dev/src/Akka.Streams.Kafka/reference.conf) for the latest on settings.
325253

326254
### PlainSource
327255

@@ -357,27 +285,73 @@ The `KafkaConsumer.CommittableSource` makes it possible to commit offset positio
357285

358286
If you need to store offsets in anything other than Kafka, `PlainSource` should be used instead of this API.
359287

360-
This is useful when “at-least once delivery” is desired, as each message will likely be delivered one time but in failure cases could be duplicated:
288+
This is useful when "at-least once delivery" is desired, as each message will likely be delivered one time but in failure cases could be duplicated.
289+
290+
The recommended way to handle commits is to use the built-in `Committer` facilities, which provide proper batching and error handling:
361291

362292
```C#
363-
KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Topics("topic1"))
364-
.SelectAsync(1, async elem =>
293+
// Recommended pattern - using Committer.Sink for safe batched commits
294+
var control = KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Topics("topic1"))
295+
.ToMaterialized(Committer.Sink(CommitterSettings.Create(system)), Keep.Both)
296+
.MapMaterializedValue(DrainingControl<Done>.Create)
297+
.Run(materializer);
298+
299+
// For more complex scenarios, you can process messages before committing
300+
var control = KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Topics("topic1"))
301+
.SelectAsync(parallelism: 10, async message =>
365302
{
366-
await elem.CommitableOffset.Commit();
367-
return Done.Instance;
303+
await ProcessMessage(message.Record); // Your message processing logic
304+
return message.CommitableOffset;
368305
})
369-
.RunWith(Sink.Ignore<Done>(), _materializer);
306+
.ToMaterialized(Committer.Sink(CommitterSettings.Create(system)), Keep.Both)
307+
.MapMaterializedValue(DrainingControl<Done>.Create)
308+
.Run(materializer);
309+
310+
// When you need to produce messages to Kafka before committing
311+
DrainingControl<Done> control = KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Topics("topic1"))
312+
.Select(msg =>
313+
ProducerMessage.Single(
314+
new ProducerRecord<Null, string>("topic2", msg.Record.Message.Key, msg.Record.Message.Value),
315+
msg.CommitableOffset))
316+
.Via(KafkaProducer.FlexiFlow<Null, string, ICommittableOffset>(producerSettings))
317+
.Select(ICommittable (result) => result.PassThrough)
318+
.ToMaterialized(Committer.Sink(CommitterSettings.Create(system)), Keep.Both)
319+
.MapMaterializedValue<DrainingControl<Done>>(tuple => DrainingControl.Create(tuple.Item1, tuple.Item2))
320+
.Run(system);
321+
```
322+
323+
The `Committer` facilities handle batching automatically based on your `CommitterSettings`. You can configure batch size, parallelism, and other parameters:
324+
325+
```C#
326+
var committerSettings = CommitterSettings.Create(system)
327+
.WithMaxBatch(100) // Maximum number of offsets in one commit
328+
.WithParallelism(5) // Number of commits that can be in progress at the same time
329+
.WithMaxInterval(TimeSpan.FromSeconds(3)); // Maximum interval between commits
370330
```
371-
The above example uses separate `SelectAsync` stages for processing and committing. This guarantees that for parallelism higher than 1 we will keep correct ordering of messages sent for commit.
372331

373-
Committing the offset for each message as illustrated above is rather slow.
374-
It is recommended to batch the commits for better throughput, with the trade-off that more messages may be re-delivered in case of failures.
332+
> **WARNING**: Avoid calling `CommittableOffset.Commit()` or `CommittableOffsetBatch.Commit()` directly. Always use the `Committer` facilities to ensure proper batching and error handling. Direct commits can lead to reduced performance and potential data loss in failure scenarios.
333+
334+
When using manual partition assignment or when you need more control over the commit process:
335+
336+
```C#
337+
var subscription = Subscriptions.Assignment(new TopicPartition("topic1", 0));
338+
339+
DrainingControl<Done> control = KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Assignment(topicPartition1))
340+
.Select(ICommittable (c) => c.CommitableOffset)
341+
.ToMaterialized(
342+
Committer.Sink(CommitterSettings.Create(system)
343+
.WithMaxBatch(100)
344+
.WithParallelism(5)),
345+
Keep.Both)
346+
.MapMaterializedValue<DrainingControl<Done>>(tuple => DrainingControl.Create(tuple.Item1, tuple.Item2))
347+
.Run(system);
348+
```
375349

376350
### PlainPartitionedSource
377351

378352
The `PlainPartitionedSource` is a way to track automatic partition assignment from Kafka.
379353
When a topic-partition is assigned to a consumer, this source will emit tuples with the assigned topic-partition and a corresponding source of `ConsumerRecord`s.
380-
When a topic-partition is revoked, the corresponding source completes.
354+
When a topic-partition is revoked, the corresponding source completes. As of version 1.5.39, the source automatically filters out any messages from recently revoked partitions, providing better consistency during rebalancing operations.
381355

382356
```c#
383357
var control = KafkaConsumer.PlainPartitionedSource(consumerSettings, Subscriptions.Topics(topic))
@@ -420,7 +394,7 @@ KafkaConsumer.CommitWithMetadataSource(settings, Subscriptions.Topics("topic"),
420394

421395
This source emits <see cref="ConsumeResult{TKey,TValue}"/> together with the offset position as flow context, thus makes it possible to commit offset positions to Kafka.
422396
This is useful when "at-least once delivery" is desired, as each message will likely be delivered one time but in failure cases could be duplicated.
423-
It is intended to be used with `KafkaProducer.FlowWithContext` and/or `Committer.SinkWithOffsetContext`
397+
It is intended to be used with `KafkaProducer.FlowWithContext` and/or `Committer.SinkWithOffsetContext`. As of version 1.5.39, this source includes improved partition handling with automatic filtering of messages from revoked partitions.
424398

425399
```c#
426400
var control = KafkaConsumer.SourceWithOffsetContext(consumerSettings, Subscriptions.Topics("topic1"))
@@ -462,18 +436,24 @@ but allows the use of an offset store outside of Kafka, while retaining the auto
462436
When a topic-partition is assigned to a consumer, the `getOffsetsOnAssign`
463437
function will be called to retrieve the offset, followed by a seek to the correct spot in the partition.
464438

439+
As of version 1.5.39, this source uses `IncrementalAssign` internally to prevent offset resets during partition reassignment, making it more reliable for scenarios where you're managing offsets externally - in other words: the stage now remembers any previous assignments you've made.
440+
465441
The `onRevoke` function gives the consumer a chance to store any uncommitted offsets, and do any other cleanup
466-
that is required.
442+
that is required. The source also automatically filters out any messages from recently revoked partitions to maintain consistency during rebalancing.
467443

468444
```c#
469445
var source = KafkaConsumer.PlainPartitionedManualOffsetSource(consumerSettings, Subscriptions.Topics(topic),
470446
assignedPartitions =>
471447
{
472-
// Handle assigned partitions
448+
// Handle assigned partitions - retrieve offsets from your external store
449+
return Task.FromResult(assignedPartitions.ToDictionary(
450+
p => p,
451+
_ => new TopicPartitionOffset(_.Topic, _.Partition, Offset.Stored)));
473452
},
474453
revokedPartitions =>
475454
{
476-
// Handle partitions that are revoked
455+
// Handle partitions that are revoked - store current offsets externally
456+
return Task.CompletedTask;
477457
})
478458
// Pass message values down to the stream
479459
.Select(m => m.Value);
@@ -685,11 +665,11 @@ When set, all logs will be written to `logs` subfolder near to your test assembl
685665
```C#
686666
public readonly string LogPath = $"logs\\{DateTime.Now:yyyy-MM-dd_HH-mm-ss}_{Guid.NewGuid():N}.txt";
687667
```
688-
689668
### Tests: Kafka container reuse
690669

691670
By default, tests are configured to be friendly to CI - that is, before starting tests docker Kafka images will be downloaded (if not yet exist) and containers started, and after all tests finish full cleanup will be performed (except the fact that downloaded docker images will not be removed).
692671

693672
While this might be useful when running tests locally, there are situations when you would like to save startup/shutdown tests time by using some pre-existing container, that will be used for all test runs and will not be stopped/started each time.
694673

695-
To achieve that, set `AKKA_STREAMS_KAFKA_TEST_CONTAINER_REUSE` environment variable on your local machine to any value. This will force using existing Kafka container, listening on port `29092` . Use `docker-compose up` console command in the root of project folder to get this container up and running.
674+
To achieve that, set `AKKA_STREAMS_KAFKA_TEST_CONTAINER_REUSE` environment variable on your local machine to any value. This will force using existing Kafka container, listening on port `29092` . Use `docker-compose up` console command in the root of project folder to get this container up and running.
675+

0 commit comments

Comments
 (0)