-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Expected Behavior
We write the context with contextWrite and then we expect to be able to pick up propagated context just before sending a Kafka message when we wrote the context with contextWrite earlier in the reactive chain.
Actual Behaviour
We get:
java.lang.IllegalStateException: No active propagation context!
at io.micronaut.core.propagation.PropagatedContextImpl.get(PropagatedContextImpl.java:82)
at io.micronaut.core.propagation.PropagatedContext.get(PropagatedContext.java:79)
at net.discordia.kafka.propagate.context.kafka.MainConsumer.sendSecondaryMessage(MainConsumer.java:41)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
at reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(MonoContextWriteRestoringThreadLocals.java:110)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2570)
at reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.request(MonoContextWriteRestoringThreadLocals.java:156)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:54)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
at reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(MonoContextWriteRestoringThreadLocals.java:95)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
at reactor.core.publisher.MonoContextWriteRestoringThreadLocals.subscribe(MonoContextWriteRestoringThreadLocals.java:44)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.Mono.block(Mono.java:1778)
at net.discordia.kafka.propagate.context.kafka.MainConsumer.consume(MainConsumer.java:36)
at net.discordia.kafka.propagate.context.kafka.$MainConsumer$Definition$Exec.dispatch(Unknown Source)
at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invoke(AbstractExecutableMethodsDefinition.java:456)
at io.micronaut.inject.DelegatingExecutableMethod.invoke(DelegatingExecutableMethod.java:86)
at io.micronaut.core.bind.DefaultExecutableBinder$1.invoke(DefaultExecutableBinder.java:108)
at io.micronaut.configuration.kafka.processor.ConsumerStateSingle.process(ConsumerStateSingle.java:118)
at io.micronaut.configuration.kafka.processor.ConsumerStateSingle.processRecords(ConsumerStateSingle.java:91)
at io.micronaut.configuration.kafka.processor.ConsumerState.pollAndProcessRecords(ConsumerState.java:214)
at io.micronaut.configuration.kafka.processor.ConsumerState.refreshAssignmentsPollAndProcessRecords(ConsumerState.java:164)
at io.micronaut.configuration.kafka.processor.ConsumerState.threadPollLoop(ConsumerState.java:154)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104)
at reactor.core.publisher.Mono.block(Mono.java:1779)
... 16 common frames omitted
Steps To Reproduce
Are use case is that we want to propagate some data from a http reuqest through a chain of micro service. The first service takes HTTP and there we uase a reqeust filter to populate the ContextPropagation fromMDC with MdcPropagationContext.class. That seem to work. This then produces a kafka message and we use the propagated context to write a header on the kakfa message with the request id. That works.
The the next service takes in the Kafka message and then itself then produces a new Kafka message. So Kafka is both input and output. Here is where it gets tricky. We can't seem to set a PropagatedContext from the consumer.
We have tried a MethodInterceptor that does:
@Override
public Object intercept(final MethodInvocationContext<Object, Object> context) {
var maybeRecord = findArgument(context, ConsumerRecord.class);
var headers = extractContextHeader(record);
var propagatedContext =
PropagatedContext.getOrEmpty().plus(new MdcPropagationContext(headers));
try (Scope ignore = propagatedContext.propagate()) {
log.debug("Found record headers: {}", headers);
Object result = null;
if (nonEmptyReturns == consumerFilters.size()) {
result = context.proceed();
}
return result;
}
}
That does not seem to work.
We have also tried up in the consumer:
@KafkaListener(
groupId = "name",
offsetReset = OffsetReset.EARLIEST,
offsetStrategy = OffsetStrategy.SYNC_PER_RECORD
)
@Slf4j
@AllArgsConstructor
public class SomeEventConsumer {
private final SomeService someService;
@Blocking
@Topic(SomeEvent.KAFKA_TOPIC)
public void consume(
final ConsumerRecord<UUID, SomeEvent> record
) {
var headers = KafkaContextUtils.extractContextHeader(record);
var propagatedContext =
PropagatedContext.getOrEmpty().plus(new MdcPropagationContext(headers));
Mono.just(record)
.contextWrite(ctx -> ReactorPropagation.addPropagatedContext(ctx, propagatedContext))
.map(ConsumerRecord::value)
.flatMap(e -> someService.handleEvent(e.getPayload()))
.doOnError(t -> log.warn("Failed to handle event", t))
.onErrorResume(t -> Mono.empty())
.block();
}
}
Also does not seem to work. Further down the chain where we try to pick this up there is no active PropagatedContext.
Example project that shows the problem: https://github.com/Discordia/kafka-propagate-context
Environment Information
MacOs 15.7.1
Microsoft OpenJDK 21
Example Application
https://github.com/Discordia/kafka-propagate-context
Version
4.9.4