-
Notifications
You must be signed in to change notification settings - Fork 8
fix(doc): Transform Processor CDI Decorator into manual decorators #118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/*- | ||
* #%L | ||
* Quarkus Kafka Streams Processor | ||
* %% | ||
* Copyright (C) 2024 Amadeus s.a.s. | ||
* %% | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* #L% | ||
*/ | ||
package io.quarkiverse.kafkastreamsprocessor.api.decorator.processor; | ||
|
||
import org.apache.kafka.streams.processor.api.Processor; | ||
|
||
import lombok.Getter; | ||
import lombok.Setter; | ||
import lombok.experimental.Delegate; | ||
|
||
/** | ||
* Base class for all processor decorators. | ||
* <p> | ||
* If a decorator does not implement this abstract class, it will not be found by the | ||
* <code>KafkaClientSuppliedDecorator</code> for composition. | ||
* </p> | ||
* <p> | ||
* We remove the generic declaration from {@link Processor} because ArC complains about generics on class declaration of | ||
* a bean. | ||
* </p> | ||
* <p> | ||
* Class introduced in 2.0, for compatibility with Quarkus 3.8 random failure to start when using custom processor | ||
* decorators. | ||
* </p> | ||
* | ||
* @deprecated It will be removed in 3.0, with the integration of Quarkus 3.15 where we will be able to go back to pure | ||
* CDI decorators. | ||
*/ | ||
@Deprecated(forRemoval = true, since = "2.0") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just born and already obsolete :D |
||
public abstract class AbstractProcessorDecorator implements Processor { | ||
/** | ||
* The decorated processor, holding either the next decorator layer or the final processor. | ||
*/ | ||
@Delegate | ||
@Getter | ||
@Setter | ||
private Processor delegate; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -309,6 +309,7 @@ public class PojoProcessor extends ContextualProcessor<String, SamplePojo, Strin | |
} | ||
} | ||
---- | ||
|
||
<1> Your Processor is declared with the annotation as for a regular processor. | ||
<2> The handled value type, in this example, is a simple POJO, nothing fancy. | ||
<3> Same POJO value in the _process()_ method. | ||
|
@@ -423,6 +424,7 @@ public class PingerService { | |
} | ||
} | ||
---- | ||
|
||
<1> Define the method to retry with `org.eclipse.microprofile.faulttolerance.Retry` annotation | ||
|
||
.application.properties | ||
|
@@ -484,7 +486,6 @@ This list includes the additional metrics, on top of the Kafka Streams and the g | |
| The number of times a Punctuator's execution failed with an exception since the start of the microservice. | ||
|=== | ||
|
||
|
||
.Dead Letter Queue Metrics | ||
[options="header",cols="30%,20%,40%"] | ||
|=== | ||
|
@@ -501,7 +502,6 @@ This list includes the additional metrics, on top of the Kafka Streams and the g | |
| The number of messages sent to global DLQ. | ||
|=== | ||
|
||
|
||
== A comparison between Reactive Messaging Kafka and Kafka Streams | ||
|
||
These two technologies can be used to create streaming microservices to be used in Event-Driven architecture applications. | ||
|
@@ -536,6 +536,7 @@ The purpose of increasing concurrency is to be able to cope with streaming micro | |
return api.remoteCall(); | ||
} | ||
---- | ||
|
||
<1> `@Incoming` is declaring this method as a subscriber for the channel named `ping-events` | ||
<2> `@Outgoing` is declaring this method as a producer for the channel named `pong-events` | ||
<3> `@io.smallrye.reactive.messaging.annotations.Blocking` Indicates that this method is running out of the processing thread, inside a worker thread and the order of the messages is not important. | ||
|
@@ -640,6 +641,7 @@ public class PingProcessor extends ContextualProcessor<String, Ping, String, Pin | |
} | ||
} | ||
---- | ||
|
||
<1> Your Processor is declared with the annotation as for a regular processor. | ||
<2> The definition and initialization of your state store. | ||
|
||
|
@@ -688,9 +690,9 @@ The extension proposes some capabilities to customize more finely the behaviour | |
=== Processor decorator | ||
|
||
The following decoration layer is already extensively used in this extension's source code and allows to use composition around the main processor class you have to define. | ||
Example of a new decorator: | ||
Depending on the version of Quarkus you are using, the pattern differs: | ||
|
||
.ProcessorDecorator.java | ||
.ProcessorDecorator.java with Quarkus 3.2 or 3.11.0+ | ||
[source,java] | ||
---- | ||
@Decorator // <1> | ||
|
@@ -719,6 +721,7 @@ public class ProcessorDecorator<KIn, VIn, KOut, VOut> implements Processor<KIn, | |
} | ||
} | ||
---- | ||
|
||
<1> Decorator annotation to profit from the {cdi-spec}/cdi-spec.html#decorators[decorator] feature of CDI | ||
<2> Force the instantiation of the decorator with the Priority annotation. | ||
Indeed, otherwise the decorator is not taken into account by Quarkus. | ||
|
@@ -759,23 +762,54 @@ The priority is to be set based on the priorities of the existing decorators whi | |
---- | ||
<3> The decorator should have the same generics declaration `<KIn, VIn, KOut, VOut>` as the `Processor<KIn, VIn, KOut, VOut>` interface that it implements | ||
<4> Delegate reference to use when decorating methods. | ||
It is annotated with lombok's https://projectlombok.org/features/experimental/Delegate[Delegate] annotation to generate | ||
passthrough decorated methods that this Decorator class won't decorate. | ||
It is annotated with lombok's https://projectlombok.org/features/experimental/Delegate[Delegate] annotation to generate passthrough decorated methods that this Decorator class won't decorate. | ||
The selection is done through a blacklist of method signatures gathered in a private `Excludes` interface declared at the end of the class. | ||
<5> Injection constructor which must have a delegate argument annotated with the `Delegate` annotation from CDI. | ||
You can also, as a regular CDI bean, inject any another CDI bean reference to be used in this decorator. | ||
<6> Example of decorated method, here the main `process` method of `Processor` API of Kafka Streams. | ||
|
||
Such a decorator will automatically been taken into account by CDI through the combination of `Decorator` and `Priority` annotations. | ||
.ProcessorDecorator.java for Quarkus 3.8 -> 3.10 | ||
[source,java] | ||
---- | ||
@Dependent // <1> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚫 [vale] reported by reviewdog 🐶 |
||
@Priority(150) // <2> | ||
public class ProcessorDecorator extends AbstractProcessorDecorator { // <3> | ||
@Override | ||
public void process(Record record) { // <4> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
// use bean before | ||
getDelegate().process(record); | ||
// use bean after | ||
} | ||
} | ||
---- | ||
|
||
<1> We have to mark the bean `Dependent` so it is instantiated at every use. | ||
Indeed, `KStreamProcessorSupplier` needs to return a new `Processor` instance everytime it is called, by Kafka Streams' specification. | ||
<2> We add a `Priority`, with same pattern as a CDI decorator. | ||
<3> We remove the generic types from the class signature, because CDI does not like generics in beans. | ||
<4> Example of override of process method and call to underlying decorator. | ||
|
||
Such a decorator will automatically been taken into account by CDI. | ||
The priority will control at which point your decorator will be called among all other decorators. | ||
|
||
[CAUTION] | ||
==== | ||
We noticed with a new integration-test that is using a custom serde, that usage of custom CDI `Decorator` is causing microservices to randomly crash at startup. | ||
This happens for specific versions of Quarkus. | ||
Known impacted versions are 3.8.x, 3.9.x and 3.10.x. | ||
The 3.2 LTS and upcoming 3.15 LTS versions do not suffer from this symptom. | ||
The **only** solution found was to remove usage of `@Decorator` for `Processor` decorators for microservices based on Quarkus 3.8 LTS. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could mention if/until quarkusio/quarkus#43245 is backported to 3.8 LTS ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed the link to quarkusio/quarkus#41258 will seems more the origin of the "fix" in quarkus 3.11 |
||
This change will be reverted in quarkus-kafka-streams-processor 3.0. | ||
This is probably the https://github.com/quarkusio/quarkus/pull/41258[PR] on Quarkus side that has fixed the issue in Quarkus 3.11. | ||
==== | ||
|
||
=== Producer interceptor | ||
|
||
Kafka Streams already has the notion of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor]. | ||
Kafka Streams already has the concept of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor]. | ||
But as the rest of Kafka Streams SPI, it is based on a class name and a default constructor for instantiation. | ||
It does not support CDI resolution. | ||
|
||
This is why this extension's API defines a `ProducerOnSendInterceptor` interface that is instrumentated through CDI. | ||
This is why this extension's API defines a `ProducerOnSendInterceptor` interface that is instrumented through CDI. | ||
Example of usage: | ||
|
||
.MyProducerInterceptor.java | ||
|
@@ -796,6 +830,7 @@ public class HeaderAddingProducerInterceptor implements ProducerOnSendIntercepto | |
} | ||
} | ||
---- | ||
|
||
<1> Producer interceptors are discovered by CDI by the `ApplicationScoped` annotation | ||
<2> The interceptor class should extend `ProducerOnSendInterceptor`. | ||
`ProducerOnSendInterceptor` extends `ProducerInterceptor<byte[], byte[]>` and overrides some of its methods with default implementations to exempt their forced implementations further down the line. | ||
|
@@ -847,6 +882,7 @@ public class CdiRequestContextPunctuatorDecorator implements DecoratedPunctuator | |
} | ||
} | ||
---- | ||
|
||
<1> Decorator annotation to profit from the {cdi-spec}/cdi-spec.html#decorators[decorator] feature of CDI | ||
<2> Force the instantiation of the decorator with the Priority annotation. | ||
Indeed, otherwise the decorator is not taken into account by Quarkus. | ||
|
@@ -883,4 +919,4 @@ include::includes/kafka-streams-processor-configuration-keys.adoc[] | |
|
||
== Configuration from other extension | ||
|
||
include::includes/quarkus-other-extension-configurations.adoc[] | ||
include::includes/quarkus-other-extension-configurations.adoc[] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,22 +20,24 @@ | |
package io.quarkiverse.kafkastreamsprocessor.impl; | ||
|
||
import java.lang.annotation.Annotation; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.enterprise.context.Dependent; | ||
import jakarta.enterprise.context.RequestScoped; | ||
import jakarta.enterprise.inject.Instance; | ||
import jakarta.enterprise.inject.spi.Bean; | ||
import jakarta.enterprise.inject.spi.BeanManager; | ||
import jakarta.inject.Inject; | ||
import jakarta.inject.Singleton; | ||
|
||
import org.apache.kafka.streams.processor.api.Processor; | ||
import org.apache.kafka.streams.processor.api.ProcessorSupplier; | ||
|
||
import io.quarkiverse.kafkastreamsprocessor.api.decorator.processor.AbstractProcessorDecorator; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
/** | ||
|
@@ -60,6 +62,8 @@ public class KStreamProcessorSupplier<KIn, VIn, KOut, VOut> implements Processor | |
*/ | ||
private final Instance<Kafka2ProcessorAdapter<?, ?>> adapterInstances; | ||
|
||
private final Instance<AbstractProcessorDecorator> processorDecorators; | ||
|
||
/** | ||
* Injection constructor. | ||
* | ||
|
@@ -76,17 +80,20 @@ public class KStreamProcessorSupplier<KIn, VIn, KOut, VOut> implements Processor | |
@Inject | ||
public KStreamProcessorSupplier(Instance<Processor<?, ?, ?, ?>> kafka3BeanInstances, | ||
Instance<org.apache.kafka.streams.processor.Processor<?, ?>> beanInstances, | ||
Instance<Kafka2ProcessorAdapter<?, ?>> adapterInstances, BeanManager beanManager) { | ||
Instance<Kafka2ProcessorAdapter<?, ?>> adapterInstances, BeanManager beanManager, | ||
Instance<AbstractProcessorDecorator> processorDecorators) { | ||
this.kafka3BeanInstances = kafka3BeanInstances; | ||
this.beanInstances = beanInstances; | ||
this.adapterInstances = adapterInstances; | ||
this.processorDecorators = processorDecorators; | ||
|
||
List<String> processorDecoratorNames = new ArrayList<>(processorDecorators.stream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how the list of processorDecorators is now ordered? I can't find documentation on that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For decorators it is: the higher the priority the lower you get in the order of wrappers: For injecting a list of beans or the instance object (basically a stream with optional, kind of): hight priority = earlier in the list. --> for wrapping, perfect order, we just need to loop to wrap the real instance with B and then B with A |
||
.map(Object::getClass) | ||
.map(Class::getName) | ||
.collect(Collectors.toUnmodifiableList())); | ||
Collections.reverse(processorDecoratorNames); | ||
log.info("Configured Processor decorators are in order: {}", String.join(", ", processorDecoratorNames)); | ||
|
||
log.info("Configured Processor decorators are in order: {}", | ||
beanManager.resolveDecorators(Set.of(Processor.class)) | ||
.stream() | ||
.map(Bean::getBeanClass) | ||
.map(Class::getName) | ||
.collect(Collectors.joining(", "))); | ||
} | ||
|
||
/** | ||
|
@@ -131,7 +138,16 @@ public Processor<KIn, VIn, KOut, VOut> get() { | |
"Processors cannot have a scope other than @Dependant, since KafkaStreams implementation classes are not thread-safe"); | ||
} | ||
|
||
return (Processor<KIn, VIn, KOut, VOut>) processor; | ||
return wrapProcessor((Processor<KIn, VIn, KOut, VOut>) processor); | ||
} | ||
|
||
private Processor<KIn, VIn, KOut, VOut> wrapProcessor(Processor<KIn, VIn, KOut, VOut> processor) { | ||
Processor<KIn, VIn, KOut, VOut> wrappedProcessor = processor; | ||
for (AbstractProcessorDecorator decorator : processorDecorators) { | ||
decorator.setDelegate(wrappedProcessor); | ||
wrappedProcessor = decorator; | ||
} | ||
return wrappedProcessor; | ||
} | ||
|
||
private static boolean hasAnnotation(Object bean, Class<? extends Annotation> annotation) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be in the api module or it could stay in the impl ?
I see integration-tests new HeaderDecorator compiling on it, but not sure if it's a new capability or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it needs yeah, cause that type is used when discovering the beans