Skip to content

Commit 6a8e17d

Browse files
authored
Timeout implementation (#72)
Add timeout implementation
1 parent 24bf657 commit 6a8e17d

File tree

6 files changed

+326
-4548
lines changed

6 files changed

+326
-4548
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 1.3.0 (October 1, 2021)
2+
* Update `Re-assembled message` action: Implemented option to select processing behavior of incoming messages.
3+
14
## 1.2.1 (July 23, 2021)
25
* Implemented support of maester storage in `Re-assembled message` action (maester-client library 3.3.0)
36

README.md

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,14 @@ and the JSONata expression `Phone.{type: number}`, an object constructor, the ac
9090
```Split Property``` - use this field to choose a separator.
9191

9292
### Re-assemble Messages
93-
**(Beta)**
9493

95-
Inverse of the split action: Given a stream of incoming messages that which have
96-
been split apart by a split action (or similar), produce one message once all
97-
message parts have arrived.
94+
Inverse of the split action: Given a stream of incoming messages a sum message is generated.
95+
96+
#### List of Expected Config fields
97+
```Behavior``` - Has 3 different behaviour variants(options):
98+
* Produce Groups of Fixed Size (Don't Emit Partial Groups): A message is emitted once the group size is reached for the given group. If arriving messages for a particular group are less than the defined group size then the group will not be emitted.
99+
* Group All Incoming Messages: All incomming messages will be gathered until there are no more incoming messages at which point messages will be emitted for each group.
100+
* Produce Groups of Fixed Size (Emit Partial Groups): Specify both group size and delay timer. Once a group is complete, that group will be emitted. Once there are no more incoming messages, then partially completed groups will also be emitted.
98101

99102
Supported:
100103
* Messages can be re-ordered in the flow
@@ -103,28 +106,28 @@ Supported:
103106
(e.g. Two overlapping webhook calls arrive and the flow has components where parallel processing > 1.)
104107

105108
Limitations:
106-
* All groups must have one or more messages. (i.e. No groups of size 0).
107-
Can't do re-grouping when a split is done on an empty array. (i.e. No empty for each pattern supported)
108-
* If all messages in a group fail to arrive at the re-assemble action (because one message suffered an error earlier in the flow)
109-
then this component will silently discard the group.
110-
* All messages must arrive within the same container lifetime.
111-
If at any point there is more than a 15 second gap in messages, then the group will be silently discarded.
109+
* All groups must have one or more messages. (i.e. No groups of size 0).
110+
Can't do re-grouping when a split is done on an empty array. (i.e. No empty for each pattern supported).
111+
If all the messages in the group do not arrive, then the group will not be emitted.
112112
* The group is dropped if there are any unexpected restarts to the container.
113-
* Size of the group must be known by all group members.
114-
* Messages are only emitter when all parts arrive. Emitting a message only when the first part arrives isn't supported.
113+
* In case only a groupSize is given and no delay timer is specified. The size of the group must be known by all group members.
114+
* In case of using the delay timer. Messages are only emitted when all parts arrive. Emitting a message only when the first part arrives isn't supported.
115+
* The delay timer can not exceed 40,000 milliseconds. If more than this maximum is given, then this maximum will be used instead.
115116

116117
#### List of Expected Config fields
117-
```groupSize``` - Number of messages in the group
118-
119118
```groupId``` - Globally unique id for the group to distinguish it from other groups. This value needs to be the same for all messages in a group.
120119

121-
```messageId``` - Id for a message to distinguish it from other messages in the group.
120+
```messageId``` - Id for a message to distinguish it from other messages in the group.
122121
Must be unique per group but does not have to be globally unique. This value needs to be different for all messages in a group.
123122

124-
```messageData``` - object for providing some data derived from the steps between splitting and re-assembling
123+
```messageData``` - Data from individual messages can be inserted here in form of an object. This object is then inserted into an array which is available in the message emitted for this group.
124+
125+
```groupSize``` - Number of messages in the group.
126+
127+
```Delay timer (in ms)``` - Time the process waits when no incoming messages before emiting (Max 40,000 milliseconds)
125128

126129
## Known limitations (common for the component)
127-
No.
130+
None.
128131

129132
## Documentation links
130133
More information and some examples can be found here: [Splitter documentation](https://www.elastic.io/connectors/splitter-integration/).

component.json

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"title": "Splitter",
3-
"version": "1.2.1",
3+
"version": "1.3.0",
44
"description": "Splits a message into multiple messages.",
55
"buildType":"docker",
66
"actions": {
@@ -42,34 +42,14 @@
4242
"link": "/components/splitter/index.html#re-assemble-messages"
4343
},
4444
"main": "./lib/actions/reassemble.js",
45-
"fields": {},
46-
"metadata": {
47-
"in": {
48-
"type": "object",
45+
"dynamicMetadata": true,
46+
"fields": {
47+
"mode": {
48+
"viewClass": "SelectView",
49+
"label": "Behavior",
4950
"required": true,
50-
"properties": {
51-
"groupSize": {
52-
"type": "number",
53-
"required": true,
54-
"title": "Number of messages produced by splitter"
55-
},
56-
"groupId": {
57-
"type": "string",
58-
"required": true,
59-
"title": "Unique ID to describe the group"
60-
},
61-
"messageId": {
62-
"type": "string",
63-
"required": true,
64-
"title": "Unique ID to describe this message"
65-
},
66-
"messageData": {
67-
"title": "Message Data",
68-
"required": false,
69-
"type": "object",
70-
"properties": {}
71-
}
72-
}
51+
"model": {"groupSize": "Produce Groups of Fixed Size (Don't Emit Partial Groups)", "timeout": "Group All Incoming Messages", "groupSize&timeout": "Produce Groups of Fixed Size (Emit Partial Groups)"},
52+
"prompt": "Select behavior"
7353
}
7454
}
7555
}

lib/actions/reassemble.js

Lines changed: 221 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,33 @@
22
const { messages } = require('elasticio-node');
33
const ObjectStorageWrapperExtended = require('./utils-wrapper/ObjectStorageWrapperExtended');
44

5-
async function processAction(msg) {
5+
let timeHandle;
6+
let groupList = [];
7+
let delay;
8+
9+
async function timer(this_) {
10+
for (let i = 0; i < groupList.length; i += 1) {
11+
const storage = new ObjectStorageWrapperExtended(this_);
12+
// eslint-disable-next-line no-await-in-loop
13+
const results = await storage.lookupParsedObjectById(groupList[i]);
14+
const incomingData = {};
15+
results.messages.forEach((message) => {
16+
incomingData[message.messageId] = message.messageData;
17+
});
18+
19+
// eslint-disable-next-line no-await-in-loop
20+
await this_.emit('data', messages.newMessageWithBody({
21+
groupSize: results.messages.length,
22+
groupId: results.messages[0].groupId,
23+
messageData: incomingData,
24+
}));
25+
// eslint-disable-next-line no-await-in-loop
26+
await storage.deleteObjectById(groupList[i]);
27+
}
28+
}
29+
30+
async function processAction(msg, cfg) {
31+
const { mode = 'groupSize' } = cfg;
632
const storage = new ObjectStorageWrapperExtended(this);
733
const {
834
groupSize,
@@ -17,13 +43,19 @@ async function processAction(msg) {
1743
messageData,
1844
};
1945

20-
if (groupSize <= 0) {
21-
throw new Error('Size must be a positive integer.');
22-
}
2346
if (!messageData) {
2447
incomingData[messageId] = undefined;
2548
}
26-
49+
if (mode === 'groupSize') {
50+
if (groupSize <= 0) {
51+
throw new Error('Size must be a positive integer.');
52+
}
53+
}
54+
if (mode === 'timeout') {
55+
if (msg.body.timersec <= 0) {
56+
throw new Error('Delay timer must be a positive integer.');
57+
}
58+
}
2759
const {
2860
messageGroup,
2961
messageGroupId,
@@ -54,19 +86,191 @@ async function processAction(msg) {
5486
Currently the group has ${messagesNumberSeen} of ${messageGroupSize} message(s).`,
5587
);
5688

57-
if (messagesNumberSeen >= messageGroupSize) {
58-
parsedMessageGroup.messages.forEach((message) => {
59-
incomingData[message.messageId] = message.messageData;
60-
});
89+
// when grouSized option is selected
90+
if (mode === 'groupSize') {
91+
if (messagesNumberSeen >= messageGroupSize) {
92+
parsedMessageGroup.messages.forEach((message) => {
93+
incomingData[message.messageId] = message.messageData;
94+
});
95+
await this.emit('data', messages.newMessageWithBody({
96+
groupSize,
97+
groupId,
98+
messageData: incomingData,
99+
}));
100+
await storage.deleteObjectById(messageGroupId);
101+
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
102+
}
103+
}
61104

62-
await this.emit('data', messages.newMessageWithBody({
63-
groupSize,
64-
groupId,
65-
messageData: incomingData,
66-
}));
67-
await storage.deleteObjectById(messageGroupId);
68-
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
105+
// When delay timer option is selected
106+
if (mode === 'timeout') {
107+
delay = (msg.body.timersec >= 40000) ? 40000 : msg.body.timersec;
108+
clearTimeout(timeHandle);
109+
timeHandle = setTimeout(timer, delay, this);
110+
if (!groupList.includes(messageGroupId)) {
111+
groupList.push(messageGroupId);
112+
}
113+
}
114+
115+
// When both groupSize and delay timer option is selected
116+
if (mode === 'groupSize&timeout') {
117+
delay = (msg.body.timersec >= 40000) ? 40000 : msg.body.timersec;
118+
clearTimeout(timeHandle);
119+
timeHandle = setTimeout(timer, delay, this);
120+
121+
if (!groupList.includes(messageGroupId)) {
122+
groupList.push(messageGroupId);
123+
}
124+
125+
if (messagesNumberSeen >= messageGroupSize) {
126+
parsedMessageGroup.messages.forEach((message) => {
127+
incomingData[message.messageId] = message.messageData;
128+
});
129+
130+
await this.emit('data', messages.newMessageWithBody({
131+
groupSize,
132+
groupId,
133+
messageData: incomingData,
134+
}));
135+
await storage.deleteObjectById(messageGroupId);
136+
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
137+
groupList = groupList.filter((def) => def !== messageGroupId);
138+
}
69139
}
70140
}
71141

72-
exports.process = processAction;
142+
//-------------------------------------------------------------------------------------
143+
// Dynamic drop-down logic starts hier
144+
145+
async function getMetaModel(cfg) {
146+
if (cfg.mode === 'groupSize') {
147+
return ({
148+
in: {
149+
type: 'object',
150+
required: true,
151+
properties: {
152+
groupId: {
153+
type: 'string',
154+
required: true,
155+
title: 'Unique ID to describe the group',
156+
order: 5,
157+
},
158+
messageId: {
159+
type: 'string',
160+
required: true,
161+
title: 'Unique ID to describe this message',
162+
order: 4,
163+
},
164+
groupSize: {
165+
type: 'number',
166+
required: true,
167+
title: 'Number of messages produced by splitter',
168+
order: 3,
169+
},
170+
messageData: {
171+
title: 'Message Data',
172+
required: false,
173+
type: 'object',
174+
properties: {},
175+
order: 2,
176+
},
177+
},
178+
},
179+
out: {
180+
type: 'object',
181+
},
182+
});
183+
}
184+
if (cfg.mode === 'timeout') {
185+
return ({
186+
in: {
187+
type: 'object',
188+
required: true,
189+
properties: {
190+
groupId: {
191+
type: 'string',
192+
required: true,
193+
title: 'Unique ID to describe the group',
194+
order: 5,
195+
},
196+
messageId: {
197+
type: 'string',
198+
required: true,
199+
title: 'Unique ID to describe this message',
200+
order: 4,
201+
},
202+
timersec: {
203+
type: 'number',
204+
required: false,
205+
help: {
206+
description: 'Time the process waits when no incoming messages before emiting(Default 20000 miliseconds)',
207+
},
208+
title: 'Delay timer(in ms)',
209+
order: 3,
210+
},
211+
messageData: {
212+
title: 'Message Data',
213+
required: false,
214+
type: 'object',
215+
properties: {},
216+
order: 2,
217+
},
218+
},
219+
},
220+
out: {
221+
type: 'object',
222+
},
223+
});
224+
}
225+
if (cfg.mode === 'groupSize&timeout') {
226+
return ({
227+
in: {
228+
type: 'object',
229+
required: true,
230+
properties: {
231+
groupId: {
232+
type: 'string',
233+
required: true,
234+
title: 'Unique ID to describe the group',
235+
order: 5,
236+
},
237+
messageId: {
238+
type: 'string',
239+
required: true,
240+
title: 'Unique ID to describe this message',
241+
order: 4,
242+
},
243+
groupSize: {
244+
type: 'number',
245+
title: 'Number of messages produced by splitter',
246+
order: 3,
247+
},
248+
timersec: {
249+
type: 'number',
250+
required: false,
251+
help: {
252+
description: 'Time the process waits when no incoming messages before emiting(Default 20000 miliseconds)',
253+
},
254+
title: 'Delay timer(in ms)',
255+
order: 2,
256+
},
257+
messageData: {
258+
title: 'Message Data',
259+
required: false,
260+
type: 'object',
261+
properties: {},
262+
order: 1,
263+
},
264+
},
265+
},
266+
out: {
267+
type: 'object',
268+
},
269+
});
270+
}
271+
return false;
272+
}
273+
module.exports = {
274+
process: processAction,
275+
getMetaModel,
276+
};

0 commit comments

Comments
 (0)