Skip to content

refactor: handle retain messages with dynamic subscriptions #3710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

jarhodes314
Copy link
Contributor

Proposed changes

Handle retain messages when dynamic subscribers subscribe to a topic with an existing subscription. With the original implementation, new subscribers to topics that are already subscribed to wouldn't receive retain messages, e.g.

retain message exists on a/b
client a connections with subscription to a/b
client b connects with no subscriptions
mqtt actor connects -> retain message is received and forwarded to client a
client b subscribes to a/b
the message won't be resent to the mqtt actor for forwarding since an existing subscription exists

With these changes, the subscription from client b will cause a connection to be created dynamically to retrieve the relevant retain messages (i.e. the ones not covered by a new subscription).

In the case of an existing subscription to a/b and a new subscription to a/+, both the existing client and the newly subscribed client will receive the retain message. I don't believe this is a huge issue, as spontaneous redelivery of retain messages can occur regardless in the case of connection flakiness, for example.

Still TODO:

  • Integrate these changes with the entity store retain message clearing

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue


Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s. You can activate automatic signing by running just prepare-dev once)
  • I ran just format as mentioned in CODING_GUIDELINES
  • I used just check as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

@jarhodes314 jarhodes314 temporarily deployed to Test Pull Request June 26, 2025 10:11 — with GitHub Actions Inactive
@jarhodes314 jarhodes314 added refactoring Developer value theme:mqtt Theme: mqtt and mosquitto related topics labels Jun 26, 2025
Copy link

codecov bot commented Jun 26, 2025

Codecov Report

Attention: Patch coverage is 93.05019% with 18 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
crates/extensions/tedge_mqtt_ext/src/lib.rs 90.65% 4 Missing and 6 partials ⚠️
crates/common/mqtt_channel/src/tests.rs 90.90% 0 Missing and 4 partials ⚠️
crates/extensions/tedge_mqtt_ext/src/tests.rs 96.19% 0 Missing and 4 partials ⚠️

📢 Thoughts on this report? Let us know!

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

github-actions bot commented Jun 26, 2025

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
650 1 3 651 99.85 1h46m46.99778s

Failed Tests

Name Message ⏱️ Duration Suite
Run shell custom operation for main device and publish the status Matching messages on topic 'c8y/s/us' is greater than maximum. wanted: 2 got: 4 messages: ['504,19703158', '506,19703158,"helloworld1\n"', '504,19703158', '506,19703158,"helloworld1\n"'] 35.069 s Custom Operation Command

@@ -345,6 +365,30 @@ impl FromPeers {
client.unsubscribe_many(diff.unsubscribe).await.unwrap();
}
});
if !tf.is_empty() {
Copy link
Contributor

@albinsuresh albinsuresh Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this would meet the dynamic subscription requirements of the dynamic mapper, where in most cases when new pipelines are dynamically added, they are most likely going to subscribe to already subscribed topics like te/+/+/+/+/m/temp/meta and would be expecting the retained messages from those topics. But, since dynamic connections are established only when a new subscription is added or an existing one is upgraded, the newly added pipelines won't get the existing retained messages as far as I understand. This won't meet our requirement, right? I'm just wondering if blindly establishing a new connection per peer/client (pipeline) would be better here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, since dynamic connections are established only when a new subscription is added or an existing one is upgraded

No, the exact opposite. We establish a dynamic connection when we receive a subscription that does already exist.

I'm just wondering if blindly establishing a new connection would be better here.

If we did this blindly for all topics, all that would achieve is retain messages sent to newly subscribed topics would be received twice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the exact opposite. We establish a dynamic connection when we receive a subscription that does already exist.

Not sure what I was doing wrong then. When I tested the same by tweaking the dynamic_subscribers_receive_retain_messages test with the following code at the end:

client_0
        .send(PublishOrSubscribe::Subscribe(SubscriptionRequest {
            diff: SubscriptionDiff {
                subscribe: ["a/b".into()].into(),
                unsubscribe: [].into(),
            },
            client_id: client_id_1,
        }))
        .await
        .unwrap();
assert_eq!(timeout(client_0.recv()).await.unwrap(), msg);

the assertion validating if client0 gets the same retained message again, timed out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the intention. A new client subscribing shouldn't redeliver the retain messages to an existing subscription?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's where I see a problem. For all the pipelines, the generic mapper is the only peer of the mqtt actor, right? While it is connected to the mqtt actor while being built, it would have declared a set of subscriptions based on all the pipelines that existed at that time. And when a new pipeline is dynamically registered, but with some topics that the previous pipelines had already subscribed, it should still get the retained messages on those previously registered topics as well, right? Since the new pipeline is not registering as a new peer/client, but issuing the subscription request via the same gen mapper peer which has already received the retained messages for the same topics, it wouldn't get those retained messages even though they are "new" to the new pipeline, right? That's my concern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As that is not so easy to fix, an option is to let the tedge_gen_mapper maintains a connection to the MQTT actor per pipeline.

Comment on lines -142 to +232
let mqtt_config = self.mqtt_config.with_subscriptions(topic_filter);
let mqtt_config = self.mqtt_config.clone().with_subscriptions(topic_filter);
MqttActor::new(
mqtt_config,
self.mqtt_config,
Copy link
Contributor

@didier-wenzek didier-wenzek Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The base_config used on the side to read retained messages cannot be a simple clone of the main MQTT connection config. Using the same session name would trigger session clashes. Also better to use a clean session, as only retained messages matter.

let base_config = mqtt_config.clone().with_no_session();

.await
.unwrap();
while let Ok(msg) =
tokio::time::timeout(Duration::from_secs(10), conn.received.next())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 seconds seems like too much. All the retained messages will be published in a row on connect. That's okay though, as adding a new subscription is a rare event.

Signed-off-by: James Rhodes <jarhodes314@gmail.com>
…scriptions

Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
@jarhodes314 jarhodes314 force-pushed the refactor/dynamic-subscription-retain branch from 5664833 to 6d510fc Compare July 14, 2025 15:37
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
@reubenmiller reubenmiller marked this pull request as draft July 15, 2025 09:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
refactoring Developer value theme:mqtt Theme: mqtt and mosquitto related topics
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants