A modern, functional, and high-performance Kafka consumer built using Java 24 features like virtual threads, composable message processors, and DslJson for JSON processing. Features robust error handling, configurable retries, built-in metrics, and support for both parallel and sequential processing. Ideal for large-scale systems.
- Virtual Threads for massive concurrency with minimal overhead
- Functional programming patterns for clean, maintainable code
- High-performance JSON processing with DslJson
-
Message processors are pure functions (
Function<V, V>
) that transform data without side effects -
Build complex pipelines through function composition using
Function::andThen
or the registry -
Declarative processing lets you describe what to do, not how to do it
-
Higher-order functions enable conditional processing, retry logic, and error handling
-
Teams can register their own processors in a central registry via:
// Register team-specific processors MessageProcessorRegistry.register("sanitizeData", JsonMessageProcessor.removeFields("password", "ssn")); // Create pipelines from registered processors final var pipeline = MessageProcessorRegistry.pipeline( "parseJson", "validateSchema", "sanitizeData", "addMetadata"); // Apply transformations with built-in error handling and retry logic final var consumer = new FunctionalConsumer.<byte[], byte[]>builder() .withProcessor(pipeline) .withRetry(3, Duration.ofSeconds(1)) .build(); consumer.start();
// Register custom processors for your team's needs MessageProcessorRegistry.register("extractMetadata", message -> { // Custom extraction logic here return processedMessage; }); // Load processors from configuration String[] configuredProcessors = config.getStringArray("message.processors"); Function<byte[], byte[]> pipeline = MessageProcessorRegistry.pipeline(configuredProcessors); // Create a consumer with team-specific processing pipeline final var consumer = new FunctionalConsumer.<byte[], byte[]>builder() .withProperties(kafkaProps) .withTopic("team-topic") .withProcessor(MessageProcessorRegistry.pipeline( "parseJson", "validateSchema", "extractMetadata", "addTeamIdentifier")) .withErrorHandler(error -> publishToErrorTopic(error)) .withRetry(3, Duration.ofSeconds(1)) .build(); // Use the consumer consumer.start();
<dependency>
<groupId>io.github.eschizoid</groupId>
<artifactId>kpipe</artifactId>
<version>0.2.0</version>
</dependency>
implementation 'io.github.eschizoid:kpipe:0.2.0'
implementation("io.github.eschizoid:kpipe:0.2.0")
libraryDependencies += "io.github.eschizoid" % "kpipe" % "0.2.0"
KPipe is organized into two main modules:
The core library that provides the Kafka consumer functionality:
βββ src/main/java/org/kpipe/
β βββ consumer/ # Core consumer components
β β βββ FunctionalConsumer.java # Main functional consumer implementation
β β βββ OffsetManager.java # Manages Kafka offsets for reliable processing
β β βββ MessageTracker.java # Tracks message processing state
β β βββ RebalanceListener.java # Handles Kafka consumer rebalancing
β β βββ enums/ # Enums for consumer states and commands
β β
β βββ processor/ # Message processors
β β βββ JsonMessageProcessor.java # JSON processing with DslJson
β β
β βββ registry/ # Registry components
β β βββ MessageProcessorRegistry.java # Registry for processor functions
β β βββ MessageSinkRegistry.java # Registry for message sinks
β β βββ RegistryFunctions.java # Shared utilities for registries
β β
β βββ sink/ # Message sink implementations
β β βββ ConsoleSink.java # Console sink implementation
β β βββ MessageSink.java # Message sink interface
β β
β βββ config/ # Configuration components
β β βββ AppConfig.java # Application configuration
β β βββ KafkaConsumerConfig.java # Kafka consumer configuration
β β
β βββ metrics/ # Metrics components
β βββ ConsumerMetricsReporter.java # Reports consumer metrics
β βββ MetricsReporter.java # Metrics reporting interface
β βββ ProcessorMetricsReporter.java # Reports processor metrics
A ready-to-use application that demonstrates the library:
βββ src/main/java/org/kpipe/
β βββ App.java # Main application class
Extend the registry like this:
// Register a processor for JSON field transformations
MessageProcessorRegistry.register("uppercase", bytes ->
JsonMessageProcessor.transformField("text", value -> {
if (value instanceof String text) {
return text.toUpperCase();
}
return value;
}).apply(bytes)
);
// Register a processor that adds environment information
MessageProcessorRegistry.register("addEnvironment",
JsonMessageProcessor.addField("environment", "production"));
// Create a reusable processor pipeline
final var pipeline = MessageProcessorRegistry.pipeline(
"parseJson", "validateSchema", "addEnvironment", "uppercase", "addTimestamp"
);
// Use the pipeline with a consumer
final var consumer = new FunctionalConsumer.<byte[], byte[]>builder()
.withProperties(kafkaProps)
.withTopic("events")
.withProcessor(pipeline)
.withRetry(3, Duration.ofSeconds(1))
.build();
// Start processing messages
consumer.start();
Monitor your consumer with built-in metrics:
// Access consumer metrics
Map<String, Long> metrics = consumer.getMetrics();
System.out.println("Messages received: " + metrics.get("messagesReceived"));
System.out.println("Successfully processed: " + metrics.get("messagesProcessed"));
System.out.println("Processing errors: " + metrics.get("processingErrors"));
Configure automatic metrics reporting:
new App(config)
.withMetricsInterval(Duration.ofSeconds(30))
.start();
The consumer supports graceful shutdown with in-flight message handling:
// Initiate graceful shutdown with 5-second timeout
boolean allProcessed = kafkaApp.shutdownGracefully(5000);
if (allProcessed) {
LOGGER.info("All messages processed successfully before shutdown");
} else {
LOGGER.warning("Shutdown completed with some messages still in flight");
}
// Register as JVM shutdown hook
Runtime.getRuntime().addShutdownHook(
new Thread(() -> app.shutdownGracefully(5000))
);
The JSON processors handle deserialization and transformation of JSON data:
// Add a timestamp field to messages
final var addTimestampProcessor = JsonMessageProcessor.addTimestamp("processedAt");
// Remove sensitive fields
final var sanitizeProcessor = JsonMessageProcessor.removeFields("password", "ssn", "creditCard");
// Transform specific fields
final var uppercaseSubjectProcessor = JsonMessageProcessor.transformField("subject", value -> {
if (value instanceof String text) {
return text.toUpperCase();
}
return value;
});
// Add metadata to messages
final var metadata = new HashMap<String, Object>();
metadata.put("version", "1.0");
metadata.put("environment", "production");
var addMetadataProcessor = JsonMessageProcessor.mergeWith(metadata);
// Combine processors into a pipeline
Function<byte[], byte[]> pipeline = message -> addMetadataProcessor.apply(
uppercaseSubjectProcessor.apply(
sanitizeProcessor.apply(
addTimestampProcessor.apply(message)
)
)
);
// Or use the registry to build pipelines
final var registryPipeline = MessageProcessorRegistry.pipeline(
"sanitize", "addTimestamp", "uppercaseSubject", "addMetadata"
);
Here's a concise example of a KPipe application:
public class KPipeApp implements AutoCloseable {
private final ConsumerRunner<FunctionalConsumer<byte[], byte[]>> runner;
public static void main(final String[] args) {
// Load configuration from environment variables
final var config = AppConfig.fromEnv();
try (final var app = new MyKafkaApp(config)) {
app.start();
app.awaitShutdown();
} catch (final Exception e) {
System.getLogger(MyKafkaApp.class.getName())
.log(System.Logger.Level.ERROR, "Fatal error in application", e);
System.exit(1);
}
}
public MyKafkaApp(final AppConfig config) {
// Create processor and sink registries
final var processorRegistry = new MessageProcessorRegistry(config.appName());
final var sinkRegistry = new MessageSinkRegistry();
// Create the functional consumer
final var functionalConsumer = FunctionalConsumer.<byte[], byte[]>builder()
.withProperties(KafkaConsumerConfig.createConsumerConfig(
config.bootstrapServers(), config.consumerGroup()))
.withTopic(config.topic())
.withProcessor(processorRegistry.pipeline(
"parseJson", "addSource", "markProcessed", "addTimestamp"))
.withMessageSink(sinkRegistry.<byte[], byte[]>pipeline("logging"))
.withOffsetManagerProvider(consumer ->
OffsetManager.builder(consumer)
.withCommitInterval(Duration.ofSeconds(30))
.build())
.withMetrics(true)
.build();
// Set up the consumer runner with metrics and shutdown hooks
runner = ConsumerRunner.builder(functionalConsumer)
.withMetricsInterval(config.metricsInterval().toMillis())
.withShutdownTimeout(config.shutdownTimeout().toMillis())
.withShutdownHook(true)
.build();
}
public void start() { runner.start(); }
public boolean awaitShutdown() { return runner.awaitShutdown(); }
public void close() { runner.close(); }
}
Key Components:
- Configuration from environment variables
- Processor and sink registries for message handling
- Processing pipeline with error handling
- Metrics reporting and graceful shutdown
To Run:
# Set configuration
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export KAFKA_CONSUMER_GROUP=my-group
export KAFKA_TOPIC=json-events
# Run the application
./gradlew run
# Test with a sample message
echo '{"message":"Hello from KPipe!"}' | kcat -P -b localhost:9092 -t json-events
- Java 24+
- Gradle (for building the project)
- kcat (for testing)
- Docker (for local Kafka setup)
Configure via environment variables:
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export KAFKA_CONSUMER_GROUP=my-consumer-group
export KAFKA_TOPIC=json-events
export KAFKA_PROCESSORS=parseJson,validateSchema,addTimestamp
export METRICS_INTERVAL_MS=30000
export SHUTDOWN_TIMEOUT_MS=5000
Follow these steps to test the KPipe Kafka Consumer:
# Format code and build the library module
./gradlew clean :lib:spotlessApply :lib:build
# Format code and build the applications module
./gradlew :app:clean :app:spotlessApply :app:build
# Build the consumer app container and start all services
docker compose build --no-cache --build-arg MESSAGE_FORMAT=<json|avro|protobuf>
docker compose down -v
docker compose up -d
# Publish a simple JSON message to the json-topic
echo '{"message":"Hello world"}' | kcat -P -b kafka:9092 -t json-topic
# For complex JSON messages, use a file
cat test-message.json | kcat -P -b kafka:9092 -t json-topic
# Publish multiple test messages
for i in {1..10}; do echo "{\"id\":$i,\"message\":\"Test message $i\"}" | \
kcat -P -b kafka:9092 -t json-topic; done
If you want to use Avro with a schema registry, follow these steps:
# Register an Avro schema
curl -X POST \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "{\"schema\": $(cat lib/src/test/resources/avro/customer.avsc | jq tostring)}" \
http://localhost:8081/subjects/com.kpipe.customer/versions
# Read registered schema
curl -s http://localhost:8081/subjects/com.kpipe.customer/versions/latest | jq -r '.schema' | jq --indent 2 '.'
# Produce an Avro message using kafka-avro-console-producer
echo '{"id":1,"name":"Mariano Gonzalez","email":{"string":"mariano@example.com"},"active":true,"registrationDate":1635724800000,"address":{"com.kpipe.customer.Address":{"street":"123 Main St","city":"Chicago","zipCode":"00000","country":"USA"}},"tags":["premium","verified"],"preferences":{"notifications":"email"}}' \
| docker run -i --rm --network=host confluentinc/cp-schema-registry:latest \
kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic avro-topic \
--property schema.registry.url=http://localhost:8081 \
--property value.schema.id=1
Kafka consumer will:
- Connect to
localhost:9092
- Subscribe to
avro-topic|json-topic|protobuf-topic
- Compose the processing pipeline from configured processors
- Process each message concurrently using virtual threads
For maintainable pipelines, group related processors:
// Create focused processor groups
final var securityProcessors = MessageProcessorRegistry.pipeline(
"sanitizeData", "encryptSensitiveFields", "addAuditTrail");
final var enrichmentProcessors = MessageProcessorRegistry.pipeline(
"addMetadata", "addTimestamp", "addEnvironment");
// Compose them into a master pipeline
final var fullPipeline = message -> enrichmentProcessors.apply(
securityProcessors.apply(message));
// Or register the composed pipeline
MessageProcessorRegistry.register("standardProcessing", fullPipeline);
The library provides a built-in when()
method for conditional processing:
// Create a predicate that checks message type
Predicate<byte[]> isOrderMessage = message -> {
try {
Map<String, Object> parsed = JsonMessageProcessor.parseJson().apply(message);
return "ORDER".equals(parsed.get("type"));
} catch (Exception e) {
return false;
}
};
// Use the built-in conditional processor
Function<byte[], byte[]> conditionalProcessor = MessageProcessorRegistry.when(
isOrderMessage,
MessageProcessorRegistry.get("orderEnrichment"),
MessageProcessorRegistry.get("defaultEnrichment")
);
// Register the conditional pipeline
MessageProcessorRegistry.register("smartProcessing", conditionalProcessor);
- Message processors should be stateless and thread-safe
- Avoid shared mutable state between processors
- Use immutable data structures where possible
- For processors with side effects (like database calls), consider using thread-local variables
- Register frequently used processor combinations as single processors
- For very large messages, consider streaming JSON processors
- Profile your processor pipeline to identify bottlenecks
- Virtual threads are 1:1 with Kafka records β scales to 100k+ messages/sec
- Zero-GC JSON processing
- Safe and efficient memory model using modern Java features
This library is inspired by the best practices from:
If you're a team using this library, feel free to:
- Register custom processors
- Add metrics/observability hooks
- Share improvements or retry strategies
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
This Kafka consumer is:
- Functional
- Extensible
- Future-proof
Use it to modernize your Kafka stack with Java 24 elegance and performance.