Skip to content

Context Propagation does not work from a Kafka consumer when using reactive streams #12122

@Discordia

Description

@Discordia

Expected Behavior

We write the context with contextWrite and then we expect to be able to pick up propagated context just before sending a Kafka message when we wrote the context with contextWrite earlier in the reactive chain.

Actual Behaviour

We get:

java.lang.IllegalStateException: No active propagation context!
	at io.micronaut.core.propagation.PropagatedContextImpl.get(PropagatedContextImpl.java:82)
	at io.micronaut.core.propagation.PropagatedContext.get(PropagatedContext.java:79)
	at net.discordia.kafka.propagate.context.kafka.MainConsumer.sendSecondaryMessage(MainConsumer.java:41)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(MonoContextWriteRestoringThreadLocals.java:110)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2570)
	at reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.request(MonoContextWriteRestoringThreadLocals.java:156)
	at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
	at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:54)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)
	at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
	at reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(MonoContextWriteRestoringThreadLocals.java:95)
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	at reactor.core.publisher.MonoContextWriteRestoringThreadLocals.subscribe(MonoContextWriteRestoringThreadLocals.java:44)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
	at reactor.core.publisher.Mono.block(Mono.java:1778)
	at net.discordia.kafka.propagate.context.kafka.MainConsumer.consume(MainConsumer.java:36)
	at net.discordia.kafka.propagate.context.kafka.$MainConsumer$Definition$Exec.dispatch(Unknown Source)
	at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:456)
	at io.micronaut.inject.DelegatingExecutableMethod.invoke(DelegatingExecutableMethod.java:86)
	at io.micronaut.core.bind.DefaultExecutableBinder$1.invoke(DefaultExecutableBinder.java:108)
	at io.micronaut.configuration.kafka.processor.ConsumerStateSingle.process(ConsumerStateSingle.java:118)
	at io.micronaut.configuration.kafka.processor.ConsumerStateSingle.processRecords(ConsumerStateSingle.java:91)
	at io.micronaut.configuration.kafka.processor.ConsumerState.pollAndProcessRecords(ConsumerState.java:214)
	at io.micronaut.configuration.kafka.processor.ConsumerState.refreshAssignmentsPollAndProcessRecords(ConsumerState.java:164)
	at io.micronaut.configuration.kafka.processor.ConsumerState.threadPollLoop(ConsumerState.java:154)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104)
		at reactor.core.publisher.Mono.block(Mono.java:1779)
		... 16 common frames omitted

Steps To Reproduce

Are use case is that we want to propagate some data from a http reuqest through a chain of micro service. The first service takes HTTP and there we uase a reqeust filter to populate the ContextPropagation fromMDC with MdcPropagationContext.class. That seem to work. This then produces a kafka message and we use the propagated context to write a header on the kakfa message with the request id. That works.

The the next service takes in the Kafka message and then itself then produces a new Kafka message. So Kafka is both input and output. Here is where it gets tricky. We can't seem to set a PropagatedContext from the consumer.

We have tried a MethodInterceptor that does:

    @Override
    public Object intercept(final MethodInvocationContext<Object, Object> context) {
        var maybeRecord = findArgument(context, ConsumerRecord.class);
        var headers = extractContextHeader(record);
        var propagatedContext =
            PropagatedContext.getOrEmpty().plus(new MdcPropagationContext(headers));
        try (Scope ignore = propagatedContext.propagate()) {
            log.debug("Found record headers: {}", headers);

            Object result = null;
            if (nonEmptyReturns == consumerFilters.size()) {
                result = context.proceed();
            }

            return result;
        }
    }

That does not seem to work.

We have also tried up in the consumer:

@KafkaListener(
    groupId = "name",
    offsetReset = OffsetReset.EARLIEST,
    offsetStrategy = OffsetStrategy.SYNC_PER_RECORD
)
@Slf4j
@AllArgsConstructor
public class SomeEventConsumer {
    private final SomeService someService;

    @Blocking
    @Topic(SomeEvent.KAFKA_TOPIC)
    public void consume(
        final ConsumerRecord<UUID, SomeEvent> record
    ) {
        var headers = KafkaContextUtils.extractContextHeader(record); 
        var propagatedContext =
            PropagatedContext.getOrEmpty().plus(new MdcPropagationContext(headers));

        Mono.just(record)
            .contextWrite(ctx -> ReactorPropagation.addPropagatedContext(ctx, propagatedContext))
            .map(ConsumerRecord::value)
            .flatMap(e -> someService.handleEvent(e.getPayload()))
            .doOnError(t -> log.warn("Failed to handle event", t))
            .onErrorResume(t -> Mono.empty())
            .block();
    }
}

Also does not seem to work. Further down the chain where we try to pick this up there is no active PropagatedContext.

Example project that shows the problem: https://github.com/Discordia/kafka-propagate-context

Environment Information

MacOs 15.7.1
Microsoft OpenJDK 21

Example Application

https://github.com/Discordia/kafka-propagate-context

Version

4.9.4

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions