Skip to content

Timeout implementation #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Sep 29, 2021
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 1.2.1 (July 23, 2021)
## 1.3.0 (July 23, 2021)
* Implemented support of maester storage in `Re-assembled message` action (maester-client library 3.3.0)

## 1.2.0 (July 9, 2021)
Expand Down
36 changes: 19 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ and the JSONata expression `Phone.{type: number}`, an object constructor, the ac
```Split Property``` - use this field to choose a separator.

### Re-assemble Messages
**(Beta)**

Inverse of the split action: Given a stream of incoming messages that which have
been split apart by a split action (or similar), produce one message once all
message parts have arrived.
Inverse of the split action: Given a stream of incoming messages a sum message is generated.
Has 3 different behaviour variants(options):
* Use Group Size: A message is emitted once the group size is reached for the given group. If arriving messages for a particular group are less than the defined group size then the group is silently discarded.
* Use Timeout: All incomming messages count towards the delay timer. Once no more message is received in this time frame there will be a emitted message for each group.
* Use Group Size and Timeout: Specify both group size and delay timer. Groups that have reached their limit are emitted directly. Beyond that the action behaves as specifed in the line before.

Supported:
* Messages can be re-ordered in the flow
Expand All @@ -103,28 +104,29 @@ Supported:
(e.g. Two overlapping webhook calls arrive and the flow has components where parallel processing > 1.)

Limitations:
* All groups must have one or more messages. (i.e. No groups of size 0).
Can't do re-grouping when a split is done on an empty array. (i.e. No empty for each pattern supported)
* If all messages in a group fail to arrive at the re-assemble action (because one message suffered an error earlier in the flow)
then this component will silently discard the group.
* All messages must arrive within the same container lifetime.
If at any point there is more than a 15 second gap in messages, then the group will be silently discarded.
* All groups must have one or more messages. (i.e. No groups of size 0).
Can't do re-grouping when a split is done on an empty array. (i.e. No empty for each pattern supported).
* All messages must arrive within the same container lifetime.
If all the messages in the group do not arrive, then the group will be silently discarded.
* The group is dropped if there are any unexpected restarts to the container.
* Size of the group must be known by all group members.
* Messages are only emitter when all parts arrive. Emitting a message only when the first part arrives isn't supported.
* In case only a groupSize is given and no delay timer is specified. The size of the group must be known by all group members.
* In case of using the delay timer. Messages are only emitted when all parts arrive. Emitting a message only when the first part arrives isn't supported.
* The delay timer can not exceed 40,000 milliseconds. If more than this maximum is given, then this maximum will be used instead.

#### List of Expected Config fields
```groupSize``` - Number of messages in the group

```groupId``` - Globally unique id for the group to distinguish it from other groups. This value needs to be the same for all messages in a group.

```messageId``` - Id for a message to distinguish it from other messages in the group.
```messageId``` - Id for a message to distinguish it from other messages in the group.
Must be unique per group but does not have to be globally unique. This value needs to be different for all messages in a group.

```messageData``` - object for providing some data derived from the steps between splitting and re-assembling
```messageData``` - Data from individual messages can be inserted here in form of an object. This object is then inserted into an array which is available in the message emitted for this group.

```groupSize``` - Number of messages in the group.

```Delay timer (in ms)``` - Time the process waits when no incoming messages before emiting (Max 40,000 milliseconds)

## Known limitations (common for the component)
No.
None.

