Skip to content

Timeout implementation #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Sep 29, 2021
5 changes: 1 addition & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
## 1.2.2 (August 25, 2021)
* Added delay timer property in re-assemble action.

## 1.2.1 (July 23, 2021)
## 1.3.1 (July 23, 2021)
* Implemented support of maester storage in `Re-assembled message` action (maester-client library 3.3.0)

## 1.2.0 (July 9, 2021)
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ and the JSONata expression `Phone.{type: number}`, an object constructor, the ac
### Re-assemble Messages

Inverse of the split action: Given a stream of incoming messages a sum message is generated.
Has 3 different behaviour variants:
* Only specify group size and no delay timer. A message is emitted once the group size is reached for the given group. Should less message then the given group size arrive, then the group is silently discarded.
* Only specify delay timer and no group size. All incomming messages count towards the delay timer. Once no more message is received in this time frame there will be a emitted message for each group.
* Specify both group size and delay timer. Groups that have reached their limit are emitted directly. Beyond that the action behaves as specifed in the line before.
Has 3 different behaviour variants(options):
* Use Group Size: A message is emitted once the group size is reached for the given group. If arriving messages for a particular group are less than the defined group size then the group is silently discarded.
* Use Timeout: All incomming messages count towards the delay timer. Once no more message is received in this time frame there will be a emitted message for each group.
* Use Group Size and Timeout: Specify both group size and delay timer. Groups that have reached their limit are emitted directly. Beyond that the action behaves as specifed in the line before.

Supported:
* Messages can be re-ordered in the flow
Expand All @@ -107,7 +107,7 @@ Limitations:
* All groups must have one or more messages. (i.e. No groups of size 0).
Can't do re-grouping when a split is done on an empty array. (i.e. No empty for each pattern supported).
* All messages must arrive within the same container lifetime.
If at any point there is more than a 15 second gap in messages, then the group will be silently discarded.
If all the messages in the group do not arrive, then the group will be silently discarded.
* The group is dropped if there are any unexpected restarts to the container.
* In case only a groupSize is given and no delay timer is specified. The size of the group must be known by all group members.
* In case of using the delay timer. Messages are only emitted when all parts arrive. Emitting a message only when the first part arrives isn't supported.
Expand Down
248 changes: 210 additions & 38 deletions lib/actions/reassemble.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ async function timer(this_) {
}
}

