From 79a7151a5f757bdc1e164b455d2e7aadc1c1fce1 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Mon, 27 May 2019 15:29:30 +0300 Subject: [PATCH 01/37] elasticio#1456 removed unused argument --- lib/amqp.js | 2 +- lib/cipher.js | 4 +--- lib/encryptor.js | 4 ++-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index 798e8907..ecb5d67a 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -69,7 +69,7 @@ class Amqp { let decryptedContent; try { - decryptedContent = encryptor.decryptMessageContent(message.content, message.properties.headers); + decryptedContent = encryptor.decryptMessageContent(message.content); } catch (err) { console.error( 'Error occurred while parsing message #%j payload (%s)', diff --git a/lib/cipher.js b/lib/cipher.js index 5ddcd54a..d7ec763b 100644 --- a/lib/cipher.js +++ b/lib/cipher.js @@ -30,11 +30,9 @@ function encryptIV(rawData) { return cipher.update(rawData, 'utf-8', 'base64') + cipher.final('base64'); } -function decryptIV(encData, options) { +function decryptIV(encData) { debug('About to decrypt:', encData); - options = options || {}; - if (!_.isString(encData)) { throw new Error('RabbitMQ message cipher.decryptIV() accepts only string as parameter.'); } diff --git a/lib/encryptor.js b/lib/encryptor.js index d8b3bdeb..1750d4d8 100644 --- a/lib/encryptor.js +++ b/lib/encryptor.js @@ -7,12 +7,12 @@ function encryptMessageContent(messagePayload) { return cipher.encrypt(JSON.stringify(messagePayload)); } -function decryptMessageContent(messagePayload, messageHeaders) { +function decryptMessageContent(messagePayload) { if (!messagePayload || messagePayload.toString().length === 0) { return null; } try { - return JSON.parse(cipher.decrypt(messagePayload.toString(), messageHeaders)); + return JSON.parse(cipher.decrypt(messagePayload.toString())); } catch (err) { console.error(err.stack); throw Error('Failed to decrypt message: ' + err.message); From ddad1d59f5d13ad87ae4e5a6352520b3c0ef83be Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Mon, 27 May 2019 17:37:43 +0300 Subject: [PATCH 02/37] elasticio#1456 1st draft --- lib/amqp.js | 47 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index ecb5d67a..b9481a21 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -4,6 +4,8 @@ const encryptor = require('./encryptor.js'); const co = require('co'); const _ = require('lodash'); const eventToPromise = require('event-to-promise'); +const request = require('requestretry'); +const Readable = require('stream').Readable const HEADER_ROUTING_KEY = 'x-eio-routing-key'; const HEADER_ERROR_RESPONSE = 'x-eio-error-response'; @@ -53,13 +55,43 @@ class Amqp { return Promise.resolve(); } + async maesterGet(messageId) { + const options = { + auth: { + bearer: this.settings.MAESTER_JWT + }, + // TODO: check agent works as expected + forever: true, + // TODO: retry options + }; + return await request.get(`${this.settings.MAESTER_BASEPATH}/objects/${messageId}`, options).body; + } + + async maesterPut(messageId, data) { + const options = { + auth: { + bearer: this.settings.MAESTER_JWT + }, + headers: { + 'Content-Type': 'application/octet-stream' + }, + // TODO: check agent works as expected + forever: true, + // TODO: retry options + }; + const s = new Readable; + s.push(data); + s.push(null); + return await s.pipe(request.put(`${this.settings.MAESTER_BASEPATH}/objects/${messageId}`, options)); + } + listenQueue(queueName, callback) { //eslint-disable-next-line consistent-this const self = this; this.subscribeChannel.prefetch(this.settings.RABBITMQ_PREFETCH_SAILOR); - return this.subscribeChannel.consume(queueName, function decryptMessage(message) { + return this.subscribeChannel.consume(queueName, async function decryptMessage(message) { log.trace('Message received: %j', message); if (message === null) { @@ -67,9 +99,13 @@ class Amqp { return; } + // TODO: handle errors + // TODO: check is_maester flag is not copied to outgoing message + const content = message.properties.headers.is_maester ? await maesterGet(message.properties.headers.messageId) : message.content; + let decryptedContent; try { - decryptedContent = encryptor.decryptMessageContent(message.content); + decryptedContent = encryptor.decryptMessageContent(content); } catch (err) { console.error( 'Error occurred while parsing message #%j payload (%s)', @@ -150,8 +186,13 @@ class Amqp { const settings = this.settings; data.headers = filterMessageHeaders(data.headers); const encryptedData = encryptor.encryptMessageContent(data); + if (this.settings.MAESTER_IS_STORE) { + // TODO: handle errors + await maesterPut(data.headers.messageId, encryptedData); + properties.headers.is_maester = true; + } - return this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties, throttle); + return this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, this.settings.MAESTER_IS_STORE ? '' : encryptedData, properties, throttle); } async sendData(data, properties, throttle) { From e374344488cfea68010709d7b08ba093f8f38e84 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Mon, 27 May 2019 18:52:51 +0300 Subject: [PATCH 03/37] elasticio#1456 generate uuid for maester, we can't use messageId because it can be not uuid --- lib/amqp.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index b9481a21..03c996cc 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -6,6 +6,7 @@ const _ = require('lodash'); const eventToPromise = require('event-to-promise'); const request = require('requestretry'); const Readable = require('stream').Readable +const uuid = require('uuid'); const HEADER_ROUTING_KEY = 'x-eio-routing-key'; const HEADER_ERROR_RESPONSE = 'x-eio-error-response'; @@ -100,8 +101,8 @@ class Amqp { } // TODO: handle errors - // TODO: check is_maester flag is not copied to outgoing message - const content = message.properties.headers.is_maester ? await maesterGet(message.properties.headers.messageId) : message.content; + // TODO: check maesterId flag is not copied to outgoing message + const content = message.properties.headers.maesterId ? await maesterGet(message.properties.headers.maesterId) : message.content; let decryptedContent; try { @@ -187,9 +188,10 @@ class Amqp { data.headers = filterMessageHeaders(data.headers); const encryptedData = encryptor.encryptMessageContent(data); if (this.settings.MAESTER_IS_STORE) { + const maesterId = uuid.v4(); // TODO: handle errors - await maesterPut(data.headers.messageId, encryptedData); - properties.headers.is_maester = true; + await maesterPut(maesterId, encryptedData); + properties.headers.maesterId = maesterId; } return this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, this.settings.MAESTER_IS_STORE ? '' : encryptedData, properties, throttle); From b5900303aa24acd639a181d0c9bf433f51bd5bf1 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Mon, 27 May 2019 19:09:01 +0300 Subject: [PATCH 04/37] elasticio#1456 fixed missing this --- lib/amqp.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index 03c996cc..a733c07c 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -102,8 +102,7 @@ class Amqp { // TODO: handle errors // TODO: check maesterId flag is not copied to outgoing message - const content = message.properties.headers.maesterId ? await maesterGet(message.properties.headers.maesterId) : message.content; - + const content = message.properties.headers.maesterId ? await self.maesterGet(message.properties.headers.maesterId) : message.content; let decryptedContent; try { decryptedContent = encryptor.decryptMessageContent(content); @@ -190,7 +189,7 @@ class Amqp { if (this.settings.MAESTER_IS_STORE) { const maesterId = uuid.v4(); // TODO: handle errors - await maesterPut(maesterId, encryptedData); + await this.maesterPut(maesterId, encryptedData); properties.headers.maesterId = maesterId; } From 7c020ab8d0c3e17805325eeaa3195508404db883 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Mon, 27 May 2019 19:09:27 +0300 Subject: [PATCH 05/37] elasticio#1456 writing integration test --- mocha_spec/run.spec.js | 81 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index 43c8213a..d51a7770 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -143,6 +143,87 @@ describe('Integration Test', () => { }); }); + it('should run trigger with data from maester successfully', (done) => { + process.env.ELASTICIO_MESSAGE_CRYPTO_PASSWORD = 'testCryptoPassword'; + process.env.ELASTICIO_MESSAGE_CRYPTO_IV = 'iv=any16_symbols'; + + helpers.mockApiTaskStepResponse(); + + nock('https://api.acme.com') + .post('/subscribe') + .reply(200, { + id: 'subscription_12345' + }) + .get('/customers') + .reply(200, customers); + + amqpHelper.on('data', ({ properties, body }, queueName) => { + + expect(queueName).to.eql(amqpHelper.nextStepQueue); + + delete properties.headers.start; + delete properties.headers.end; + delete properties.headers.cid; + + expect(properties.headers).to.deep.equal({ + 'execId': env.ELASTICIO_EXEC_ID, + 'taskId': env.ELASTICIO_FLOW_ID, + 'userId': env.ELASTICIO_USER_ID, + 'stepId': env.ELASTICIO_STEP_ID, + 'compId': env.ELASTICIO_COMP_ID, + 'function': env.ELASTICIO_FUNCTION, + 'x-eio-meta-trace-id': traceId, + 'parentMessageId': parentMessageId, + messageId + }); + + expect(properties).to.deep.equal({ + contentType: 'application/json', + contentEncoding: 'utf8', + headers: { + 'execId': env.ELASTICIO_EXEC_ID, + 'taskId': env.ELASTICIO_FLOW_ID, + 'userId': env.ELASTICIO_USER_ID, + 'stepId': env.ELASTICIO_STEP_ID, + 'compId': env.ELASTICIO_COMP_ID, + 'function': env.ELASTICIO_FUNCTION, + 'x-eio-meta-trace-id': traceId, + 'parentMessageId': parentMessageId, + messageId + }, + deliveryMode: undefined, + priority: undefined, + correlationId: undefined, + replyTo: undefined, + expiration: undefined, + messageId: undefined, + timestamp: undefined, + type: undefined, + userId: undefined, + appId: undefined, + clusterId: undefined + }); + expect(body).to.deep.equal({ + originalMsg: '', + customers: customers, + subscription: { + id: 'subscription_12345', + cfg: { + apiKey: 'secret' + } + } + }); + done(); + }); + + run = requireRun(); + + amqpHelper.publishMessage('', { + parentMessageId, + traceId + }, { maesterId: '47d3e978-8099-11e9-bc42-526af7764f64' }); + }); + it('should augment passthrough property with data', done => { process.env.ELASTICIO_STEP_ID = 'step_2'; process.env.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003'; From cd2542b40153f3d5b1a4eb8d559eaf79fd3330a6 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Tue, 28 May 2019 13:52:08 +0300 Subject: [PATCH 06/37] elasticio#1456 small code fixes, made test to work --- lib/amqp.js | 8 +- lib/settings.js | 5 +- mocha_spec/integration_helpers.js | 11 ++- mocha_spec/run.spec.js | 135 +++++++++++++++++------------- 4 files changed, 90 insertions(+), 69 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index a733c07c..dc9bf2b7 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -56,7 +56,7 @@ class Amqp { return Promise.resolve(); } - async maesterGet(messageId) { + async maesterGet(maesterId) { const options = { auth: { bearer: this.settings.MAESTER_JWT @@ -65,10 +65,10 @@ class Amqp { forever: true, // TODO: retry options }; - return await request.get(`${this.settings.MAESTER_BASEPATH}/objects/${messageId}`, options).body; + return (await request.get(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options)).body; } - async maesterPut(messageId, data) { + async maesterPut(maesterId, data) { const options = { auth: { bearer: this.settings.MAESTER_JWT @@ -83,7 +83,7 @@ class Amqp { const s = new Readable; s.push(data); s.push(null); - return await s.pipe(request.put(`${this.settings.MAESTER_BASEPATH}/objects/${messageId}`, options)); + return await s.pipe(request.put(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options)); } listenQueue(queueName, callback) { diff --git a/lib/settings.js b/lib/settings.js index 87fd2ebe..2f9b2a58 100644 --- a/lib/settings.js +++ b/lib/settings.js @@ -48,7 +48,10 @@ function readFrom(envVars) { RATE_INTERVAL: 100, // 100ms PROCESS_AMQP_DRAIN: true, AMQP_PUBLISH_RETRY_DELAY: 100, // 100ms - AMQP_PUBLISH_RETRY_ATTEMPTS: 10 + AMQP_PUBLISH_RETRY_ATTEMPTS: 10, + MAESTER_BASEPATH: '', + MAESTER_JWT: '', + MAESTER_IS_STORE: false }; _.forEach(requiredAlways, function readRequired(key) { diff --git a/mocha_spec/integration_helpers.js b/mocha_spec/integration_helpers.js index 1c4b6fbf..bbbdcebe 100644 --- a/mocha_spec/integration_helpers.js +++ b/mocha_spec/integration_helpers.js @@ -131,11 +131,12 @@ class AmqpHelper extends EventEmitter { this.publishChannel.ack(message); - const emittedMessage = JSON.parse(message.content.toString()); + const content = message.content.toString(); + const emittedMessage = content ? JSON.parse(content) : content; const data = { properties: message.properties, - body: emittedMessage.body, + body: emittedMessage ? emittedMessage.body : null, emittedMessage }; this.dataMessages.push(data); @@ -159,7 +160,7 @@ function amqp() { return handle; } -function prepareEnv() { +function prepareEnv(isMaester) { env.ELASTICIO_AMQP_URI = 'amqp://guest:guest@localhost:5672'; env.ELASTICIO_RABBITMQ_PREFETCH_SAILOR = '10'; env.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003'; @@ -178,7 +179,9 @@ function prepareEnv() { env.DEBUG = 'sailor:debug'; - + env.ELASTICIO_MAESTER_IS_STORE = isMaester || ''; + env.ELASTICIO_MAESTER_BASEPATH = isMaester ? 'http://ma.es.ter' : ''; + env.ELASTICIO_MAESTER_JWT = isMaester ? 'jwt' : ''; } function mockApiTaskStepResponse(response) { diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index d51a7770..b26fdc9d 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -6,6 +6,7 @@ const co = require('co'); const sinonjs = require('sinon'); const logging = require('../lib/logging.js'); const helpers = require('./integration_helpers'); +const encryptor = require('../lib/encryptor.js'); const env = process.env; @@ -143,44 +144,51 @@ describe('Integration Test', () => { }); }); - it('should run trigger with data from maester successfully', (done) => { - process.env.ELASTICIO_MESSAGE_CRYPTO_PASSWORD = 'testCryptoPassword'; - process.env.ELASTICIO_MESSAGE_CRYPTO_IV = 'iv=any16_symbols'; + describe('when maester should be used', () => { + beforeEach(() => helpers.prepareEnv(true)); - helpers.mockApiTaskStepResponse(); + it('should run trigger successfully', (done) => { + const maesterId = '47d3e978-8099-11e9-bc42-526af7764f64'; - nock('https://api.acme.com') - .post('/subscribe') - .reply(200, { - id: 'subscription_12345' - }) - .get('/customers') - .reply(200, customers); + helpers.mockApiTaskStepResponse(); - amqpHelper.on('data', ({ properties, body }, queueName) => { + nock('https://api.acme.com') + .post('/subscribe') + .reply(200, { + id: 'subscription_12345' + }) + .get('/customers') + .reply(200, customers); - expect(queueName).to.eql(amqpHelper.nextStepQueue); + nock(process.env.ELASTICIO_MAESTER_BASEPATH) + .get(`/objects/${maesterId}`) + .reply(200, encryptor.encryptMessageContent(inputMessage)) + .put(/^\/objects\/[0-9a-z-]+$/, encryptor.encryptMessageContent({ + id: messageId, + body: { + originalMsg: inputMessage, + customers, + subscription: { + id: 'subscription_12345', + cfg: { + apiKey: 'secret' + } + } + }, + headers: {} + })) + .reply(200); - delete properties.headers.start; - delete properties.headers.end; - delete properties.headers.cid; + amqpHelper.on('data', ({ properties, body }, queueName) => { + expect(queueName).to.eql(amqpHelper.nextStepQueue); + expect(properties.headers.maesterId).to.be.string; - expect(properties.headers).to.deep.equal({ - 'execId': env.ELASTICIO_EXEC_ID, - 'taskId': env.ELASTICIO_FLOW_ID, - 'userId': env.ELASTICIO_USER_ID, - 'stepId': env.ELASTICIO_STEP_ID, - 'compId': env.ELASTICIO_COMP_ID, - 'function': env.ELASTICIO_FUNCTION, - 'x-eio-meta-trace-id': traceId, - 'parentMessageId': parentMessageId, - messageId - }); + delete properties.headers.start; + delete properties.headers.end; + delete properties.headers.cid; + delete properties.headers.maesterId; - expect(properties).to.deep.equal({ - contentType: 'application/json', - contentEncoding: 'utf8', - headers: { + expect(properties.headers).to.deep.equal({ 'execId': env.ELASTICIO_EXEC_ID, 'taskId': env.ELASTICIO_FLOW_ID, 'userId': env.ELASTICIO_USER_ID, @@ -190,38 +198,45 @@ describe('Integration Test', () => { 'x-eio-meta-trace-id': traceId, 'parentMessageId': parentMessageId, messageId - }, - deliveryMode: undefined, - priority: undefined, - correlationId: undefined, - replyTo: undefined, - expiration: undefined, - messageId: undefined, - timestamp: undefined, - type: undefined, - userId: undefined, - appId: undefined, - clusterId: undefined - }); - expect(body).to.deep.equal({ - originalMsg: '', - customers: customers, - subscription: { - id: 'subscription_12345', - cfg: { - apiKey: 'secret' - } - } + }); + + expect(properties).to.deep.equal({ + contentType: 'application/json', + contentEncoding: 'utf8', + headers: { + 'execId': env.ELASTICIO_EXEC_ID, + 'taskId': env.ELASTICIO_FLOW_ID, + 'userId': env.ELASTICIO_USER_ID, + 'stepId': env.ELASTICIO_STEP_ID, + 'compId': env.ELASTICIO_COMP_ID, + 'function': env.ELASTICIO_FUNCTION, + 'x-eio-meta-trace-id': traceId, + 'parentMessageId': parentMessageId, + messageId + }, + deliveryMode: undefined, + priority: undefined, + correlationId: undefined, + replyTo: undefined, + expiration: undefined, + messageId: undefined, + timestamp: undefined, + type: undefined, + userId: undefined, + appId: undefined, + clusterId: undefined + }); + expect(body).to.be.null; + done(); }); - done(); - }); - run = requireRun(); + run = requireRun(); - amqpHelper.publishMessage('', { - parentMessageId, - traceId - }, { maesterId: '47d3e978-8099-11e9-bc42-526af7764f64' }); + amqpHelper.publishMessage('', { + parentMessageId, + traceId + }, { maesterId }); + }); }); it('should augment passthrough property with data', done => { From 4277bdc21fe5485ed782136ee65460be5e0aca32 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Tue, 28 May 2019 14:21:20 +0300 Subject: [PATCH 07/37] elasticio#1456 added test to respond directly, updated tests --- mocha_spec/integration_helpers.js | 6 +- mocha_spec/run.spec.js | 156 ++++++++++++++---------------- 2 files changed, 77 insertions(+), 85 deletions(-) diff --git a/mocha_spec/integration_helpers.js b/mocha_spec/integration_helpers.js index bbbdcebe..53df332d 100644 --- a/mocha_spec/integration_helpers.js +++ b/mocha_spec/integration_helpers.js @@ -179,9 +179,9 @@ function prepareEnv(isMaester) { env.DEBUG = 'sailor:debug'; - env.ELASTICIO_MAESTER_IS_STORE = isMaester || ''; - env.ELASTICIO_MAESTER_BASEPATH = isMaester ? 'http://ma.es.ter' : ''; - env.ELASTICIO_MAESTER_JWT = isMaester ? 'jwt' : ''; + env.ELASTICIO_MAESTER_IS_STORE = ''; + env.ELASTICIO_MAESTER_BASEPATH = 'http://ma.es.ter'; + env.ELASTICIO_MAESTER_JWT = 'jwt'; } function mockApiTaskStepResponse(response) { diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index b26fdc9d..2806b0db 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -60,6 +60,7 @@ describe('Integration Test', () => { const parentMessageId = 'parent_message_1234567890'; const traceId = helpers.PREFIX + '_trace_id_123456'; const messageId = 'f45be600-f770-11e6-b42d-b187bfbf19fd'; + const maesterId = '47d3e978-8099-11e9-bc42-526af7764f64'; let amqpHelper = helpers.amqp(); beforeEach(() => amqpHelper.prepare()); @@ -144,99 +145,90 @@ describe('Integration Test', () => { }); }); - describe('when maester should be used', () => { - beforeEach(() => helpers.prepareEnv(true)); + it('should get maester message', (done) => { + process.env.ELASTICIO_MAESTER_IS_STORE = true; - it('should run trigger successfully', (done) => { - const maesterId = '47d3e978-8099-11e9-bc42-526af7764f64'; - - helpers.mockApiTaskStepResponse(); + helpers.mockApiTaskStepResponse(); - nock('https://api.acme.com') - .post('/subscribe') - .reply(200, { - id: 'subscription_12345' - }) - .get('/customers') - .reply(200, customers); + nock('https://api.acme.com') + .post('/subscribe') + .reply(200, { + id: 'subscription_12345' + }) + .get('/customers') + .reply(200, customers); - nock(process.env.ELASTICIO_MAESTER_BASEPATH) - .get(`/objects/${maesterId}`) - .reply(200, encryptor.encryptMessageContent(inputMessage)) - .put(/^\/objects\/[0-9a-z-]+$/, encryptor.encryptMessageContent({ - id: messageId, - body: { - originalMsg: inputMessage, - customers, - subscription: { - id: 'subscription_12345', - cfg: { - apiKey: 'secret' - } + const maesterCalls = nock(process.env.ELASTICIO_MAESTER_BASEPATH) + .get(`/objects/${maesterId}`) + .reply(200, encryptor.encryptMessageContent(inputMessage)) + .put(/^\/objects\/[0-9a-z-]+$/, encryptor.encryptMessageContent({ + id: messageId, + body: { + originalMsg: inputMessage, + customers, + subscription: { + id: 'subscription_12345', + cfg: { + apiKey: 'secret' } - }, - headers: {} - })) - .reply(200); + } + }, + headers: {} + })) + .reply(200); + + amqpHelper.on('data', ({ properties, body }) => { + expect(properties.headers.maesterId).to.be.a('string'); + expect(body).to.be.null; + expect(maesterCalls.isDone()).to.be.true; + done(); + }); - amqpHelper.on('data', ({ properties, body }, queueName) => { - expect(queueName).to.eql(amqpHelper.nextStepQueue); - expect(properties.headers.maesterId).to.be.string; + run = requireRun(); - delete properties.headers.start; - delete properties.headers.end; - delete properties.headers.cid; - delete properties.headers.maesterId; + amqpHelper.publishMessage('', { + parentMessageId, + traceId + }, { maesterId }); + }); - expect(properties.headers).to.deep.equal({ - 'execId': env.ELASTICIO_EXEC_ID, - 'taskId': env.ELASTICIO_FLOW_ID, - 'userId': env.ELASTICIO_USER_ID, - 'stepId': env.ELASTICIO_STEP_ID, - 'compId': env.ELASTICIO_COMP_ID, - 'function': env.ELASTICIO_FUNCTION, - 'x-eio-meta-trace-id': traceId, - 'parentMessageId': parentMessageId, - messageId - }); + it('should get maester message, but publish directly', (done) => { + helpers.mockApiTaskStepResponse(); - expect(properties).to.deep.equal({ - contentType: 'application/json', - contentEncoding: 'utf8', - headers: { - 'execId': env.ELASTICIO_EXEC_ID, - 'taskId': env.ELASTICIO_FLOW_ID, - 'userId': env.ELASTICIO_USER_ID, - 'stepId': env.ELASTICIO_STEP_ID, - 'compId': env.ELASTICIO_COMP_ID, - 'function': env.ELASTICIO_FUNCTION, - 'x-eio-meta-trace-id': traceId, - 'parentMessageId': parentMessageId, - messageId - }, - deliveryMode: undefined, - priority: undefined, - correlationId: undefined, - replyTo: undefined, - expiration: undefined, - messageId: undefined, - timestamp: undefined, - type: undefined, - userId: undefined, - appId: undefined, - clusterId: undefined - }); - expect(body).to.be.null; - done(); - }); + nock('https://api.acme.com') + .post('/subscribe') + .reply(200, { + id: 'subscription_12345' + }) + .get('/customers') + .reply(200, customers); - run = requireRun(); + const maesterCalls = nock(process.env.ELASTICIO_MAESTER_BASEPATH) + .get(`/objects/${maesterId}`) + .reply(200, encryptor.encryptMessageContent(inputMessage)); - amqpHelper.publishMessage('', { - parentMessageId, - traceId - }, { maesterId }); + amqpHelper.on('data', ({ properties, body }) => { + expect(properties.headers.maesterId).to.not.exist; + expect(body).to.deep.equal({ + originalMsg: inputMessage, + customers, + subscription: { + id: 'subscription_12345', + cfg: { + apiKey: 'secret' + } + } + }); + expect(maesterCalls.isDone()).to.be.true; + done(); }); + + run = requireRun(); + + amqpHelper.publishMessage('', { + parentMessageId, + traceId + }, { maesterId }); }); it('should augment passthrough property with data', done => { From d9d82cc0104db27ad2b6c9b89f8737c29a6e3f65 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Tue, 28 May 2019 15:05:54 +0300 Subject: [PATCH 08/37] elasticio#1456 isolate enc/dec code together with maester in a separate functions --- lib/amqp.js | 62 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index dc9bf2b7..18e03880 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -56,19 +56,33 @@ class Amqp { return Promise.resolve(); } - async maesterGet(maesterId) { - const options = { - auth: { - bearer: this.settings.MAESTER_JWT - }, - // TODO: check agent works as expected - forever: true, - // TODO: retry options - }; - return (await request.get(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options)).body; + async decryptMessage(message) { + const maesterId = message.properties.headers.maesterId; + let content = message.content; + + if (maesterId) { + const options = { + auth: { + bearer: this.settings.MAESTER_JWT + }, + // TODO: check agent works as expected + forever: true + // TODO: retry options + }; + content = (await request.get(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options)).body; + } + + return encryptor.decryptMessageContent(content); } - async maesterPut(maesterId, data) { + async encryptMessage(data, properties) { + const encryptedData = encryptor.encryptMessageContent(data); + if (!this.settings.MAESTER_IS_STORE) { + return encryptedData; + } + + const maesterId = uuid.v4(); + // TODO: handle errors const options = { auth: { bearer: this.settings.MAESTER_JWT @@ -77,13 +91,16 @@ class Amqp { 'Content-Type': 'application/octet-stream' }, // TODO: check agent works as expected - forever: true, + forever: true // TODO: retry options }; const s = new Readable; - s.push(data); + s.push(encryptedData); s.push(null); - return await s.pipe(request.put(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options)); + await s.pipe(request.put(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options)); + properties.headers.maesterId = maesterId; + + return ''; } listenQueue(queueName, callback) { @@ -100,15 +117,12 @@ class Amqp { return; } - // TODO: handle errors - // TODO: check maesterId flag is not copied to outgoing message - const content = message.properties.headers.maesterId ? await self.maesterGet(message.properties.headers.maesterId) : message.content; let decryptedContent; try { - decryptedContent = encryptor.decryptMessageContent(content); + decryptedContent = await self.decryptMessage(message); } catch (err) { console.error( - 'Error occurred while parsing message #%j payload (%s)', + 'Error occurred while getting message #%j payload (%s)', message.fields.deliveryTag, err.message ); @@ -185,15 +199,9 @@ class Amqp { async prepareMessageAndSendToExchange(data, properties, routingKey, throttle) { const settings = this.settings; data.headers = filterMessageHeaders(data.headers); - const encryptedData = encryptor.encryptMessageContent(data); - if (this.settings.MAESTER_IS_STORE) { - const maesterId = uuid.v4(); - // TODO: handle errors - await this.maesterPut(maesterId, encryptedData); - properties.headers.maesterId = maesterId; - } + const encryptedData = await this.encryptMessage(data, properties); - return this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, this.settings.MAESTER_IS_STORE ? '' : encryptedData, properties, throttle); + return this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties, throttle); } async sendData(data, properties, throttle) { From adb2012218b132253659328fdc02591c43570d8d Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Tue, 28 May 2019 15:32:24 +0300 Subject: [PATCH 09/37] elasticio#1456 work with ecnrypted data as base64, not binary --- lib/amqp.js | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index 18e03880..c1376c07 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -87,17 +87,12 @@ class Amqp { auth: { bearer: this.settings.MAESTER_JWT }, - headers: { - 'Content-Type': 'application/octet-stream' - }, + body: encryptedData, // TODO: check agent works as expected forever: true // TODO: retry options }; - const s = new Readable; - s.push(encryptedData); - s.push(null); - await s.pipe(request.put(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options)); + await request.put(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options); properties.headers.maesterId = maesterId; return ''; From e120aa0937b716cbe714bcb965c87769bcb8becb Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Tue, 28 May 2019 18:46:00 +0300 Subject: [PATCH 10/37] elasticio#1456 handle maester errors and conflicts --- lib/amqp.js | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index c1376c07..ed4bf8ce 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -5,7 +5,6 @@ const co = require('co'); const _ = require('lodash'); const eventToPromise = require('event-to-promise'); const request = require('requestretry'); -const Readable = require('stream').Readable const uuid = require('uuid'); const HEADER_ROUTING_KEY = 'x-eio-routing-key'; @@ -65,11 +64,15 @@ class Amqp { auth: { bearer: this.settings.MAESTER_JWT }, - // TODO: check agent works as expected - forever: true - // TODO: retry options + forever: true, + maxAttempts: 3, + retryDelay: 100 }; - content = (await request.get(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options)).body; + const res = (await request.get(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options)); + if (res.statusCode >= 400) { + throw new Error(`HTTP error during maester get: ${res.statusCode} (${res.statusMessage})`); + } + content = res.body; } return encryptor.decryptMessageContent(content); @@ -81,18 +84,28 @@ class Amqp { return encryptedData; } - const maesterId = uuid.v4(); - // TODO: handle errors + let maesterId = uuid.v4(); const options = { auth: { bearer: this.settings.MAESTER_JWT }, body: encryptedData, - // TODO: check agent works as expected - forever: true - // TODO: retry options + forever: true, + maxAttempts: 3, + retryDelay: 100, + retryStrategy: (err, response, body, options) => { + if (!err && response.statusCode === 409) { + maesterId = uuid.v4(); + options.url = `${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`; + return { mustRetry: true, options }; + } + return request.RetryStrategies.HTTPOrNetworkError(err, response, body, options); + } }; - await request.put(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options); + const res = await request.put(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options); + if (res.statusCode >= 400) { + throw new Error(`HTTP error during maester put: ${res.statusCode} (${res.statusMessage})`); + } properties.headers.maesterId = maesterId; return ''; From d458603f8b479a46ae1bb7be9ec3b592c1dcd690 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Tue, 28 May 2019 18:48:34 +0300 Subject: [PATCH 11/37] elasticio#1456 group common headers --- lib/amqp.js | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index ed4bf8ce..1655fe26 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -61,9 +61,7 @@ class Amqp { if (maesterId) { const options = { - auth: { - bearer: this.settings.MAESTER_JWT - }, + auth: { bearer: this.settings.MAESTER_JWT }, forever: true, maxAttempts: 3, retryDelay: 100 @@ -86,13 +84,11 @@ class Amqp { let maesterId = uuid.v4(); const options = { - auth: { - bearer: this.settings.MAESTER_JWT - }, - body: encryptedData, + auth: { bearer: this.settings.MAESTER_JWT }, forever: true, maxAttempts: 3, retryDelay: 100, + body: encryptedData, retryStrategy: (err, response, body, options) => { if (!err && response.statusCode === 409) { maesterId = uuid.v4(); From 3a5c964e45acb096c62c86d17bd924b93d442c76 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Tue, 28 May 2019 18:54:14 +0300 Subject: [PATCH 12/37] elasticio#1456 MAESTER_IS_STORE -> MAESTER_OUT --- lib/amqp.js | 2 +- lib/settings.js | 2 +- mocha_spec/integration_helpers.js | 3 +-- mocha_spec/run.spec.js | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index 1655fe26..23bc552f 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -78,7 +78,7 @@ class Amqp { async encryptMessage(data, properties) { const encryptedData = encryptor.encryptMessageContent(data); - if (!this.settings.MAESTER_IS_STORE) { + if (!this.settings.MAESTER_OUT) { return encryptedData; } diff --git a/lib/settings.js b/lib/settings.js index 2f9b2a58..a6826f51 100644 --- a/lib/settings.js +++ b/lib/settings.js @@ -51,7 +51,7 @@ function readFrom(envVars) { AMQP_PUBLISH_RETRY_ATTEMPTS: 10, MAESTER_BASEPATH: '', MAESTER_JWT: '', - MAESTER_IS_STORE: false + MAESTER_OUT: false }; _.forEach(requiredAlways, function readRequired(key) { diff --git a/mocha_spec/integration_helpers.js b/mocha_spec/integration_helpers.js index 53df332d..38d62fa6 100644 --- a/mocha_spec/integration_helpers.js +++ b/mocha_spec/integration_helpers.js @@ -179,8 +179,7 @@ function prepareEnv(isMaester) { env.DEBUG = 'sailor:debug'; - env.ELASTICIO_MAESTER_IS_STORE = ''; - env.ELASTICIO_MAESTER_BASEPATH = 'http://ma.es.ter'; + env.ELASTICIO_MAESTER_OUT = ''; env.ELASTICIO_MAESTER_JWT = 'jwt'; } diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index 2806b0db..a05d16fc 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -146,7 +146,7 @@ describe('Integration Test', () => { }); it('should get maester message', (done) => { - process.env.ELASTICIO_MAESTER_IS_STORE = true; + process.env.ELASTICIO_MAESTER_OUT = true; helpers.mockApiTaskStepResponse(); From ce865029edc69d9f7a538703f7d3d9ba3a4aee86 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Tue, 28 May 2019 18:56:40 +0300 Subject: [PATCH 13/37] elasticio#1456 ignore eslint, we can't fix this --- lib/amqp.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/amqp.js b/lib/amqp.js index 23bc552f..434785d0 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -95,6 +95,7 @@ class Amqp { options.url = `${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`; return { mustRetry: true, options }; } + //eslint-disable-next-line new-cap return request.RetryStrategies.HTTPOrNetworkError(err, response, body, options); } }; From 5c5141de731d4bac12c09e95ca22512188fe396b Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Tue, 28 May 2019 19:33:49 +0300 Subject: [PATCH 14/37] elasticio#1456 fixed unit-tests --- mocha_spec/integration_helpers.js | 1 + spec/amqp.spec.js | 14 +++++++------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/mocha_spec/integration_helpers.js b/mocha_spec/integration_helpers.js index 38d62fa6..5437010d 100644 --- a/mocha_spec/integration_helpers.js +++ b/mocha_spec/integration_helpers.js @@ -179,6 +179,7 @@ function prepareEnv(isMaester) { env.DEBUG = 'sailor:debug'; + env.ELASTICIO_MAESTER_BASEPATH = 'http://ma.es.ter'; env.ELASTICIO_MAESTER_OUT = ''; env.ELASTICIO_MAESTER_JWT = 'jwt'; } diff --git a/spec/amqp.spec.js b/spec/amqp.spec.js index 28fc42e1..e27fba3a 100644 --- a/spec/amqp.spec.js +++ b/spec/amqp.spec.js @@ -65,7 +65,7 @@ describe('AMQP', () => { spyOn(encryptor, 'decryptMessageContent').andCallThrough(); }); - it('Should send message to outgoing channel when process data', () => { + it('Should send message to outgoing channel when process data', async () => { const amqp = new Amqp(settings); amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); @@ -79,7 +79,7 @@ describe('AMQP', () => { } }; - amqp.sendData({ + await amqp.sendData({ headers: { 'some-other-header': 'headerValue' }, @@ -164,7 +164,7 @@ describe('AMQP', () => { }, done); }); - it('Should sendHttpReply to outgoing channel using routing key from headers when process data', () => { + it('Should sendHttpReply to outgoing channel using routing key from headers when process data', async () => { const amqp = new Amqp(settings); amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); @@ -187,7 +187,7 @@ describe('AMQP', () => { reply_to: 'my-special-routing-key' } }; - amqp.sendHttpReply(msg, props); + await amqp.sendHttpReply(msg, props); expect(amqp.publishChannel.publish).toHaveBeenCalled(); expect(amqp.publishChannel.publish.callCount).toEqual(1); @@ -232,7 +232,7 @@ describe('AMQP', () => { }); - it('Should send message to outgoing channel using routing key from headers when process data', () => { + it('Should send message to outgoing channel using routing key from headers when process data', async () => { const amqp = new Amqp(settings); amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); @@ -255,7 +255,7 @@ describe('AMQP', () => { } }; - amqp.sendData(msg, props); + await amqp.sendData(msg, props); expect(amqp.publishChannel.publish).toHaveBeenCalled(); expect(amqp.publishChannel.publish.callCount).toEqual(1); @@ -672,7 +672,7 @@ describe('AMQP', () => { //eslint-disable-next-line max-len expect(clientFunction.calls[0].args[1].content).toEqual(encryptor.encryptMessageContent({ content: 'Message content' })); - expect(encryptor.decryptMessageContent).toHaveBeenCalledWith(message.content, message.properties.headers); + expect(encryptor.decryptMessageContent).toHaveBeenCalledWith(message.content); }); }); From fc8dc2440d805f94eb09744c0f22e3d4fbe1c9a4 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Wed, 29 May 2019 11:49:10 +0300 Subject: [PATCH 15/37] elasticio#1456 maester -> object storage --- lib/amqp.js | 26 +++++++++++++------------- lib/settings.js | 6 +++--- mocha_spec/integration_helpers.js | 8 ++++---- mocha_spec/run.spec.js | 28 ++++++++++++++-------------- package.json | 2 +- 5 files changed, 35 insertions(+), 35 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index 434785d0..97596368 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -56,19 +56,19 @@ class Amqp { } async decryptMessage(message) { - const maesterId = message.properties.headers.maesterId; + const objectId = message.properties.headers.objectId; let content = message.content; - if (maesterId) { + if (objectId) { const options = { - auth: { bearer: this.settings.MAESTER_JWT }, + auth: { bearer: this.settings.OBJECT_STORAGE_TOKEN }, forever: true, maxAttempts: 3, retryDelay: 100 }; - const res = (await request.get(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options)); + const res = (await request.get(`${this.settings.OBJECT_STORAGE_BASEPATH}/objects/${objectId}`, options)); if (res.statusCode >= 400) { - throw new Error(`HTTP error during maester get: ${res.statusCode} (${res.statusMessage})`); + throw new Error(`HTTP error during object get: ${res.statusCode} (${res.statusMessage})`); } content = res.body; } @@ -78,32 +78,32 @@ class Amqp { async encryptMessage(data, properties) { const encryptedData = encryptor.encryptMessageContent(data); - if (!this.settings.MAESTER_OUT) { + if (!this.settings.OBJECT_STORAGE_OUT) { return encryptedData; } - let maesterId = uuid.v4(); + let objectId = uuid.v4(); const options = { - auth: { bearer: this.settings.MAESTER_JWT }, + auth: { bearer: this.settings.OBJECT_STORAGE_TOKEN }, forever: true, maxAttempts: 3, retryDelay: 100, body: encryptedData, retryStrategy: (err, response, body, options) => { if (!err && response.statusCode === 409) { - maesterId = uuid.v4(); - options.url = `${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`; + objectId = uuid.v4(); + options.url = `${this.settings.OBJECT_STORAGE_BASEPATH}/objects/${objectId}`; return { mustRetry: true, options }; } //eslint-disable-next-line new-cap return request.RetryStrategies.HTTPOrNetworkError(err, response, body, options); } }; - const res = await request.put(`${this.settings.MAESTER_BASEPATH}/objects/${maesterId}`, options); + const res = await request.put(`${this.settings.OBJECT_STORAGE_BASEPATH}/objects/${objectId}`, options); if (res.statusCode >= 400) { - throw new Error(`HTTP error during maester put: ${res.statusCode} (${res.statusMessage})`); + throw new Error(`HTTP error during object put: ${res.statusCode} (${res.statusMessage})`); } - properties.headers.maesterId = maesterId; + properties.headers.objectId = objectId; return ''; } diff --git a/lib/settings.js b/lib/settings.js index a6826f51..27853e2c 100644 --- a/lib/settings.js +++ b/lib/settings.js @@ -49,9 +49,9 @@ function readFrom(envVars) { PROCESS_AMQP_DRAIN: true, AMQP_PUBLISH_RETRY_DELAY: 100, // 100ms AMQP_PUBLISH_RETRY_ATTEMPTS: 10, - MAESTER_BASEPATH: '', - MAESTER_JWT: '', - MAESTER_OUT: false + OBJECT_STORAGE_BASEPATH: '', + OBJECT_STORAGE_TOKEN: '', + OBJECT_STORAGE_OUT: false }; _.forEach(requiredAlways, function readRequired(key) { diff --git a/mocha_spec/integration_helpers.js b/mocha_spec/integration_helpers.js index 5437010d..11d7a5e5 100644 --- a/mocha_spec/integration_helpers.js +++ b/mocha_spec/integration_helpers.js @@ -160,7 +160,7 @@ function amqp() { return handle; } -function prepareEnv(isMaester) { +function prepareEnv() { env.ELASTICIO_AMQP_URI = 'amqp://guest:guest@localhost:5672'; env.ELASTICIO_RABBITMQ_PREFETCH_SAILOR = '10'; env.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003'; @@ -179,9 +179,9 @@ function prepareEnv(isMaester) { env.DEBUG = 'sailor:debug'; - env.ELASTICIO_MAESTER_BASEPATH = 'http://ma.es.ter'; - env.ELASTICIO_MAESTER_OUT = ''; - env.ELASTICIO_MAESTER_JWT = 'jwt'; + env.ELASTICIO_OBJECT_STORAGE_BASEPATH = 'http://ma.es.ter'; + env.ELASTICIO_OBJECT_STORAGE_OUT = ''; + env.ELASTICIO_OBJECT_STORAGE_TOKEN = 'jwt'; } function mockApiTaskStepResponse(response) { diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index a05d16fc..169ba7e4 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -60,7 +60,7 @@ describe('Integration Test', () => { const parentMessageId = 'parent_message_1234567890'; const traceId = helpers.PREFIX + '_trace_id_123456'; const messageId = 'f45be600-f770-11e6-b42d-b187bfbf19fd'; - const maesterId = '47d3e978-8099-11e9-bc42-526af7764f64'; + const objectId = '47d3e978-8099-11e9-bc42-526af7764f64'; let amqpHelper = helpers.amqp(); beforeEach(() => amqpHelper.prepare()); @@ -145,8 +145,8 @@ describe('Integration Test', () => { }); }); - it('should get maester message', (done) => { - process.env.ELASTICIO_MAESTER_OUT = true; + it('should get object storage message', (done) => { + process.env.ELASTICIO_OBJECT_STORAGE_OUT = true; helpers.mockApiTaskStepResponse(); @@ -158,8 +158,8 @@ describe('Integration Test', () => { .get('/customers') .reply(200, customers); - const maesterCalls = nock(process.env.ELASTICIO_MAESTER_BASEPATH) - .get(`/objects/${maesterId}`) + const objectStorageCalls = nock(process.env.ELASTICIO_OBJECT_STORAGE_BASEPATH) + .get(`/objects/${objectId}`) .reply(200, encryptor.encryptMessageContent(inputMessage)) .put(/^\/objects\/[0-9a-z-]+$/, encryptor.encryptMessageContent({ id: messageId, @@ -178,9 +178,9 @@ describe('Integration Test', () => { .reply(200); amqpHelper.on('data', ({ properties, body }) => { - expect(properties.headers.maesterId).to.be.a('string'); + expect(properties.headers.objectId).to.be.a('string'); expect(body).to.be.null; - expect(maesterCalls.isDone()).to.be.true; + expect(objectStorageCalls.isDone()).to.be.true; done(); }); @@ -189,10 +189,10 @@ describe('Integration Test', () => { amqpHelper.publishMessage('', { parentMessageId, traceId - }, { maesterId }); + }, { objectId }); }); - it('should get maester message, but publish directly', (done) => { + it('should get object storage message, but publish directly', (done) => { helpers.mockApiTaskStepResponse(); nock('https://api.acme.com') @@ -203,12 +203,12 @@ describe('Integration Test', () => { .get('/customers') .reply(200, customers); - const maesterCalls = nock(process.env.ELASTICIO_MAESTER_BASEPATH) - .get(`/objects/${maesterId}`) + const objectStorageCalls = nock(process.env.ELASTICIO_OBJECT_STORAGE_BASEPATH) + .get(`/objects/${objectId}`) .reply(200, encryptor.encryptMessageContent(inputMessage)); amqpHelper.on('data', ({ properties, body }) => { - expect(properties.headers.maesterId).to.not.exist; + expect(properties.headers.objectId).to.not.exist; expect(body).to.deep.equal({ originalMsg: inputMessage, customers, @@ -219,7 +219,7 @@ describe('Integration Test', () => { } } }); - expect(maesterCalls.isDone()).to.be.true; + expect(objectStorageCalls.isDone()).to.be.true; done(); }); @@ -228,7 +228,7 @@ describe('Integration Test', () => { amqpHelper.publishMessage('', { parentMessageId, traceId - }, { maesterId }); + }, { objectId }); }); it('should augment passthrough property with data', done => { diff --git a/package.json b/package.json index 314f7f9d..4e0940f8 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.2.5", + "version": "2.4.0-dev.23", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js", From 228def6fed37b5c7105cc28ea97621290a905769 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Wed, 29 May 2019 11:52:19 +0300 Subject: [PATCH 16/37] elasticio#1456 OBJECT_STORAGE_BASEPATH-> OBJECT_STORAGE_URI --- lib/amqp.js | 6 +++--- lib/settings.js | 2 +- mocha_spec/integration_helpers.js | 2 +- mocha_spec/run.spec.js | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index 97596368..5a18ec96 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -66,7 +66,7 @@ class Amqp { maxAttempts: 3, retryDelay: 100 }; - const res = (await request.get(`${this.settings.OBJECT_STORAGE_BASEPATH}/objects/${objectId}`, options)); + const res = (await request.get(`${this.settings.OBJECT_STORAGE_URI}/objects/${objectId}`, options)); if (res.statusCode >= 400) { throw new Error(`HTTP error during object get: ${res.statusCode} (${res.statusMessage})`); } @@ -92,14 +92,14 @@ class Amqp { retryStrategy: (err, response, body, options) => { if (!err && response.statusCode === 409) { objectId = uuid.v4(); - options.url = `${this.settings.OBJECT_STORAGE_BASEPATH}/objects/${objectId}`; + options.url = `${this.settings.OBJECT_STORAGE_URI}/objects/${objectId}`; return { mustRetry: true, options }; } //eslint-disable-next-line new-cap return request.RetryStrategies.HTTPOrNetworkError(err, response, body, options); } }; - const res = await request.put(`${this.settings.OBJECT_STORAGE_BASEPATH}/objects/${objectId}`, options); + const res = await request.put(`${this.settings.OBJECT_STORAGE_URI}/objects/${objectId}`, options); if (res.statusCode >= 400) { throw new Error(`HTTP error during object put: ${res.statusCode} (${res.statusMessage})`); } diff --git a/lib/settings.js b/lib/settings.js index 27853e2c..02c1ab75 100644 --- a/lib/settings.js +++ b/lib/settings.js @@ -49,7 +49,7 @@ function readFrom(envVars) { PROCESS_AMQP_DRAIN: true, AMQP_PUBLISH_RETRY_DELAY: 100, // 100ms AMQP_PUBLISH_RETRY_ATTEMPTS: 10, - OBJECT_STORAGE_BASEPATH: '', + OBJECT_STORAGE_URI: '', OBJECT_STORAGE_TOKEN: '', OBJECT_STORAGE_OUT: false }; diff --git a/mocha_spec/integration_helpers.js b/mocha_spec/integration_helpers.js index 11d7a5e5..6e81efa7 100644 --- a/mocha_spec/integration_helpers.js +++ b/mocha_spec/integration_helpers.js @@ -179,7 +179,7 @@ function prepareEnv() { env.DEBUG = 'sailor:debug'; - env.ELASTICIO_OBJECT_STORAGE_BASEPATH = 'http://ma.es.ter'; + env.ELASTICIO_OBJECT_STORAGE_URI = 'http://ma.es.ter'; env.ELASTICIO_OBJECT_STORAGE_OUT = ''; env.ELASTICIO_OBJECT_STORAGE_TOKEN = 'jwt'; } diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index 169ba7e4..b696f762 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -158,7 +158,7 @@ describe('Integration Test', () => { .get('/customers') .reply(200, customers); - const objectStorageCalls = nock(process.env.ELASTICIO_OBJECT_STORAGE_BASEPATH) + const objectStorageCalls = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) .get(`/objects/${objectId}`) .reply(200, encryptor.encryptMessageContent(inputMessage)) .put(/^\/objects\/[0-9a-z-]+$/, encryptor.encryptMessageContent({ @@ -203,7 +203,7 @@ describe('Integration Test', () => { .get('/customers') .reply(200, customers); - const objectStorageCalls = nock(process.env.ELASTICIO_OBJECT_STORAGE_BASEPATH) + const objectStorageCalls = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) .get(`/objects/${objectId}`) .reply(200, encryptor.encryptMessageContent(inputMessage)); From d142975605cb11200d06ce23d727949a718cfd8d Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Wed, 29 May 2019 11:52:58 +0300 Subject: [PATCH 17/37] elasticio#1456 2.4.0-dev.24 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 4e0940f8..59736a01 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.4.0-dev.23", + "version": "2.4.0-dev.24", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js", From 6d12ac8269f257dec267b45fd239519ee32c20d0 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Wed, 29 May 2019 17:35:52 +0300 Subject: [PATCH 18/37] elasticio#1456 pass correct content-type --- lib/amqp.js | 3 ++- package.json | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index 5a18ec96..97a82a55 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -88,6 +88,7 @@ class Amqp { forever: true, maxAttempts: 3, retryDelay: 100, + headers: { 'content-type': 'text/plain' }, body: encryptedData, retryStrategy: (err, response, body, options) => { if (!err && response.statusCode === 409) { @@ -204,8 +205,8 @@ class Amqp { async prepareMessageAndSendToExchange(data, properties, routingKey, throttle) { const settings = this.settings; data.headers = filterMessageHeaders(data.headers); + // TODO: handle errors const encryptedData = await this.encryptMessage(data, properties); - return this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties, throttle); } diff --git a/package.json b/package.json index 59736a01..d73ac60b 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.4.0-dev.24", + "version": "2.4.0-dev.25", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js", From dcd065f4023adeb7fb63f486d5facb257d7ffe9b Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Wed, 29 May 2019 17:49:18 +0300 Subject: [PATCH 19/37] elasticio#1456 2.4.0-dev.26 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index d73ac60b..84a0a54a 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.4.0-dev.25", + "version": "2.4.0-dev.26", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js", From e32b0e4dfc6c88b5bd2ca933d635a49da99aca85 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Wed, 29 May 2019 19:02:10 +0300 Subject: [PATCH 20/37] elasticio#1456 no need to handle encryptMessage errors, it's ok --- lib/amqp.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/amqp.js b/lib/amqp.js index 97a82a55..e5d46177 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -205,7 +205,6 @@ class Amqp { async prepareMessageAndSendToExchange(data, properties, routingKey, throttle) { const settings = this.settings; data.headers = filterMessageHeaders(data.headers); - // TODO: handle errors const encryptedData = await this.encryptMessage(data, properties); return this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties, throttle); } From 07da7773bbc9c965d64866a6886bb0a568d935ae Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Thu, 30 May 2019 13:01:18 +0300 Subject: [PATCH 21/37] elasticio#1456 object storage stream + gzip --- lib/amqp.js | 58 +++++++++++++++++++------------ lib/cipher.js | 34 ++++++++++++++++++ lib/encryptor.js | 25 +++++++++++++ mocha_spec/integration_helpers.js | 9 ++++- mocha_spec/run.spec.js | 27 ++++++++++---- package.json | 1 + 6 files changed, 123 insertions(+), 31 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index e5d46177..3386a298 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -57,29 +57,33 @@ class Amqp { async decryptMessage(message) { const objectId = message.properties.headers.objectId; - let content = message.content; - - if (objectId) { - const options = { - auth: { bearer: this.settings.OBJECT_STORAGE_TOKEN }, - forever: true, - maxAttempts: 3, - retryDelay: 100 - }; - const res = (await request.get(`${this.settings.OBJECT_STORAGE_URI}/objects/${objectId}`, options)); - if (res.statusCode >= 400) { - throw new Error(`HTTP error during object get: ${res.statusCode} (${res.statusMessage})`); - } - content = res.body; + if (!objectId) { + return encryptor.decryptMessageContent(message.content); } - return encryptor.decryptMessageContent(content); + const options = { + auth: { bearer: this.settings.OBJECT_STORAGE_TOKEN }, + forever: true, + maxAttempts: 3, + retryDelay: 100 + }; + + return new Promise(async (resolve, reject) => { + const s = request + .get(`${this.settings.OBJECT_STORAGE_URI}/objects/${objectId}`, options) + .on('response', (res) => { + if (res.statusCode >= 400) { + reject(new Error(`HTTP error during object get: ${res.statusCode} (${res.statusMessage})`)); + } + }) + .on('error', reject); + resolve(await encryptor.decryptMessageContentStream(s)); + }); } async encryptMessage(data, properties) { - const encryptedData = encryptor.encryptMessageContent(data); if (!this.settings.OBJECT_STORAGE_OUT) { - return encryptedData; + return encryptor.encryptMessageContent(data); } let objectId = uuid.v4(); @@ -88,8 +92,7 @@ class Amqp { forever: true, maxAttempts: 3, retryDelay: 100, - headers: { 'content-type': 'text/plain' }, - body: encryptedData, + headers: { 'content-type': 'application/octet-stream' }, retryStrategy: (err, response, body, options) => { if (!err && response.statusCode === 409) { objectId = uuid.v4(); @@ -100,10 +103,19 @@ class Amqp { return request.RetryStrategies.HTTPOrNetworkError(err, response, body, options); } }; - const res = await request.put(`${this.settings.OBJECT_STORAGE_URI}/objects/${objectId}`, options); - if (res.statusCode >= 400) { - throw new Error(`HTTP error during object put: ${res.statusCode} (${res.statusMessage})`); - } + await new Promise((resolve, reject) => { + const res = request + .put(`${this.settings.OBJECT_STORAGE_URI}/objects/${objectId}`, options) + .on('response', (res) => { + if (res.statusCode >= 400) { + reject(new Error(`HTTP error during object put: ${res.statusCode} (${res.statusMessage})`)); + } + }) + .on('error', reject); + encryptor.encryptMessageContentStream(data) + .pipe(res) + .on('end', resolve); + }); properties.headers.objectId = objectId; return ''; diff --git a/lib/cipher.js b/lib/cipher.js index d7ec763b..448bf21e 100644 --- a/lib/cipher.js +++ b/lib/cipher.js @@ -1,6 +1,7 @@ var _ = require('lodash'); var crypto = require('crypto'); var debug = require('debug')('sailor:cipher'); +var PassThrough = require('stream').PassThrough; var ALGORYTHM = 'aes-256-cbc'; var PASSWORD = process.env.ELASTICIO_MESSAGE_CRYPTO_PASSWORD; @@ -9,6 +10,8 @@ var VECTOR = process.env.ELASTICIO_MESSAGE_CRYPTO_IV; exports.id = 1; exports.encrypt = encryptIV; exports.decrypt = decryptIV; +exports.decryptStream = decryptStreamIV; +exports.encryptStream = encryptStreamIV; function encryptIV(rawData) { debug('About to encrypt:', rawData); @@ -52,3 +55,34 @@ function decryptIV(encData) { return result; } + +function encryptStreamIV() { + debug('Creating encryption stream'); + + if (!PASSWORD) { + return new PassThrough; + } + + + if (!VECTOR) { + throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set'); + } + + var encodeKey = crypto.createHash('sha256').update(PASSWORD, 'utf-8').digest(); + return crypto.createCipheriv(ALGORYTHM, encodeKey, VECTOR); +} + +function decryptStreamIV() { + debug('Creating decryption stream'); + + if (!PASSWORD) { + return new PassThrough; + } + + if (!VECTOR) { + throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set'); + } + + var decodeKey = crypto.createHash('sha256').update(PASSWORD, 'utf-8').digest(); + return crypto.createDecipheriv(ALGORYTHM, decodeKey, VECTOR); +} diff --git a/lib/encryptor.js b/lib/encryptor.js index 1750d4d8..25ac4043 100644 --- a/lib/encryptor.js +++ b/lib/encryptor.js @@ -1,7 +1,12 @@ var cipher = require('./cipher.js'); +var Readable = require('stream').Readable; +var zlib = require('zlib'); +var getStream = require('get-stream'); exports.encryptMessageContent = encryptMessageContent; exports.decryptMessageContent = decryptMessageContent; +exports.encryptMessageContentStream = encryptMessageContentStream; +exports.decryptMessageContentStream = decryptMessageContentStream; function encryptMessageContent(messagePayload) { return cipher.encrypt(JSON.stringify(messagePayload)); @@ -18,3 +23,23 @@ function decryptMessageContent(messagePayload) { throw Error('Failed to decrypt message: ' + err.message); } } + +function encryptMessageContentStream(data) { + const dataStream = new Readable; + dataStream.push(JSON.stringify(data)); + dataStream.push(null); + return dataStream + .pipe(zlib.createGzip()) + .pipe(cipher.encryptStream()); +} + +async function decryptMessageContentStream(stream) { + const s = stream.pipe(cipher.decryptStream()).pipe(zlib.createGunzip()); + const content = await getStream(s); + + return JSON.parse(content); + // TODO: do we need to check this condition somehow? + // if (!messagePayload || messagePayload.toString().length === 0) { + // return null; + // } +} diff --git a/mocha_spec/integration_helpers.js b/mocha_spec/integration_helpers.js index 6e81efa7..31e5ba10 100644 --- a/mocha_spec/integration_helpers.js +++ b/mocha_spec/integration_helpers.js @@ -5,6 +5,8 @@ const amqplib = require('amqplib'); const { EventEmitter } = require('events'); const PREFIX = 'sailor_nodejs_integration_test'; const nock = require('nock'); +const getStream = require('get-stream'); +const encryptor = require('../lib/encryptor.js'); const env = process.env; @@ -200,6 +202,11 @@ function mockApiTaskStepResponse(response) { .reply(200, Object.assign(defaultResponse, response)); } +async function encryptForObjectStorage(input) { + const stream = encryptor.encryptMessageContentStream(input); + return await getStream.buffer(stream); +} + exports.PREFIX = PREFIX; exports.amqp = function amqp() { @@ -208,4 +215,4 @@ exports.amqp = function amqp() { exports.prepareEnv = prepareEnv; exports.mockApiTaskStepResponse = mockApiTaskStepResponse; - +exports.encryptForObjectStorage = encryptForObjectStorage; diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index b696f762..d508d6b1 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -145,7 +145,7 @@ describe('Integration Test', () => { }); }); - it('should get object storage message', (done) => { + it('should get object storage message', async () => { process.env.ELASTICIO_OBJECT_STORAGE_OUT = true; helpers.mockApiTaskStepResponse(); @@ -158,10 +158,14 @@ describe('Integration Test', () => { .get('/customers') .reply(200, customers); - const objectStorageCalls = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + const objectStorageGet = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) .get(`/objects/${objectId}`) - .reply(200, encryptor.encryptMessageContent(inputMessage)) - .put(/^\/objects\/[0-9a-z-]+$/, encryptor.encryptMessageContent({ + .reply(200, await helpers.encryptForObjectStorage(inputMessage), { + 'content-type': 'application/octet-stream' + }); + const objectStoragePut = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .matchHeader('content-type', 'application/octet-stream') + .put(/^\/objects\/[0-9a-z-]+$/, await helpers.encryptForObjectStorage({ id: messageId, body: { originalMsg: inputMessage, @@ -177,10 +181,12 @@ describe('Integration Test', () => { })) .reply(200); + let done; amqpHelper.on('data', ({ properties, body }) => { expect(properties.headers.objectId).to.be.a('string'); expect(body).to.be.null; - expect(objectStorageCalls.isDone()).to.be.true; + expect(objectStorageGet.isDone()).to.be.true; + expect(objectStoragePut.isDone()).to.be.true; done(); }); @@ -190,9 +196,11 @@ describe('Integration Test', () => { parentMessageId, traceId }, { objectId }); + + return new Promise((resolve) => { done = resolve; }); }); - it('should get object storage message, but publish directly', (done) => { + it('should get object storage message, but publish directly', async () => { helpers.mockApiTaskStepResponse(); nock('https://api.acme.com') @@ -205,8 +213,11 @@ describe('Integration Test', () => { const objectStorageCalls = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) .get(`/objects/${objectId}`) - .reply(200, encryptor.encryptMessageContent(inputMessage)); + .reply(200, await helpers.encryptForObjectStorage(inputMessage), { + 'content-type': 'application/octet-stream' + }); + let done; amqpHelper.on('data', ({ properties, body }) => { expect(properties.headers.objectId).to.not.exist; expect(body).to.deep.equal({ @@ -229,6 +240,8 @@ describe('Integration Test', () => { parentMessageId, traceId }, { objectId }); + + return new Promise((resolve) => { done = resolve; }); }); it('should augment passthrough property with data', done => { diff --git a/package.json b/package.json index 84a0a54a..7825871f 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "debug": "3.1.0", "elasticio-rest-node": "1.2.3", "event-to-promise": "^0.8.0", + "get-stream": "5.1.0", "lodash": "4.17.4", "p-throttle": "^2.1.0", "q": "1.4.1", From 9bb8fac1022822e85b0463d20beae0add775b523 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Thu, 30 May 2019 17:30:56 +0300 Subject: [PATCH 22/37] elasticio#1456 fixed working with requests streams by using axios, isolated storage code into separate class --- lib/amqp.js | 55 +++--------------------------- lib/objectStorage.js | 76 ++++++++++++++++++++++++++++++++++++++++++ mocha_spec/run.spec.js | 2 ++ package.json | 1 + 4 files changed, 83 insertions(+), 51 deletions(-) create mode 100644 lib/objectStorage.js diff --git a/lib/amqp.js b/lib/amqp.js index 3386a298..76ec73db 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -1,11 +1,10 @@ const log = require('./logging.js'); const amqplib = require('amqplib'); const encryptor = require('./encryptor.js'); +const ObjectStorage = require('./objectStorage.js'); const co = require('co'); const _ = require('lodash'); const eventToPromise = require('event-to-promise'); -const request = require('requestretry'); -const uuid = require('uuid'); const HEADER_ROUTING_KEY = 'x-eio-routing-key'; const HEADER_ERROR_RESPONSE = 'x-eio-error-response'; @@ -13,6 +12,7 @@ const HEADER_ERROR_RESPONSE = 'x-eio-error-response'; class Amqp { constructor(settings) { this.settings = settings; + this.objectStorage = new ObjectStorage(settings); } connect(uri) { @@ -61,24 +61,7 @@ class Amqp { return encryptor.decryptMessageContent(message.content); } - const options = { - auth: { bearer: this.settings.OBJECT_STORAGE_TOKEN }, - forever: true, - maxAttempts: 3, - retryDelay: 100 - }; - - return new Promise(async (resolve, reject) => { - const s = request - .get(`${this.settings.OBJECT_STORAGE_URI}/objects/${objectId}`, options) - .on('response', (res) => { - if (res.statusCode >= 400) { - reject(new Error(`HTTP error during object get: ${res.statusCode} (${res.statusMessage})`)); - } - }) - .on('error', reject); - resolve(await encryptor.decryptMessageContentStream(s)); - }); + return await this.objectStorage.getObject(objectId); } async encryptMessage(data, properties) { @@ -86,37 +69,7 @@ class Amqp { return encryptor.encryptMessageContent(data); } - let objectId = uuid.v4(); - const options = { - auth: { bearer: this.settings.OBJECT_STORAGE_TOKEN }, - forever: true, - maxAttempts: 3, - retryDelay: 100, - headers: { 'content-type': 'application/octet-stream' }, - retryStrategy: (err, response, body, options) => { - if (!err && response.statusCode === 409) { - objectId = uuid.v4(); - options.url = `${this.settings.OBJECT_STORAGE_URI}/objects/${objectId}`; - return { mustRetry: true, options }; - } - //eslint-disable-next-line new-cap - return request.RetryStrategies.HTTPOrNetworkError(err, response, body, options); - } - }; - await new Promise((resolve, reject) => { - const res = request - .put(`${this.settings.OBJECT_STORAGE_URI}/objects/${objectId}`, options) - .on('response', (res) => { - if (res.statusCode >= 400) { - reject(new Error(`HTTP error during object put: ${res.statusCode} (${res.statusMessage})`)); - } - }) - .on('error', reject); - encryptor.encryptMessageContentStream(data) - .pipe(res) - .on('end', resolve); - }); - properties.headers.objectId = objectId; + properties.headers.objectId = await this.objectStorage.addObject(data); return ''; } diff --git a/lib/objectStorage.js b/lib/objectStorage.js new file mode 100644 index 00000000..e812fb3d --- /dev/null +++ b/lib/objectStorage.js @@ -0,0 +1,76 @@ +const encryptor = require('./encryptor.js'); +const uuid = require('uuid'); +const axios = require('axios'); +const http = require('http'); +const https = require('https'); + +class ObjectStorage { + constructor(settings) { + this.api = axios.create({ + baseURL: `${settings.OBJECT_STORAGE_URI}/`, + httpAgent: new http.Agent({ keepAlive: true }), + httpsAgent: new https.Agent({ keepAlive: true }), + headers: { Authorization: `Bearer ${settings.OBJECT_STORAGE_TOKEN}` }, + validateStatus: null + }); + } + + async requestRetry({ maxAttempts, delay, createReq, onRes }) { + let attempts = 0; + let res; + let err; + while (attempts < maxAttempts) { + attempts++; + try { + res = await createReq(); + } catch (e) { + err = e; + } + if (onRes && onRes(err, res)) { + continue; + } + if (err || res.status >= 400) { + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + break; + } + if (err || res.status >= 400) { + throw err || new Error(`HTTP error during object get: ${res.status} (${res.statusText})`); + } + return res; + } + + async getObject(objectId) { + const res = await this.requestRetry({ + maxAttempts: 3, + delay: 100, + createReq: () => this.api.get(`/objects/${objectId}`, { responseType: 'stream' }) + }); + + return await encryptor.decryptMessageContentStream(res.data); + } + + async addObject(data) { + let objectId = uuid.v4(); + await this.requestRetry({ + maxAttempts: 3, + delay: 100, + createReq: () => this.api.put( + `/objects/${objectId}`, + encryptor.encryptMessageContentStream(data), + { headers: { 'content-type': 'application/octet-stream' } } + ), + onRes: (err, res) => { + if (!err && res.status === 409) { + objectId = uuid.v4(); + return true; + } + } + }); + + return objectId; + } +} + +module.exports = ObjectStorage; diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index d508d6b1..8890f65b 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -160,11 +160,13 @@ describe('Integration Test', () => { const objectStorageGet = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) .get(`/objects/${objectId}`) + .matchHeader('authorization', /Bearer/) .reply(200, await helpers.encryptForObjectStorage(inputMessage), { 'content-type': 'application/octet-stream' }); const objectStoragePut = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) .matchHeader('content-type', 'application/octet-stream') + .matchHeader('authorization', /Bearer/) .put(/^\/objects\/[0-9a-z-]+$/, await helpers.encryptForObjectStorage({ id: messageId, body: { diff --git a/package.json b/package.json index 7825871f..58310d54 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,7 @@ }, "dependencies": { "amqplib": "0.5.1", + "axios": "0.18.0", "bunyan": "^1.8.10", "co": "4.6.0", "debug": "3.1.0", From 74e0da891883a4fa820f08b07b063fab77144450 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Thu, 30 May 2019 17:32:01 +0300 Subject: [PATCH 23/37] elasticio#1456 2.4.0-dev.27 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 58310d54..cc659e13 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.4.0-dev.26", + "version": "2.4.0-dev.27", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js", From c2657eb8db707b4d9ad3c32065ba9e0269aba43e Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Thu, 30 May 2019 18:11:51 +0300 Subject: [PATCH 24/37] elasticio#1456 checked whether maester can return empty object, it's impossible --- lib/encryptor.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/encryptor.js b/lib/encryptor.js index 25ac4043..0461c26a 100644 --- a/lib/encryptor.js +++ b/lib/encryptor.js @@ -38,8 +38,4 @@ async function decryptMessageContentStream(stream) { const content = await getStream(s); return JSON.parse(content); - // TODO: do we need to check this condition somehow? - // if (!messagePayload || messagePayload.toString().length === 0) { - // return null; - // } } From e06cd34eba48f31b94241d470a1435217cb39c8c Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Thu, 30 May 2019 18:14:25 +0300 Subject: [PATCH 25/37] elasticio#1456 removed unused dependency --- mocha_spec/run.spec.js | 1 - 1 file changed, 1 deletion(-) diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index 8890f65b..dc4be12d 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -6,7 +6,6 @@ const co = require('co'); const sinonjs = require('sinon'); const logging = require('../lib/logging.js'); const helpers = require('./integration_helpers'); -const encryptor = require('../lib/encryptor.js'); const env = process.env; From b91d2298586714c1b1fcd68056c7f2dbd0569724 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Fri, 31 May 2019 12:58:45 +0300 Subject: [PATCH 26/37] elasticio#1456 fixed tests --- spec/amqp.spec.js | 209 ++++++++++++++++++++++++++-------------------- 1 file changed, 118 insertions(+), 91 deletions(-) diff --git a/spec/amqp.spec.js b/spec/amqp.spec.js index e27fba3a..634681c6 100644 --- a/spec/amqp.spec.js +++ b/spec/amqp.spec.js @@ -65,9 +65,11 @@ describe('AMQP', () => { spyOn(encryptor, 'decryptMessageContent').andCallThrough(); }); - it('Should send message to outgoing channel when process data', async () => { + it('Should send message to outgoing channel when process data', (done) => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = { publish: () => true }; + spyOn(amqp.publishChannel, 'publish').andReturn(true); + amqp.publishChannel.waitForConfirms = () => Promise.resolve([null]); const props = { contentType: 'application/json', @@ -79,31 +81,35 @@ describe('AMQP', () => { } }; - await amqp.sendData({ - headers: { - 'some-other-header': 'headerValue' - }, - body: 'Message content' - }, props); - - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); + async function test() { + await amqp.sendData({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }, props); + } + test().then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters).toEqual([ - settings.PUBLISH_MESSAGES_TO, - settings.DATA_ROUTING_KEY, - jasmine.any(Object), - props - ]); + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + settings.DATA_ROUTING_KEY, + jasmine.any(Object), + props + ]); - const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual({ - headers: { - 'some-other-header': 'headerValue' - }, - body: 'Message content' - }); + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual({ + headers: { + 'some-other-header': 'headerValue' + }, + body: 'Message content' + }); + done(); + }, done); }); it('Should send message async to outgoing channel when process data', done => { @@ -164,10 +170,12 @@ describe('AMQP', () => { }, done); }); - it('Should sendHttpReply to outgoing channel using routing key from headers when process data', async () => { + it('Should sendHttpReply to outgoing channel using routing key from headers when process data', (done) => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = { publish: () => true }; + spyOn(amqp.publishChannel, 'publish').andReturn(true); + amqp.publishChannel.waitForConfirms = () => Promise.resolve([null]); const msg = { statusCode: 200, @@ -187,19 +195,24 @@ describe('AMQP', () => { reply_to: 'my-special-routing-key' } }; - await amqp.sendHttpReply(msg, props); - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); + async function test() { + await amqp.sendHttpReply(msg, props); + } + test().then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); - expect(publishParameters[1]).toEqual('my-special-routing-key'); - expect(publishParameters[2].toString()).toEqual(encryptor.encryptMessageContent(msg)); - expect(publishParameters[3]).toEqual(props); + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); + expect(publishParameters[1]).toEqual('my-special-routing-key'); + expect(publishParameters[2].toString()).toEqual(encryptor.encryptMessageContent(msg)); + expect(publishParameters[3]).toEqual(props); - const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual(msg); + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual(msg); + done(); + }, done); }); it('Should throw error in sendHttpReply if reply_to header not found', done => { @@ -232,9 +245,11 @@ describe('AMQP', () => { }); - it('Should send message to outgoing channel using routing key from headers when process data', async () => { + it('Should send message to outgoing channel using routing key from headers when process data', (done) => { const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = { publish: () => true }; + spyOn(amqp.publishChannel, 'publish').andReturn(true); + amqp.publishChannel.waitForConfirms = () => Promise.resolve([null]); const msg = { headers: { @@ -255,26 +270,31 @@ describe('AMQP', () => { } }; - await amqp.sendData(msg, props); + async function test() { + await amqp.sendData(msg, props); + } - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(1); + test().then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); - const publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters).toEqual([ - settings.PUBLISH_MESSAGES_TO, - 'my-special-routing-key', - jasmine.any(Object), - props - ]); + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + 'my-special-routing-key', + jasmine.any(Object), + props + ]); - const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual({ - headers: {}, - body: { - content: 'Message content' - } - }); + const payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + expect(payload).toEqual({ + headers: {}, + body: { + content: 'Message content' + } + }); + done(); + }, done); }); it('Should send message to errors when process error', () => { @@ -321,7 +341,7 @@ describe('AMQP', () => { }); }); - it('Should send message to errors using routing key from headers when process error', async () => { + it('Should send message to errors using routing key from headers when process error', (done) => { const expectedErrorPayload = { error: { @@ -335,7 +355,9 @@ describe('AMQP', () => { }; const amqp = new Amqp(settings); - amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + amqp.publishChannel = { publish: () => true }; + spyOn(amqp.publishChannel, 'publish').andReturn(true); + amqp.publishChannel.waitForConfirms = () => Promise.resolve([null]); const props = { contentType: 'application/json', @@ -348,47 +370,52 @@ describe('AMQP', () => { } }; - await amqp.sendError(new Error('Test error'), props, message.content); + async function test() { + await amqp.sendError(new Error('Test error'), props, message.content); + } - expect(amqp.publishChannel.publish).toHaveBeenCalled(); - expect(amqp.publishChannel.publish.callCount).toEqual(2); + test().then(() => { + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(2); - let publishParameters = amqp.publishChannel.publish.calls[0].args; - expect(publishParameters.length).toEqual(4); - expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); - expect(publishParameters[1]).toEqual('5559edd38968ec0736000003:step_1:1432205514864:error'); - expect(publishParameters[3]).toEqual(props); + let publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters.length).toEqual(4); + expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); + expect(publishParameters[1]).toEqual('5559edd38968ec0736000003:step_1:1432205514864:error'); + expect(publishParameters[3]).toEqual(props); - let payload = JSON.parse(publishParameters[2].toString()); - payload.error = encryptor.decryptMessageContent(payload.error); - payload.errorInput = encryptor.decryptMessageContent(payload.errorInput); + let payload = JSON.parse(publishParameters[2].toString()); + payload.error = encryptor.decryptMessageContent(payload.error); + payload.errorInput = encryptor.decryptMessageContent(payload.errorInput); - expect(payload).toEqual(expectedErrorPayload); + expect(payload).toEqual(expectedErrorPayload); - publishParameters = amqp.publishChannel.publish.calls[1].args; - expect(publishParameters.length).toEqual(4); - expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); - expect(publishParameters[1]).toEqual('my-special-routing-key'); - expect(publishParameters[3]).toEqual({ - contentType: 'application/json', - contentEncoding: 'utf8', - mandatory: true, - headers: { - 'taskId': 'task1234567890', - 'stepId': 'step_456', - 'reply_to': 'my-special-routing-key', - 'x-eio-error-response': true - } - }); + publishParameters = amqp.publishChannel.publish.calls[1].args; + expect(publishParameters.length).toEqual(4); + expect(publishParameters[0]).toEqual(settings.PUBLISH_MESSAGES_TO); + expect(publishParameters[1]).toEqual('my-special-routing-key'); + expect(publishParameters[3]).toEqual({ + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + headers: { + 'taskId': 'task1234567890', + 'stepId': 'step_456', + 'reply_to': 'my-special-routing-key', + 'x-eio-error-response': true + } + }); - console.log('AND BACK'); - //eslint-disable-next-line max-len - console.log(encryptor.decryptMessageContent('+PAlXNRj+5HdYNSuw3cyrfXNSlnUHKH0AtyspQkvT0RFROPAhMgqrj8y1I0EW9zJEhcRzmiEwbK5ftV3a8N3FcMd1Yu2beNt0R2Ou2f1yae0FxZ/aIUOmicX3iWbUKFnfljwUUA39sEKnpp9yP7zprAf755FgEtplt3cSy+hQVCC0u7olkbIeHtmSuw/9YP9PckVk82eM7FfnK5qKEDilzR9CWgpQEak8kZeekko86WczgkRrnMj52ifGVCbIk4aY5K+uBPbQKURI9bbBra4aR0l/2Y/bOBa5jahl2Q6hrX9iAe9BMMIll9GvDxBOEV7n5H5CsZj1IrFbq5nri3qT48LgNFTDlq/ts2kAjJQORPZnp3Fq25B9ToPQt6DGGZLUG+YKGHCv73RNwUCx4Dj2oVJjNyWIYMA4EEJwcHhR+rUrHcAVJZ0SOOTJI1tJPzcasXy3d95XQgKpHSYcbXuUOtmql4oyU5ZP9QEiIscsWFS7fJs+r8Eit+H777vvc37zxjA3DM0LJ8QmB5VbkkGxYbi43dzzd3hOXz4Rvs6C08F3jDK20r+VpAqEDRo/OgBaBH4uhd+XynwVXUpKASHNaJirGGu1K8tpiX1+XOxAGqHyhZjBICeg/f8igqJs54af78AZPpvnoSQzkAhF5pDmvMINMPuJnM/ooK3O9SgJYEi4wMzu/vnAEajROE5t7d0QhSSollCx+IMpiz9XdSALZyRMNPaF2yLb3rw7gwXV7q67u/zPm79AR1GBrWbgxXei7gdA9z3TwgWdT91RfTRdSYZDsgenGCanrcpE+Wi+YEozIan9pC47xhBxzzIL9a3AUVllNIGc4qNfs9Al0M/r+kl+ndk+I2k6QFNr4aIjR/qsk52YjW/ZqmORbe2MoI4bIFS3FwlWRoYhJC78yLXOfghvl3xHJiq0Uir2vxmYdXYXfaY82g7ZtThaSqc63WZcD5CaV1Wy6jfqB1sHwuJsADE6BXPQKFfZ9t8tKE3b58rB47TFTmJb8TETgG/xK6pbaEo/Z7iWjFhJKTrcnnF4PynrJab6kw+pnU08u7/je9ZhDEf+jvK3XnqwC+A8XEktywihnrskQ7Eo9Wdmzuw9ujbY8EwQxIFK+TPpgQ8dv25aXPXspnPgiH+2lt19ok1oRIZTenv2KLXqE3wrvmXQIEbdAHFHXsTLj781/9iNdc8ta645V3ktqvz35s1c8Gr+ZbZIK5WRlrJ8TO1WcokSDK7H8hqY6CbT1QC3oFxr5pVPoqZzBMOR6g5MOPbR41XtcHlQopCKC6XeGAVd4dIuCx1CT4vqG+8RgOABxhrEeLmsHGFpBnwPtlVniZQixmOLSzQWUNoUDWMt2mwrWKb/VmzprnNmN++ybPqXhX8bD+k1NQDb7r5CwPqlzmCypXSNH9kVn0QvpqLT5elQ2295yzasW22c8mEPmSvNPM/rE/tqWJA6vAKbXOy1ktrG/TCbzGV2llAvqQqQPX8zGJrXEzKTYk+mHiIdMKpw1bWJhDUOAjdosi853Lbt2GuUjiVNMGJBXPcLLvmjjvv9oLcSYHBTuIfOkScLKKGUhabzHFPmdxgF1MB0zvVO22ooxhmhvCmq+dlag71bbP5RvTjHf50BzJZ5+ysGyM7FJm99BErHo2lTpHSKdSFF0nAlP9Z/Ybf2zTEunlz8RdmQgsq+0F+kwkxI7SqGTy0SAJbbgawNoNTptdyO33a41zprKd/3Wnp7kfoTOfmjVYdHPVFC1GywMER7ordLV3XpjrjX6R6JTd2eOZajcBCsEc+gzVqg/nR6t5y8jfS8NfzfdCMsRzEqz6vuy+M66zNIEocZiF9Tkm1r8MLwaUCE7QfEXexqkChAk9jaOzcojyOfAlXIxvVMn6yFF1gmmQtgudxsY7I/0ZjdSZlBgBFcPFT6OT+HTZ7cCAVF7J7GsGlVzwrUpqcQzSt9z3QrA0iTd4DUXgsWmFIgcdhWbPFlkaPKyZ+QXxrz2VYKCuzDWi3wzLaioFnHxLXZDt6Puo5mPiRTzSolu3fH4S31yVJ7E6e2n8zwUmnFiZ10TrrkO64b9B3TwLx1mLPap7F39DAnufj7XF4eKCdvGJEKVGc+SsyrElzKimsR4Zs9H/Jw+KOCWc/O9l8yFAc42EXUGWrq9L+B6NIaZ7hDY/sDHI748wyFPeUHhOa99BnR15Sr+IrXBG3tsXbyMgHv+gS66Nkmkllvwjpi5Q/7vJOrxrKyFS1KGl5+6N/PXj1Tn5SqWMN8Wj2mniEGD9zSaLy7DUCxmKYA9Dn3/8WQdY8yWmOyi+SFyrL6VgQ8sUQ5MNnVPhQevxB3ZQSTItofT0sE0Xv7yEYkc/T4HGVsvDRKz6RZwaZvZEg')); + console.log('AND BACK'); + //eslint-disable-next-line max-len + console.log(encryptor.decryptMessageContent('+PAlXNRj+5HdYNSuw3cyrfXNSlnUHKH0AtyspQkvT0RFROPAhMgqrj8y1I0EW9zJEhcRzmiEwbK5ftV3a8N3FcMd1Yu2beNt0R2Ou2f1yae0FxZ/aIUOmicX3iWbUKFnfljwUUA39sEKnpp9yP7zprAf755FgEtplt3cSy+hQVCC0u7olkbIeHtmSuw/9YP9PckVk82eM7FfnK5qKEDilzR9CWgpQEak8kZeekko86WczgkRrnMj52ifGVCbIk4aY5K+uBPbQKURI9bbBra4aR0l/2Y/bOBa5jahl2Q6hrX9iAe9BMMIll9GvDxBOEV7n5H5CsZj1IrFbq5nri3qT48LgNFTDlq/ts2kAjJQORPZnp3Fq25B9ToPQt6DGGZLUG+YKGHCv73RNwUCx4Dj2oVJjNyWIYMA4EEJwcHhR+rUrHcAVJZ0SOOTJI1tJPzcasXy3d95XQgKpHSYcbXuUOtmql4oyU5ZP9QEiIscsWFS7fJs+r8Eit+H777vvc37zxjA3DM0LJ8QmB5VbkkGxYbi43dzzd3hOXz4Rvs6C08F3jDK20r+VpAqEDRo/OgBaBH4uhd+XynwVXUpKASHNaJirGGu1K8tpiX1+XOxAGqHyhZjBICeg/f8igqJs54af78AZPpvnoSQzkAhF5pDmvMINMPuJnM/ooK3O9SgJYEi4wMzu/vnAEajROE5t7d0QhSSollCx+IMpiz9XdSALZyRMNPaF2yLb3rw7gwXV7q67u/zPm79AR1GBrWbgxXei7gdA9z3TwgWdT91RfTRdSYZDsgenGCanrcpE+Wi+YEozIan9pC47xhBxzzIL9a3AUVllNIGc4qNfs9Al0M/r+kl+ndk+I2k6QFNr4aIjR/qsk52YjW/ZqmORbe2MoI4bIFS3FwlWRoYhJC78yLXOfghvl3xHJiq0Uir2vxmYdXYXfaY82g7ZtThaSqc63WZcD5CaV1Wy6jfqB1sHwuJsADE6BXPQKFfZ9t8tKE3b58rB47TFTmJb8TETgG/xK6pbaEo/Z7iWjFhJKTrcnnF4PynrJab6kw+pnU08u7/je9ZhDEf+jvK3XnqwC+A8XEktywihnrskQ7Eo9Wdmzuw9ujbY8EwQxIFK+TPpgQ8dv25aXPXspnPgiH+2lt19ok1oRIZTenv2KLXqE3wrvmXQIEbdAHFHXsTLj781/9iNdc8ta645V3ktqvz35s1c8Gr+ZbZIK5WRlrJ8TO1WcokSDK7H8hqY6CbT1QC3oFxr5pVPoqZzBMOR6g5MOPbR41XtcHlQopCKC6XeGAVd4dIuCx1CT4vqG+8RgOABxhrEeLmsHGFpBnwPtlVniZQixmOLSzQWUNoUDWMt2mwrWKb/VmzprnNmN++ybPqXhX8bD+k1NQDb7r5CwPqlzmCypXSNH9kVn0QvpqLT5elQ2295yzasW22c8mEPmSvNPM/rE/tqWJA6vAKbXOy1ktrG/TCbzGV2llAvqQqQPX8zGJrXEzKTYk+mHiIdMKpw1bWJhDUOAjdosi853Lbt2GuUjiVNMGJBXPcLLvmjjvv9oLcSYHBTuIfOkScLKKGUhabzHFPmdxgF1MB0zvVO22ooxhmhvCmq+dlag71bbP5RvTjHf50BzJZ5+ysGyM7FJm99BErHo2lTpHSKdSFF0nAlP9Z/Ybf2zTEunlz8RdmQgsq+0F+kwkxI7SqGTy0SAJbbgawNoNTptdyO33a41zprKd/3Wnp7kfoTOfmjVYdHPVFC1GywMER7ordLV3XpjrjX6R6JTd2eOZajcBCsEc+gzVqg/nR6t5y8jfS8NfzfdCMsRzEqz6vuy+M66zNIEocZiF9Tkm1r8MLwaUCE7QfEXexqkChAk9jaOzcojyOfAlXIxvVMn6yFF1gmmQtgudxsY7I/0ZjdSZlBgBFcPFT6OT+HTZ7cCAVF7J7GsGlVzwrUpqcQzSt9z3QrA0iTd4DUXgsWmFIgcdhWbPFlkaPKyZ+QXxrz2VYKCuzDWi3wzLaioFnHxLXZDt6Puo5mPiRTzSolu3fH4S31yVJ7E6e2n8zwUmnFiZ10TrrkO64b9B3TwLx1mLPap7F39DAnufj7XF4eKCdvGJEKVGc+SsyrElzKimsR4Zs9H/Jw+KOCWc/O9l8yFAc42EXUGWrq9L+B6NIaZ7hDY/sDHI748wyFPeUHhOa99BnR15Sr+IrXBG3tsXbyMgHv+gS66Nkmkllvwjpi5Q/7vJOrxrKyFS1KGl5+6N/PXj1Tn5SqWMN8Wj2mniEGD9zSaLy7DUCxmKYA9Dn3/8WQdY8yWmOyi+SFyrL6VgQ8sUQ5MNnVPhQevxB3ZQSTItofT0sE0Xv7yEYkc/T4HGVsvDRKz6RZwaZvZEg')); - payload = encryptor.decryptMessageContent(publishParameters[2].toString()); + payload = encryptor.decryptMessageContent(publishParameters[2].toString()); - expect(payload).toEqual(expectedErrorPayload.error); + expect(payload).toEqual(expectedErrorPayload.error); + done(); + }, done); }); it('Should not provide errorInput if errorInput was empty', () => { @@ -620,7 +647,7 @@ describe('AMQP', () => { it('Should ack message when confirmed', () => { - const amqp = new Amqp(); + const amqp = new Amqp(settings); amqp.subscribeChannel = jasmine.createSpyObj('subscribeChannel', ['ack']); amqp.ack(message); From 6ee68d569fde88e9898ba0609f41afb3eb1c6d8e Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Fri, 31 May 2019 15:53:35 +0300 Subject: [PATCH 27/37] elasticio#1456 unit tests for objectStorage, fixed object storage bug --- lib/objectStorage.js | 2 + mocha_spec/objectStorage.spec.js | 120 +++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 mocha_spec/objectStorage.spec.js diff --git a/lib/objectStorage.js b/lib/objectStorage.js index e812fb3d..d0ec82a9 100644 --- a/lib/objectStorage.js +++ b/lib/objectStorage.js @@ -20,6 +20,8 @@ class ObjectStorage { let res; let err; while (attempts < maxAttempts) { + err = null; + res = null; attempts++; try { res = await createReq(); diff --git a/mocha_spec/objectStorage.spec.js b/mocha_spec/objectStorage.spec.js new file mode 100644 index 00000000..67ea0e9d --- /dev/null +++ b/mocha_spec/objectStorage.spec.js @@ -0,0 +1,120 @@ +const nock = require('nock'); +const expect = require('chai').expect; +const getStream = require('get-stream'); + +describe('ObjectStorage', () => { + + process.env.ELASTICIO_MESSAGE_CRYPTO_PASSWORD = 'testCryptoPassword'; + process.env.ELASTICIO_MESSAGE_CRYPTO_IV = 'iv=any16_symbols'; + + const envVars = {}; + envVars.ELASTICIO_AMQP_URI = 'amqp://test2/test2'; + envVars.ELASTICIO_FLOW_ID = '5559edd38968ec0736000003'; + envVars.ELASTICIO_STEP_ID = 'step_1'; + envVars.ELASTICIO_EXEC_ID = 'some-exec-id'; + + envVars.ELASTICIO_USER_ID = '5559edd38968ec0736000002'; + envVars.ELASTICIO_COMP_ID = '5559edd38968ec0736000456'; + envVars.ELASTICIO_FUNCTION = 'list'; + + envVars.ELASTICIO_HOOK_SHUTDOWN = true; + + envVars.ELASTICIO_API_URI = 'http://apihost.com'; + envVars.ELASTICIO_API_USERNAME = 'test@test.com'; + envVars.ELASTICIO_API_KEY = '5559edd'; + + envVars.ELASTICIO_OBJECT_STORAGE_URI = 'http://ma.es.ter'; + envVars.ELASTICIO_OBJECT_STORAGE_TOKEN = 'jwt'; + envVars.ELASTICIO_OBJECT_FLOW_ID = 1; + + const ObjectStorage = require('../lib/objectStorage.js'); + const encryptor = require('../lib/encryptor.js'); + const settings = require('../lib/settings.js').readFrom(envVars); + + it('should fail after 3 retries', async () => { + const objectStorage = new ObjectStorage(settings); + + const objectStorageCalls = nock(settings.OBJECT_STORAGE_URI) + .matchHeader('authorization', 'Bearer jwt') + .get('/objects/1') + .replyWithError({ code: 'ETIMEDOUT' }) + .get('/objects/1') + .reply(404) + .get('/objects/1') + .replyWithError({ code: 'ENOTFOUND' }); + + let err; + try { + await objectStorage.getObject(1); + } catch (e) { + err = e; + } + + expect(objectStorageCalls.isDone()).to.be.true; + expect(err.code).to.be.equal('ENOTFOUND'); + }); + + it('should retry get request 3 times on errors', async () => { + const objectStorage = new ObjectStorage(settings); + const data = { test: 'test' }; + + const objectStorageCalls = nock(settings.OBJECT_STORAGE_URI) + .matchHeader('authorization', 'Bearer jwt') + .get('/objects/1') + .reply(500) + .get('/objects/1') + .replyWithError({ code: 'ECONNRESET' }) + .get('/objects/1') + .reply(200, await getStream.buffer(encryptor.encryptMessageContentStream(data))); + + const out = await objectStorage.getObject(1); + + expect(objectStorageCalls.isDone()).to.be.true; + expect(out).to.be.deep.equal(data); + }); + + it('should retry put request 3 times on errors', async () => { + const objectStorage = new ObjectStorage(settings); + const data = { test: 'test' }; + const put = await getStream.buffer(encryptor.encryptMessageContentStream(data)); + + const objectStorageCalls = nock(settings.OBJECT_STORAGE_URI) + .matchHeader('authorization', 'Bearer jwt') + .put(/^\/objects\/[0-9a-z-]+$/, put) + .replyWithError({ code: 'ECONNREFUSED' }) + .put(/^\/objects\/[0-9a-z-]+$/, put) + .reply(400) + .put(/^\/objects\/[0-9a-z-]+$/, put) + .reply(200); + + await objectStorage.addObject(data); + + expect(objectStorageCalls.isDone()).to.be.true; + }); + + it('should retry put request on 409 error with different objectId', async () => { + const objectStorage = new ObjectStorage(settings); + const data = { test: 'test' }; + const put = await getStream.buffer(encryptor.encryptMessageContentStream(data)); + + const objectStorageCalls = nock(settings.OBJECT_STORAGE_URI) + .matchHeader('authorization', 'Bearer jwt') + .matchHeader('content-type', 'application/octet-stream') + .put(/^\/objects\/[0-9a-z-]+$/, put) + .reply(503) + .put(/^\/objects\/[0-9a-z-]+$/, put) + .reply(409) + .put(/^\/objects\/[0-9a-z-]+$/, put) + .reply(200); + const urls = []; + objectStorageCalls.on('request', req => { + urls.push(req.path); + }); + + await objectStorage.addObject(data); + + expect(objectStorageCalls.isDone()).to.be.true; + expect(urls[0]).to.be.equal(urls[1]); + expect(urls[1]).to.not.equal(urls[2]); + }); +}); From 45c56e1d6cddd0ee2f2f351e246ddf35fa333c86 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Fri, 31 May 2019 15:54:18 +0300 Subject: [PATCH 28/37] elasticio#1456 2.4.0-dev.28 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index cc659e13..ade50947 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.4.0-dev.27", + "version": "2.4.0-dev.28", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js", From 54ea830e94990af8a7b56a2e9b3835c9b3878136 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Fri, 31 May 2019 17:19:38 +0300 Subject: [PATCH 29/37] elasticio#1456 2.4.0-dev.29 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index ade50947..f17a9c20 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.4.0-dev.28", + "version": "2.4.0-dev.29", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js", From d6125a0fd86de3a5d44b84c4b443af74c46fb3e8 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Fri, 31 May 2019 18:09:39 +0300 Subject: [PATCH 30/37] elasticio#1456 updated package-lock --- package-lock.json | 64 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 20f4491b..8b29c09d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "elasticio-sailor-nodejs", - "version": "2.2.4", + "version": "2.4.0-dev.29", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -259,6 +259,15 @@ "integrity": "sha512-ReZxvNHIOv88FlT7rxcXIIC0fPt4KZqZbOlivyWtXLt8ESx84zd3kMC6iK5jVeS2qt+g7ftS7ye4fi06X5rtRQ==", "dev": true }, + "axios": { + "version": "0.18.0", + "resolved": "https://registry.npmjs.org/axios/-/axios-0.18.0.tgz", + "integrity": "sha1-MtU+SFHv3AoRmTts0AB4nXDAUQI=", + "requires": { + "follow-redirects": "^1.3.0", + "is-buffer": "^1.1.5" + } + }, "babel-code-frame": { "version": "6.26.0", "resolved": "https://registry.npmjs.org/babel-code-frame/-/babel-code-frame-6.26.0.tgz", @@ -1753,6 +1762,29 @@ } } }, + "follow-redirects": { + "version": "1.7.0", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.7.0.tgz", + "integrity": "sha512-m/pZQy4Gj287eNy94nivy5wchN3Kp+Q5WgUPNy5lJSZ3sgkVKSYV/ZChMAQVIgx1SqfZ2zBZtPA2YlXIWxxJOQ==", + "requires": { + "debug": "^3.2.6" + }, + "dependencies": { + "debug": { + "version": "3.2.6", + "resolved": "https://registry.npmjs.org/debug/-/debug-3.2.6.tgz", + "integrity": "sha512-mel+jf7nrtEl5Pn1Qx46zARXKDpBbvzezse7p7LqINmdoIk8PYP5SySaxEmYv6TZ0JyEKA1hsCId6DIhgITtWQ==", + "requires": { + "ms": "^2.1.1" + } + }, + "ms": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.1.tgz", + "integrity": "sha512-tgp+dl5cGk28utYktBsrFqA7HKgrhgPsg6Z/EfhWI4gl1Hwq8B/GmY/0oXZ6nF8hDVesS/FpnYaD/kOWhYQvyg==" + } + } + }, "for-in": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/for-in/-/for-in-1.0.2.tgz", @@ -1848,6 +1880,14 @@ "integrity": "sha1-uWjGsKBDhDJJAui/Gl3zJXmkUP4=", "dev": true }, + "get-stream": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/get-stream/-/get-stream-5.1.0.tgz", + "integrity": "sha512-EXr1FOzrzTfGeL0gQdeFEvOMm2mzMOglyiOXSTpPC+iAjAKftbr3jpCMWynogwYnM+eSj9sHGc6wjIcDvYiygw==", + "requires": { + "pump": "^3.0.0" + } + }, "get-value": { "version": "2.0.6", "resolved": "https://registry.npmjs.org/get-value/-/get-value-2.0.6.tgz", @@ -2737,8 +2777,7 @@ "is-buffer": { "version": "1.1.6", "resolved": "https://registry.npmjs.org/is-buffer/-/is-buffer-1.1.6.tgz", - "integrity": "sha512-NcdALwpXkTm5Zvvbk7owOUSvVvBKDgKP5/ewfXEznmQFfs4ZRmanOeKBTjRVjka3QFoN6XJ+9F3USqfHqTaU5w==", - "dev": true + "integrity": "sha512-NcdALwpXkTm5Zvvbk7owOUSvVvBKDgKP5/ewfXEznmQFfs4ZRmanOeKBTjRVjka3QFoN6XJ+9F3USqfHqTaU5w==" }, "is-builtin-module": { "version": "1.0.0", @@ -4765,6 +4804,25 @@ "resolved": "https://registry.npmjs.org/psl/-/psl-1.1.31.tgz", "integrity": "sha512-/6pt4+C+T+wZUieKR620OpzN/LlnNKuWjy1iFLQ/UG35JqHlR/89MP1d96dUfkf6Dne3TuLQzOYEYshJ+Hx8mw==" }, + "pump": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/pump/-/pump-3.0.0.tgz", + "integrity": "sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==", + "requires": { + "end-of-stream": "^1.1.0", + "once": "^1.3.1" + }, + "dependencies": { + "end-of-stream": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.1.tgz", + "integrity": "sha512-1MkrZNvWTKCaigbn+W15elq2BB/L22nqrSY5DKlo3X6+vclJm8Bb5djXJBmEX6fS3+zCh/F4VBK5Z2KxJt4s2Q==", + "requires": { + "once": "^1.4.0" + } + } + } + }, "punycode": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.1.1.tgz", From 6949dfda6208e224fa34d0abd2df2f50b7a56077 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Mon, 3 Jun 2019 14:36:31 +0300 Subject: [PATCH 31/37] elasticio#1456 added parentheses and rule --- .eslintrc.js | 3 ++- lib/cipher.js | 1 - lib/encryptor.js | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index cfcc4e26..f1088d05 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -145,6 +145,7 @@ module.exports = { } ], 'mocha/no-skipped-tests': ERROR, - 'mocha/no-exclusive-tests': ERROR + 'mocha/no-exclusive-tests': ERROR, + 'new-parens': ERROR } }; \ No newline at end of file diff --git a/lib/cipher.js b/lib/cipher.js index 448bf21e..580b19b5 100644 --- a/lib/cipher.js +++ b/lib/cipher.js @@ -63,7 +63,6 @@ function encryptStreamIV() { return new PassThrough; } - if (!VECTOR) { throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set'); } diff --git a/lib/encryptor.js b/lib/encryptor.js index 0461c26a..1926925e 100644 --- a/lib/encryptor.js +++ b/lib/encryptor.js @@ -25,7 +25,7 @@ function decryptMessageContent(messagePayload) { } function encryptMessageContentStream(data) { - const dataStream = new Readable; + const dataStream = new Readable(); dataStream.push(JSON.stringify(data)); dataStream.push(null); return dataStream From d9e0ed4bb4b32992d0e822e267df8c7899c8f4a5 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Mon, 3 Jun 2019 14:38:19 +0300 Subject: [PATCH 32/37] elasticio#1456 added more parentheses --- lib/cipher.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/cipher.js b/lib/cipher.js index 580b19b5..46bbbb4f 100644 --- a/lib/cipher.js +++ b/lib/cipher.js @@ -60,7 +60,7 @@ function encryptStreamIV() { debug('Creating encryption stream'); if (!PASSWORD) { - return new PassThrough; + return new PassThrough(); } if (!VECTOR) { @@ -75,7 +75,7 @@ function decryptStreamIV() { debug('Creating decryption stream'); if (!PASSWORD) { - return new PassThrough; + return new PassThrough(); } if (!VECTOR) { From af9bb1c3a10c6d09359969232140ac44ae0c8a5c Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Mon, 3 Jun 2019 15:25:57 +0300 Subject: [PATCH 33/37] elasticio#1456 decryptMessage -> onMessage --- lib/amqp.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/amqp.js b/lib/amqp.js index 76ec73db..e1b05cb2 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -80,7 +80,7 @@ class Amqp { this.subscribeChannel.prefetch(this.settings.RABBITMQ_PREFETCH_SAILOR); - return this.subscribeChannel.consume(queueName, async function decryptMessage(message) { + return this.subscribeChannel.consume(queueName, async function onMessage(message) { log.trace('Message received: %j', message); if (message === null) { From 06026654258983be083932b99dc5a8ec468826f2 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Mon, 3 Jun 2019 17:18:55 +0300 Subject: [PATCH 34/37] elasticio#1456 OBJECT_STORAGE_OUT -> OBJECT_STORAGE_ENABLED --- lib/amqp.js | 2 +- lib/settings.js | 2 +- mocha_spec/integration_helpers.js | 2 +- mocha_spec/run.spec.js | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index e1b05cb2..e9867760 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -65,7 +65,7 @@ class Amqp { } async encryptMessage(data, properties) { - if (!this.settings.OBJECT_STORAGE_OUT) { + if (!this.settings.OBJECT_STORAGE_ENABLED) { return encryptor.encryptMessageContent(data); } diff --git a/lib/settings.js b/lib/settings.js index 02c1ab75..ec30012c 100644 --- a/lib/settings.js +++ b/lib/settings.js @@ -51,7 +51,7 @@ function readFrom(envVars) { AMQP_PUBLISH_RETRY_ATTEMPTS: 10, OBJECT_STORAGE_URI: '', OBJECT_STORAGE_TOKEN: '', - OBJECT_STORAGE_OUT: false + OBJECT_STORAGE_ENABLED: false }; _.forEach(requiredAlways, function readRequired(key) { diff --git a/mocha_spec/integration_helpers.js b/mocha_spec/integration_helpers.js index 31e5ba10..402d1d7d 100644 --- a/mocha_spec/integration_helpers.js +++ b/mocha_spec/integration_helpers.js @@ -182,7 +182,7 @@ function prepareEnv() { env.DEBUG = 'sailor:debug'; env.ELASTICIO_OBJECT_STORAGE_URI = 'http://ma.es.ter'; - env.ELASTICIO_OBJECT_STORAGE_OUT = ''; + env.ELASTICIO_OBJECT_STORAGE_ENABLED = ''; env.ELASTICIO_OBJECT_STORAGE_TOKEN = 'jwt'; } diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index dc4be12d..a2e7f6ec 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -145,7 +145,7 @@ describe('Integration Test', () => { }); it('should get object storage message', async () => { - process.env.ELASTICIO_OBJECT_STORAGE_OUT = true; + process.env.ELASTICIO_OBJECT_STORAGE_ENABLED = true; helpers.mockApiTaskStepResponse(); From c83215129eee8f6dfe0c9a8527b27b69fd1ab06f Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Mon, 3 Jun 2019 17:21:30 +0300 Subject: [PATCH 35/37] elasticio#1456 createReq -> request, onRes -> onResponse --- lib/objectStorage.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/objectStorage.js b/lib/objectStorage.js index d0ec82a9..f9b972aa 100644 --- a/lib/objectStorage.js +++ b/lib/objectStorage.js @@ -15,7 +15,7 @@ class ObjectStorage { }); } - async requestRetry({ maxAttempts, delay, createReq, onRes }) { + async requestRetry({ maxAttempts, delay, request, onResponse }) { let attempts = 0; let res; let err; @@ -24,11 +24,11 @@ class ObjectStorage { res = null; attempts++; try { - res = await createReq(); + res = await request(); } catch (e) { err = e; } - if (onRes && onRes(err, res)) { + if (onResponse && onResponse(err, res)) { continue; } if (err || res.status >= 400) { @@ -47,7 +47,7 @@ class ObjectStorage { const res = await this.requestRetry({ maxAttempts: 3, delay: 100, - createReq: () => this.api.get(`/objects/${objectId}`, { responseType: 'stream' }) + request: () => this.api.get(`/objects/${objectId}`, { responseType: 'stream' }) }); return await encryptor.decryptMessageContentStream(res.data); @@ -58,12 +58,12 @@ class ObjectStorage { await this.requestRetry({ maxAttempts: 3, delay: 100, - createReq: () => this.api.put( + request: () => this.api.put( `/objects/${objectId}`, encryptor.encryptMessageContentStream(data), { headers: { 'content-type': 'application/octet-stream' } } ), - onRes: (err, res) => { + onResponse: (err, res) => { if (!err && res.status === 409) { objectId = uuid.v4(); return true; From 5b9341c3093a0178626793a17057a6bcc639ff9d Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Mon, 3 Jun 2019 18:56:13 +0300 Subject: [PATCH 36/37] elasticio#1456 added fallback to message send via rabbitmq on object storage errors, added warnings --- lib/amqp.js | 10 ++- lib/objectStorage.js | 3 + mocha_spec/objectStorage.spec.js | 16 ++++ mocha_spec/run.spec.js | 137 +++++++++++++++++++++++++++++++ 4 files changed, 164 insertions(+), 2 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index e9867760..fa78f69b 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -69,9 +69,15 @@ class Amqp { return encryptor.encryptMessageContent(data); } - properties.headers.objectId = await this.objectStorage.addObject(data); + let message = ''; + try { + properties.headers.objectId = await this.objectStorage.addObject(data); + } catch (e) { + log.error('Failed to add message to object storage: %s', e); + message = encryptor.encryptMessageContent(data); + } - return ''; + return message; } listenQueue(queueName, callback) { diff --git a/lib/objectStorage.js b/lib/objectStorage.js index f9b972aa..f6482c0f 100644 --- a/lib/objectStorage.js +++ b/lib/objectStorage.js @@ -1,3 +1,4 @@ +const log = require('./logging.js'); const encryptor = require('./encryptor.js'); const uuid = require('uuid'); const axios = require('axios'); @@ -32,6 +33,7 @@ class ObjectStorage { continue; } if (err || res.status >= 400) { + log.warn('Error during object get: %s', err ? err : `${res.status} (${res.statusText})`); await new Promise((resolve) => setTimeout(resolve, delay)); continue; } @@ -65,6 +67,7 @@ class ObjectStorage { ), onResponse: (err, res) => { if (!err && res.status === 409) { + log.warn('Generated already existing UUID'); objectId = uuid.v4(); return true; } diff --git a/mocha_spec/objectStorage.spec.js b/mocha_spec/objectStorage.spec.js index 67ea0e9d..e9a2376b 100644 --- a/mocha_spec/objectStorage.spec.js +++ b/mocha_spec/objectStorage.spec.js @@ -1,6 +1,8 @@ const nock = require('nock'); +const sinonjs = require('sinon'); const expect = require('chai').expect; const getStream = require('get-stream'); +const logging = require('../lib/logging.js'); describe('ObjectStorage', () => { @@ -31,7 +33,16 @@ describe('ObjectStorage', () => { const encryptor = require('../lib/encryptor.js'); const settings = require('../lib/settings.js').readFrom(envVars); + let sinon; + beforeEach(() => { + sinon = sinonjs.sandbox.create(); + }); + afterEach(() => { + sinon.restore(); + }); + it('should fail after 3 retries', async () => { + const log = sinon.stub(logging, 'warn'); const objectStorage = new ObjectStorage(settings); const objectStorageCalls = nock(settings.OBJECT_STORAGE_URI) @@ -52,6 +63,8 @@ describe('ObjectStorage', () => { expect(objectStorageCalls.isDone()).to.be.true; expect(err.code).to.be.equal('ENOTFOUND'); + expect(log.getCall(1).args[1].toString()).to.include('404'); + expect(log.callCount).to.be.equal(3); }); it('should retry get request 3 times on errors', async () => { @@ -93,6 +106,7 @@ describe('ObjectStorage', () => { }); it('should retry put request on 409 error with different objectId', async () => { + const log = sinon.stub(logging, 'warn'); const objectStorage = new ObjectStorage(settings); const data = { test: 'test' }; const put = await getStream.buffer(encryptor.encryptMessageContentStream(data)); @@ -116,5 +130,7 @@ describe('ObjectStorage', () => { expect(objectStorageCalls.isDone()).to.be.true; expect(urls[0]).to.be.equal(urls[1]); expect(urls[1]).to.not.equal(urls[2]); + expect(log.getCall(1).args[0]).to.include('UUID'); + expect(log.callCount).to.be.equal(2); }); }); diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index a2e7f6ec..6fb6e131 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -201,6 +201,143 @@ describe('Integration Test', () => { return new Promise((resolve) => { done = resolve; }); }); + it('should get object storage message if error repeat succeed', async () => { + process.env.ELASTICIO_OBJECT_STORAGE_ENABLED = true; + + helpers.mockApiTaskStepResponse(); + + nock('https://api.acme.com') + .post('/subscribe') + .reply(200, { + id: 'subscription_12345' + }) + .get('/customers') + .reply(200, customers); + + const log = sinon.stub(logging, 'warn'); + const objectStorageGet = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .get(`/objects/${objectId}`) + .matchHeader('authorization', /Bearer/) + .reply(200, await helpers.encryptForObjectStorage(inputMessage), { + 'content-type': 'application/octet-stream' + }); + const putData = await helpers.encryptForObjectStorage({ + id: messageId, + body: { + originalMsg: inputMessage, + customers, + subscription: { + id: 'subscription_12345', + cfg: { + apiKey: 'secret' + } + } + }, + headers: {} + }); + const objectStoragePut = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .matchHeader('content-type', 'application/octet-stream') + .matchHeader('authorization', /Bearer/) + .put(/^\/objects\/[0-9a-z-]+$/, putData) + .replyWithError({ code: 'ECONNREFUSED' }) + .put(/^\/objects\/[0-9a-z-]+$/, putData) + .reply(400) + .put(/^\/objects\/[0-9a-z-]+$/, putData) + .reply(200); + + let done; + amqpHelper.on('data', ({ properties, body }) => { + expect(properties.headers.objectId).to.be.a('string'); + expect(body).to.be.null; + expect(objectStorageGet.isDone()).to.be.true; + expect(objectStoragePut.isDone()).to.be.true; + expect(log.getCall(1).args[1].toString()).to.include('400'); + done(); + }); + + run = requireRun(); + + amqpHelper.publishMessage('', { + parentMessageId, + traceId + }, { objectId }); + + return new Promise((resolve) => { done = resolve; }); + }); + + it('should get object storage message, but publish directly on 3 errors', async () => { + process.env.ELASTICIO_OBJECT_STORAGE_ENABLED = true; + + helpers.mockApiTaskStepResponse(); + + nock('https://api.acme.com') + .post('/subscribe') + .reply(200, { + id: 'subscription_12345' + }) + .get('/customers') + .reply(200, customers); + + const log = sinon.stub(logging, 'error'); + const objectStorageGet = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .get(`/objects/${objectId}`) + .matchHeader('authorization', /Bearer/) + .reply(200, await helpers.encryptForObjectStorage(inputMessage), { + 'content-type': 'application/octet-stream' + }); + const putData = await helpers.encryptForObjectStorage({ + id: messageId, + body: { + originalMsg: inputMessage, + customers, + subscription: { + id: 'subscription_12345', + cfg: { + apiKey: 'secret' + } + } + }, + headers: {} + }); + const objectStoragePut = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .matchHeader('content-type', 'application/octet-stream') + .matchHeader('authorization', /Bearer/) + .put(/^\/objects\/[0-9a-z-]+$/, putData) + .replyWithError({ code: 'ECONNREFUSED' }) + .put(/^\/objects\/[0-9a-z-]+$/, putData) + .reply(400) + .put(/^\/objects\/[0-9a-z-]+$/, putData) + .reply(503); + + let done; + amqpHelper.on('data', ({ properties, body }) => { + expect(properties.headers.objectId).to.not.exist; + expect(body).to.deep.equal({ + originalMsg: inputMessage, + customers, + subscription: { + id: 'subscription_12345', + cfg: { + apiKey: 'secret' + } + } + }); + expect(objectStorageGet.isDone()).to.be.true; + expect(objectStoragePut.isDone()).to.be.true; + expect(log.getCall(0).args[1].toString()).to.include('503'); + done(); + }); + + run = requireRun(); + + amqpHelper.publishMessage('', { + parentMessageId, + traceId + }, { objectId }); + + return new Promise((resolve) => { done = resolve; }); + }); + it('should get object storage message, but publish directly', async () => { helpers.mockApiTaskStepResponse(); From 2f2332497f553cf00d9dd7c5b1e61884c31043a1 Mon Sep 17 00:00:00 2001 From: Paul Annekov Date: Wed, 5 Jun 2019 11:14:18 +0300 Subject: [PATCH 37/37] elasticio#1456 2.4.2-dev.0 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index f17a9c20..f74b83a1 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "elasticio-sailor-nodejs", "description": "The official elastic.io library for bootstrapping and executing for Node.js connectors", - "version": "2.4.0-dev.29", + "version": "2.4.2-dev.0", "main": "run.js", "scripts": { "lint": "./node_modules/.bin/eslint lib spec mocha_spec lib run.js runService.js",