## Documentation links
More information and some examples can be found here: [Splitter documentation](https://www.elastic.io/connectors/splitter-integration/).
36 changes: 8 additions & 28 deletions component.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"title": "Splitter",
"version": "1.2.1",
"version": "1.3.0",
"description": "Splits a message into multiple messages.",
"buildType":"docker",
"actions": {
Expand Down Expand Up @@ -42,34 +42,14 @@
"link": "/components/splitter/index.html#re-assemble-messages"
},
"main": "./lib/actions/reassemble.js",
"fields": {},
"metadata": {
"in": {
"type": "object",
"dynamicMetadata": true,
"fields": {
"mode": {
"viewClass": "SelectView",
"label": "Select option",
"required": true,
"properties": {
"groupSize": {
"type": "number",
"required": true,
"title": "Number of messages produced by splitter"
},
"groupId": {
"type": "string",
"required": true,
"title": "Unique ID to describe the group"
},
"messageId": {
"type": "string",
"required": true,
"title": "Unique ID to describe this message"
},
"messageData": {
"title": "Message Data",
"required": false,
"type": "object",
"properties": {}
}
}
"model": {"groupSize": "Use Group Size", "timeout": "Use Timeout", "groupSize&timeout": "Use Group Size and Timeout"},
"prompt": "Select type"
}
}
}
Expand Down
255 changes: 238 additions & 17 deletions lib/actions/reassemble.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@
const { messages } = require('elasticio-node');
const ObjectStorageWrapperExtended = require('./utils-wrapper/ObjectStorageWrapperExtended');

async function processAction(msg) {
let timeHandle;
let groupList = [];
let groupElements = [{ groupSize: undefined, groupId: undefined, messageData: undefined }];
let delay;

async function timer(this_) {
for (let i = 1; i < groupElements.length; i += 1) {
this_.emit('data', messages.newMessageWithBody(groupElements[i]));
}
}

async function processAction(msg, cfg) {
const { mode } = cfg;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const { mode } = cfg;
const { mode = 'groupSize'} = cfg;

One last change: We should default to having the mode be set to group size in order to ensure backwards compatibility.

const storage = new ObjectStorageWrapperExtended(this);
const {
groupSize,
Expand All @@ -17,13 +29,19 @@ async function processAction(msg) {
messageData,
};

if (groupSize <= 0) {
throw new Error('Size must be a positive integer.');
}
if (!messageData) {
incomingData[messageId] = undefined;
}

if (mode === 'groupSize') {
if (groupSize <= 0) {
throw new Error('Size must be a positive integer.');
}
}
if (mode === 'timeout') {
if (msg.body.timersec <= 0) {
throw new Error('Delay timer must be a positive integer.');
}
}
const {
messageGroup,
messageGroupId,
Expand Down Expand Up @@ -54,19 +72,222 @@ async function processAction(msg) {
Currently the group has ${messagesNumberSeen} of ${messageGroupSize} message(s).`,
);

if (messagesNumberSeen >= messageGroupSize) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
// when group sized is defined || when both group size and delay timer are defined
if (mode === 'groupSize') {
if (messagesNumberSeen >= messageGroupSize) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
await this.emit('data', messages.newMessageWithBody({
groupSize,
groupId,
messageData: incomingData,
}));
await storage.deleteObjectById(messageGroupId);
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
}
}

// When delay timer option is selected
if (mode === 'timeout') {
// delay timer
delay = msg.body.timersec;
delay = (msg.body.timersec >= 40000) ? 40000 : msg.body.timersec;
clearTimeout(timeHandle);
timeHandle = setTimeout(timer, delay, this);

await this.emit('data', messages.newMessageWithBody({
groupSize,
groupId,
messageData: incomingData,
}));
await storage.deleteObjectById(messageGroupId);
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
if (groupList.includes(groupId)) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
// update incoming messageData in current group
const currentGroup = groupElements.filter((x) => x.groupId === groupId);
currentGroup[0].groupSize = messagesNumberSeen;
currentGroup[0].messageData = incomingData;
groupElements = groupElements.filter((x) => x.groupId !== groupId);
groupElements.push(currentGroup[0]);
} else {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
groupList.push(groupId);
groupElements.push({ groupSize: messagesNumberSeen, groupId, messageData: incomingData });
}
}

// When both groupSize and delay timer option is selected
if (mode === 'groupSize&timeout') {
delay = msg.body.timersec;
delay = (msg.body.timersec >= 40000) ? 40000 : msg.body.timersec;
clearTimeout(timeHandle);
timeHandle = setTimeout(timer, delay, this);

if (groupList.includes(groupId)) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
// update incoming messageData in current group
const currentGroup = groupElements.filter((x) => x.groupId === groupId);
currentGroup[0].groupSize = messagesNumberSeen;
currentGroup[0].messageData = incomingData;
groupElements = groupElements.filter((x) => x.groupId !== groupId);
groupElements.push(currentGroup[0]);
} else {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
groupList.push(groupId);
groupElements.push({ groupSize: messagesNumberSeen, groupId, messageData: incomingData });
}
if (messagesNumberSeen >= messageGroupSize) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});

await this.emit('data', messages.newMessageWithBody({
groupSize,
groupId,
messageData: incomingData,
}));
await storage.deleteObjectById(messageGroupId);
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
groupList = groupList.filter((def) => def !== groupId);
groupElements = groupElements.filter((def) => def.groupId !== groupId);
}
}
}

exports.process = processAction;
//-------------------------------------------------------------------------------------
// Dynamic drop-down logic starts hier

async function getMetaModel(cfg) {
if (cfg.mode === 'groupSize') {
return ({
in: {
type: 'object',
required: true,
properties: {
groupId: {
type: 'string',
required: true,
title: 'Unique ID to describe the group',
order: 5,
},
messageId: {
type: 'string',
required: true,
title: 'Unique ID to describe this message',
order: 4,
},
groupSize: {
type: 'number',
required: true,
title: 'Number of messages produced by splitter',
order: 3,
},
messageData: {
title: 'Message Data',
required: false,
type: 'object',
properties: {},
order: 2,
},
},
},
out: {
type: 'object',
},
});
}
if (cfg.mode === 'timeout') {
return ({
in: {
type: 'object',
required: true,
properties: {
groupId: {
type: 'string',
required: true,
title: 'Unique ID to describe the group',
order: 5,
},
messageId: {
type: 'string',
required: true,
title: 'Unique ID to describe this message',
order: 4,
},
timersec: {
type: 'number',
required: true,
help: {
description: 'Time the process waits when no incoming messages before emiting(Default 20000 miliseconds)',
},
title: 'Delay timer(in ms)',
order: 3,
},
messageData: {
title: 'Message Data',
required: false,
type: 'object',
properties: {},
order: 2,
},
},
},
out: {
type: 'object',
},
});
}
if (cfg.mode === 'groupSize&timeout') {
return ({
in: {
type: 'object',
required: true,
properties: {
groupId: {
type: 'string',
required: true,
title: 'Unique ID to describe the group',
order: 5,
},
messageId: {
type: 'string',
required: true,
title: 'Unique ID to describe this message',
order: 4,
},
groupSize: {
type: 'number',
title: 'Number of messages produced by splitter',
order: 3,
},
timersec: {
type: 'number',
help: {
description: 'Time the process waits when no incoming messages before emiting(Default 20000 miliseconds)',
},
title: 'Delay timer(in ms)',
order: 2,
},
messageData: {
title: 'Message Data',
required: false,
type: 'object',
properties: {},
order: 1,
},
},
},
out: {
type: 'object',
},
});
}
return true;
}
module.exports = {
process: processAction,
getMetaModel,
};
Loading