async function processAction(msg) {
async function processAction(msg,cfg) {
const mode = cfg.mode
const storage = new ObjectStorageWrapperExtended(this);
const {
groupSize,
Expand All @@ -30,9 +31,16 @@ async function processAction(msg) {
if (!messageData) {
incomingData[messageId] = undefined;
}
if (groupSize <= 0) {
throw new Error('Size must be a positive integer.');
}
if(mode == 'groupSize'){
if (groupSize <= 0) {
throw new Error('Size must be a positive integer.');
}
};
if(mode == 'timeout'){
if (msg.body.timersec <= 0) {
throw new Error('Delay timer must be a positive integer.');
}
};
const {
messageGroup,
messageGroupId,
Expand Down Expand Up @@ -64,45 +72,25 @@ async function processAction(msg) {
);

// when group sized is defined || when both group size and delay timer are defined
if(messageGroupSize !== '' && messageGroupSize !== undefined){
if(groupList.includes(groupId)){
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
for(var key in groupElements){
if(groupElements[key].groupId === groupId){
groupElements[key].groupSize == messagesNumberSeen
groupElements[key].messageData == incomingData
}
}
}
else{
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
groupList.push(groupId)
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
}
if(mode == 'groupSize'){
if (messagesNumberSeen >= messageGroupSize) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});

await this.emit('data', messages.newMessageWithBody({
groupSize,
groupId,
messageData: incomingData,
}));
await storage.deleteObjectById(messageGroupId);
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
groupList = groupList.filter(def => def != groupId);
groupElements = groupElements.filter(def => def.groupId != groupId);
}
}
// When delay timer is defined and no group size defined
if(messageGroupSize === undefined && msg.body.timersec !== undefined){

// in case no delay was defined
// When delay timer option is selected
if(mode == 'timeout'){

// delay timer
var delay = msg.body.timersec
if(delay >= 40000){delay == 40000}
clearTimeout(timeHandle);
Expand All @@ -112,12 +100,11 @@ async function processAction(msg) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
for(var key in groupElements){
if(groupElements[key].groupId === groupId){
groupElements[key].groupSize == messagesNumberSeen
groupElements[key].messageData == incomingData
}
}
var currentGroup = groupElements.filter(function(x) { return x.groupId == groupId; }); // update incoming messageData in current group
currentGroup[0].groupSize = messagesNumberSeen
currentGroup[0].messageData = incomingData
groupElements = groupElements.filter(function(x) { return x.groupId != groupId; });
groupElements.push(currentGroup[0]);
}
else{
parsedMessageGroup.messages.forEach((message) => {
Expand All @@ -127,9 +114,194 @@ async function processAction(msg) {
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
}
}
if(messageGroupSize === undefined && msg.body.timersec === undefined){
throw new Error('Either group size or delay timer need to be defined');

// When both groupSize and delay timer option is selected
if(mode == 'groupSize&timeout'){

var delay = msg.body.timersec
if(delay >= 40000){delay == 40000}
clearTimeout(timeHandle);
timeHandle = setTimeout(timer, delay, this);

if(groupList.includes(groupId)){
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
var currentGroup = groupElements.filter(function(x) { return x.groupId == groupId; }); // update incoming messageData in current group
currentGroup[0].groupSize = messagesNumberSeen
currentGroup[0].messageData = incomingData
groupElements = groupElements.filter(function(x) { return x.groupId != groupId; });
groupElements.push(currentGroup[0]);
}
else{
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});
groupList.push(groupId)
groupElements.push({groupSize: messagesNumberSeen, groupId: groupId, messageData:incomingData})
}
if (messagesNumberSeen >= messageGroupSize) {
parsedMessageGroup.messages.forEach((message) => {
incomingData[message.messageId] = message.messageData;
});

await this.emit('data', messages.newMessageWithBody({
groupSize,
groupId,
messageData: incomingData,
}));
await storage.deleteObjectById(messageGroupId);
this.logger.info(`Message group with id ${messageGroupId} has been deleted`);
groupList = groupList.filter(def => def != groupId);
groupElements = groupElements.filter(def => def.groupId != groupId);
}
}
}

exports.process = processAction;
//----------------------------------------------------------------------------------------------------------------------------
//Dynamic drop-down logic starts hier

async function dynList(cfg) {
let list = {
"groupSize": "Use Group Size",
"timeout": "Use Timeout",
"groupSize&timeout": "Use Group Size and Timeout"
};
return list;
}

async function getMetaModel(cfg) {
let meta = '';
if(cfg.mode === "groupSize"){
meta = {
"in": {
"type": "object",
"required": true,
"properties": {
"groupId": {
"type": "string",
"required": true,
"title": "Unique ID to describe the group",
"order": 5
},
"messageId": {
"type": "string",
"required": true,
"title": "Unique ID to describe this message",
"order": 4
},
"groupSize": {
"type": "number",
"required": true,
"title": "Number of messages produced by splitter",
"order": 3
},
"messageData": {
"title": "Message Data",
"required": false,
"type": "object",
"properties": {},
"order": 2
}
}
},
"out": {
"type": "object"
}
};
}
else if(cfg.mode === "timeout"){
meta = {
"in": {
"type": "object",
"required": true,
"properties": {
"groupId": {
"type": "string",
"required": true,
"title": "Unique ID to describe the group",
"order": 5
},
"messageId": {
"type": "string",
"required": true,
"title": "Unique ID to describe this message",
"order": 4
},
"timersec": {
"type": "number",
"required": true,
"help":{
"description": "Time the process waits when no incoming messages before emiting(Default 20000 miliseconds)"
},
"title": "Delay timer(in ms)",
"order": 3
},
"messageData": {
"title": "Message Data",
"required": false,
"type": "object",
"properties": {},
"order": 2
}
}
},
"out": {
"type": "object"
}
};
}
else if(cfg.mode === "groupSize&timeout"){
meta = {
"in": {
"type": "object",
"required": true,
"properties": {
"groupId": {
"type": "string",
"required": true,
"title": "Unique ID to describe the group",
"order": 5
},
"messageId": {
"type": "string",
"required": true,
"title": "Unique ID to describe this message",
"order": 4
},
"groupSize": {
"type": "number",
"required": false,
"title": "Number of messages produced by splitter",
"order": 3
},
"timersec": {
"type": "number",
"required": false,
"help":{
"description": "Time the process waits when no incoming messages before emiting(Default 20000 miliseconds)"
},
"title": "Delay timer(in ms)",
"order": 2
},
"messageData": {
"title": "Message Data",
"required": false,
"type": "object",
"properties": {},
"order": 1
}
}
},
"out": {
"type": "object"
}
};
}
return meta;
}
module.exports = {
process: processAction,
getMetaModel: getMetaModel,
dynList: dynList
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
},
"devDependencies": {
"@elastic.io/component-logger": "0.0.1",
"chai": "^4.2.0",
"chai": "4.2.0",
"chai-as-promised": "7.1.1",
"eslint": "7.6.0",
"eslint-config-airbnb-base": "14.2.0",
Expand Down
10 changes: 5 additions & 5 deletions spec/reassemble.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('Split on JSONata ', () => {
const putMessageGroup1 = nock('https://ma.estr').put('/objects/group123').reply(200, {});
const deleteMessageGroup = nock('https://ma.estr').delete('/objects/group123').reply(200, {});

await reassemble.process.call(self, msg, {});
await reassemble.process.call(self, msg, {mode: 'groupSize'});
// eslint-disable-next-line no-unused-expressions
expect(self.emit.calledOnce).to.be.true;
expect(self.emit.lastCall.args[1].body).to.deep.equal({
Expand Down Expand Up @@ -88,7 +88,7 @@ describe('Split on JSONata ', () => {
},
};

await expect(reassemble.process.call(self, msg, {})).to.eventually.be.rejectedWith('Size must be a positive integer.');
await expect(reassemble.process.call(self, msg, {mode: 'groupSize'})).to.eventually.be.rejectedWith('Size must be a positive integer.');
});

it('Interleaved Case with duplicate deliveries', async () => {
Expand Down Expand Up @@ -152,7 +152,7 @@ describe('Split on JSONata ', () => {
nock('https://ma.estr').delete('/objects/2').reply(200, {});

// eslint-disable-next-line no-await-in-loop
await reassemble.process.call(self, { body: msgBodies[i] }, {});
await reassemble.process.call(self, { body: msgBodies[i] }, {mode: 'groupSize'});
// eslint-disable-next-line default-case
switch (i) {
case i <= 3:
Expand Down Expand Up @@ -214,7 +214,7 @@ describe('Split on JSONata ', () => {
const putMessageGroup1 = nock('https://ma.estr').put('/objects/group123').reply(200, {});
const deleteMessageGroup = nock('https://ma.estr').delete('/objects/group123').reply(200, {});

await reassemble.process.call(self, msg, {});
await reassemble.process.call(self, msg, {mode: 'groupSize'});
// eslint-disable-next-line no-unused-expressions
expect(self.emit.calledOnce).to.be.true;
expect(self.emit.lastCall.args[1].body).to.deep.equal({
Expand Down Expand Up @@ -262,7 +262,7 @@ describe('Split on JSONata ', () => {

const putMessageGroup1 = nock('https://ma.estr').put('/objects/group123').reply(200, {});

await reassemble.process.call(self, msg, {});
await reassemble.process.call(self, msg, {mode: 'timeout'});

// timersec + 0,5 second
await sleep(1500);
Expand Down