@@ -63,6 +63,7 @@ stages = [
63
63
```
64
64
65
65
- A filter has to export at least one ` process ` function.
66
+ - ` process(t: Timestamp, msg: Message, config: Json) -> Vec<Message> `
66
67
- This function is called for each message to be transformed
67
68
- The arguments passed to the function are:
68
69
- The current time as ` { seconds: u64, nanoseconds: u32 } `
@@ -81,12 +82,34 @@ stages = [
81
82
- The filter can then return zero, one or many transformed messages
82
83
- By sharing an internal state between the ` process ` and ` tick ` functions,
83
84
the filter can implement aggregations over a time window.
85
+ When messages are received they are pushed by the ` process ` function into that state
86
+ and the final outcome is extracted by the ` tick ` function at the end of the time window.
84
87
85
- ## Ideas and alternatives
88
+ ## First release
86
89
87
- ### Combine builtin and user-provided filters
90
+ While the POC provides a generic mapper that is fully independent of the legacy mappers,
91
+ the plan is not to abandon the latter in favor of the former
92
+ but to revisit the legacy mappers to include the ability for users to add their own mapping rules.
88
93
89
- ### Several kinds of filters
94
+ To be lovable, the first release of an extensible mapper should at least:
95
+
96
+ - be a drop-in replacement of the current mapper (for c8y, aws, az or collect)
97
+ - feature the ability to customize MEA processing by combining builtin filters with user-provided functions written in JavaScript
98
+ - provide tools to create, test, monitor and debug filters and pipelines
99
+ - be stable enough that user-defined filters will still work without changes with future releases.
100
+
101
+ To keep things simple for the first release, the following questions are deferred:
102
+
103
+ - Could a generic mapper let users define bridge rules as well as message transformation pipelines?
104
+ - Does it make sense to run such a mapper on child-devices?
105
+ - Could a pipeline send HTTP messages? Or could a filter tell the runtime to send messages over HTTP?
106
+ - How to handle binary payloads on the MQTT bus?
107
+ - Could operations be managed is a similar way with user-provided functions to transform commands?
108
+ - To handle operations, would the plugins be expanded to do more complex things like HTTP calls, file-system interactions, etc.?
109
+ - What are the pros and cons to persist filter states?
110
+ - Split a pipeline, forwarding transformed messages to different pipelines for further processing
111
+
112
+ ### API
90
113
91
114
The POC expects the filter to implement a bunch of functions. This gives a quite expressive interface
92
115
(filtering, mapping, splitting, dynamic configuration, aggregation over time windows), but at the cost of some complexity.
@@ -102,20 +125,44 @@ An alternative is to let the user implement more specific functions with simpler
102
125
- ` filter_map(msg: Message, config: Json) -> Option<Message> `
103
126
- ` flat_map(msg: Message, config: Json) -> Vec<Message> `
104
127
105
- ### Inline definition
106
-
107
- ### Use JSON for all parameters
108
-
109
- ### Feed filters with message excerpts as done for the workflows
110
-
111
- ### Test tools
112
-
113
- ``` shell
114
- $ tedge mapping test [pipeline.toml | filter.js] topic message
115
- ```
116
-
117
- One should be able to pipe ` tedge mqtt sub ` and ` tedge mapping test `
118
-
119
- ``` shell
120
- $ tedge mqtt sub ' te/+/+/+/+/m/+' | tedge mapping test te-to-c8y.js
121
- ```
128
+ One can also rearrange the argument order for these functions,
129
+ making life easier when a transformation does need a config or the current time
130
+ leveraging that one can pass more arguments than declared to a javascript function:
131
+
132
+ - ` process(msg: Message, config: Json, t: Timestamp) -> Vec<Message> `
133
+ - ` process(msg: Message, config: Json) -> Vec<Message> `
134
+ - ` process(msg: Message) -> Vec<Message> `
135
+
136
+ One can even use a bit further the flexibility of javascript, to let the process function freely return:
137
+ - An array of message objects
138
+ - A single message object
139
+ - A null value interpreted as no messages
140
+ - A boolean
141
+
142
+ Other ideas to explore to make the API more flexible:
143
+
144
+ - Interaction with the entity store and tedge config.
145
+ - Allow a pipeline to subscribe to topics related to the device/entity it is running on
146
+ - Feed filters with message excerpts as done for the workflows
147
+
148
+ ### Devops tools
149
+
150
+ The flexibility to customize MQTT message processing with user-provided functions comes with risks:
151
+ - a filter might not behave as expected,
152
+ - pipelines might be overlapping or conflicting, possibly sending duplicate messages or creating infinite loops
153
+ - builtin pipelines might be accidentally disconnected or broken
154
+ - a filter might introduce a performance bottleneck.
155
+
156
+ To help mitigating these risks, the ` tedge mapping ` sub-commands provide the tools to test, monitor and debug filters and pipelines.
157
+
158
+ - ` tedge mapping flow [topic] ` displays pipelines and filters messages received on this topic will flow through
159
+ - can be used with a set of pipelines not configured yet for a mapper
160
+ - ` tedge mapping test [filter] ` feeds a filter or pipeline with input messages and produces the transformed output messages
161
+ - allow users to run an assertion based on the input/output of a filter
162
+ - ability to pipe ` tedge mqtt sub ` and ` tedge mapping test `
163
+ - control of the timestamps
164
+ - test aggregation over ticks
165
+ - can be used with a set of pipelines not configured yet for a mapper
166
+ - ` tedge mapping stats [pipeline] ` returns statistics on the messages processed by a pipeline
167
+ - count message in, message out
168
+ - processing time min, median, max per filter
0 commit comments