diff --git a/src/index.ts b/src/index.ts index fcfb5f6..7116f42 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,12 +1,9 @@ import http from 'http'; import { createTerminus } from '@godaddy/terminus'; -import app from './app'; import config from './config'; import logger from './logger'; import services from './services'; -const server = http.createServer(app); - async function onSignal(): Promise { logger.warn('Server is going to shut down! Starting cleanup...'); } @@ -19,14 +16,6 @@ async function onHealthCheck(): Promise { return; } -createTerminus(server, { - healthChecks: { - '/health/liveness': onHealthCheck, - }, - onSignal, - onShutdown, -}); - async function start(): Promise { try { const namespace = config.services.trace.daemonAddressNamespace; @@ -35,9 +24,20 @@ async function start(): Promise { const address = await services.ServiceDiscovery.discoverInstance(namespace, name); (services.Trace as any).setDaemonAddress(address); - (services.Trace as any).captureHTTPRequests(); } + const app = (await import('./app')).default; + + const server = http.createServer(app); + + createTerminus(server, { + healthChecks: { + '/health/liveness': onHealthCheck, + }, + onSignal, + onShutdown, + }); + server.listen(config.port, () => { logger.info(`Server started at http://localhost:${ config.port }`); }); diff --git a/src/services/trace/AWSXRay.ts b/src/services/trace/AWSXRay.ts index d2cd2d3..c3ba139 100644 --- a/src/services/trace/AWSXRay.ts +++ b/src/services/trace/AWSXRay.ts @@ -5,6 +5,9 @@ import https from 'https'; class AWSXRay implements Services.Trace { constructor (plugins: string) { this.setPlugins(plugins); + + XRay.captureHTTPsGlobal(http, true); + XRay.captureHTTPsGlobal(https, true); } public openSegment(defaultName: string) { @@ -60,11 +63,6 @@ class AWSXRay implements Services.Trace { XRay.config(xrayPlugins); } - - public captureHTTPRequests(): void { - XRay.captureHTTPsGlobal(http, true); - XRay.captureHTTPsGlobal(https, true); - } } export default AWSXRay; diff --git a/src/services/worker/Consumer.ts b/src/services/worker/Consumer.ts new file mode 100644 index 0000000..f5d9ab8 --- /dev/null +++ b/src/services/worker/Consumer.ts @@ -0,0 +1,78 @@ +import { Consumer } from 'sqs-consumer'; +import services from '../../services'; +import config from '../../config'; +import logger from '../../logger'; +import video from '../../services/job/video'; +import repositories from '../../repositories'; + +function createConsumer() { + const consumer = Consumer.create({ + queueUrl: config.services.queue.url, + handleMessage: async (message) => { + const namespace: any = services.Trace.getNamespace(); + const segment: any = services.Trace.createSegment('upload-service-worker'); + + await namespace.runPromise(async () => { + services.Trace.setSegment(segment); + + logger.debug('Received message from queue!', { ...message }); + + const body = JSON.parse(message.Body); + + if ('traceId' in body) { + namespace.set('traceId', body.traceId); + } else { + logger.warn('Trace ID is not present in message body!'); + } + + if ('fileId' in body) { + try { + const file = await repositories.File.get(body.fileId, null); + const values = await services.Job.process(body.mimetype, file) as Record; + await repositories.File.update({ ...values, cacheable: !body.mimetype.startsWith('video') }, { id: file.id }); + } catch (err) { + logger.error('Could not process file!', { error: err.message, stack: err.stack }); + } + } + + if ('detail' in body && 'jobId' in body.detail) { + if (body.detail.status === 'COMPLETE') { + const updatedFile = await video.AWSVideo.complete(body) as Record; + await repositories.File.update({ ...updatedFile, cacheable: true }, { id: updatedFile['id'] as string }); + } + + if (body.detail.status === 'ERROR') { + logger.error('Media convert job ended with error!', body); + } + } + + segment.close(); + }); + }, + sqs: services.AWS.sqs, + }); + + consumer.on('message_processed', (message) => { + logger.debug('Message successfully processed and removed from the queue!', { message }); + }); + + consumer.on('error', (err) => { + logger.error('Unknown error occured!', { message: err.message, stack: err.stack }); + }); + + consumer.on('processing_error', (err) => { + logger.error('Error while processing message from queue!', { message: err.message, stack: err.stack }); + + services.Trace.closeSegment(); + }); + + consumer.on('timeout_error', (err) => { + logger.error('Timeout error!', { message: err.message, stack: err.stack }); + }); + + return consumer; +} + +export default { + get: createConsumer, +} diff --git a/src/worker.ts b/src/worker.ts index e378e48..5124f55 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,73 +1,7 @@ -import { Consumer } from 'sqs-consumer'; import services from './services'; import config from './config'; import logger from './logger'; -import video from './services/job/video'; -import repositories from './repositories'; - -const app = Consumer.create({ - queueUrl: config.services.queue.url, - handleMessage: async (message) => { - const namespace: any = services.Trace.getNamespace(); - const segment: any = services.Trace.createSegment('upload-service-worker'); - - await namespace.runPromise(async () => { - services.Trace.setSegment(segment); - - logger.debug('Received message from queue!', { ...message }); - - const body = JSON.parse(message.Body); - - if ('traceId' in body) { - namespace.set('traceId', body.traceId); - } else { - logger.warn('Trace ID is not present in message body!'); - } - - if ('fileId' in body) { - try { - const file = await repositories.File.get(body.fileId, null); - const values = await services.Job.process(body.mimetype, file) as Record; - await repositories.File.update({ ...values, cacheable: !body.mimetype.startsWith('video') }, { id: file.id }); - } catch (err) { - logger.error('Could not process file!', { error: err.message, stack: err.stack }); - } - } - - if ('detail' in body && 'jobId' in body.detail) { - if (body.detail.status === 'COMPLETE') { - const updatedFile = await video.AWSVideo.complete(body) as Record; - await repositories.File.update({ ...updatedFile, cacheable: true }, { id: updatedFile['id'] as string }); - } - - if (body.detail.status === 'ERROR') { - logger.error('Media convert job ended with error!', body); - } - } - - segment.close(); - }); - }, - sqs: services.AWS.sqs, -}); - -app.on('message_processed', (message) => { - logger.debug('Message successfully processed and removed from the queue!', { message }); -}); - -app.on('error', (err) => { - logger.error('Unknown error occured!', { message: err.message, stack: err.stack }); -}); - -app.on('processing_error', (err) => { - logger.error('Error while processing message from queue!', { message: err.message, stack: err.stack }); - - services.Trace.closeSegment(); -}); - -app.on('timeout_error', (err) => { - logger.error('Timeout error!', { message: err.message, stack: err.stack }); -}); +import Consumer from './services/worker/Consumer'; async function start() { const namespace = config.services.trace.daemonAddressNamespace; @@ -76,12 +10,12 @@ async function start() { const address = await services.ServiceDiscovery.discoverInstance(namespace, name); (services.Trace as any).setDaemonAddress(address); - (services.Trace as any).captureHTTPRequests(); } logger.info('Upload worker is running'); - app.start(); + const consumer = Consumer.get(); + consumer.start(); } start();