Skip to content

Fix missing context issue #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import http from 'http';
import { createTerminus } from '@godaddy/terminus';
import app from './app';
import config from './config';
import logger from './logger';
import services from './services';

const server = http.createServer(app);

async function onSignal(): Promise<void> {
logger.warn('Server is going to shut down! Starting cleanup...');
}
Expand All @@ -19,14 +16,6 @@
return;
}

createTerminus(server, {
healthChecks: {
'/health/liveness': onHealthCheck,
},
onSignal,
onShutdown,
});

async function start(): Promise<void> {
try {
const namespace = config.services.trace.daemonAddressNamespace;
Expand All @@ -34,10 +23,21 @@
if (typeof namespace === 'string' && typeof name === 'string') {
const address = await services.ServiceDiscovery.discoverInstance(namespace, name);

(services.Trace as any).setDaemonAddress(address);

Check warning on line 26 in src/index.ts

View workflow job for this annotation

GitHub Actions / Lint, test and build

Unexpected any. Specify a different type
(services.Trace as any).captureHTTPRequests();
}

const app = (await import('./app')).default;

const server = http.createServer(app);

createTerminus(server, {
healthChecks: {
'/health/liveness': onHealthCheck,
},
onSignal,
onShutdown,
});

server.listen(config.port, () => {
logger.info(`Server started at http://localhost:${ config.port }`);
});
Expand Down
8 changes: 3 additions & 5 deletions src/services/trace/AWSXRay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import https from 'https';
class AWSXRay implements Services.Trace {
constructor (plugins: string) {
this.setPlugins(plugins);

XRay.captureHTTPsGlobal(http, true);
XRay.captureHTTPsGlobal(https, true);
}

public openSegment(defaultName: string) {
Expand Down Expand Up @@ -60,11 +63,6 @@ class AWSXRay implements Services.Trace {

XRay.config(xrayPlugins);
}

public captureHTTPRequests(): void {
XRay.captureHTTPsGlobal(http, true);
XRay.captureHTTPsGlobal(https, true);
}
}

export default AWSXRay;
78 changes: 78 additions & 0 deletions src/services/worker/Consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { Consumer } from 'sqs-consumer';
import services from '../../services';
import config from '../../config';
import logger from '../../logger';
import video from '../../services/job/video';
import repositories from '../../repositories';

function createConsumer() {
const consumer = Consumer.create({
queueUrl: config.services.queue.url,
handleMessage: async (message) => {
const namespace: any = services.Trace.getNamespace();
const segment: any = services.Trace.createSegment('upload-service-worker');

await namespace.runPromise(async () => {
services.Trace.setSegment(segment);

logger.debug('Received message from queue!', { ...message });

const body = JSON.parse(message.Body);

if ('traceId' in body) {
namespace.set('traceId', body.traceId);
} else {
logger.warn('Trace ID is not present in message body!');
}

if ('fileId' in body) {
try {
const file = await repositories.File.get(body.fileId, null);
const values = await services.Job.process(body.mimetype, file) as Record<string, string | number | boolean>;
await repositories.File.update({ ...values, cacheable: !body.mimetype.startsWith('video') }, { id: file.id });
} catch (err) {
logger.error('Could not process file!', { error: err.message, stack: err.stack });
}
}

if ('detail' in body && 'jobId' in body.detail) {
if (body.detail.status === 'COMPLETE') {
const updatedFile = await video.AWSVideo.complete(body) as Record<string, string | number | boolean>;
await repositories.File.update({ ...updatedFile, cacheable: true }, { id: updatedFile['id'] as string });
}

if (body.detail.status === 'ERROR') {
logger.error('Media convert job ended with error!', body);
}
}

segment.close();
});
},
sqs: services.AWS.sqs,
});

consumer.on('message_processed', (message) => {
logger.debug('Message successfully processed and removed from the queue!', { message });
});

consumer.on('error', (err) => {
logger.error('Unknown error occured!', { message: err.message, stack: err.stack });
});

consumer.on('processing_error', (err) => {
logger.error('Error while processing message from queue!', { message: err.message, stack: err.stack });

services.Trace.closeSegment();
});

consumer.on('timeout_error', (err) => {
logger.error('Timeout error!', { message: err.message, stack: err.stack });
});

return consumer;
}

export default {
get: createConsumer,
}
72 changes: 3 additions & 69 deletions src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,73 +1,7 @@
import { Consumer } from 'sqs-consumer';
import services from './services';
import config from './config';
import logger from './logger';
import video from './services/job/video';
import repositories from './repositories';

const app = Consumer.create({
queueUrl: config.services.queue.url,
handleMessage: async (message) => {
const namespace: any = services.Trace.getNamespace();
const segment: any = services.Trace.createSegment('upload-service-worker');

await namespace.runPromise(async () => {
services.Trace.setSegment(segment);

logger.debug('Received message from queue!', { ...message });

const body = JSON.parse(message.Body);

if ('traceId' in body) {
namespace.set('traceId', body.traceId);
} else {
logger.warn('Trace ID is not present in message body!');
}

if ('fileId' in body) {
try {
const file = await repositories.File.get(body.fileId, null);
const values = await services.Job.process(body.mimetype, file) as Record<string, string | number | boolean>;
await repositories.File.update({ ...values, cacheable: !body.mimetype.startsWith('video') }, { id: file.id });
} catch (err) {
logger.error('Could not process file!', { error: err.message, stack: err.stack });
}
}

if ('detail' in body && 'jobId' in body.detail) {
if (body.detail.status === 'COMPLETE') {
const updatedFile = await video.AWSVideo.complete(body) as Record<string, string | number | boolean>;
await repositories.File.update({ ...updatedFile, cacheable: true }, { id: updatedFile['id'] as string });
}

if (body.detail.status === 'ERROR') {
logger.error('Media convert job ended with error!', body);
}
}

segment.close();
});
},
sqs: services.AWS.sqs,
});

app.on('message_processed', (message) => {
logger.debug('Message successfully processed and removed from the queue!', { message });
});

app.on('error', (err) => {
logger.error('Unknown error occured!', { message: err.message, stack: err.stack });
});

app.on('processing_error', (err) => {
logger.error('Error while processing message from queue!', { message: err.message, stack: err.stack });

services.Trace.closeSegment();
});

app.on('timeout_error', (err) => {
logger.error('Timeout error!', { message: err.message, stack: err.stack });
});
import Consumer from './services/worker/Consumer';

async function start() {
const namespace = config.services.trace.daemonAddressNamespace;
Expand All @@ -76,12 +10,12 @@ async function start() {
const address = await services.ServiceDiscovery.discoverInstance(namespace, name);

(services.Trace as any).setDaemonAddress(address);
(services.Trace as any).captureHTTPRequests();
}

logger.info('Upload worker is running');

app.start();
const consumer = Consumer.get();
consumer.start();
}

start();
Loading