Skip to content

feat: tedge connect to c8y mqtt service endpoint #3707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

albinsuresh
Copy link
Contributor

@albinsuresh albinsuresh commented Jun 25, 2025

Proposed changes

  • Specification (with multiple approaches) for connecting thin-edge to the new Cumulocity MQTT service endpoint.
  • Impl finalised approach (connect to mqtt service along with core mqtt)
  • mosquitto bridge support
  • builtin bridge support

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue


Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s. You can activate automatic signing by running just prepare-dev once)
  • I ran just format as mentioned in CODING_GUIDELINES
  • I used just check as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

Copy link

codecov bot commented Jun 25, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

📢 Thoughts on this report? Let us know!

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@albinsuresh albinsuresh force-pushed the feat/c8y-mqtt-service-connect branch from a332515 to 6e25aff Compare June 26, 2025 06:34
@albinsuresh albinsuresh marked this pull request as ready for review June 26, 2025 06:34
Comment on lines +83 to +84
The tenant id which must be passed in the username is retrieved using the `https://<c8y.url>/tenant/currentTenant` REST API.
An alternative is to take this as an additional `c8y.tenant_id` configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting the tenant id using C8Y REST API requires to be already connected to C8Y. Using an additional c8y.tenant_id settings imposes less constraints. Also, can the tenant id be derived from C8Y url? At least in the usual cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting the tenant id using C8Y REST API requires to be already connected to C8Y

By "connected to C8Y" if you meant the core mqtt bridge connection, then no, as the proposal was to use the direct REST API here, as we do for device creation in the existing tedge connect c8y. But, if you meant "network connection to C8Y", then yes, that's the primary limitation of this choice. We won't be able to do an "offline connect", using this mechanism.

Also, can the tenant id be derived from C8Y url?

I don't think we can do this reliably, esp with tenant aliases. For example, when I connect to my c8y tenant, I use the aliased url albin.latest.c8y.io when the real tenant id is something like t9836373. And the mqtt service expects the real tenant id in the username and and not the alias.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "connected to C8Y" if you meant the core mqtt bridge connection, then no, as the proposal was to use the direct REST API here, as we do for device creation in the existing tedge connect c8y.

My point is related to this use of a direct REST API call. One needs to be authenticated.

  • For password based authentication, things are okay as a password is given along the device id.
  • But for cert based authentication, one needs to fetch a JWT token using the regular C8Y MQTT connection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The /tenant/loginOptions is an endpoint that doesn't require authentication, however you'd have to parse a bit more information from it to check potentially get the tenant id.

Copy link
Contributor Author

@albinsuresh albinsuresh Jul 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced the tedge config: c8y.tenant_id for the same in 9979965, instead of the REST call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could c8y.tenant_id be computed when not set?

Having an explicit setting for c8y.tenant_id is simpler. However as a user this leads to something a bit surprising.

Simply enabling the mqtt_service breaks tedge connect c8y --test because c8y.tenant_id is not set.

$ tedge connect c8y --test          
test connection to Cumulocity cloud.:
        device id: demo-1.5
        cloud profile: <none>
        cloud host: didier.latest.stage.c8y.io:8883
        auth type: Certificate
        certificate file: /etc/tedge/device-certs/tedge-certificate.pem
        cryptoki: off
        bridge: mosquitto
        service manager: systemd
        proxy: Not configured
Verifying device is connected to cloud... ✓
Checking Cumulocity is connected to intended tenant... ✓
Connection check to c8y cloud is successful.

$ tedge config set c8y.mqtt_service.enabled true

$ tedge connect c8y --test                      
Error: failed to test connection to Cumulocity cloud.

Caused by:
    A value for 'c8y.tenant_id' is missing.
        A value can be set with `tedge config set c8y.tenant_id <value>`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Not aware of any mechanisms to derive it statically from other config fields, other than the dynamic methods mentioned above like fetching it via REST. We could fall back to those methods when this value is not set.

That being said, in the context of the issue that you highlighted about c8y --test, we need to define what that --test even means for the mqtt service endpoint and how/what we can test.

@reubenmiller
Copy link
Contributor

reubenmiller commented Jun 26, 2025

Given that there is a clear vision that the Cumulocity mqtt-service will replace the MQTT core functionality, then approach 1 is probably the best approach for delivering the immediate "mqtt-service support" feature. As it would also add a smooth transition to only using the mqtt-service without changing much on the device side, e.g. tedge connect c8y would still be the same UX but it would just do something differently behind the scenes.

Though I still think we should explore the idea of support the connection to any MQTT broker by allowing option 2, but we could do that specific stuff in a different feature implementation.

Where a generic feature could look very similar to what is being proposed, by you can also specify a "type"

[bridge.custom]     # where "custom" is the unique id of the mapper
type = "generic | c8y (mqtt-core) | c8y-mqtt-service | aws | az"        # type control default mapping rules and any type specific connection logic
device.key_path = ""
device.cert_path = ""
bridge.username = "${cert.Issuer.CN}"   # reference meta info from the device's certificate
bridge.topic_prefix = "${bridge.id}"    # reference its own id, custom

# control which subscriptions the bridge will subscribe to
remote_subscriptions = [
    "3"
]

# type specific settings (e.g. for the c8y proxy)
c8y.proxy.bind.port = 8001

# future properties
pipelines_dir = "/etc/tedge/pipelines/${bridge.id}"      # specify which mapping/filtering rules should be used (once we have a generic mapper)

@albinsuresh albinsuresh force-pushed the feat/c8y-mqtt-service-connect branch from 6e25aff to 9979965 Compare July 3, 2025 08:31
@albinsuresh albinsuresh changed the title spec: c8y mqtt service connection spec feat: tedge connect to c8y mqtt service endpoint Jul 3, 2025
@albinsuresh albinsuresh requested a review from a team as a code owner July 4, 2025 13:36
Execute Command tedge config set c8y.tenant_id t37070943
Execute Command tedge config set c8y.mqtt_service.enabled true
Execute Command tedge config set c8y.mqtt_service.topics 'sub/topic,demo/topic'
Execute Command tedge connect c8y
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can I check that the connection to the MQTT service is actually established?

  • Neither tedge connect c8y nor tedge connect c8y --test tells a word about the MQTT service and status.
  • I tried on purpose to break the config, no errors are returned by tedge connect.

Good. The bridge status published on te/device/main/service/mosquitto-c8y-mqtt-bridge/status/health is working and reports broken as well as correct settings.

=> I would add a test assertion that a status message of 1 is received on this topic.

Execute Command tedge config set mqtt.bridge.built_in true
Execute Command tedge config set c8y.tenant_id t37070943
Execute Command tedge config set c8y.mqtt_service.enabled true
Execute Command tedge config set c8y.mqtt_service.topics 'sub/topic,demo/topic'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's something really weird with multiple subscriptions on MQTT service. MQTT service is rejecting subscription requests with multiple topics from the built-in bridge, which is why this test case was failing. Here is the same reproduced with mosquitto_sub client:

$ mosquitto_sub -v -h albin.latest.stage.c8y.io -p 9883 -i albin-123 --key device.key --cert device.crt --cafile c8y-root.crt -u t37070943 -t sub/topic -t demo/topic
All subscription requests were denied.

Once you limit the subscription to a single topic, it works. This test also works when there is only one subscription topic.

But to my surprise, the mosquitto bridge with multiple subscriptions works just fine (as done in the first test case).

}

// Templates
tc.forward_from_local("#", local_prefix.clone(), "")?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this bridge rule triggers infinite loops. This forward_from_local rule bridges anything received on c8y-mqtt/# to the cloud and the above forward_from_remote rule bridges anything received on the subscribed topics to local topics with the c8y/ prefix. And this creates an infinite loop when a message is received from the cloud.

But again, the mosquitto bridge detects and handles these cycles somehow, but the built-in bridge doesn't. So, either we should improve the built-in bridge to handle the same, or have either one of the following rules enforced for the mqtt service bridge:

  1. Use different prefixes to bridge outbound and inbound messages
  2. Do not bridge all published messages on c8y-mqtt/#, but make the user explicitly specify the publish topics to be bridged as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some deduplication logic we already have for the AWS shadow topics. Unfortunately there isn't a nice way to naturally avoid loops with MQTT v3 since there is no way to opt out of receiving messages you published (assuming you are subscribed to the relevant topic). So to detect and avoid the loops, the bridge needs to know the topic is bidirectional, then cache the relevant publish packet until we receive the message back from the broker, at which point we can immediately acknowledge the message and not forward it to the cloud.

We can probably fix things in this (relatively) simple case by telling the bridge to treat the cloud topics as bidirectional but not (directly) subscribe to them locally. For testing this works against mosquitto, forwarding the cloud topics bidirectionally (using tc.forward_bidirectionally) and subscribing separately to # should be fine, since mosquitto will essentially ignore the double subscription (but this would be problematic with nanomq).

In short, provided we don't have complex overlapping rules, it should be quite possible to make this just work.

Copy link
Contributor Author

@albinsuresh albinsuresh Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the bridge needs to know the topic is bidirectional, then cache the relevant publish packet until we receive the message back from the broker, at which point we can immediately acknowledge the message and not forward it to the cloud.

