|
| 1 | +--- |
| 2 | +title: Generic Mapper |
| 3 | +tags: [Reference, Mappers, Cloud] |
| 4 | +sidebar_position: 2 |
| 5 | +draft: true |
| 6 | +--- |
| 7 | + |
| 8 | +import ProposalBanner from '@site/src/components/ProposalBanner' |
| 9 | + |
| 10 | +<ProposalBanner/> |
| 11 | + |
| 12 | +:::note |
| 13 | +This section is actually a design document. |
| 14 | +It includes a reference guide for the POC, but also proposes a plan toward a generic mapper. |
| 15 | +::: |
| 16 | + |
| 17 | +## Motivation |
| 18 | + |
| 19 | +In theory, %%te%% users can implement customized mappers to transform data published on the MQTT bus |
| 20 | +or to interact with the cloud. In practice, they don't. |
| 21 | +Implementing a mapper is costly while what is provided out-the-box by %%te%% already meets most requirements. |
| 22 | +The need is not to write new mappers but to adapt existing ones. |
| 23 | + |
| 24 | +The aim of the generic mapper it to let users extend and adapt the mappers with their own filtering and mapping rules, |
| 25 | +leveraging the core mapping rules and mapper mechanisms (bridge connections, HTTP proxies, operations). |
| 26 | + |
| 27 | +## Vision |
| 28 | + |
| 29 | +The %%te%% mappers for Cumulocity, Azure, AWS and Collectd are implemented on top of a so-called generic mapper |
| 30 | +which is used to drive all MQTT message transformations. |
| 31 | +- Transformations are implemented as pipelines consuming MQTT messages, feeding a chain of filters and producing MQTT messages. |
| 32 | + - `MQTT sub| filter-1 | filter-2 | ... | filter-n | MQTT pub` |
| 33 | +- A pipeline can combine builtin and user-provided filters. |
| 34 | +- The user can configure all the transformations used by a mapper, |
| 35 | + editing MQTT sources, pipelines, filters and MQTT sinks. |
| 36 | +- By contrast with the current implementation, where the translation of measurements from %%te%% JSON to Cumulocity JSON |
| 37 | + is fully hard-coded, with the generic mapper a user can re-use the core of this transformation while adding customized steps: |
| 38 | + - consuming measurement from a non-standard topic |
| 39 | + - filtering out part of the measurements |
| 40 | + - normalizing units |
| 41 | + - adding units read from some config |
| 42 | + - producing transformed measurements on a non-standard topic. |
| 43 | + |
| 44 | +## POC reference |
| 45 | + |
| 46 | +- The generic mapper loads pipeline and filters stored in `/etc/tedge/gen-mapper/`. |
| 47 | +- A pipeline is defined by a TOML file with `.toml` extension. |
| 48 | +- A filter is defined by a Javascript file with `.js` extension. |
| 49 | +- The definition of pipeline must provide a list of MQTT topics to subscribe to. |
| 50 | + - The pipeline will be feed with all the messages received on these topics. |
| 51 | +- A pipeline definition also provides a list of stages. |
| 52 | + - Each stage is built from a javascript and is possibly given a config (arbitrary json that will be passed to the script) |
| 53 | + - Each stage can also subscribe to a list of MQTT topics (which messages will be passed to the script to update its config) |
| 54 | + |
| 55 | +```toml |
| 56 | +input_topics = ["te/+/+/+/+/m/+"] |
| 57 | + |
| 58 | +stages = [ |
| 59 | + { filter = "add_timestamp.js" }, |
| 60 | + { filter = "drop_stragglers.js", config = { max_delay = 60 } }, |
| 61 | + { filter = "te_to_c8y.js", meta_topics = ["te/+/+/+/+/m/+/meta"] } |
| 62 | +] |
| 63 | +``` |
| 64 | + |
| 65 | +- A filter has to export at least one `process` function. |
| 66 | + - This function is called for each message to be transformed |
| 67 | + - The arguments passed to the function are: |
| 68 | + - The current time as `{ seconds: u64, nanoseconds: u32 }` |
| 69 | + - The message `{ topic: string, payload: string }` |
| 70 | + - The config as read from the pipeline config or updated by the script |
| 71 | + - The function is expected to return zero, one or many transformed messages `[{ topic: string, payload: string }]` |
| 72 | + - An exception can be thrown if the input message cannot be transformed. |
| 73 | +- A filter can also export an `update_config` function |
| 74 | + - This function is called on each message received on the `meta_topics` as defined in the config. |
| 75 | + - The arguments are: |
| 76 | + - The message to be interpreted as a config update `{ topic: string, payload: string }` |
| 77 | + - The current config |
| 78 | + - The returned value (an arbitrary JSON value) is then used as the new config for the filter. |
| 79 | +- A filter can also export a `tick` function |
| 80 | + - This function is called at a regular pace with the current time and config. |
| 81 | + - The filter can then return zero, one or many transformed messages |
| 82 | + - By sharing an internal state between the `process` and `tick` functions, |
| 83 | + the filter can implement aggregations over a time window. |
| 84 | + |
| 85 | +## Ideas and alternatives |
| 86 | + |
| 87 | +### Combine builtin and user-provided filters |
| 88 | + |
| 89 | +### Several kinds of filters |
| 90 | + |
| 91 | +The POC expects the filter to implement a bunch of functions. This gives a quite expressive interface |
| 92 | +(filtering, mapping, splitting, dynamic configuration, aggregation over time windows), but at the cost of some complexity. |
| 93 | + |
| 94 | +- `process(t: Timestamp, msg: Message, config: Json) -> Vec<Message>` |
| 95 | +- `tick(t: Timestamp) -> Vec<Message>` |
| 96 | +- `update_config(msg: Message, config: Json) -> Json` |
| 97 | + |
| 98 | +An alternative is to let the user implement more specific functions with simpler type signatures: |
| 99 | + |
| 100 | +- `filter(msg: Message, config: Json) -> bool` |
| 101 | +- `map(msg: Message, config: Json) -> Message` |
| 102 | +- `filter_map(msg: Message, config: Json) -> Option<Message>` |
| 103 | +- `flat_map(msg: Message, config: Json) -> Vec<Message>` |
| 104 | + |
| 105 | +### Inline definition |
| 106 | + |
| 107 | +### Use JSON for all parameters |
| 108 | + |
| 109 | +### Feed filters with message excerpts as done for the workflows |
| 110 | + |
| 111 | +### Test tools |
| 112 | + |
| 113 | +```shell |
| 114 | +$ tedge mapping test [pipeline.toml | filter.js] topic message |
| 115 | +``` |
| 116 | + |
| 117 | +One should be able to pipe `tedge mqtt sub` and `tedge mapping test` |
| 118 | + |
| 119 | +```shell |
| 120 | +$ tedge mqtt sub 'te/+/+/+/+/m/+' | tedge mapping test te-to-c8y.js |
| 121 | +``` |
0 commit comments