From 32379a3a35a6831ebcc3b6f0989c2ff1ba02d47e Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 28 Feb 2025 21:56:17 +0800 Subject: [PATCH 1/4] Fix message might lost when use listener --- src/Consumer.cc | 14 ++------------ src/MessageListener.h | 8 +++++--- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/src/Consumer.cc b/src/Consumer.cc index e56f8ba6..c564ec6b 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -111,7 +111,7 @@ void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessag std::shared_ptr cMessage(rawMessage, pulsar_message_free); MessageListenerCallback *listenerCallback = (MessageListenerCallback *)ctx; - Consumer *consumer = (Consumer *)listenerCallback->consumer; + Consumer *consumer = static_cast(listenerCallback->consumerFuture.get()); if (listenerCallback->callback.Acquire() != napi_ok) { return; @@ -135,7 +135,7 @@ void Consumer::SetListenerCallback(MessageListenerCallback *listener) { } if (listener != nullptr) { - listener->consumer = this; + listener->consumerPromise.set_value(this); // If a consumer listener is set, the Consumer instance is kept alive even if it goes out of scope in JS // code. this->Ref(); @@ -168,11 +168,6 @@ struct ConsumerNewInstanceContext { auto cConsumer = std::shared_ptr(rawConsumer, pulsar_consumer_free); auto listener = consumerConfig->GetListenerCallback(); - if (listener) { - // pause, will resume in OnOK, to prevent MessageListener get a nullptr of consumer - pulsar_consumer_pause_message_listener(cConsumer.get()); - } - deferred->Resolve([cConsumer, consumerConfig, listener](const Napi::Env env) { Napi::Object obj = Consumer::constructor.New({}); Consumer *consumer = Consumer::Unwrap(obj); @@ -180,11 +175,6 @@ struct ConsumerNewInstanceContext { consumer->SetCConsumer(cConsumer); consumer->SetListenerCallback(listener); - if (listener) { - // resume to enable MessageListener function callback - resume_message_listener(cConsumer.get()); - } - return obj; }); } diff --git a/src/MessageListener.h b/src/MessageListener.h index ff4efeea..704c8ec5 100644 --- a/src/MessageListener.h +++ b/src/MessageListener.h @@ -21,14 +21,16 @@ #define MESSAGELISTENER_H #include +#include struct MessageListenerCallback { Napi::ThreadSafeFunction callback; - // Using consumer as void* since the ListenerCallback is shared between Config and Consumer. - void *consumer; + // Use future store consumer point, because need ensure sync. + std::promise consumerPromise; + std::shared_future consumerFuture; - MessageListenerCallback() : consumer(nullptr) {} + MessageListenerCallback() : consumerPromise(), consumerFuture(consumerPromise.get_future()) {} }; #endif From 3163c315cd2b864581ca352153da1fbbe38c8d20 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 28 Feb 2025 21:58:46 +0800 Subject: [PATCH 2/4] Remove todo --- src/Consumer.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Consumer.cc b/src/Consumer.cc index c564ec6b..24f02295 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -77,8 +77,6 @@ void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListe Napi::Object msg = Message::NewInstance({}, data->cMessage); Consumer *consumer = data->consumer; - // `consumer` might be null in certain cases, segmentation fault might happend without this null check. We - // need to handle this rare case in future. if (consumer) { Napi::Value ret; try { From 4fd4e6b434fdf6bef4eae20a85decfa2a8fb809a Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 28 Feb 2025 22:24:12 +0800 Subject: [PATCH 3/4] code format --- src/Consumer.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Consumer.cc b/src/Consumer.cc index 24f02295..a56f8047 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -109,7 +109,7 @@ void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessag std::shared_ptr cMessage(rawMessage, pulsar_message_free); MessageListenerCallback *listenerCallback = (MessageListenerCallback *)ctx; - Consumer *consumer = static_cast(listenerCallback->consumerFuture.get()); + Consumer *consumer = static_cast(listenerCallback->consumerFuture.get()); if (listenerCallback->callback.Acquire() != napi_ok) { return; From f7e8958dfa259601a36dfd529fb43e32853b6101 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 28 Feb 2025 22:30:09 +0800 Subject: [PATCH 4/4] Remove consumer if null judge --- src/Consumer.cc | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/src/Consumer.cc b/src/Consumer.cc index a56f8047..0b824364 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -77,30 +77,28 @@ void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListe Napi::Object msg = Message::NewInstance({}, data->cMessage); Consumer *consumer = data->consumer; - if (consumer) { - Napi::Value ret; - try { - ret = jsCallback.Call({msg, consumer->Value()}); - } catch (std::exception &exception) { - logMessageListenerError(consumer, exception.what()); - } + Napi::Value ret; + try { + ret = jsCallback.Call({msg, consumer->Value()}); + } catch (std::exception &exception) { + logMessageListenerError(consumer, exception.what()); + } - if (ret.IsPromise()) { - Napi::Promise promise = ret.As(); - Napi::Function catchFunc = promise.Get("catch").As(); + if (ret.IsPromise()) { + Napi::Promise promise = ret.As(); + Napi::Function catchFunc = promise.Get("catch").As(); - ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const Napi::CallbackInfo &info) { - Napi::Error error = info[0].As(); - logMessageListenerError(consumer, error.what()); - })}); + ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const Napi::CallbackInfo &info) { + Napi::Error error = info[0].As(); + logMessageListenerError(consumer, error.what()); + })}); - promise = ret.As(); - Napi::Function finallyFunc = promise.Get("finally").As(); + promise = ret.As(); + Napi::Function finallyFunc = promise.Get("finally").As(); - finallyFunc.Call( - promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); })}); - return; - } + finallyFunc.Call( + promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); })}); + return; } data->callback(); }