Skip to content

Fix message might lost when use listener #406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 2, 2025
Merged

Conversation

shibd
Copy link
Member

@shibd shibd commented Feb 28, 2025

Motivation

When using a listener to create a consumer with multiple topics, messages might be lost.

After #219, when a message reaches a listener, if the consumer is null, it will ignore this msg, and then msg will be lost.

// `consumer` might be null in certain cases, segmentation fault might happend without this null check. We
// need to handle this rare case in future.
if (consumer) {
Napi::Value ret;
try {

Question: Why is the consumer null here?

In CPP implementation, after apache/pulsar-client-cpp#447, we will make sure all sub-consumers create success and then resumeListenerDispacther.

And it will call create-callback first
https://github.com/apache/pulsar-client-cpp/blob/54e529aaf82bddac063c847d4c11d3fba3acf0f3/lib/MultiTopicsConsumerImpl.cc#L151-L156

            multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
            // Now all child topics are successfully subscribed, start messageListeners
            if (messageListener_ && !conf_.isStartPaused()) {
                LOG_INFO("Start messageListeners");
                resumeMessageListener();
            }

And, when callback reach to Node.js lib. will execute code: this code will set consumer object.

deferred->Resolve([cConsumer, consumerConfig, listener](const Napi::Env env) {
Napi::Object obj = Consumer::constructor.New({});
Consumer *consumer = Consumer::Unwrap(obj);
consumer->SetCConsumer(cConsumer);
consumer->SetListenerCallback(listener);
if (listener) {
// resume to enable MessageListener function callback
resume_message_listener(cConsumer.get());
}
return obj;
});

However, since the consumer is set in deferred->Resolve, which is asynchronous, it might not be fully set up when the listener starts receiving messages. Therefore, in the code above, the consumer might be null, and the message could ultimately be ignored.

Modifications

  • Use consumer future to make sure get consumer point after setting it.

Verifying this change

This issue to hard use unit test to cover it.

There is reproduction step:

  1. Create a topic with 5 partition.
  2. Create subscription
  3. Change pulsar source code liks below. (Slow down the subscription to one of the topics to delay the overall completion of the consumer subscription.)
    protected void handleSubscribe(final CommandSubscribe subscribe) {
        checkArgument(state == State.Connected);
        final long requestId = subscribe.getRequestId();
        final long consumerId = subscribe.getConsumerId();
        TopicName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe);
+        if (!subscribe.getTopic().equals("persistent://public/default/test-listener2-partition-0")) {
+            try {
+              log.info("test sleep 5000ms: " + subscribe.getTopic());
+               Thread.sleep(5000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
  1. Send 2 msg to partition-0
  2. Create a consumer with a listener.
  // Create a consumer with listener
  const consumer = await client.subscribe({
    topic: 'persistent://public/default/test-listener2',
    subscription: 'sub1',
    subscriptionType: 'Shared',
    ackTimeoutMs: 100000000,
    listener: (msg, msgConsumer) => {
      console.log("Receive-msg: " + msg.getData().toString());
    },
  });

When running repeatedly, you'll encounter receiving only one message or no messages approximately every 2 to 3 times.

!!Note: After this PR, running it 100 times repeatedly no longer results in any issues.

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@shibd shibd added this to the 1.13.0 milestone Feb 28, 2025
@shibd shibd self-assigned this Feb 28, 2025
@shibd shibd merged commit b17a2e1 into apache:master Mar 2, 2025
12 checks passed
shibd added a commit that referenced this pull request Mar 2, 2025
* Fix message might lost when use listener

* Remove todo

* code format

* Remove consumer if null judge

(cherry picked from commit b17a2e1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants