Skip to content

Commit 206d0d1

Browse files
authored
Merge pull request #800 from telefonicaid/task/set_default_notification_handler
allow receive command notifications from CB
2 parents 5034c81 + 605ff04 commit 206d0d1

File tree

7 files changed

+264
-141
lines changed

7 files changed

+264
-141
lines changed

CHANGES_NEXT_RELEASE

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
+ Add: config.mqtt.subscribeBatchSize option (IOTA_MQTT_SUBSCRIBE_BATCH_SIZE) to batch topic subscriptions into multiple SUBSCRIBE packets (#875)
1+
- Add: allow receive command notifications from CB (iotagent-node-lib#1455)
2+
- Add: config.mqtt.subscribeBatchSize option (IOTA_MQTT_SUBSCRIBE_BATCH_SIZE) to batch topic subscriptions into multiple SUBSCRIBE packets (#875)
23
- Fix: bad json measures are reported as 500 Error instead of 400 (#863)
34
- Fix: Subscription fails on AWS IoT Core broker (#875)
45
- Upgrade mqtt from 4.3.7 to 5.13.0

lib/bindings/HTTPBinding.js

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -792,28 +792,70 @@ function stop(callback) {
792792
}
793793
}
794794

795-
function sendPushNotifications(device, values, callback) {
796-
const executions = _.flatten(values.map(commandHandler.generateCommandExecution.bind(null, null, device)));
795+
function sendPushNotifications(device, group, values, callback) {
796+
const executions = _.flatten(
797+
values.map(commandHandler.generateCommandExecution.bind(null, group.apikey, device, group))
798+
);
797799

798800
async.series(executions, function (error) {
799801
callback(error);
800802
});
801803
}
802804

803-
function storePollNotifications(device, values, callback) {
805+
function storePollNotifications(device, group, values, callback) {
804806
function addPollNotification(item, innerCallback) {
805807
iotAgentLib.addCommand(device.service, device.subservice, device.id, item, innerCallback);
806808
}
807-
808809
async.map(values, addPollNotification, callback);
809810
}
810811

811812
function notificationHandler(device, values, callback) {
812-
if (device.endpoint) {
813-
sendPushNotifications(device, values, callback);
814-
} else {
815-
storePollNotifications(device, values, callback);
813+
config.getLogger().debug(context, 'values for command %j and device %j', values, device);
814+
815+
function invokeWithConfiguration(apiKey, callback) {
816+
let group = {};
817+
iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', apiKey, function (
818+
error,
819+
foundGroup
820+
) {
821+
if (!error) {
822+
group = foundGroup;
823+
}
824+
var cmdValue = { type: 'command' };
825+
for (let val of values) {
826+
if (val.name === 'cmd') {
827+
cmdValue.name = val.value;
828+
} else if (val.name === 'params') {
829+
cmdValue.value = val.value;
830+
} else {
831+
// other fields like status, info, onDelivered, OnError
832+
cmdValue[val.name] = val.value;
833+
}
834+
}
835+
var cmdValues = [cmdValue];
836+
config.getLogger().debug(context, 'cmdValues %j', cmdValues);
837+
iotAgentLib.executeUpdateSideEffects(
838+
device,
839+
device.id,
840+
device.type,
841+
device.service,
842+
device.subservice,
843+
cmdValues,
844+
function () {
845+
if (device.endpoint || group.endpoint) {
846+
sendPushNotifications(device, group, cmdValues, callback);
847+
} else {
848+
storePollNotifications(device, group, cmdValues, callback);
849+
}
850+
}
851+
);
852+
});
816853
}
854+
855+
async.waterfall(
856+
[apply(iotaUtils.getEffectiveApiKey, device.service, device.subservice, device), invokeWithConfiguration],
857+
callback
858+
);
817859
}
818860

819861
exports.start = start;

lib/iotagent-json.js

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const config = require('./configService');
4141
4242
*/
4343
function configurationNotificationHandler(device, updates, callback) {
44+
config.getLogger().debug(context, 'configurationNotificationHandler command %j and device %j', updates, device);
4445
function invokeConfiguration(apiKey, callback) {
4546
let group = {};
4647
iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', apiKey, function (
@@ -81,6 +82,41 @@ function configurationHandler(configuration, callback) {
8182
}
8283
}
8384

85+
/**
86+
* Calls all the command execution handlers for each transport protocol binding whenever a new notification request
87+
* arrives from the Context Broker.
88+
*
89+
* @param {Object} device Device data object containing all stored information about the device.
90+
* @param {Array} values Values recieved in the notification.
91+
*/
92+
function notificationHandler(device, values, callback) {
93+
config.getLogger().debug(context, 'notificationHandler command %j and device %j', values, device);
94+
function invokeWithConfiguration(apiKey, callback) {
95+
let group = {};
96+
iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', apiKey, function (
97+
error,
98+
foundGroup
99+
) {
100+
if (!error) {
101+
group = foundGroup;
102+
}
103+
transportSelector.applyFunctionFromBinding(
104+
[device, values],
105+
'notificationHandler',
106+
device.transport ||
107+
(group && group.transport ? group.transport : undefined) ||
108+
config.getConfig().defaultTransport,
109+
callback
110+
);
111+
});
112+
}
113+
114+
async.waterfall(
115+
[apply(iotaUtils.getEffectiveApiKey, device.service, device.subservice, device), invokeWithConfiguration],
116+
callback
117+
);
118+
}
119+
84120
/**
85121
* Handles incoming updateContext requests related with lazy attributes. This handler is still just registered,
86122
* but empty.
@@ -154,8 +190,10 @@ function start(newConfig, callback) {
154190
iotAgentLib.setUpdatingHandler(deviceUpdatingHandler);
155191
iotAgentLib.setDataUpdateHandler(updateHandler);
156192

157-
if (config.getConfig().configRetrieval) {
193+
if (config.getConfig().configRetrieval === true) {
158194
iotAgentLib.setNotificationHandler(configurationNotificationHandler);
195+
} else {
196+
iotAgentLib.setNotificationHandler(notificationHandler);
159197
}
160198

161199
transportSelector.startTransportBindings(newConfig, callback);

test/unit/ngsiv2/HTTP_commands_test.js

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -594,8 +594,6 @@ describe('HTTP: Commands with extra headers from groups', function () {
594594
// device provisioning functionality. Appropriate verification is done in tests under
595595
// provisioning folder of iotagent-node-lib
596596
beforeEach(function (done) {
597-
//contextBrokerUnprovMock = nock('http://192.168.1.1:1026');
598-
599597
contextBrokerMock
600598
.matchHeader('fiware-service', 'smartgondor')
601599
.matchHeader('fiware-servicepath', '/gardens')
@@ -687,3 +685,112 @@ describe('HTTP: Commands with extra headers from groups', function () {
687685
});
688686
});
689687
});
688+
689+
describe('HTTP: Commands from CB notifications', function () {
690+
beforeEach(function (done) {
691+
config.logLevel = 'INFO';
692+
693+
nock.cleanAll();
694+
695+
contextBrokerMock = nock('http://192.168.1.1:1026')
696+
.matchHeader('fiware-service', 'smartgondor')
697+
.matchHeader('fiware-servicepath', '/gardens')
698+
.post('/v2/registrations')
699+
.reply(201, null, { Location: '/v2/registrations/6319a7f5254b05844116584d' });
700+
701+
iotagentMqtt.start(config, function () {
702+
done();
703+
});
704+
});
705+
706+
afterEach(function (done) {
707+
nock.cleanAll();
708+
async.series([iotAgentLib.clearAll, iotagentMqtt.stop], done);
709+
});
710+
describe('When a POST measure arrives for an unprovisioned device in a command group', function () {
711+
const optionsMeasure = {
712+
url: 'http://localhost:' + config.http.port + '/iot/json',
713+
method: 'POST',
714+
json: {
715+
h: '33'
716+
},
717+
headers: {
718+
'fiware-service': 'smartgondor',
719+
'fiware-servicepath': '/gardens'
720+
},
721+
qs: {
722+
i: 'JSON_UNPROVISIONED',
723+
k: 'KL223HHV8732SFL1'
724+
}
725+
};
726+
// This mock does not check the payload since the aim of the test is not to verify
727+
// device provisioning functionality. Appropriate verification is done in tests under
728+
// provisioning folder of iotagent-node-lib
729+
beforeEach(function (done) {
730+
contextBrokerMock
731+
.matchHeader('fiware-service', 'smartgondor')
732+
.matchHeader('fiware-servicepath', '/gardens')
733+
.post(
734+
'/v2/entities?options=upsert',
735+
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/unprovisionedDevice3.json')
736+
)
737+
.reply(204);
738+
739+
request(groupCreation, function (error, response, body) {
740+
done();
741+
});
742+
});
743+
744+
it('should send its value to the Context Broker', function (done) {
745+
request(optionsMeasure, function (error, result, body) {
746+
contextBrokerMock.done();
747+
done();
748+
});
749+
});
750+
751+
describe('When a CB notification with a command arrive to the Agent for a device with the HTTP protocol', function () {
752+
const commandOptions = {
753+
url: 'http://localhost:' + config.iota.server.port + '/notify',
754+
method: 'POST',
755+
json: utils.readExampleFile('./test/unit/ngsiv2/contextRequests/notifyCommand.json'),
756+
headers: {
757+
'fiware-service': 'smartgondor',
758+
'fiware-servicepath': '/gardens'
759+
}
760+
};
761+
beforeEach(function () {
762+
contextBrokerMock
763+
.matchHeader('fiware-service', 'smartgondor')
764+
.matchHeader('fiware-servicepath', '/gardens')
765+
.post(
766+
'/v2/entities?options=upsert',
767+
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/updateStatus9.json')
768+
)
769+
.reply(204);
770+
contextBrokerMock
771+
.matchHeader('fiware-service', 'smartgondor')
772+
.matchHeader('fiware-servicepath', '/gardens')
773+
.post(
774+
'/v2/entities?options=upsert',
775+
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/updateStatus10.json')
776+
)
777+
.reply(204);
778+
mockedClientServer = nock('http://localhost:9876')
779+
.post('/command', function (body) {
780+
return body.cmd1 || body.cmd1.data || body.cmd1.data === 22;
781+
})
782+
.reply(200, '{"cmd1":{"data":"22"}}');
783+
});
784+
it('should return a 200 OK without errors', function (done) {
785+
request(optionsMeasure, function (error, result, body) {
786+
request(commandOptions, function (error, response, body) {
787+
should.not.exist(error);
788+
response.statusCode.should.equal(204);
789+
contextBrokerMock.done();
790+
done();
791+
});
792+
});
793+
});
794+
});
795+
});
796+
});

test/unit/ngsiv2/HTTP_get-configuration_test.js

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -142,71 +142,4 @@ describe('HTTP: Get configuration from the devices', function () {
142142
});
143143
});
144144
});
145-
describe('When a subscription request is received in the IoT Agent', function () {
146-
const configurationRequest = {
147-
url: 'http://localhost:' + config.http.port + '/iot/json/configuration',
148-
method: 'POST',
149-
json: {
150-
type: 'subscription',
151-
fields: ['sleepTime', 'warningLevel']
152-
},
153-
headers: {
154-
'fiware-service': 'smartgondor',
155-
'fiware-servicepath': '/gardens'
156-
},
157-
qs: {
158-
i: 'MQTT_2',
159-
k: '1234'
160-
}
161-
};
162-
163-
beforeEach(function () {
164-
contextBrokerMock
165-
.matchHeader('fiware-service', 'smartgondor')
166-
.matchHeader('fiware-servicepath', '/gardens')
167-
.post('/v2/subscriptions')
168-
.reply(201, null, { Location: '/v2/subscriptions/51c0ac9ed714fb3b37d7d5a8' });
169-
170-
mockedClientServer = nock('http://localhost:9876')
171-
.post('/command/configuration', function (result) {
172-
return (
173-
result.sleepTime &&
174-
result.sleepTime === '200' &&
175-
result.warningLevel &&
176-
result.warningLevel === 'ERROR' &&
177-
result.dt
178-
);
179-
})
180-
.reply(200, '');
181-
});
182-
183-
it('should create a subscription in the ContextBroker', function (done) {
184-
request(configurationRequest, function (error, response, body) {
185-
contextBrokerMock.done();
186-
done();
187-
});
188-
});
189-
it('should update the values in the MQTT topic when a notification is received', function (done) {
190-
const optionsNotify = {
191-
url: 'http://localhost:' + config.iota.server.port + '/notify',
192-
method: 'POST',
193-
json: utils.readExampleFile('./test/subscriptions/notification.json'),
194-
headers: {
195-
'fiware-service': 'smartgondor',
196-
'fiware-servicepath': '/gardens'
197-
}
198-
};
199-
200-
request(configurationRequest, function (error, response, body) {
201-
setTimeout(function () {
202-
request(optionsNotify, function () {
203-
setTimeout(function () {
204-
mockedClientServer.done();
205-
done();
206-
}, 100);
207-
});
208-
}, 100);
209-
});
210-
});
211-
});
212145
});

0 commit comments

Comments
 (0)