diff --git a/.eslintrc.js b/.eslintrc.js index cfcc4e26..f1088d05 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -145,6 +145,7 @@ module.exports = { } ], 'mocha/no-skipped-tests': ERROR, - 'mocha/no-exclusive-tests': ERROR + 'mocha/no-exclusive-tests': ERROR, + 'new-parens': ERROR } }; \ No newline at end of file diff --git a/lib/amqp.js b/lib/amqp.js index 798e8907..fa78f69b 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -1,6 +1,7 @@ const log = require('./logging.js'); const amqplib = require('amqplib'); const encryptor = require('./encryptor.js'); +const ObjectStorage = require('./objectStorage.js'); const co = require('co'); const _ = require('lodash'); const eventToPromise = require('event-to-promise'); @@ -11,6 +12,7 @@ const HEADER_ERROR_RESPONSE = 'x-eio-error-response'; class Amqp { constructor(settings) { this.settings = settings; + this.objectStorage = new ObjectStorage(settings); } connect(uri) { @@ -53,13 +55,38 @@ class Amqp { return Promise.resolve(); } + async decryptMessage(message) { + const objectId = message.properties.headers.objectId; + if (!objectId) { + return encryptor.decryptMessageContent(message.content); + } + + return await this.objectStorage.getObject(objectId); + } + + async encryptMessage(data, properties) { + if (!this.settings.OBJECT_STORAGE_ENABLED) { + return encryptor.encryptMessageContent(data); + } + + let message = ''; + try { + properties.headers.objectId = await this.objectStorage.addObject(data); + } catch (e) { + log.error('Failed to add message to object storage: %s', e); + message = encryptor.encryptMessageContent(data); + } + + return message; + } + listenQueue(queueName, callback) { //eslint-disable-next-line consistent-this const self = this; this.subscribeChannel.prefetch(this.settings.RABBITMQ_PREFETCH_SAILOR); - return this.subscribeChannel.consume(queueName, function decryptMessage(message) { + return this.subscribeChannel.consume(queueName, async function onMessage(message) { log.trace('Message received: %j', message); if (message === null) { @@ -69,10 +96,10 @@ class Amqp { let decryptedContent; try { - decryptedContent = encryptor.decryptMessageContent(message.content, message.properties.headers); + decryptedContent = await self.decryptMessage(message); } catch (err) { console.error( - 'Error occurred while parsing message #%j payload (%s)', + 'Error occurred while getting message #%j payload (%s)', message.fields.deliveryTag, err.message ); @@ -149,8 +176,7 @@ class Amqp { async prepareMessageAndSendToExchange(data, properties, routingKey, throttle) { const settings = this.settings; data.headers = filterMessageHeaders(data.headers); - const encryptedData = encryptor.encryptMessageContent(data); - + const encryptedData = await this.encryptMessage(data, properties); return this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties, throttle); } diff --git a/lib/cipher.js b/lib/cipher.js index 5ddcd54a..46bbbb4f 100644 --- a/lib/cipher.js +++ b/lib/cipher.js @@ -1,6 +1,7 @@ var _ = require('lodash'); var crypto = require('crypto'); var debug = require('debug')('sailor:cipher'); +var PassThrough = require('stream').PassThrough; var ALGORYTHM = 'aes-256-cbc'; var PASSWORD = process.env.ELASTICIO_MESSAGE_CRYPTO_PASSWORD; @@ -9,6 +10,8 @@ var VECTOR = process.env.ELASTICIO_MESSAGE_CRYPTO_IV; exports.id = 1; exports.encrypt = encryptIV; exports.decrypt = decryptIV; +exports.decryptStream = decryptStreamIV; +exports.encryptStream = encryptStreamIV; function encryptIV(rawData) { debug('About to encrypt:', rawData); @@ -30,11 +33,9 @@ function encryptIV(rawData) { return cipher.update(rawData, 'utf-8', 'base64') + cipher.final('base64'); } -function decryptIV(encData, options) { +function decryptIV(encData) { debug('About to decrypt:', encData); - options = options || {}; - if (!_.isString(encData)) { throw new Error('RabbitMQ message cipher.decryptIV() accepts only string as parameter.'); } @@ -54,3 +55,33 @@ function decryptIV(encData, options) { return result; } + +function encryptStreamIV() { + debug('Creating encryption stream'); + + if (!PASSWORD) { + return new PassThrough(); + } + + if (!VECTOR) { + throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set'); + } + + var encodeKey = crypto.createHash('sha256').update(PASSWORD, 'utf-8').digest(); + return crypto.createCipheriv(ALGORYTHM, encodeKey, VECTOR); +} + +function decryptStreamIV() { + debug('Creating decryption stream'); + + if (!PASSWORD) { + return new PassThrough(); + } + + if (!VECTOR) { + throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set'); + } + + var decodeKey = crypto.createHash('sha256').update(PASSWORD, 'utf-8').digest(); + return crypto.createDecipheriv(ALGORYTHM, decodeKey, VECTOR); +} diff --git a/lib/encryptor.js b/lib/encryptor.js index d8b3bdeb..1926925e 100644 --- a/lib/encryptor.js +++ b/lib/encryptor.js @@ -1,20 +1,41 @@ var cipher = require('./cipher.js'); +var Readable = require('stream').Readable; +var zlib = require('zlib'); +var getStream = require('get-stream'); exports.encryptMessageContent = encryptMessageContent; exports.decryptMessageContent = decryptMessageContent; +exports.encryptMessageContentStream = encryptMessageContentStream; +exports.decryptMessageContentStream = decryptMessageContentStream; function encryptMessageContent(messagePayload) { return cipher.encrypt(JSON.stringify(messagePayload)); } -function decryptMessageContent(messagePayload, messageHeaders) { +function decryptMessageContent(messagePayload) { if (!messagePayload || messagePayload.toString().length === 0) { return null; } try { - return JSON.parse(cipher.decrypt(messagePayload.toString(), messageHeaders)); + return JSON.parse(cipher.decrypt(messagePayload.toString())); } catch (err) { console.error(err.stack); throw Error('Failed to decrypt message: ' + err.message); } } + +function encryptMessageContentStream(data) { + const dataStream = new Readable(); + dataStream.push(JSON.stringify(data)); + dataStream.push(null); + return dataStream + .pipe(zlib.createGzip()) + .pipe(cipher.encryptStream()); +} + +async function decryptMessageContentStream(stream) { + const s = stream.pipe(cipher.decryptStream()).pipe(zlib.createGunzip()); + const content = await getStream(s); + + return JSON.parse(content); +} diff --git a/lib/objectStorage.js b/lib/objectStorage.js new file mode 100644 index 00000000..f6482c0f --- /dev/null +++ b/lib/objectStorage.js @@ -0,0 +1,81 @@ +const log = require('./logging.js'); +const encryptor = require('./encryptor.js'); +const uuid = require('uuid'); +const axios = require('axios'); +const http = require('http'); +const https = require('https'); + +class ObjectStorage { + constructor(settings) { + this.api = axios.create({ + baseURL: `${settings.OBJECT_STORAGE_URI}/`, + httpAgent: new http.Agent({ keepAlive: true }), + httpsAgent: new https.Agent({ keepAlive: true }), + headers: { Authorization: `Bearer ${settings.OBJECT_STORAGE_TOKEN}` }, + validateStatus: null + }); + } + + async requestRetry({ maxAttempts, delay, request, onResponse }) { + let attempts = 0; + let res; + let err; + while (attempts < maxAttempts) { + err = null; + res = null; + attempts++; + try { + res = await request(); + } catch (e) { + err = e; + } + if (onResponse && onResponse(err, res)) { + continue; + } + if (err || res.status >= 400) { + log.warn('Error during object get: %s', err ? err : `${res.status} (${res.statusText})`); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + break; + } + if (err || res.status >= 400) { + throw err || new Error(`HTTP error during object get: ${res.status} (${res.statusText})`); + } + return res; + } + + async getObject(objectId) { + const res = await this.requestRetry({ + maxAttempts: 3, + delay: 100, + request: () => this.api.get(`/objects/${objectId}`, { responseType: 'stream' }) + }); + + return await encryptor.decryptMessageContentStream(res.data); + } + + async addObject(data) { + let objectId = uuid.v4(); + await this.requestRetry({ + maxAttempts: 3, + delay: 100, + request: () => this.api.put( + `/objects/${objectId}`, + encryptor.encryptMessageContentStream(data), + { headers: { 'content-type': 'application/octet-stream' } } + ), + onResponse: (err, res) => { + if (!err && res.status === 409) { + log.warn('Generated already existing UUID'); + objectId = uuid.v4(); + return true; + } + } + }); + + return objectId; + } +} + +module.exports = ObjectStorage; diff --git a/lib/settings.js b/lib/settings.js index 87fd2ebe..ec30012c 100644 --- a/lib/settings.js +++ b/lib/settings.js @@ -48,7 +48,10 @@ function readFrom(envVars) { RATE_INTERVAL: 100, // 100ms PROCESS_AMQP_DRAIN: true, AMQP_PUBLISH_RETRY_DELAY: 100, // 100ms - AMQP_PUBLISH_RETRY_ATTEMPTS: 10 + AMQP_PUBLISH_RETRY_ATTEMPTS: 10, + OBJECT_STORAGE_URI: '', + OBJECT_STORAGE_TOKEN: '', + OBJECT_STORAGE_ENABLED: false }; _.forEach(requiredAlways, function readRequired(key) { diff --git a/mocha_spec/integration_helpers.js b/mocha_spec/integration_helpers.js index 1c4b6fbf..402d1d7d 100644 --- a/mocha_spec/integration_helpers.js +++ b/mocha_spec/integration_helpers.js @@ -5,6 +5,8 @@ const amqplib = require('amqplib'); const { EventEmitter } = require('events'); const PREFIX = 'sailor_nodejs_integration_test'; const nock = require('nock'); +const getStream = require('get-stream'); +const encryptor = require('../lib/encryptor.js'); const env = process.env; @@ -131,11 +133,12 @@ class AmqpHelper extends EventEmitter { this.publishChannel.ack(message); - const emittedMessage = JSON.parse(message.content.toString()); + const content = message.content.toString(); + const emittedMessage = content ? JSON.parse(content) : content; const data = { properties: message.properties, - body: emittedMessage.body, + body: emittedMessage ? emittedMessage.body : null, emittedMessage }; this.dataMessages.push(data); @@ -178,7 +181,9 @@ function prepareEnv() { env.DEBUG = 'sailor:debug'; - + env.ELASTICIO_OBJECT_STORAGE_URI = 'http://ma.es.ter'; + env.ELASTICIO_OBJECT_STORAGE_ENABLED = ''; + env.ELASTICIO_OBJECT_STORAGE_TOKEN = 'jwt'; } function mockApiTaskStepResponse(response) { @@ -197,6 +202,11 @@ function mockApiTaskStepResponse(response) { .reply(200, Object.assign(defaultResponse, response)); } +async function encryptForObjectStorage(input) { + const stream = encryptor.encryptMessageContentStream(input); + return await getStream.buffer(stream); +} + exports.PREFIX = PREFIX; exports.amqp = function amqp() { @@ -205,4 +215,4 @@ exports.amqp = function amqp() { exports.prepareEnv = prepareEnv; exports.mockApiTaskStepResponse = mockApiTaskStepResponse; - +exports.encryptForObjectStorage = encryptForObjectStorage; diff --git a/mocha_spec/objectStorage.spec.js b/mocha_spec/objectStorage.spec.js new file mode 100644 index 00000000..e9a2376b --- /dev/null +++ b/mocha_spec/objectStorage.spec.js @@ -0,0 +1,136 @@ +const nock = require('nock'); +const sinonjs = require('sinon'); +const expect = require('chai').expect; +const getStream = require('get-stream'); +const logging = require('../lib/logging.js'); + +describe('ObjectStorage', () => { + + process.env.ELASTICIO_MESSAGE_CRYPTO_PASSWORD = 'testCryptoPassword'; + process.env.ELASTICIO_MESSAGE_CRYPTO_IV = 'iv=any16_symbols'; + + const envVars = {}; + envVars.ELASTICIO_AMQP_URI = 'amqp://test2/test2'; + envVars.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003'; + envVars.ELASTICIO_STEP_ID = 'step_1'; + envVars.ELASTICIO_EXEC_ID = 'some-exec-id'; + + envVars.ELASTICIO_USER_ID = '5559edd38968ec0736000002'; + envVars.ELASTICIO_COMP_ID = '5559edd38968ec0736000456'; + envVars.ELASTICIO_FUNCTION = 'list'; + + envVars.ELASTICIO_HOOK_SHUTDOWN = true; + + envVars.ELASTICIO_API_URI = 'http://apihost.com'; + envVars.ELASTICIO_API_USERNAME = 'test@test.com'; + envVars.ELASTICIO_API_KEY = '5559edd'; + + envVars.ELASTICIO_OBJECT_STORAGE_URI = 'http://ma.es.ter'; + envVars.ELASTICIO_OBJECT_STORAGE_TOKEN = 'jwt'; + envVars.ELASTICIO_OBJECT_FLOW_ID = 1; + + const ObjectStorage = require('../lib/objectStorage.js'); + const encryptor = require('../lib/encryptor.js'); + const settings = require('../lib/settings.js').readFrom(envVars); + + let sinon; + beforeEach(() => { + sinon = sinonjs.sandbox.create(); + }); + afterEach(() => { + sinon.restore(); + }); + + it('should fail after 3 retries', async () => { + const log = sinon.stub(logging, 'warn'); + const objectStorage = new ObjectStorage(settings); + + const objectStorageCalls = nock(settings.OBJECT_STORAGE_URI) + .matchHeader('authorization', 'Bearer jwt') + .get('/objects/1') + .replyWithError({ code: 'ETIMEDOUT' }) + .get('/objects/1') + .reply(404) + .get('/objects/1') + .replyWithError({ code: 'ENOTFOUND' }); + + let err; + try { + await objectStorage.getObject(1); + } catch (e) { + err = e; + } + + expect(objectStorageCalls.isDone()).to.be.true; + expect(err.code).to.be.equal('ENOTFOUND'); + expect(log.getCall(1).args[1].toString()).to.include('404'); + expect(log.callCount).to.be.equal(3); + }); + + it('should retry get request 3 times on errors', async () => { + const objectStorage = new ObjectStorage(settings); + const data = { test: 'test' }; + + const objectStorageCalls = nock(settings.OBJECT_STORAGE_URI) + .matchHeader('authorization', 'Bearer jwt') + .get('/objects/1') + .reply(500) + .get('/objects/1') + .replyWithError({ code: 'ECONNRESET' }) + .get('/objects/1') + .reply(200, await getStream.buffer(encryptor.encryptMessageContentStream(data))); + + const out = await objectStorage.getObject(1); + + expect(objectStorageCalls.isDone()).to.be.true; + expect(out).to.be.deep.equal(data); + }); + + it('should retry put request 3 times on errors', async () => { + const objectStorage = new ObjectStorage(settings); + const data = { test: 'test' }; + const put = await getStream.buffer(encryptor.encryptMessageContentStream(data)); + + const objectStorageCalls = nock(settings.OBJECT_STORAGE_URI) + .matchHeader('authorization', 'Bearer jwt') + .put(/^\/objects\/[0-9a-z-]+$/, put) + .replyWithError({ code: 'ECONNREFUSED' }) + .put(/^\/objects\/[0-9a-z-]+$/, put) + .reply(400) + .put(/^\/objects\/[0-9a-z-]+$/, put) + .reply(200); + + await objectStorage.addObject(data); + + expect(objectStorageCalls.isDone()).to.be.true; + }); + + it('should retry put request on 409 error with different objectId', async () => { + const log = sinon.stub(logging, 'warn'); + const objectStorage = new ObjectStorage(settings); + const data = { test: 'test' }; + const put = await getStream.buffer(encryptor.encryptMessageContentStream(data)); + + const objectStorageCalls = nock(settings.OBJECT_STORAGE_URI) + .matchHeader('authorization', 'Bearer jwt') + .matchHeader('content-type', 'application/octet-stream') + .put(/^\/objects\/[0-9a-z-]+$/, put) + .reply(503) + .put(/^\/objects\/[0-9a-z-]+$/, put) + .reply(409) + .put(/^\/objects\/[0-9a-z-]+$/, put) + .reply(200); + const urls = []; + objectStorageCalls.on('request', req => { + urls.push(req.path); + }); + + await objectStorage.addObject(data); + + expect(objectStorageCalls.isDone()).to.be.true; + expect(urls[0]).to.be.equal(urls[1]); + expect(urls[1]).to.not.equal(urls[2]); + expect(log.getCall(1).args[0]).to.include('UUID'); + expect(log.callCount).to.be.equal(2); + }); +}); diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index 43c8213a..6fb6e131 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -59,6 +59,7 @@ describe('Integration Test', () => { const parentMessageId = 'parent_message_1234567890'; const traceId = helpers.PREFIX + '_trace_id_123456'; const messageId = 'f45be600-f770-11e6-b42d-b187bfbf19fd'; + const objectId = '47d3e978-8099-11e9-bc42-526af7764f64'; let amqpHelper = helpers.amqp(); beforeEach(() => amqpHelper.prepare()); @@ -143,6 +144,244 @@ describe('Integration Test', () => { }); }); + it('should get object storage message', async () => { + process.env.ELASTICIO_OBJECT_STORAGE_ENABLED = true; + + helpers.mockApiTaskStepResponse(); + + nock('https://api.acme.com') + .post('/subscribe') + .reply(200, { + id: 'subscription_12345' + }) + .get('/customers') + .reply(200, customers); + + const objectStorageGet = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .get(`/objects/${objectId}`) + .matchHeader('authorization', /Bearer/) + .reply(200, await helpers.encryptForObjectStorage(inputMessage), { + 'content-type': 'application/octet-stream' + }); + const objectStoragePut = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .matchHeader('content-type', 'application/octet-stream') + .matchHeader('authorization', /Bearer/) + .put(/^\/objects\/[0-9a-z-]+$/, await helpers.encryptForObjectStorage({ + id: messageId, + body: { + originalMsg: inputMessage, + customers, + subscription: { + id: 'subscription_12345', + cfg: { + apiKey: 'secret' + } + } + }, + headers: {} + })) + .reply(200); + + let done; + amqpHelper.on('data', ({ properties, body }) => { + expect(properties.headers.objectId).to.be.a('string'); + expect(body).to.be.null; + expect(objectStorageGet.isDone()).to.be.true; + expect(objectStoragePut.isDone()).to.be.true; + done(); + }); + + run = requireRun(); + + amqpHelper.publishMessage('', { + parentMessageId, + traceId + }, { objectId }); + + return new Promise((resolve) => { done = resolve; }); + }); + + it('should get object storage message if error repeat succeed', async () => { + process.env.ELASTICIO_OBJECT_STORAGE_ENABLED = true; + + helpers.mockApiTaskStepResponse(); + + nock('https://api.acme.com') + .post('/subscribe') + .reply(200, { + id: 'subscription_12345' + }) + .get('/customers') + .reply(200, customers); + + const log = sinon.stub(logging, 'warn'); + const objectStorageGet = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .get(`/objects/${objectId}`) + .matchHeader('authorization', /Bearer/) + .reply(200, await helpers.encryptForObjectStorage(inputMessage), { + 'content-type': 'application/octet-stream' + }); + const putData = await helpers.encryptForObjectStorage({ + id: messageId, + body: { + originalMsg: inputMessage, + customers, + subscription: { + id: 'subscription_12345', + cfg: { + apiKey: 'secret' + } + } + }, + headers: {} + }); + const objectStoragePut = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .matchHeader('content-type', 'application/octet-stream') + .matchHeader('authorization', /Bearer/) + .put(/^\/objects\/[0-9a-z-]+$/, putData) + .replyWithError({ code: 'ECONNREFUSED' }) + .put(/^\/objects\/[0-9a-z-]+$/, putData) + .reply(400) + .put(/^\/objects\/[0-9a-z-]+$/, putData) + .reply(200); + + let done; + amqpHelper.on('data', ({ properties, body }) => { + expect(properties.headers.objectId).to.be.a('string'); + expect(body).to.be.null; + expect(objectStorageGet.isDone()).to.be.true; + expect(objectStoragePut.isDone()).to.be.true; + expect(log.getCall(1).args[1].toString()).to.include('400'); + done(); + }); + + run = requireRun(); + + amqpHelper.publishMessage('', { + parentMessageId, + traceId + }, { objectId }); + + return new Promise((resolve) => { done = resolve; }); + }); + + it('should get object storage message, but publish directly on 3 errors', async () => { + process.env.ELASTICIO_OBJECT_STORAGE_ENABLED = true; + + helpers.mockApiTaskStepResponse(); + + nock('https://api.acme.com') + .post('/subscribe') + .reply(200, { + id: 'subscription_12345' + }) + .get('/customers') + .reply(200, customers); + + const log = sinon.stub(logging, 'error'); + const objectStorageGet = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .get(`/objects/${objectId}`) + .matchHeader('authorization', /Bearer/) + .reply(200, await helpers.encryptForObjectStorage(inputMessage), { + 'content-type': 'application/octet-stream' + }); + const putData = await helpers.encryptForObjectStorage({ + id: messageId, + body: { + originalMsg: inputMessage, + customers, + subscription: { + id: 'subscription_12345', + cfg: { + apiKey: 'secret' + } + } + }, + headers: {} + }); + const objectStoragePut = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .matchHeader('content-type', 'application/octet-stream') + .matchHeader('authorization', /Bearer/) + .put(/^\/objects\/[0-9a-z-]+$/, putData) + .replyWithError({ code: 'ECONNREFUSED' }) + .put(/^\/objects\/[0-9a-z-]+$/, putData) + .reply(400) + .put(/^\/objects\/[0-9a-z-]+$/, putData) + .reply(503); + + let done; + amqpHelper.on('data', ({ properties, body }) => { + expect(properties.headers.objectId).to.not.exist; + expect(body).to.deep.equal({ + originalMsg: inputMessage, + customers, + subscription: { + id: 'subscription_12345', + cfg: { + apiKey: 'secret' + } + } + }); + expect(objectStorageGet.isDone()).to.be.true; + expect(objectStoragePut.isDone()).to.be.true; + expect(log.getCall(0).args[1].toString()).to.include('503'); + done(); + }); + + run = requireRun(); + + amqpHelper.publishMessage('', { + parentMessageId, + traceId + }, { objectId }); + + return new Promise((resolve) => { done = resolve; }); + }); + + it('should get object storage message, but publish directly', async () => { + helpers.mockApiTaskStepResponse(); + + nock('https://api.acme.com') + .post('/subscribe') + .reply(200, { + id: 'subscription_12345' + }) + .get('/customers') + .reply(200, customers); + + const objectStorageCalls = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .get(`/objects/${objectId}`) + .reply(200, await helpers.encryptForObjectStorage(inputMessage), { + 'content-type': 'application/octet-stream' + }); + + let done; + amqpHelper.on('data', ({ properties, body }) => { + expect(properties.headers.objectId).to.not.exist; + expect(body).to.deep.equal({ + originalMsg: inputMessage, + customers, + subscription: { + id: 'subscription_12345', + cfg: { + apiKey: 'secret' + } + } + }); + expect(objectStorageCalls.isDone()).to.be.true; + done(); + }); + + run = requireRun(); + + amqpHelper.publishMessage('', { + parentMessageId, + traceId + }, { objectId }); + + return new Promise((resolve) => { done = resolve; }); + }); + it('should augment passthrough property with data', done => { process.env.ELASTICIO_STEP_ID = 'step_2'; process.env.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003'; diff --git a/package-lock.json b/package-lock.json index 20f4491b..8b29c09d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "elasticio-sailor-nodejs", - "version": "2.2.4", + "version": "2.4.0-dev.29", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -259,6 +259,15 @@ "integrity": "sha512-ReZxvNHIOv88FlT7rxcXIIC0fPt4KZqZbOlivyWtXLt8ESx84zd3kMC6iK5jVeS2qt+g7ftS7ye4fi06X5rtRQ==", "dev": true }, + "axios": { + "version": "0.18.0", + "resolved": "https://registry.npmjs.org/axios/-/axios-0.18.0.tgz", + "integrity": "sha1-MtU+SFHv3AoRmTts0AB4nXDAUQI=", + "requires": { + "follow-redirects": "^1.3.0", + "is-buffer": "^1.1.5" + } + }, "babel-code-frame": { "version": "6.26.0", "resolved": "https://registry.npmjs.org/babel-code-frame/-/babel-code-frame-6.26.0.tgz", @@ -1753,6 +1762,29 @@ } } }, + "follow-redirects": { + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.7.0.tgz", + "integrity": "sha512-m/pZQy4Gj287eNy94nivy5wchN3Kp+Q5WgUPNy5lJSZ3sgkVKSYV/ZChMAQVIgx1SqfZ2zBZtPA2YlXIWxxJOQ==", + "requires": { + "debug": "^3.2.6" + }, + "dependencies": { + "debug": { + "version": "3.2.6", + "resolved": "https://registry.npmjs.org/debug/-/debug-3.2.6.tgz", + "integrity": "sha512-mel+jf7nrtEl5Pn1Qx46zARXKDpBbvzezse7p7LqINmdoIk8PYP5SySaxEmYv6TZ0JyEKA1hsCId6DIhgITtWQ==", + "requires": { + "ms": "^2.1.1" + } + }, + "ms": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.1.tgz", + "integrity": "sha512-tgp+dl5cGk28utYktBsrFqA7HKgrhgPsg6Z/EfhWI4gl1Hwq8B/GmY/0oXZ6nF8hDVesS/FpnYaD/kOWhYQvyg==" + } + } + }, "for-in": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/for-in/-/for-in-1.0.2.tgz", @@ -1848,6 +1880,14 @@ "integrity": "sha1-uWjGsKBDhDJJAui/Gl3zJXmkUP4=", "dev": true }, + "get-stream": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/get-stream/-/get-stream-5.1.0.tgz", + "integrity": "sha512-EXr1FOzrzTfGeL0gQdeFEvOMm2mzMOglyiOXSTpPC+iAjAKftbr3jpCMWynogwYnM+eSj9sHGc6wjIcDvYiygw==", + "requires": { + "pump": "^3.0.0" + } + }, "get-value": { "version": "2.0.6", "resolved": "https://registry.npmjs.org/get-value/-/get-value-2.0.6.tgz", @@ -2737,8 +2777,7 @@ "is-buffer": { "version": "1.1.6", "resolved": "https://registry.npmjs.org/is-buffer/-/is-buffer-1.1.6.tgz", - "integrity": "sha512-NcdALwpXkTm5Zvvbk7owOUSvVvBKDgKP5/ewfXEznmQFfs4ZRmanOeKBTjRVjka3QFoN6XJ+9F3USqfHqTaU5w==", - "dev": true + "integrity": "sha512-NcdALwpXkTm5Zvvbk7owOUSvVvBKDgKP5/ewfXEznmQFfs4ZRmanOeKBTjRVjka3QFoN6XJ+9F3USqfHqTaU5w==" }, "is-builtin-module": { "version": "1.0.0", @@ -4765,6 +4804,25 @@ "resolved": "https://registry.npmjs.org/psl/-/psl-1.1.31.tgz", "integrity": "sha512-/6pt4+C+T+wZUieKR620OpzN/LlnNKuWjy1iFLQ/UG35JqHlR/89MP1d96dUfkf6Dne3TuLQzOYEYshJ+Hx8mw==" }, + "pump": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz", + "integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==", + "requires": { + "end-of-stream": "^1.1.0", + "once": "^1.3.1" + }, + "dependencies": { + "end-of-stream": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.1.tgz", + "integrity": "sha512-1MkrZNvWTKCaigbn+W15elq2BB/L22nqrSY5DKlo3X6+vclJm8Bb5djXJBmEX6fS3+zCh/F4VBK5Z2KxJt4s2Q==", + "requires": { + "once": "^1.4.0" + } + } + } + }, "punycode": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.1.1.tgz", diff --git a/package.json b/package.json index 314f7f9d..f74b83a1 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.2.5", + "version": "2.4.2-dev.0", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js", @@ -16,11 +16,13 @@ }, "dependencies": { "amqplib": "0.5.1", + "axios": "0.18.0", "bunyan": "^1.8.10", "co": "4.6.0", "debug": "3.1.0", "elasticio-rest-node": "1.2.3", "event-to-promise": "^0.8.0", + "get-stream": "5.1.0", "lodash": "4.17.4", "p-throttle": "^2.1.0", "q": "1.4.1", diff --git a/spec/amqp.spec.js b/spec/amqp.spec.js index 28fc42e1..634681c6 100644 --- a/spec/amqp.spec.js +++ b/spec/amqp.spec.js @@ -65,9 +65,11 @@ describe('AMQP', () => { spyOn(encryptor, 'decryptMessageContent').andCallThrough(); }); - it('Should send message to outgoing channel when process data', () => { + it('Should send message to outgoing channel when process data', (done) => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = { publish: () => true }; + spyOn(amqp.publishChannel, 'publish').andReturn(true); + amqp.publishChannel.waitForConfirms = () => Promise.resolve([null]); const props = { contentType: 'application/json', @@ -79,31 +81,35 @@ describe('AMQP', () => { } }; - amqp.sendData({ - headers: { - 'some-other-header': 'headerValue' - }, - body: 'Message content' - }, props); - - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); + async function test() { + await amqp.sendData({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }, props); + } + test().then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters).toEqual([ - settings.PUBLISH_MESSAGES_TO, - settings.DATA_ROUTING_KEY, - jasmine.any(Object), - props - ]); + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + settings.DATA_ROUTING_KEY, + jasmine.any(Object), + props + ]); - const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual({ - headers: { - 'some-other-header': 'headerValue' - }, - body: 'Message content' - }); + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }); + done(); + }, done); }); it('Should send message async to outgoing channel when process data', done => { @@ -164,10 +170,12 @@ describe('AMQP', () => { }, done); }); - it('Should sendHttpReply to outgoing channel using routing key from headers when process data', () => { + it('Should sendHttpReply to outgoing channel using routing key from headers when process data', (done) => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = { publish: () => true }; + spyOn(amqp.publishChannel, 'publish').andReturn(true); + amqp.publishChannel.waitForConfirms = () => Promise.resolve([null]); const msg = { statusCode: 200, @@ -187,19 +195,24 @@ describe('AMQP', () => { reply_to: 'my-special-routing-key' } }; - amqp.sendHttpReply(msg, props); - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); + async function test() { + await amqp.sendHttpReply(msg, props); + } + test().then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); - expect(publishParameters[1]).toEqual('my-special-routing-key'); - expect(publishParameters[2].toString()).toEqual(encryptor.encryptMessageContent(msg)); - expect(publishParameters[3]).toEqual(props); + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); + expect(publishParameters[1]).toEqual('my-special-routing-key'); + expect(publishParameters[2].toString()).toEqual(encryptor.encryptMessageContent(msg)); + expect(publishParameters[3]).toEqual(props); - const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual(msg); + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual(msg); + done(); + }, done); }); it('Should throw error in sendHttpReply if reply_to header not found', done => { @@ -232,9 +245,11 @@ describe('AMQP', () => { }); - it('Should send message to outgoing channel using routing key from headers when process data', () => { + it('Should send message to outgoing channel using routing key from headers when process data', (done) => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = { publish: () => true }; + spyOn(amqp.publishChannel, 'publish').andReturn(true); + amqp.publishChannel.waitForConfirms = () => Promise.resolve([null]); const msg = { headers: { @@ -255,26 +270,31 @@ describe('AMQP', () => { } }; - amqp.sendData(msg, props); + async function test() { + await amqp.sendData(msg, props); + } - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); + test().then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters).toEqual([ - settings.PUBLISH_MESSAGES_TO, - 'my-special-routing-key', - jasmine.any(Object), - props - ]); + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + 'my-special-routing-key', + jasmine.any(Object), + props + ]); - const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual({ - headers: {}, - body: { - content: 'Message content' - } - }); + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual({ + headers: {}, + body: { + content: 'Message content' + } + }); + done(); + }, done); }); it('Should send message to errors when process error', () => { @@ -321,7 +341,7 @@ describe('AMQP', () => { }); }); - it('Should send message to errors using routing key from headers when process error', async () => { + it('Should send message to errors using routing key from headers when process error', (done) => { const expectedErrorPayload = { error: { @@ -335,7 +355,9 @@ describe('AMQP', () => { }; const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = { publish: () => true }; + spyOn(amqp.publishChannel, 'publish').andReturn(true); + amqp.publishChannel.waitForConfirms = () => Promise.resolve([null]); const props = { contentType: 'application/json', @@ -348,47 +370,52 @@ describe('AMQP', () => { } }; - await amqp.sendError(new Error('Test error'), props, message.content); + async function test() { + await amqp.sendError(new Error('Test error'), props, message.content); + } - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(2); + test().then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(2); - let publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters.length).toEqual(4); - expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); - expect(publishParameters[1]).toEqual('5559edd38968ec0736000003:step_1:1432205514864:error'); - expect(publishParameters[3]).toEqual(props); + let publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters.length).toEqual(4); + expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); + expect(publishParameters[1]).toEqual('5559edd38968ec0736000003:step_1:1432205514864:error'); + expect(publishParameters[3]).toEqual(props); - let payload = JSON.parse(publishParameters[2].toString()); - payload.error = encryptor.decryptMessageContent(payload.error); - payload.errorInput = encryptor.decryptMessageContent(payload.errorInput); + let payload = JSON.parse(publishParameters[2].toString()); + payload.error = encryptor.decryptMessageContent(payload.error); + payload.errorInput = encryptor.decryptMessageContent(payload.errorInput); - expect(payload).toEqual(expectedErrorPayload); + expect(payload).toEqual(expectedErrorPayload); - publishParameters = amqp.publishChannel.publish.calls[1].args; - expect(publishParameters.length).toEqual(4); - expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); - expect(publishParameters[1]).toEqual('my-special-routing-key'); - expect(publishParameters[3]).toEqual({ - contentType: 'application/json', - contentEncoding: 'utf8', - mandatory: true, - headers: { - 'taskId': 'task1234567890', - 'stepId': 'step_456', - 'reply_to': 'my-special-routing-key', - 'x-eio-error-response': true - } - }); + publishParameters = amqp.publishChannel.publish.calls[1].args; + expect(publishParameters.length).toEqual(4); + expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); + expect(publishParameters[1]).toEqual('my-special-routing-key'); + expect(publishParameters[3]).toEqual({ + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + headers: { + 'taskId': 'task1234567890', + 'stepId': 'step_456', + 'reply_to': 'my-special-routing-key', + 'x-eio-error-response': true + } + }); - console.log('AND BACK'); - //eslint-disable-next-line max-len - console.log(encryptor.decryptMessageContent('+PAlXNRj+5HdYNSuw3cyrfXNSlnUHKH0AtyspQkvT0RFROPAhMgqrj8y1I0EW9zJEhcRzmiEwbK5ftV3a8N3FcMd1Yu2beNt0R2Ou2f1yae0FxZ/aIUOmicX3iWbUKFnfljwUUA39sEKnpp9yP7zprAf755FgEtplt3cSy+hQVCC0u7olkbIeHtmSuw/9YP9PckVk82eM7FfnK5qKEDilzR9CWgpQEak8kZeekko86WczgkRrnMj52ifGVCbIk4aY5K+uBPbQKURI9bbBra4aR0l/2Y/bOBa5jahl2Q6hrX9iAe9BMMIll9GvDxBOEV7n5H5CsZj1IrFbq5nri3qT48LgNFTDlq/ts2kAjJQORPZnp3Fq25B9ToPQt6DGGZLUG+YKGHCv73RNwUCx4Dj2oVJjNyWIYMA4EEJwcHhR+rUrHcAVJZ0SOOTJI1tJPzcasXy3d95XQgKpHSYcbXuUOtmql4oyU5ZP9QEiIscsWFS7fJs+r8Eit+H777vvc37zxjA3DM0LJ8QmB5VbkkGxYbi43dzzd3hOXz4Rvs6C08F3jDK20r+VpAqEDRo/OgBaBH4uhd+XynwVXUpKASHNaJirGGu1K8tpiX1+XOxAGqHyhZjBICeg/f8igqJs54af78AZPpvnoSQzkAhF5pDmvMINMPuJnM/ooK3O9SgJYEi4wMzu/vnAEajROE5t7d0QhSSollCx+IMpiz9XdSALZyRMNPaF2yLb3rw7gwXV7q67u/zPm79AR1GBrWbgxXei7gdA9z3TwgWdT91RfTRdSYZDsgenGCanrcpE+Wi+YEozIan9pC47xhBxzzIL9a3AUVllNIGc4qNfs9Al0M/r+kl+ndk+I2k6QFNr4aIjR/qsk52YjW/ZqmORbe2MoI4bIFS3FwlWRoYhJC78yLXOfghvl3xHJiq0Uir2vxmYdXYXfaY82g7ZtThaSqc63WZcD5CaV1Wy6jfqB1sHwuJsADE6BXPQKFfZ9t8tKE3b58rB47TFTmJb8TETgG/xK6pbaEo/Z7iWjFhJKTrcnnF4PynrJab6kw+pnU08u7/je9ZhDEf+jvK3XnqwC+A8XEktywihnrskQ7Eo9Wdmzuw9ujbY8EwQxIFK+TPpgQ8dv25aXPXspnPgiH+2lt19ok1oRIZTenv2KLXqE3wrvmXQIEbdAHFHXsTLj781/9iNdc8ta645V3ktqvz35s1c8Gr+ZbZIK5WRlrJ8TO1WcokSDK7H8hqY6CbT1QC3oFxr5pVPoqZzBMOR6g5MOPbR41XtcHlQopCKC6XeGAVd4dIuCx1CT4vqG+8RgOABxhrEeLmsHGFpBnwPtlVniZQixmOLSzQWUNoUDWMt2mwrWKb/VmzprnNmN++ybPqXhX8bD+k1NQDb7r5CwPqlzmCypXSNH9kVn0QvpqLT5elQ2295yzasW22c8mEPmSvNPM/rE/tqWJA6vAKbXOy1ktrG/TCbzGV2llAvqQqQPX8zGJrXEzKTYk+mHiIdMKpw1bWJhDUOAjdosi853Lbt2GuUjiVNMGJBXPcLLvmjjvv9oLcSYHBTuIfOkScLKKGUhabzHFPmdxgF1MB0zvVO22ooxhmhvCmq+dlag71bbP5RvTjHf50BzJZ5+ysGyM7FJm99BErHo2lTpHSKdSFF0nAlP9Z/Ybf2zTEunlz8RdmQgsq+0F+kwkxI7SqGTy0SAJbbgawNoNTptdyO33a41zprKd/3Wnp7kfoTOfmjVYdHPVFC1GywMER7ordLV3XpjrjX6R6JTd2eOZajcBCsEc+gzVqg/nR6t5y8jfS8NfzfdCMsRzEqz6vuy+M66zNIEocZiF9Tkm1r8MLwaUCE7QfEXexqkChAk9jaOzcojyOfAlXIxvVMn6yFF1gmmQtgudxsY7I/0ZjdSZlBgBFcPFT6OT+HTZ7cCAVF7J7GsGlVzwrUpqcQzSt9z3QrA0iTd4DUXgsWmFIgcdhWbPFlkaPKyZ+QXxrz2VYKCuzDWi3wzLaioFnHxLXZDt6Puo5mPiRTzSolu3fH4S31yVJ7E6e2n8zwUmnFiZ10TrrkO64b9B3TwLx1mLPap7F39DAnufj7XF4eKCdvGJEKVGc+SsyrElzKimsR4Zs9H/Jw+KOCWc/O9l8yFAc42EXUGWrq9L+B6NIaZ7hDY/sDHI748wyFPeUHhOa99BnR15Sr+IrXBG3tsXbyMgHv+gS66Nkmkllvwjpi5Q/7vJOrxrKyFS1KGl5+6N/PXj1Tn5SqWMN8Wj2mniEGD9zSaLy7DUCxmKYA9Dn3/8WQdY8yWmOyi+SFyrL6VgQ8sUQ5MNnVPhQevxB3ZQSTItofT0sE0Xv7yEYkc/T4HGVsvDRKz6RZwaZvZEg')); + console.log('AND BACK'); + //eslint-disable-next-line max-len + console.log(encryptor.decryptMessageContent('+PAlXNRj+5HdYNSuw3cyrfXNSlnUHKH0AtyspQkvT0RFROPAhMgqrj8y1I0EW9zJEhcRzmiEwbK5ftV3a8N3FcMd1Yu2beNt0R2Ou2f1yae0FxZ/aIUOmicX3iWbUKFnfljwUUA39sEKnpp9yP7zprAf755FgEtplt3cSy+hQVCC0u7olkbIeHtmSuw/9YP9PckVk82eM7FfnK5qKEDilzR9CWgpQEak8kZeekko86WczgkRrnMj52ifGVCbIk4aY5K+uBPbQKURI9bbBra4aR0l/2Y/bOBa5jahl2Q6hrX9iAe9BMMIll9GvDxBOEV7n5H5CsZj1IrFbq5nri3qT48LgNFTDlq/ts2kAjJQORPZnp3Fq25B9ToPQt6DGGZLUG+YKGHCv73RNwUCx4Dj2oVJjNyWIYMA4EEJwcHhR+rUrHcAVJZ0SOOTJI1tJPzcasXy3d95XQgKpHSYcbXuUOtmql4oyU5ZP9QEiIscsWFS7fJs+r8Eit+H777vvc37zxjA3DM0LJ8QmB5VbkkGxYbi43dzzd3hOXz4Rvs6C08F3jDK20r+VpAqEDRo/OgBaBH4uhd+XynwVXUpKASHNaJirGGu1K8tpiX1+XOxAGqHyhZjBICeg/f8igqJs54af78AZPpvnoSQzkAhF5pDmvMINMPuJnM/ooK3O9SgJYEi4wMzu/vnAEajROE5t7d0QhSSollCx+IMpiz9XdSALZyRMNPaF2yLb3rw7gwXV7q67u/zPm79AR1GBrWbgxXei7gdA9z3TwgWdT91RfTRdSYZDsgenGCanrcpE+Wi+YEozIan9pC47xhBxzzIL9a3AUVllNIGc4qNfs9Al0M/r+kl+ndk+I2k6QFNr4aIjR/qsk52YjW/ZqmORbe2MoI4bIFS3FwlWRoYhJC78yLXOfghvl3xHJiq0Uir2vxmYdXYXfaY82g7ZtThaSqc63WZcD5CaV1Wy6jfqB1sHwuJsADE6BXPQKFfZ9t8tKE3b58rB47TFTmJb8TETgG/xK6pbaEo/Z7iWjFhJKTrcnnF4PynrJab6kw+pnU08u7/je9ZhDEf+jvK3XnqwC+A8XEktywihnrskQ7Eo9Wdmzuw9ujbY8EwQxIFK+TPpgQ8dv25aXPXspnPgiH+2lt19ok1oRIZTenv2KLXqE3wrvmXQIEbdAHFHXsTLj781/9iNdc8ta645V3ktqvz35s1c8Gr+ZbZIK5WRlrJ8TO1WcokSDK7H8hqY6CbT1QC3oFxr5pVPoqZzBMOR6g5MOPbR41XtcHlQopCKC6XeGAVd4dIuCx1CT4vqG+8RgOABxhrEeLmsHGFpBnwPtlVniZQixmOLSzQWUNoUDWMt2mwrWKb/VmzprnNmN++ybPqXhX8bD+k1NQDb7r5CwPqlzmCypXSNH9kVn0QvpqLT5elQ2295yzasW22c8mEPmSvNPM/rE/tqWJA6vAKbXOy1ktrG/TCbzGV2llAvqQqQPX8zGJrXEzKTYk+mHiIdMKpw1bWJhDUOAjdosi853Lbt2GuUjiVNMGJBXPcLLvmjjvv9oLcSYHBTuIfOkScLKKGUhabzHFPmdxgF1MB0zvVO22ooxhmhvCmq+dlag71bbP5RvTjHf50BzJZ5+ysGyM7FJm99BErHo2lTpHSKdSFF0nAlP9Z/Ybf2zTEunlz8RdmQgsq+0F+kwkxI7SqGTy0SAJbbgawNoNTptdyO33a41zprKd/3Wnp7kfoTOfmjVYdHPVFC1GywMER7ordLV3XpjrjX6R6JTd2eOZajcBCsEc+gzVqg/nR6t5y8jfS8NfzfdCMsRzEqz6vuy+M66zNIEocZiF9Tkm1r8MLwaUCE7QfEXexqkChAk9jaOzcojyOfAlXIxvVMn6yFF1gmmQtgudxsY7I/0ZjdSZlBgBFcPFT6OT+HTZ7cCAVF7J7GsGlVzwrUpqcQzSt9z3QrA0iTd4DUXgsWmFIgcdhWbPFlkaPKyZ+QXxrz2VYKCuzDWi3wzLaioFnHxLXZDt6Puo5mPiRTzSolu3fH4S31yVJ7E6e2n8zwUmnFiZ10TrrkO64b9B3TwLx1mLPap7F39DAnufj7XF4eKCdvGJEKVGc+SsyrElzKimsR4Zs9H/Jw+KOCWc/O9l8yFAc42EXUGWrq9L+B6NIaZ7hDY/sDHI748wyFPeUHhOa99BnR15Sr+IrXBG3tsXbyMgHv+gS66Nkmkllvwjpi5Q/7vJOrxrKyFS1KGl5+6N/PXj1Tn5SqWMN8Wj2mniEGD9zSaLy7DUCxmKYA9Dn3/8WQdY8yWmOyi+SFyrL6VgQ8sUQ5MNnVPhQevxB3ZQSTItofT0sE0Xv7yEYkc/T4HGVsvDRKz6RZwaZvZEg')); - payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual(expectedErrorPayload.error); + expect(payload).toEqual(expectedErrorPayload.error); + done(); + }, done); }); it('Should not provide errorInput if errorInput was empty', () => { @@ -620,7 +647,7 @@ describe('AMQP', () => { it('Should ack message when confirmed', () => { - const amqp = new Amqp(); + const amqp = new Amqp(settings); amqp.subscribeChannel = jasmine.createSpyObj('subscribeChannel', ['ack']); amqp.ack(message); @@ -672,7 +699,7 @@ describe('AMQP', () => { //eslint-disable-next-line max-len expect(clientFunction.calls[0].args[1].content).toEqual(encryptor.encryptMessageContent({ content: 'Message content' })); - expect(encryptor.decryptMessageContent).toHaveBeenCalledWith(message.content, message.properties.headers); + expect(encryptor.decryptMessageContent).toHaveBeenCalledWith(message.content); }); });