That sounds fine. But, to be on the safer side with that caching (esp with half open connections), it'd be better if the cached entries have a time-to-live as well, post-which they are cleared whether or not the duplicate is received from the cloud or not. Sure, this might lead to a late duplicate message getting resent to the cloud once more, but that should settle after a few loops once the network stabilises.

We can probably fix things in this (relatively) simple case by telling the bridge to treat the cloud topics as bidirectional but not (directly) subscribe to them locally

I didn't understand that proposal. How does a bidirectional subscription to the same topic (e.g: demo/topic) both on the cloud and the local help? Esp since the expectation on the local broker is to bridge messages on c8y-mqtt/demo/topic (included in c8y-mqtt/#) and not demo/topic directly. Could you explain where/how the loop is broken when a message is received first from the cloud on demo/topic?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do a bidirectional forward, this tells the bridge to break loops on those messages. It can have different prefixes on local and cloud topics, which is exactly how the AWS shadow topic works at the moment.

Because the bridge knows in advance the topic will cause a loop, it can cache the published message when it forwards to local/cloud and then use that cached message to ignore the same message when the bridge receives the message from the target broker.

The only difference from the AWS shadow topics is that we have a wildcard subscription on local and not the cloud so if we add bidirectional rules to perform de-duplication for the cloud topics, we would then subscribe to c8y-mqtt/# on the local broker and duplicate all of those subscriptions. This isn't going to cause a problem with mosquitto, so should be a valid way to verify that the existing logic works, but we will need to ensure we don't have duplicate subscriptions in the final version as that will misbehave with NanoMQ.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Use different prefixes to bridge outbound and inbound messages

I see this as the simplest and more robust route.

Copy link
Contributor

@reubenmiller reubenmiller Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But still using a common prefix to group the c8y-mqtt messages together, but just adding two nested topics:

  • c8y-mqtt/in
  • c8y-mqtt/out


/// MQTT service endpoint for the Cumulocity tenant, with optional port.
#[tedge_config(example = "mqtt.your-tenant.cumulocity.com:9883")]
#[tedge_config(default(from_optional_key = "c8y.url"))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not better derived from c8y.mqtt, which will likely already have the correct domain for MQTT connections (I'm assuming this is the same issue as for standard MQTT connections)?

}

// Templates
tc.forward_from_local("#", local_prefix.clone(), "")?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some deduplication logic we already have for the AWS shadow topics. Unfortunately there isn't a nice way to naturally avoid loops with MQTT v3 since there is no way to opt out of receiving messages you published (assuming you are subscribed to the relevant topic). So to detect and avoid the loops, the bridge needs to know the topic is bidirectional, then cache the relevant publish packet until we receive the message back from the broker, at which point we can immediately acknowledge the message and not forward it to the cloud.

We can probably fix things in this (relatively) simple case by telling the bridge to treat the cloud topics as bidirectional but not (directly) subscribe to them locally. For testing this works against mosquitto, forwarding the cloud topics bidirectionally (using tc.forward_bidirectionally) and subscribing separately to # should be fine, since mosquitto will essentially ignore the double subscription (but this would be problematic with nanomq).

In short, provided we don't have complex overlapping rules, it should be quite possible to make this just work.

@reubenmiller reubenmiller added the theme:c8y Theme: Cumulocity related topics label Jul 15, 2025
Copy link
Contributor

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
654 2 3 656 99.70 1h47m20.425781s

Failed Tests

Name Message ⏱️ Duration Suite
Run shell custom operation for main device and publish the status Matching messages on topic 'c8y/s/us' is greater than maximum. wanted: 2 got: 4 messages: ['504,20096862', '506,20096862,"helloworld1\n"', '504,20096862', '506,20096862,"helloworld1\n"'] 34.781 s Custom Operation Command
Connect to Cumulocity MQTT Service endpoint basic auth tedge connect c8y returned an unexpected exit code stdout: stderr: connect to Cumulocity cloud.: device id: TST_merge_violent_raft cloud profile: <none> cloud host: qaenvironment.eu-latest.cumulocity.com:8883 auth type: Basic credentials path: /etc/tedge/credentials.toml bridge: mosquitto service manager: systemd mosquitto version: 2.0.11 proxy: Not configured Creating device in Cumulocity cloud... ✗ error: Connection error while creating device in Cumulocity: Connection refused, return code: NotAuthorized 39.711 s Tedge Connet Mqtt Service

Comment on lines +40 to +46
1. Configure Cumulocity tenant URL:

<UserContext>

```sh
sudo tedge config set c8y.url $C8Y_URL
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use here c8y.mqtt_service.url and add a note about c8y.mqtt and c8y.url.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
theme:c8y Theme: Cumulocity related topics
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants