-
Notifications
You must be signed in to change notification settings - Fork 65
refactor: handle retain messages with dynamic subscriptions #3710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
refactor: handle retain messages with dynamic subscriptions #3710
Conversation
Codecov ReportAttention: Patch coverage is 📢 Thoughts on this report? Let us know! 🚀 New features to boost your workflow:
|
Robot Results
Failed Tests
|
@@ -345,6 +365,30 @@ impl FromPeers { | |||
client.unsubscribe_many(diff.unsubscribe).await.unwrap(); | |||
} | |||
}); | |||
if !tf.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this would meet the dynamic subscription requirements of the dynamic mapper, where in most cases when new pipelines are dynamically added, they are most likely going to subscribe to already subscribed topics like te/+/+/+/+/m/temp/meta
and would be expecting the retained messages from those topics. But, since dynamic connections are established only when a new subscription is added or an existing one is upgraded, the newly added pipelines won't get the existing retained messages as far as I understand. This won't meet our requirement, right? I'm just wondering if blindly establishing a new connection per peer/client (pipeline) would be better here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But, since dynamic connections are established only when a new subscription is added or an existing one is upgraded
No, the exact opposite. We establish a dynamic connection when we receive a subscription that does already exist.
I'm just wondering if blindly establishing a new connection would be better here.
If we did this blindly for all topics, all that would achieve is retain messages sent to newly subscribed topics would be received twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the exact opposite. We establish a dynamic connection when we receive a subscription that does already exist.
Not sure what I was doing wrong then. When I tested the same by tweaking the dynamic_subscribers_receive_retain_messages
test with the following code at the end:
client_0
.send(PublishOrSubscribe::Subscribe(SubscriptionRequest {
diff: SubscriptionDiff {
subscribe: ["a/b".into()].into(),
unsubscribe: [].into(),
},
client_id: client_id_1,
}))
.await
.unwrap();
assert_eq!(timeout(client_0.recv()).await.unwrap(), msg);
the assertion validating if client0
gets the same retained message again, timed out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is the intention. A new client subscribing shouldn't redeliver the retain messages to an existing subscription?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's where I see a problem. For all the pipelines, the generic mapper is the only peer of the mqtt actor, right? While it is connected to the mqtt actor while being built, it would have declared a set of subscriptions based on all the pipelines that existed at that time. And when a new pipeline is dynamically registered, but with some topics that the previous pipelines had already subscribed, it should still get the retained messages on those previously registered topics as well, right? Since the new pipeline is not registering as a new peer/client, but issuing the subscription request via the same gen mapper peer which has already received the retained messages for the same topics, it wouldn't get those retained messages even though they are "new" to the new pipeline, right? That's my concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As that is not so easy to fix, an option is to let the tedge_gen_mapper
maintains a connection to the MQTT actor per pipeline.
let mqtt_config = self.mqtt_config.with_subscriptions(topic_filter); | ||
let mqtt_config = self.mqtt_config.clone().with_subscriptions(topic_filter); | ||
MqttActor::new( | ||
mqtt_config, | ||
self.mqtt_config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The base_config
used on the side to read retained messages cannot be a simple clone of the main MQTT connection config. Using the same session name would trigger session clashes. Also better to use a clean session, as only retained messages matter.
let base_config = mqtt_config.clone().with_no_session();
.await | ||
.unwrap(); | ||
while let Ok(msg) = | ||
tokio::time::timeout(Duration::from_secs(10), conn.received.next()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10 seconds seems like too much. All the retained messages will be published in a row on connect. That's okay though, as adding a new subscription is a rare event.
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
…scriptions Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
5664833
to
6d510fc
Compare
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Proposed changes
Handle retain messages when dynamic subscribers subscribe to a topic with an existing subscription. With the original implementation, new subscribers to topics that are already subscribed to wouldn't receive retain messages, e.g.
retain message exists on a/b
client a connections with subscription to a/b
client b connects with no subscriptions
mqtt actor connects -> retain message is received and forwarded to client a
client b subscribes to a/b
the message won't be resent to the mqtt actor for forwarding since an existing subscription exists
With these changes, the subscription from client b will cause a connection to be created dynamically to retrieve the relevant retain messages (i.e. the ones not covered by a new subscription).
In the case of an existing subscription to
a/b
and a new subscription toa/+
, both the existing client and the newly subscribed client will receive the retain message. I don't believe this is a huge issue, as spontaneous redelivery of retain messages can occur regardless in the case of connection flakiness, for example.Still TODO:
Types of changes
Paste Link to the issue
Checklist
just prepare-dev
once)just format
as mentioned in CODING_GUIDELINESjust check
as mentioned in CODING_GUIDELINESFurther comments