Skip to content

gaohe7091/spring-integration-kafka

 
 

Repository files navigation

Spring Integration Kafka Adapter

INTEXT KAFKA

The Spring Integration Kafka extension project provides inbound and outbound channel adapters for Apache Kafka. Apache Kafka is a distributed publish-subscribe messaging system that is designed for high throughput (terabytes of data) and low latency (milliseconds). For more information on Kafka and its design goals, see the Kafka main page.

This 2.0 version is a complete rewrite based on the new spring-kafka project which uses the pure java Producer and Consumer clients provided by Kafka 0.9.x.x.

That project is currently at Release Candidate 1 and its documentation is here.

Quick Start

See the Spring Integration kafka Sample for a simple Spring Boot application that sends and receives messages.

Checking out and building

In order to build the project:

./gradlew build

In order to install this into your local maven cache:

./gradlew install

Spring Integration Kafka project currently supports the following components. Please keep in mind that this is very early stage in development and do not yet fully make use of all the features that Kafka provides.

  • Outbound Channel Adapter

  • Message Driven Channel Adapter

Outbound Channel Adapter:

The Outbound channel adapter is used to publish messages from a Spring Integration channel to Kafka. The channel is defined in the application context and then wired in the application that sends messages to Kafka. After that, sender applications can publish to Kafka via Spring Integration messages, which are internally converted to Kafka messages by the outbound channel adapter, as follows: the payload of the Spring Integration message will be used to populate the payload of the Kafka message, and (by default) the kafka_messageKey header of the Spring Integration message will be used to populate the key of the Kafka message

The target topic and partition for publishing the message can be customized through the kafka_topic and kafka_partitionId headers, respectively.

Here’s an example for sending a message with an arbitrary payload and the String "key" as value on the test topic.

    final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class);

    channel.send(
            MessageBuilder.withPayload(payload)
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "key")
                    .setHeader(KafkaHeaders.TOPIC, "test")
					.build()
            );

In addition, the <int-kafka:outbound-channel-adapter> provides the ability to extract the key, target topic, and target partition by applying SpEL expressions on the outbound message. To that end, it supports the mutually exclusive pairs of attributes topic/topic-expression, message-key/message-key-expression, and partition-id/partition-id-expression, to allow the specification of topic,message-key and partition-id respectively as static values on the adapter, or to dynamically evaluate their values at runtime against the request message.

Important
The KafkaHeaders interface (provided by spring-kafka) contains constants used for interacting with headers. The messageKey and topic default headers now require a kafka_ prefix. When migrating from an earlier version that used the old headers, you need to specify message-key-expression="headers.messageKey" and topic-expression="headers.topic" on the <int-kafka:outbound-channel-adapter>, or simply change the headers upstream to the new headers from KafkaHeaders using a <header-enricher> or MessageBuilder. Or, of course, configure them on the adapter using topic and message-key if you are using constant values.

NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used and the corresponding header is ignored. If you wish the header to override the configuration, you need to configure it in an expression, such as:

topic-expression="headers.topic != null ? headers.topic : 'myTopic'".

The adapter requires a KafkaTemplate.

Here is an example of how the Kafka outbound channel adapter is configured with XML:

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    message-key-expression="'bar'"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

Message Driven Channel Adapter:

The KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) uses a spring-kafka KafkaMessageListenerContainer or ConcurrentListenerContainer.

An example of xml configuration variant is shown here:

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg name="topics" value="foo" />
</bean>

See the sample mentioned above for Java @Configuration.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Java 97.1%
  • HTML 2.9%