This project explores using virtual threads for message processing in a Key_Shared subscription in Apache Pulsar, optimizing for scenarios where processing a single message may take hundreds of milliseconds and involves IO while preserving message ordering.
In Pulsar, the MessageListener
interface provides a push-based mechanism for consuming messages.
When a message arrives, the MessageListener
is triggered and receives the message.
Here's an example of how to use it:
Consumer consumer = client.newConsumer()
.topic("topic")
.subscriptionName("subscription")
.messageListener((c, msg) -> {
// process the message
})
.subscribe();
Pulsar supports four subscription types.
This experiment focuses on the Key_Shared
subscription, where multiple consumers can attach to the subscription, and messages with the same ordering key are delivered to the same consumer.
When using a Key_Shared
subscription with a MessageListener
, messages are processed by multiple executors.
The message order is preserved because messages with the same ordering key are handled by the same executor.
By default, the MessageListener
uses a thread pool of the Pulsar client.
With PIP-359, Pulsar supports custom message listener executors.
When using a single-threaded executor for the message listener, only one message can be processed at a time. While this ensures messages are processed in order, the throughput is limited by the time it takes to process a single message.
In the example above, assume all messages arrive at the consumer nearly at the same time. If all messages have the same ordering key, they must be processed sequentially, which prevents any speedup in processing. If they have different ordering keys, message 2 and 3 unnecessarily need to wait.
Using a multi-threaded executor allows for concurrent processing of multiple messages. By ensuring that messages with the same ordering key are processed by the same thread, we can guarantee message ordering. We have an increased throughput compared to a single-threaded executor, as messages with different ordering keys can be processed concurrently.
In the example above, three threads are used. Messages with the same ordering key are represented in the same color. The message number indicates the order in which messages are received by the consumer. One can see that for example message 3 might unnecessarily need to wait for message 2 to be processed even though they have different ordering keys. Also message 6 is processed before message 5, even though message 5 was received earlier. What also can be seen is that thread 2 and 3 are idle at some point.
Let's assume we need to subscribe to hundreds or thousands of topics and the message processing time varies.
Then, we will have hundreds or thousands of consumers.
While messages of one topic are processed within milliseconds, messages of another topic may take up to a few seconds.
We could use one large thread pool and share it among the consumers, but this could mean that messages of one topic are blocked/delayed by messages of another topic.
Let's assume topic A
and topic B
receive messages and their ordering keys are mapped to the same thread.
Then, the message of one topic needs to wait until the message of the other topic is processed.
If we do not use a shared thread pool, but individual thread pools for each consumer, then we could run out of threads.
The experiment aims to improve fairness between topics by leveraging virtual threads. A message should not need to wait for a message of a different topic to be processed and should only need to wait for another message of the same topic if they have the same ordering key.
The MessageListener
interface is used with a custom message listener executor.
One executor instance is created for each consumer.
It will map each ordering key to one virtual thread that will process messages with this ordering key.
If a message is received, the executor creates a new virtual thread that processes the message.
If another message with the same ordering key is received, the executor will submit it to the same virtual thread.
If a message with a different ordering key is received, the executor will create a new virtual thread.
Once all messages assigned to a virtual thread are processed, the virtual thread will be terminated to free resources.
The example above shows the same messages as in the previous example. Message 1 is received and a new virtual thread is created for the green ordering key. While message 1 is processed, message 2 and 3 are received. Since message 2 has a different ordering key, a new virtual thread is created. This virtual thread can be scheduled on a different platform thread. Message 3 has the same ordering key as message 1, so it is submitted to virtual thread 1 and processed after message 1. In this way, ordering is guaranteed.
In the case of the yellow ordering key, message 7 arrives after message 4 is processed. Therefore, the virtual thread 3 was already terminated and a new virtual thread is created for the yellow ordering key and message 7.
In comparison to the previous example, all messages except message 6 and 8 are processed as early as possible. Message 6 and 8 show that the number of platform threads is still a limiting factor and if all platform threads are busy, messages may need to wait.
If the message processing involves IO operations, the virtual thread may even be suspended and another virtual thread can be scheduled on the same platform thread. This may further increase the throughput.
Add the GitHub packages repository to your pom.xml
:
<repository>
<id>github</id>
<url>https://maven.pkg.github.com/pdolif/pulsar-virtual-threads-message-listener</url>
</repository>
Note: GitHub packages requires authentication.
Add the following dependency to your pom.xml
:
<dependency>
<groupId>com.github.pdolif</groupId>
<artifactId>pulsar-virtual-threads-message-listener</artifactId>
<version>1.0.0</version>
</dependency>
Create a KeySharedExecutor
:
var messageListenerExecutor = new VirtualThreadKeySharedExecutor("name");
In this example, a VirtualThreadKeySharedExecutor
is used, which executes the message listener on virtual threads.
Use the message listener executor when creating a consumer:
pulsarClient.newConsumer()
.subscriptionType(SubscriptionType.Key_Shared)
.messageListener((consumer, msg) -> {
// process and acknowledge the message
})
.messageListenerExecutor(messageListenerExecutor)
...
.subscribe();
To collect metrics using Micrometer, provide a MeterRegistry
to the VirtualThreadKeySharedExecutor
:
var messageListenerExecutor = new VirtualThreadKeySharedExecutor("name", Metrics.with(meterRegistry));
To evaluate the approach, a producer application, a consumer application, and an HTTP endpoint were used. The consumer application consumes messages and forwards them to the HTTP endpoint. The endpoint applies a delay before it responds. The applications were implemented using Java 21 and Spring Boot 3.3.5.
The producer used a fixed number of random UUIDs as ordering keys. Messages were produced at a fixed rate and for each message an ordering key was randomly selected from the fixed set of UUIDs. Each message contained the creation timestamp and a 10kb payload.
The end to end time from message creation until the message is received by the HTTP endpoint (inclusive delay) was measured. Each test was executed for 5 minutes and a single instance of the consumer application was running. The consumer application used one Pulsar consumer per topic.
The applications were deployed on a microk8s instance running version v1.32.3. The host system was equipped with an Intel Core Ultra 7 265K CPU, 64GB of DDR5 RAM, and a Samsung 990 EVO Plus SSD. The Pulsar cluster (version 4.0.4) was running on the same microk8s instance.
The Pulsar Helm chart was used with the default configuration, except that anti affinity and journaling were disabled. The consumer receiver queue size was set to 1000/default and a Key_Shared subscription was used.
Producer Message Rate/ Endpoint delay |
10ms | 100ms | 500ms |
---|---|---|---|
3 messages/s per topic | 12-13/13-14/13-15ms | 102-103/103/103-104ms | 502-503/503-504/504-670ms |
5 messages/s per topic | 12-13/12-13/13-14ms | 102-103/102-104/102-104ms | 502-503/502-503/602-803ms |
10 messages/s per topic | 11-13/12-13/12-14ms | 101-103/102-104/102-104ms | 501-503/502-504/802-903ms |
20 messages/s per topic | 11-12/12-13/13-14ms | 101-103/102-103/152-157ms | 501-503/505-552/952-953ms |
Producer Message Rate/ Endpoint delay |
10ms | 100ms | 500ms |
---|---|---|---|
3 messages/s per topic | 12/12-13/13-14ms | 102/102-103/103-137ms | 502/502-503/534-670ms |
5 messages/s per topic | 11-12/11-13/12-15ms | 101-102/101-103/129-154ms | 501-503/502-503/516-803ms |