From 2e5badf12a4d51dbaaa1677c7d34c1b0e5523fde Mon Sep 17 00:00:00 2001 From: Peter Baker Date: Mon, 19 May 2025 13:56:00 +1000 Subject: [PATCH 01/12] Improved scaling behaviour, and passing through scaling params Signed-off-by: Peter Baker --- src/infra/components/jobs.ts | 9 +- src/infra/infra.ts | 3 +- src/job-manager/src/config.ts | 167 ++++++------- src/job-manager/src/manager.ts | 431 +++++++++++++++++++++++++++------ 4 files changed, 435 insertions(+), 175 deletions(-) diff --git a/src/infra/components/jobs.ts b/src/infra/components/jobs.ts index 014b9cf..47d9cba 100644 --- a/src/infra/components/jobs.ts +++ b/src/infra/components/jobs.ts @@ -25,8 +25,9 @@ export interface JobTypeConfig { // Scaling configuration desiredMinCapacity: number; desiredMaxCapacity: number; - scaleUpThreshold: number; cooldownSeconds: number; + scalingSensitivity: number; + scalingFactor: number; serverPort: number; command: string[]; @@ -374,8 +375,10 @@ export class JobSystem extends Construct { workerConfig.desiredMinCapacity.toString(); taskDefEnvVars[`${jobType}_MAX_CAPACITY`] = workerConfig.desiredMaxCapacity.toString(); - taskDefEnvVars[`${jobType}_SCALE_THRESHOLD`] = - workerConfig.scaleUpThreshold.toString(); + taskDefEnvVars[`${jobType}_SENSITIVITY`] = + workerConfig.scalingSensitivity.toString(); + taskDefEnvVars[`${jobType}_FACTOR`] = + workerConfig.scalingFactor.toString(); taskDefEnvVars[`${jobType}_COOLDOWN`] = workerConfig.cooldownSeconds.toString(); taskDefEnvVars[`${jobType}_SECURITY_GROUP`] = workerSg.securityGroupId; diff --git a/src/infra/infra.ts b/src/infra/infra.ts index ca2bc5d..ec4d8c5 100644 --- a/src/infra/infra.ts +++ b/src/infra/infra.ts @@ -189,7 +189,8 @@ export class ReefguideWebApiStack extends cdk.Stack { command: ['using ReefGuideAPI; ReefGuideAPI.start_worker()'], desiredMinCapacity: 0, desiredMaxCapacity: 5, - scaleUpThreshold: 1, + scalingFactor: 20, + scalingSensitivity: 5.0, cooldownSeconds: 60, // This specifies where the config file path can be found for the diff --git a/src/job-manager/src/config.ts b/src/job-manager/src/config.ts index db77b83..74ba587 100644 --- a/src/job-manager/src/config.ts +++ b/src/job-manager/src/config.ts @@ -1,18 +1,25 @@ import { JobType } from '@prisma/client'; import { z } from 'zod'; +export const ScalingConfiguration = z.object({ + min: z.number().min(0, 'Minimum capacity must be non-negative'), + max: z.number().min(0, 'Maximum capacity must be non-negative'), + sensitivity: z + .number() + .min(1, 'Logarithmic sensitivity - see scaling algorithm'), + factor: z + .number() + .min( + 1, + 'Division factor for jobs - this allows you to consider different job count scales.', + ), + cooldownSeconds: z.number().min(0, 'Cooldown seconds must be non-negative'), +}); // Configuration schema for job types and their corresponding ECS resources export const JobTypeConfigSchema = z.object({ taskDefinitionArn: z.string().min(1, 'Task Definition ARN is required'), clusterArn: z.string().min(1, 'Cluster ARN is required'), - desiredMinCapacity: z - .number() - .min(0, 'Minimum capacity must be non-negative'), - desiredMaxCapacity: z - .number() - .min(0, 'Maximum capacity must be non-negative'), - scaleUpThreshold: z.number().min(1, 'Scale-up threshold must be at least 1'), - cooldownSeconds: z.number().min(0, 'Cooldown seconds must be non-negative'), + scaling: ScalingConfiguration, // Security group ARN for this task securityGroup: z.string().min(1, 'Security group ARN is required'), }); @@ -29,23 +36,15 @@ export const BaseEnvConfigSchema = z.object({ VPC_ID: z.string().min(1, 'VPC ID is required'), }); -// Job type specific environment variable fields that will be expected for each job type -const JOB_TYPE_ENV_FIELDS = { - TASK_DEF: 'taskDefinitionArn', - CLUSTER: 'clusterArn', - MIN_CAPACITY: 'desiredMinCapacity', - MAX_CAPACITY: 'desiredMaxCapacity', - SCALE_THRESHOLD: 'scaleUpThreshold', - COOLDOWN: 'cooldownSeconds', - SECURITY_GROUP: 'securityGroup', -}; - // Final configuration schema structure export const ConfigSchema = z.object({ pollIntervalMs: z.number().min(1000, 'Poll interval must be at least 1000ms'), apiEndpoint: z.string().url('API endpoint must be a valid URL'), region: z.string().min(1, 'AWS region is required'), - jobTypes: z.record(z.nativeEnum(JobType), JobTypeConfigSchema), + jobTypes: z.record( + z.nativeEnum(JobType), + JobTypeConfigSchema.extend({ jobTypes: z.array(z.nativeEnum(JobType)) }), + ), auth: z.object({ email: z.string().min(1, 'Email is required'), password: z.string().min(1, 'Password is required'), @@ -55,57 +54,6 @@ export const ConfigSchema = z.object({ export type Config = z.infer; -/** - * Creates a Zod schema for all environment variables based on available job types - * Dynamically generates validation for each job type's environment variables - * - * @returns Zod schema for environment variables - */ -export function createEnvVarsSchema(): z.ZodObject { - // Start with the base config schema - let envSchema: Record = { ...BaseEnvConfigSchema.shape }; - - // For each job type in the enum, add its specific environment variables - Object.values(JobType).forEach(jobType => { - const typePrefix = jobType.toString(); - - // Add each field for this job type - Object.entries(JOB_TYPE_ENV_FIELDS).forEach(([envSuffix, configField]) => { - const envVarName = `${typePrefix}_${envSuffix}`; - - // Handle numeric fields with transformation - if ( - [ - 'MIN_CAPACITY', - 'MAX_CAPACITY', - 'SCALE_THRESHOLD', - 'COOLDOWN', - ].includes(envSuffix) - ) { - envSchema[envVarName] = z - .string() - .transform(val => { - const parsed = parseInt(val); - if (isNaN(parsed)) { - throw new Error(`${envVarName} must be a valid number`); - } - return parsed; - }) - .describe(`${configField} for ${typePrefix} job type`); - } else { - // String fields - envSchema[envVarName] = z - .string() - .min(1, `${envVarName} is required for ${typePrefix} job type`) - .describe(`${configField} for ${typePrefix} job type`); - } - }); - }); - - // Return the complete environment variables schema - return z.object(envSchema); -} - /** * Builds job type configuration from environment variables * @@ -114,23 +62,28 @@ export function createEnvVarsSchema(): z.ZodObject { * @returns Configuration for the specified job type */ function buildJobTypeConfig( - env: Record, + env: Record, jobType: string, ): JobTypeConfig { + const optimisticParse = { + taskDefinitionArn: env[`${jobType}_TASK_DEF`] as string, + clusterArn: env[`${jobType}_CLUSTER`] as string, + scaling: { + min: env[`${jobType}_MIN_CAPACITY`] as number, + max: env[`${jobType}_MAX_CAPACITY`] as number, + cooldownSeconds: env[`${jobType}_COOLDOWN`] as number, + sensitivity: env[`${jobType}_SENSITIVITY`] as number, + factor: env[`${jobType}_FACTOR`] as number, + }, + securityGroup: env[`${jobType}_SECURITY_GROUP`] as string, + } satisfies JobTypeConfig; try { - return { - taskDefinitionArn: env[`${jobType}_TASK_DEF`], - clusterArn: env[`${jobType}_CLUSTER`], - desiredMinCapacity: env[`${jobType}_MIN_CAPACITY`], - desiredMaxCapacity: env[`${jobType}_MAX_CAPACITY`], - scaleUpThreshold: env[`${jobType}_SCALE_THRESHOLD`], - cooldownSeconds: env[`${jobType}_COOLDOWN`], - securityGroup: env[`${jobType}_SECURITY_GROUP`], - }; - } catch (error) { - throw new Error( - `Failed to build configuration for job type ${jobType}: ${error}`, + return JobTypeConfigSchema.parse(optimisticParse); + } catch (e) { + console.error( + `Job type ${jobType} did not have valid environment variables. Error: ${e}.`, ); + throw e; } } @@ -143,32 +96,50 @@ function buildJobTypeConfig( */ export function loadConfig(): Config { try { - // Create the environment schema based on available job types - const EnvVarsSchema = createEnvVarsSchema(); - - // Validate all required environment variables - const env = EnvVarsSchema.parse(process.env); - + // Force types here as we zod process everything! + const env = process.env as Record; // Initialize job types configuration object - const jobTypesConfig: Record = {}; + const jobTypesConfig: Record< + string, + JobTypeConfig & { jobTypes?: JobType[] } + > = {}; // Build configuration for each job type Object.values(JobType).forEach(jobType => { const typeString = jobType.toString(); - jobTypesConfig[typeString] = buildJobTypeConfig(env, typeString); + jobTypesConfig[typeString] = buildJobTypeConfig( + env as Record, + typeString, + ); }); + // Group the job types by task ARN + const arnToTypes: Map = new Map(); + for (const [jobType, config] of Object.entries(jobTypesConfig)) { + arnToTypes.set( + config.taskDefinitionArn, + (arnToTypes.get(config.taskDefinitionArn) ?? []).concat([ + jobType as JobType, + ]), + ); + } + + // Update with grouped types + for (let config of Object.values(jobTypesConfig)) { + config.jobTypes = arnToTypes.get(config.taskDefinitionArn); + } + // Construct the complete config object with validated values const config: Config = { - pollIntervalMs: env.POLL_INTERVAL_MS, - apiEndpoint: env.API_ENDPOINT, - region: env.AWS_REGION, + pollIntervalMs: env.POLL_INTERVAL_MS as number, + apiEndpoint: env.API_ENDPOINT as string, + region: env.AWS_REGION as string, jobTypes: jobTypesConfig, auth: { - email: env.API_USERNAME, - password: env.API_PASSWORD, + email: env.API_USERNAME as string, + password: env.API_PASSWORD as string, }, - vpcId: env.VPC_ID, + vpcId: env.VPC_ID as string, }; // Validate the entire config object diff --git a/src/job-manager/src/manager.ts b/src/job-manager/src/manager.ts index 3f1cc62..220214a 100644 --- a/src/job-manager/src/manager.ts +++ b/src/job-manager/src/manager.ts @@ -1,24 +1,37 @@ -import { AssignPublicIp, ECSClient, RunTaskCommand } from '@aws-sdk/client-ecs'; +import { + AssignPublicIp, + ECSClient, + RunTaskCommand, + DescribeTasksCommand, + Task, +} from '@aws-sdk/client-ecs'; import { EC2Client, DescribeSubnetsCommand } from '@aws-sdk/client-ec2'; import { Config, ConfigSchema, JobTypeConfig } from './config'; import { AuthApiClient } from './authClient'; import { JobType } from '@prisma/client'; +import { PollJobsResponse } from '../../api/jobs/routes'; + +// This interface are details for a worker which is pending or running +interface TrackedWorker { + taskArn: string; + clusterArn: string; + startTime: Date; + jobTypes: JobType[]; + status: 'PENDING' | 'RUNNING' | 'STOPPED'; +} export class CapacityManager { private config: Config; - private ecsClient: ECSClient; - private ec2Client: EC2Client; - private lastScaleTime: Record = {}; - private client: AuthApiClient; - private isRunning: boolean = false; - private pollTimeout: NodeJS.Timeout | null = null; + // Add tracking data structures + private trackedWorkers: TrackedWorker[] = []; + constructor(config: Config, client: AuthApiClient) { this.config = ConfigSchema.parse(config); this.ecsClient = new ECSClient({ region: this.config.region }); @@ -39,18 +52,13 @@ export class CapacityManager { try { console.log('Poll started at:', new Date().toISOString()); - const response = await this.client.get<{ jobs: any[] }>('/jobs/poll'); + // Update worker statuses + await this.updateWorkerStatuses(); - const jobsByType = response.jobs.reduce( - (acc: Record, job: any) => { - acc[job.type] = (acc[job.type] || 0) + 1; - return acc; - }, - {}, - ); + // Get jobs with their IDs + const response = await this.client.get('/jobs/poll'); - console.log('Jobs by type:', jobsByType); - await this.adjustCapacity(jobsByType); + await this.adjustCapacity({ pollResponse: response.jobs }); } catch (error) { console.error('Error polling job queue:', error); } finally { @@ -64,6 +72,339 @@ export class CapacityManager { } } + private async updateWorkerStatuses() { + if (this.trackedWorkers.length === 0) return; + + try { + // Build set of distinct worker types from the tracked workers + const clusterArns = new Set(this.trackedWorkers.map(w => w.clusterArn)); + + // Create a Set to track which task ARNs were found in the API response + const foundTaskArns = new Set(); + + // Now loop through each worker type and figure out the cluster ARN/task ARNs + for (const clusterArn of clusterArns) { + const relevantWorkers = this.trackedWorkers.filter( + w => w.clusterArn === clusterArn, + ); + // Which task ARNs to fetch + const taskArns = relevantWorkers.map(w => w.taskArn); + + // Split into chunks if there are many tasks (ECS API has limits) + const chunkSize = 100; + for (let i = 0; i < taskArns.length; i += chunkSize) { + const chunk = taskArns.slice(i, i + chunkSize); + + const command = new DescribeTasksCommand({ + cluster: clusterArn, + tasks: chunk, + }); + + const response = await this.ecsClient.send(command); + + if (response.tasks) { + // Add all found task ARNs to our tracking set + response.tasks.forEach(task => { + if (task.taskArn) { + foundTaskArns.add(task.taskArn); + } + }); + + this.updateWorkerStatusesFromTasks(response.tasks); + } + + // Check if any tasks weren't found but were requested + // AWS ECS API returns info in response.failures for tasks that weren't found + if (response.failures && response.failures.length > 0) { + response.failures.forEach(failure => { + if (failure.arn && failure.reason === 'MISSING') { + console.log( + `Task ${failure.arn} not found in ECS, removing from tracked workers`, + ); + // We explicitly don't add this to foundTaskArns since it's missing + } + }); + } + } + } + + // Remove workers that weren't found in the API response + const previousCount = this.trackedWorkers.length; + this.trackedWorkers = this.trackedWorkers.filter(worker => { + // Keep workers that were found in the API response + return foundTaskArns.has(worker.taskArn); + }); + + const removedCount = previousCount - this.trackedWorkers.length; + if (removedCount > 0) { + console.log( + `Removed ${removedCount} workers that were not found in ECS`, + ); + } + } catch (error) { + console.error('Error updating worker statuses:', error); + } + } + + // Update worker statuses based on ECS task information + private updateWorkerStatusesFromTasks(tasks: Task[]) { + for (const task of tasks) { + if (!task.taskArn) continue; + + const worker = this.trackedWorkers.find(w => w.taskArn === task.taskArn); + if (!worker) continue; + + const lastStatus = task.lastStatus || ''; + let newStatus: 'PENDING' | 'RUNNING' | 'STOPPED'; + + // Map AWS ECS task statuses to our internal tracking statuses + if (['PROVISIONING', 'PENDING', 'ACTIVATING'].includes(lastStatus)) { + newStatus = 'PENDING'; + } else if (lastStatus === 'RUNNING') { + newStatus = 'RUNNING'; + } else if ( + [ + 'DEACTIVATING', + 'STOPPING', + 'STOPPED', + 'DEPROVISIONING', + 'DEPROVISIONED', + ].includes(lastStatus) + ) { + newStatus = 'STOPPED'; + } else { + // For any unexpected status, log it but don't change worker status + console.warn( + `Worker ${task.taskArn} has unknown status: ${lastStatus}`, + ); + continue; + } + + // Only log if status changed + if (worker.status !== newStatus) { + console.log( + `Worker ${task.taskArn} status changed: ${worker.status} -> ${newStatus}`, + ); + worker.status = newStatus; + } + } + + // Remove stopped workers after updating + this.trackedWorkers = this.trackedWorkers.filter( + worker => worker.status !== 'STOPPED', + ); + } + + /** + * For each job which is polling, determine how many workers we have running + * that can handle that type of job. Include pending tasks as part of the + * threshold. Observe cooldown period. + */ + private async adjustCapacity({ + pollResponse, + }: { + pollResponse: PollJobsResponse['jobs']; + }): Promise { + // Count pending jobs by type + const pendingByType: Record = Object.values( + JobType, + ).reduce>( + (current, acc) => { + current[acc] = 0; + return current; + }, + {} as Record, + ); + for (const job of pollResponse) { + // Increment count + pendingByType[job.type] += 1; + } + + // Determine how many workers are already tracked for each type of job + const workersByType: Record = Object.values( + JobType, + ).reduce>( + (current, acc) => { + current[acc] = 0; + return current; + }, + {} as Record, + ); + for (const worker of this.trackedWorkers) { + // Count once for each type TODO consider if this has implications for + // scaling - de emphasising workers which handle multiple jobs + for (const type of worker.jobTypes) { + workersByType[type] += 1; + } + } + + for (const jobType of Object.values(JobType)) { + const pending = pendingByType[jobType]; + const workers = workersByType[jobType]; + + const config = this.config.jobTypes[jobType as JobType]; + if (!config) { + console.warn(`No configuration found for job type: ${jobType}`); + continue; + } + + await this.adjustCapacityForType({ + jobType, + pending, + workers, + config, + }); + } + } + + /** + * Launches n jobs of the specified type/config + * + * Updates the worker tracking + */ + private async launchTask({ + count = 1, + jobType, + config, + }: { + count?: number; + jobType: JobType; + config: JobTypeConfig; + }) { + try { + let done = 0; + while (done < count) { + const now = Date.now(); + + // Get a random public subnet for this task + const subnet = await this.getRandomPublicSubnet(this.config.vpcId); + const command = new RunTaskCommand({ + cluster: config.clusterArn, + taskDefinition: config.taskDefinitionArn, + launchType: 'FARGATE', + count: 1, + networkConfiguration: { + awsvpcConfiguration: { + subnets: [subnet], + securityGroups: [config.securityGroup], + assignPublicIp: AssignPublicIp.ENABLED, + }, + }, + }); + + const result = await this.ecsClient.send(command); + + // If task was created successfully, track it + if ( + result.tasks && + result.tasks.length > 0 && + result.tasks[0].taskArn + ) { + this.lastScaleTime[jobType] = now; + this.trackedWorkers.push({ + clusterArn: config.clusterArn, + taskArn: result.tasks[0].taskArn, + startTime: new Date(), + jobTypes: this.config.jobTypes[jobType]?.jobTypes ?? [jobType], + status: 'PENDING', + }); + + console.log( + `Started new task ${result.tasks[0].taskArn} for job type: ${jobType}`, + ); + } + done += 1; + } + } catch (e) { + console.error('Failed to launch task(s). Exception: ' + e); + } + } + + /** + * Computes the optimal number of workers to handle pending jobs using a logarithmic scale. + * + * This function uses a logarithmic relationship between pending jobs and worker count, + * which provides diminishing returns as the number of jobs increases - appropriate for + * many distributed processing scenarios. + * + * @param pendingJobs - The number of jobs waiting to be processed + * @param sensitivity - Controls how aggressively to scale workers (higher = more workers) + * Recommended range: 1.0 (conservative) to 3.0 (aggressive) + * @param minWorkers - Minimum number of workers to maintain regardless of job count + * @param maxWorkers - Maximum number of workers allowed regardless of job count + * @param baseJobCount - Reference job count that maps to roughly 1×sensitivity workers + * (helps calibrate the scale for your specific workload) + * @returns The target number of workers as an integer + */ + private computeOptimalWorkers({ + pendingJobs, + sensitivity, + minWorkers, + maxWorkers, + baseJobCount, + }: { + pendingJobs: number; + sensitivity: number; + minWorkers: number; + maxWorkers: number; + baseJobCount: number; + }): number { + // Handle edge cases + if (pendingJobs <= 0) { + return minWorkers; + } + // Compute workers using logarithmic scaling + // The formula: sensitivity * log(pendingJobs/baseJobCount + 1) + minWorkers + // + // This gives us: + // - When pendingJobs = 0: minWorkers + // - When pendingJobs = baseJobCount: roughly minWorkers + sensitivity + // - As pendingJobs grows, workers increase logarithmically + const computedWorkers = + sensitivity * Math.log(pendingJobs / baseJobCount + 1) + minWorkers; + + // Round to nearest integer and enforce bounds + return Math.min( + Math.max(Math.round(computedWorkers), minWorkers), + maxWorkers, + ); + } + + private async adjustCapacityForType({ + jobType, + pending, + workers, + config, + }: { + jobType: JobType; + pending: number; + workers: number; + config: JobTypeConfig; + }) { + const now = Date.now(); + const lastScale = this.lastScaleTime[jobType] || 0; + + // Check cooldown + if (now - lastScale < config.scaling.cooldownSeconds * 1000) { + console.log(`Still in cooldown period for ${jobType}`); + return; + } + + // Determine the ideal number of workers + const idealTarget = this.computeOptimalWorkers({ + pendingJobs: pending, + sensitivity: config.scaling.sensitivity, + minWorkers: config.scaling.min, + maxWorkers: config.scaling.max, + baseJobCount: config.scaling.factor, + }); + const diff = idealTarget - workers; + if (diff > 0) { + console.info(`Launching ${diff} tasks of type: ${jobType}.`); + this.launchTask({ count: diff, jobType, config }); + } + } + public start() { if (this.isRunning) return; @@ -92,18 +433,6 @@ export class CapacityManager { } } - private async adjustCapacity(jobsByType: Record) { - for (const [jobType, pendingCount] of Object.entries(jobsByType)) { - const config = this.config.jobTypes[jobType as JobType]; - if (!config) { - console.warn(`No configuration found for job type: ${jobType}`); - continue; - } - - await this.adjustCapacityForType(jobType, pendingCount, config); - } - } - private async getRandomPublicSubnet(vpcId: string): Promise { try { const command = new DescribeSubnetsCommand({ @@ -139,48 +468,4 @@ export class CapacityManager { throw error; } } - - private async adjustCapacityForType( - jobType: string, - pendingCount: number, - config: JobTypeConfig, - ) { - const now = Date.now(); - const lastScale = this.lastScaleTime[jobType] || 0; - - // Check cooldown - if (now - lastScale < config.cooldownSeconds * 1000) { - console.log(`Still in cooldown period for ${jobType}`); - return; - } - - // Only scale up if we have more pending jobs than our threshold - if (pendingCount >= config.scaleUpThreshold) { - try { - // Get a random public subnet for this task - const subnet = await this.getRandomPublicSubnet(this.config.vpcId); - - const command = new RunTaskCommand({ - cluster: config.clusterArn, - taskDefinition: config.taskDefinitionArn, - launchType: 'FARGATE', - count: 1, // Start one task at a time - networkConfiguration: { - awsvpcConfiguration: { - subnets: [subnet], - securityGroups: [config.securityGroup], - assignPublicIp: AssignPublicIp.ENABLED, - }, - }, - }); - - await this.ecsClient.send(command); - this.lastScaleTime[jobType] = now; - - console.log(`Started new task for ${jobType}`); - } catch (error) { - console.error(`Error starting task for ${jobType}:`, error); - } - } - } } From 50720545c11093e997c22e80323c7e1b40c0975d Mon Sep 17 00:00:00 2001 From: Peter Baker Date: Mon, 19 May 2025 14:08:06 +1000 Subject: [PATCH 02/12] Improving validation Signed-off-by: Peter Baker --- src/job-manager/src/config.ts | 67 ++++++++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 12 deletions(-) diff --git a/src/job-manager/src/config.ts b/src/job-manager/src/config.ts index 74ba587..88d76c4 100644 --- a/src/job-manager/src/config.ts +++ b/src/job-manager/src/config.ts @@ -1,20 +1,63 @@ import { JobType } from '@prisma/client'; import { z } from 'zod'; +// Helper function to create a number validator that also accepts string inputs +const createNumberValidator = ( + min: number | null = null, + errorMessage: string = 'Value must be a valid number', + minErrorMessage: string = `Value must be at least ${min}`, +) => { + return z.union([ + min !== null ? z.number().min(min, minErrorMessage) : z.number(), + z.string().transform((val, ctx) => { + const parsed = Number(val); + if (isNaN(parsed)) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: errorMessage, + }); + return z.NEVER; + } + if (min !== null && parsed < min) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: minErrorMessage, + }); + return z.NEVER; + } + return parsed; + }), + ]); +}; + export const ScalingConfiguration = z.object({ - min: z.number().min(0, 'Minimum capacity must be non-negative'), - max: z.number().min(0, 'Maximum capacity must be non-negative'), - sensitivity: z - .number() - .min(1, 'Logarithmic sensitivity - see scaling algorithm'), - factor: z - .number() - .min( - 1, - 'Division factor for jobs - this allows you to consider different job count scales.', - ), - cooldownSeconds: z.number().min(0, 'Cooldown seconds must be non-negative'), + min: createNumberValidator( + 0, + 'Minimum capacity must be a valid number', + 'Minimum capacity must be non-negative', + ), + max: createNumberValidator( + 0, + 'Maximum capacity must be a valid number', + 'Maximum capacity must be non-negative', + ), + sensitivity: createNumberValidator( + 0, + 'Sensitivity must be a valid number', + 'Logarithmic sensitivity must be non-negative', + ), + factor: createNumberValidator( + 1, + 'Factor must be a valid number', + 'Division factor for jobs - this allows you to consider different job count scales. Must be > 1.', + ), + cooldownSeconds: createNumberValidator( + 0, + 'Cooldown seconds must be a valid number', + 'Cooldown seconds must be non-negative', + ), }); + // Configuration schema for job types and their corresponding ECS resources export const JobTypeConfigSchema = z.object({ taskDefinitionArn: z.string().min(1, 'Task Definition ARN is required'), From 86c071f24543a60ad1394827b7673d414bb8f7aa Mon Sep 17 00:00:00 2001 From: Peter Baker Date: Mon, 19 May 2025 15:31:01 +1000 Subject: [PATCH 03/12] Logging docs, improvements based on using task definitions rather than task types to track counts Signed-off-by: Peter Baker --- package-lock.json | 215 +++++++++++++- package.json | 5 +- src/infra/components/jobs.ts | 2 + src/job-manager/src/authClient.ts | 117 +++++++- src/job-manager/src/config.ts | 91 ++++-- src/job-manager/src/index.ts | 53 +++- src/job-manager/src/logging.ts | 47 ++++ src/job-manager/src/manager.ts | 454 ++++++++++++++++++++++++------ 8 files changed, 853 insertions(+), 131 deletions(-) create mode 100644 src/job-manager/src/logging.ts diff --git a/package-lock.json b/package-lock.json index 58f1f02..8267516 100644 --- a/package-lock.json +++ b/package-lock.json @@ -33,6 +33,7 @@ "passport": "^0.7.0", "passport-jwt": "^4.0.1", "source-map-support": "^0.5.21", + "winston": "^3.17.0", "zod": "^3.23.8", "zod-express-middleware": "^1.4.0" }, @@ -3830,6 +3831,14 @@ "node": ">=12" } }, + "node_modules/@colors/colors": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/@colors/colors/-/colors-1.6.0.tgz", + "integrity": "sha512-Ir+AOibqzrIsL6ajt3Rz3LskB7OiMVHqltZmspbW/TJuTVuyOMirVqAkjfY6JISiLHgyNqicAC8AyHHGzNd/dA==", + "engines": { + "node": ">=0.1.90" + } + }, "node_modules/@cspotcode/source-map-support": { "version": "0.8.1", "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.8.1.tgz", @@ -3852,6 +3861,16 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "node_modules/@dabh/diagnostics": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/@dabh/diagnostics/-/diagnostics-2.0.3.tgz", + "integrity": "sha512-hrlQOIi7hAfzsMqlGSFyVucrx38O+j6wiGOf//H2ecvIEqYN4ADBSS2iLMh5UFyDunCNniUIPk/q3riFv45xRA==", + "dependencies": { + "colorspace": "1.1.x", + "enabled": "2.0.x", + "kuler": "^2.0.0" + } + }, "node_modules/@discoveryjs/json-ext": { "version": "0.5.7", "resolved": "https://registry.npmjs.org/@discoveryjs/json-ext/-/json-ext-0.5.7.tgz", @@ -5958,6 +5977,11 @@ "@types/superagent": "^8.1.0" } }, + "node_modules/@types/triple-beam": { + "version": "1.3.5", + "resolved": "https://registry.npmjs.org/@types/triple-beam/-/triple-beam-1.3.5.tgz", + "integrity": "sha512-6WaYesThRMCl19iryMYP7/x2OVgCtbIVflDGFpWnb9irXI3UjYE4AzmYuiUKY1AJstGijoY+MgUszMgRxIYTYw==" + }, "node_modules/@types/uuid": { "version": "9.0.8", "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.8.tgz", @@ -6700,7 +6724,6 @@ "version": "3.2.5", "resolved": "https://registry.npmjs.org/async/-/async-3.2.5.tgz", "integrity": "sha512-baNZyqaaLhyLVKm/DlvdW051MSgO6b8eVfIezl9E5PqWxFgzLm/wQntEW4zOytVburDEr0JlALEpdOFwvErLsg==", - "dev": true, "license": "MIT" }, "node_modules/asynckit": { @@ -7615,6 +7638,15 @@ "integrity": "sha512-lHl4d5/ONEbLlJvaJNtsF/Lz+WvB07u2ycqTYbdrq7UypDXailES4valYb2eWiJFxZlVmpGekfqoxQhzyFdT4Q==", "dev": true }, + "node_modules/color": { + "version": "3.2.1", + "resolved": "https://registry.npmjs.org/color/-/color-3.2.1.tgz", + "integrity": "sha512-aBl7dZI9ENN6fUGC7mWpMTPNHmWUSNan9tuWN6ahh5ZLNk9baLJOnSMlrQkHcrfFgz2/RigjUVAjdx36VcemKA==", + "dependencies": { + "color-convert": "^1.9.3", + "color-string": "^1.6.0" + } + }, "node_modules/color-convert": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", @@ -7630,8 +7662,29 @@ "node_modules/color-name": { "version": "1.1.4", "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", - "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==", - "dev": true + "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==" + }, + "node_modules/color-string": { + "version": "1.9.1", + "resolved": "https://registry.npmjs.org/color-string/-/color-string-1.9.1.tgz", + "integrity": "sha512-shrVawQFojnZv6xM40anx4CkoDP+fZsw/ZerEMsW/pyzsRbElpsL/DBVW7q3ExxwusdNXI3lXpuhEZkzs8p5Eg==", + "dependencies": { + "color-name": "^1.0.0", + "simple-swizzle": "^0.2.2" + } + }, + "node_modules/color/node_modules/color-convert": { + "version": "1.9.3", + "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz", + "integrity": "sha512-QfAUtd+vFdAtFQcC8CCyYt1fYWxSqAiK2cSD6zDB8N3cpsEBAvRxp9zOGg6G/SHHJYAT88/az/IuDGALsNVbGg==", + "dependencies": { + "color-name": "1.1.3" + } + }, + "node_modules/color/node_modules/color-name": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.3.tgz", + "integrity": "sha512-72fSenhMw2HZMTVHeCA9KCmpEIbzWiQsjN+BHcBbS9vr1mtt+vJjPdksIBNUmKAW8TFUDPJK5SUU3QhE9NEXDw==" }, "node_modules/colorette": { "version": "2.0.20", @@ -7648,6 +7701,15 @@ "node": ">=0.1.90" } }, + "node_modules/colorspace": { + "version": "1.1.4", + "resolved": "https://registry.npmjs.org/colorspace/-/colorspace-1.1.4.tgz", + "integrity": "sha512-BgvKJiuVu1igBUF2kEjRCZXol6wiiGbY5ipL/oVPwm0BL9sIpMIzM8IK7vwuxIIzOXMV3Ey5w+vxhm0rR/TN8w==", + "dependencies": { + "color": "^3.1.3", + "text-hex": "1.0.x" + } + }, "node_modules/combined-stream": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", @@ -8039,6 +8101,11 @@ "integrity": "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==", "dev": true }, + "node_modules/enabled": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/enabled/-/enabled-2.0.0.tgz", + "integrity": "sha512-AKrN98kuwOzMIdAizXGI86UFBoo26CL21UM763y1h/GMSJ4/OHU9k2YlsmBpyScFo/wbLzWQJBMCW4+IO3/+OQ==" + }, "node_modules/encodeurl": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/encodeurl/-/encodeurl-2.0.0.tgz", @@ -8843,6 +8910,11 @@ "bser": "2.1.1" } }, + "node_modules/fecha": { + "version": "4.2.3", + "resolved": "https://registry.npmjs.org/fecha/-/fecha-4.2.3.tgz", + "integrity": "sha512-OP2IUU6HeYKJi3i0z4A19kHMQoLVs4Hc+DPqqxI2h/DPZHTm/vjsfC6P0b4jCMy14XizLBqvndQ+UilD7707Jw==" + }, "node_modules/file-entry-cache": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.1.tgz", @@ -8978,6 +9050,11 @@ "integrity": "sha512-5nqDSxl8nn5BSNxyR3n4I6eDmbolI6WT+QqR547RwxQapgjQBmtktdP+HTBb/a/zLsbzERTONyUB5pefh5TtjQ==", "dev": true }, + "node_modules/fn.name": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/fn.name/-/fn.name-1.1.0.tgz", + "integrity": "sha512-GRnmB5gPyJpAhTQdSZTSp9uaPSvl09KoYcMQtsB9rQoOmzs9dH6ffeccH+Z+cv6P68Hu5bC6JjRh4Ah/mHSNRw==" + }, "node_modules/follow-redirects": { "version": "1.15.9", "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.9.tgz", @@ -9825,7 +9902,6 @@ "version": "2.0.1", "resolved": "https://registry.npmjs.org/is-stream/-/is-stream-2.0.1.tgz", "integrity": "sha512-hFoiJiTl63nn+kstHGBtewWSKnQLpyb155KHheA1l39uvtO9nWIop1p3udqPcUd/xbF1VLMO4n7OI6p7RbngDg==", - "dev": true, "engines": { "node": ">=8" }, @@ -10707,6 +10783,11 @@ "node": ">=6" } }, + "node_modules/kuler": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/kuler/-/kuler-2.0.0.tgz", + "integrity": "sha512-Xq9nH7KlWZmXAtodXDDRE7vs6DU1gTU8zYDHDiWLSip45Egwq3plLHzPn27NgvzL2r1LMPC1vdqh98sQxtqj4A==" + }, "node_modules/leven": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/leven/-/leven-3.1.0.tgz", @@ -10813,6 +10894,22 @@ "resolved": "https://registry.npmjs.org/lodash.once/-/lodash.once-4.1.1.tgz", "integrity": "sha512-Sb487aTOCr9drQVL8pIxOzVhafOjZN9UU54hiN8PU3uAiSV7lx1yYNpbNmex2PK6dSJoNTSJUUswT651yww3Mg==" }, + "node_modules/logform": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/logform/-/logform-2.7.0.tgz", + "integrity": "sha512-TFYA4jnP7PVbmlBIfhlSe+WKxs9dklXMTEGcBCIvLhE/Tn3H6Gk1norupVW7m5Cnd4bLcr08AytbyV/xj7f/kQ==", + "dependencies": { + "@colors/colors": "1.6.0", + "@types/triple-beam": "^1.3.2", + "fecha": "^4.2.0", + "ms": "^2.1.1", + "safe-stable-stringify": "^2.3.1", + "triple-beam": "^1.3.0" + }, + "engines": { + "node": ">= 12.0.0" + } + }, "node_modules/lru-cache": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-5.1.1.tgz", @@ -11230,6 +11327,14 @@ "wrappy": "1" } }, + "node_modules/one-time": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/one-time/-/one-time-1.0.0.tgz", + "integrity": "sha512-5DXOiRKwuSEcQ/l0kGCF6Q3jcADFv5tSmRaJck/OqkVFcOzutB134KRSfF0xDrL39MNnqxbHBbUUcjZIhTgb2g==", + "dependencies": { + "fn.name": "1.x.x" + } + }, "node_modules/onetime": { "version": "5.1.2", "resolved": "https://registry.npmjs.org/onetime/-/onetime-5.1.2.tgz", @@ -11747,6 +11852,19 @@ "integrity": "sha512-xWGDIW6x921xtzPkhiULtthJHoJvBbF3q26fzloPCK0hsvxtPVelvftw3zjbHWSkR2km9Z+4uxbDDK/6Zw9B8w==", "dev": true }, + "node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/readdirp": { "version": "3.6.0", "resolved": "https://registry.npmjs.org/readdirp/-/readdirp-3.6.0.tgz", @@ -11952,6 +12070,14 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/safe-stable-stringify": { + "version": "2.5.0", + "resolved": "https://registry.npmjs.org/safe-stable-stringify/-/safe-stable-stringify-2.5.0.tgz", + "integrity": "sha512-b3rppTKm9T+PsVCBEOUR46GWI7fdOs00VKZ1+9c1EWDaDMvjQc6tUwuFyIprgGgTcWoVHSKrU8H31ZHA2e0RHA==", + "engines": { + "node": ">=10" + } + }, "node_modules/safer-buffer": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", @@ -12187,6 +12313,19 @@ "integrity": "sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ==", "dev": true }, + "node_modules/simple-swizzle": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/simple-swizzle/-/simple-swizzle-0.2.2.tgz", + "integrity": "sha512-JA//kQgZtbuY83m+xT+tXJkmJncGMTFT+C+g2h2R9uxkYIrE2yy9sgmcLhCnw57/WSD+Eh3J97FPEDFnbXnDUg==", + "dependencies": { + "is-arrayish": "^0.3.1" + } + }, + "node_modules/simple-swizzle/node_modules/is-arrayish": { + "version": "0.3.2", + "resolved": "https://registry.npmjs.org/is-arrayish/-/is-arrayish-0.3.2.tgz", + "integrity": "sha512-eVRqCvVlZbuw3GrM63ovNSNAeA1K16kaR/LRY/92w0zxQ5/1YzwblUX652i4Xs9RwAGjW9d9y6X88t8OaAJfWQ==" + }, "node_modules/sisteransi": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/sisteransi/-/sisteransi-1.0.5.tgz", @@ -12225,6 +12364,14 @@ "integrity": "sha512-D9cPgkvLlV3t3IzL0D0YLvGA9Ahk4PcvVwUbN0dSGr1aP0Nrt4AEnTUbuGvquEC0mA64Gqt1fzirlRs5ibXx8g==", "dev": true }, + "node_modules/stack-trace": { + "version": "0.0.10", + "resolved": "https://registry.npmjs.org/stack-trace/-/stack-trace-0.0.10.tgz", + "integrity": "sha512-KGzahc7puUKkzyMt+IqAep+TVNbKP+k2Lmwhub39m1AsTSkaDutx56aDCo+HLDzf/D26BIHTJWNiTG1KAJiQCg==", + "engines": { + "node": "*" + } + }, "node_modules/stack-utils": { "version": "2.0.6", "resolved": "https://registry.npmjs.org/stack-utils/-/stack-utils-2.0.6.tgz", @@ -12255,6 +12402,14 @@ "node": ">= 0.8" } }, + "node_modules/string_decoder": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", + "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", + "dependencies": { + "safe-buffer": "~5.2.0" + } + }, "node_modules/string-length": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/string-length/-/string-length-4.0.2.tgz", @@ -12590,6 +12745,11 @@ "node": ">=8" } }, + "node_modules/text-hex": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/text-hex/-/text-hex-1.0.0.tgz", + "integrity": "sha512-uuVGNWzgJ4yhRaNSiubPY7OjISw4sw4E5Uv0wbjp+OzcbmVU/rsT8ujgcXJhn9ypzsgr5vlzpPqP+MBBKcGvbg==" + }, "node_modules/text-table": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/text-table/-/text-table-0.2.0.tgz", @@ -12642,6 +12802,14 @@ "tree-kill": "cli.js" } }, + "node_modules/triple-beam": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/triple-beam/-/triple-beam-1.4.1.tgz", + "integrity": "sha512-aZbgViZrg1QNcG+LULa7nhZpJTZSLm/mXnHXnbAbjmN5aSa0y7V+wvv6+4WaBtpISJzThKy+PIPxc1Nq1EJ9mg==", + "engines": { + "node": ">= 14.0.0" + } + }, "node_modules/truncate-utf8-bytes": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/truncate-utf8-bytes/-/truncate-utf8-bytes-1.0.2.tgz", @@ -13120,6 +13288,11 @@ "integrity": "sha512-Xn0w3MtiQ6zoz2vFyUVruaCL53O/DwUvkEeOvj+uulMm0BkUGYWmBYVyElqZaSLhY6ZD0ulfU3aBra2aVT4xfA==", "dev": true }, + "node_modules/util-deprecate": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", + "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==" + }, "node_modules/utils-merge": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/utils-merge/-/utils-merge-1.0.1.tgz", @@ -13415,6 +13588,40 @@ "integrity": "sha512-CC1bOL87PIWSBhDcTrdeLo6eGT7mCFtrg0uIJtqJUFyK+eJnzl8A1niH56uu7KMa5XFrtiV+AQuHO3n7DsHnLQ==", "dev": true }, + "node_modules/winston": { + "version": "3.17.0", + "resolved": "https://registry.npmjs.org/winston/-/winston-3.17.0.tgz", + "integrity": "sha512-DLiFIXYC5fMPxaRg832S6F5mJYvePtmO5G9v9IgUFPhXm9/GkXarH/TUrBAVzhTCzAj9anE/+GjrgXp/54nOgw==", + "dependencies": { + "@colors/colors": "^1.6.0", + "@dabh/diagnostics": "^2.0.2", + "async": "^3.2.3", + "is-stream": "^2.0.0", + "logform": "^2.7.0", + "one-time": "^1.0.0", + "readable-stream": "^3.4.0", + "safe-stable-stringify": "^2.3.1", + "stack-trace": "0.0.x", + "triple-beam": "^1.3.0", + "winston-transport": "^4.9.0" + }, + "engines": { + "node": ">= 12.0.0" + } + }, + "node_modules/winston-transport": { + "version": "4.9.0", + "resolved": "https://registry.npmjs.org/winston-transport/-/winston-transport-4.9.0.tgz", + "integrity": "sha512-8drMJ4rkgaPo1Me4zD/3WLfI/zPdA9o2IipKODunnGDcuqbHwjsbB79ylv04LCGGzU0xQ6vTznOMpQGaLhhm6A==", + "dependencies": { + "logform": "^2.7.0", + "readable-stream": "^3.6.2", + "triple-beam": "^1.3.0" + }, + "engines": { + "node": ">= 12.0.0" + } + }, "node_modules/word-wrap": { "version": "1.2.5", "resolved": "https://registry.npmjs.org/word-wrap/-/word-wrap-1.2.5.tgz", diff --git a/package.json b/package.json index f75259c..88f0a05 100644 --- a/package.json +++ b/package.json @@ -23,8 +23,8 @@ "format:write": "prettier --write \"src/**/*.{ts,js,json,html,scss,md}\" \"!**/node_modules/**\" \"!**/dist/**\"", "dev-manager": "env-cmd -f src/job-manager/.env ts-node src/job-manager/src/index.ts", "dev-worker": "env-cmd -f src/example-worker/.env ts-node src/example-worker/src/index.ts", - "start-manager": "ts-node src/job-manager/src/index.ts", - "start-worker": "ts-node src/example-worker/src/index.ts" + "start-manager": "ts-node -T src/job-manager/src/index.ts", + "start-worker": "ts-node -T src/example-worker/src/index.ts" }, "keywords": [], "author": "AIMS", @@ -54,6 +54,7 @@ "passport": "^0.7.0", "passport-jwt": "^4.0.1", "source-map-support": "^0.5.21", + "winston": "^3.17.0", "zod": "^3.23.8", "zod-express-middleware": "^1.4.0" }, diff --git a/src/infra/components/jobs.ts b/src/infra/components/jobs.ts index 47d9cba..ef2bdfb 100644 --- a/src/infra/components/jobs.ts +++ b/src/infra/components/jobs.ts @@ -301,6 +301,8 @@ export class JobSystem extends Construct { AWS_REGION: Stack.of(this).region, // Which vpc to deploy into VPC_ID: props.vpc.vpcId, + // Log level for manager + LOG_LEVEL: 'debug', }, // pass in the manager creds secrets: { diff --git a/src/job-manager/src/authClient.ts b/src/job-manager/src/authClient.ts index 0eb7358..9e494ef 100644 --- a/src/job-manager/src/authClient.ts +++ b/src/job-manager/src/authClient.ts @@ -1,25 +1,52 @@ import axios, { AxiosInstance, AxiosRequestConfig } from 'axios'; import { jwtDecode } from 'jwt-decode'; +import { logger } from './logging'; +/** + * Interface for authentication credentials + */ interface Credentials { + /** User email for authentication */ email: string; + /** User password for authentication */ password: string; } +/** + * Interface for JWT authentication tokens + */ interface AuthTokens { + /** Access token for API authorization */ token: string; + /** Refresh token for obtaining new access tokens */ refreshToken?: string; } +/** + * Interface for decoded JWT payload structure + */ interface JWTPayload { + /** User ID */ id: string; + /** User email */ email: string; + /** User roles/permissions */ roles: string[]; + /** Token expiration timestamp */ exp: number; } -// Generic error type for API errors +/** + * Custom error class for API-related errors + * Includes status code and original response for better error handling + */ export class ApiError extends Error { + /** + * Creates a new API error + * @param message - Error message + * @param statusCode - HTTP status code + * @param response - Optional original response object + */ constructor( message: string, public statusCode: number, @@ -30,15 +57,21 @@ export class ApiError extends Error { } } +/** + * Client for authenticated API requests + * Handles login, token refresh, and authenticated HTTP requests + */ export class AuthApiClient { private axiosInstance: AxiosInstance; - private credentials: Credentials; - private tokens: AuthTokens | null = null; - private readonly TOKEN_REFRESH_THRESHOLD = 60; // 1 minute in seconds + /** + * Creates a new authenticated API client + * @param baseURL - Base URL for the API + * @param credentials - Authentication credentials + */ constructor(baseURL: string, credentials: Credentials) { this.credentials = credentials; this.axiosInstance = axios.create({ @@ -48,6 +81,8 @@ export class AuthApiClient { }, }); + logger.debug('AuthApiClient initialized', { baseURL }); + // Add request interceptor to handle token management this.axiosInstance.interceptors.request.use( async config => { @@ -70,8 +105,14 @@ export class AuthApiClient { ); } + /** + * Gets a valid token, refreshing or logging in if necessary + * @returns A valid JWT token or null if authentication failed + * @private + */ private async getValidToken(): Promise { if (!this.tokens?.token) { + logger.debug('No token available, initiating login'); await this.login(); return this.tokens?.token || null; } @@ -80,29 +121,43 @@ export class AuthApiClient { const expiresIn = decodedToken.exp - Math.floor(Date.now() / 1000); if (expiresIn <= this.TOKEN_REFRESH_THRESHOLD) { + logger.debug(`Token expires in ${expiresIn}s, refreshing`); await this.refreshToken(); } return this.tokens?.token || null; } + /** + * Authenticates with the API using provided credentials + * @throws Error if login fails + * @private + */ private async login(): Promise { try { + logger.info('Logging in to API'); const response = await this.axiosInstance.post( '/auth/login', this.credentials, ); this.tokens = response.data; + logger.debug('Login successful, token received'); } catch (error) { + logger.error('Failed to login', { error }); throw new Error('Failed to login'); } } + /** + * Refreshes the access token using the refresh token + * Falls back to login if refresh fails + * @private + */ private async refreshToken(): Promise { - console.log('Token refresh started at:', new Date().toISOString()); + logger.info('Token refresh started at:', new Date().toISOString()); try { if (!this.tokens?.refreshToken) { - console.log('No refresh token, logging in...'); + logger.warn('No refresh token available, falling back to login'); await this.login(); return; } @@ -115,9 +170,9 @@ export class AuthApiClient { ); if (response.status !== 200) { - console.log( - 'Non 200 response from refresh token endpoint.', - response.status, + logger.warn( + 'Non 200 response from refresh token endpoint', + { status: response.status } ); throw new Error('Non 200 response from refresh token.'); } @@ -126,50 +181,88 @@ export class AuthApiClient { ...this.tokens, token: response.data.token, }; + logger.debug('Token refreshed successfully'); } catch (error) { - console.log('Error caught during refresh'); + logger.error('Error during token refresh, falling back to login', { error }); // If refresh fails, try logging in again this.tokens = null; // awaiting login await this.login(); } - console.log('Token refresh completed at:', new Date().toISOString()); + logger.info('Token refresh completed at:', new Date().toISOString()); } - // Base HTTP methods with proper typing + /** + * Performs a GET request to the API + * @param url - Endpoint URL (relative to base URL) + * @param config - Optional Axios request configuration + * @returns Response data + */ public async get(url: string, config?: AxiosRequestConfig): Promise { + logger.debug('GET request', { url }); const response = await this.axiosInstance.get(url, config); return response.data; } + /** + * Performs a POST request to the API + * @param url - Endpoint URL (relative to base URL) + * @param data - Request body data + * @param config - Optional Axios request configuration + * @returns Response data + */ public async post( url: string, data?: any, config?: AxiosRequestConfig, ): Promise { + logger.debug('POST request', { url }); const response = await this.axiosInstance.post(url, data, config); return response.data; } + /** + * Performs a PUT request to the API + * @param url - Endpoint URL (relative to base URL) + * @param data - Request body data + * @param config - Optional Axios request configuration + * @returns Response data + */ public async put( url: string, data?: any, config?: AxiosRequestConfig, ): Promise { + logger.debug('PUT request', { url }); const response = await this.axiosInstance.put(url, data, config); return response.data; } + /** + * Performs a PATCH request to the API + * @param url - Endpoint URL (relative to base URL) + * @param data - Request body data + * @param config - Optional Axios request configuration + * @returns Response data + */ public async patch( url: string, data?: any, config?: AxiosRequestConfig, ): Promise { + logger.debug('PATCH request', { url }); const response = await this.axiosInstance.patch(url, data, config); return response.data; } + /** + * Performs a DELETE request to the API + * @param url - Endpoint URL (relative to base URL) + * @param config - Optional Axios request configuration + * @returns Response data + */ public async delete(url: string, config?: AxiosRequestConfig): Promise { + logger.debug('DELETE request', { url }); const response = await this.axiosInstance.delete(url, config); return response.data; } diff --git a/src/job-manager/src/config.ts b/src/job-manager/src/config.ts index 88d76c4..174458d 100644 --- a/src/job-manager/src/config.ts +++ b/src/job-manager/src/config.ts @@ -1,7 +1,16 @@ import { JobType } from '@prisma/client'; import { z } from 'zod'; +import { logger } from './logging'; -// Helper function to create a number validator that also accepts string inputs +/** + * Helper function to create a number validator that also accepts string inputs + * Ensures values meet minimum requirements and handles type conversion + * + * @param min - Minimum allowed value (null for no minimum) + * @param errorMessage - Error message for invalid numbers + * @param minErrorMessage - Error message for values below minimum + * @returns A Zod validator that accepts both numbers and strings + */ const createNumberValidator = ( min: number | null = null, errorMessage: string = 'Value must be a valid number', @@ -30,6 +39,10 @@ const createNumberValidator = ( ]); }; +/** + * Schema for scaling configuration + * Defines how the capacity manager should scale resources + */ export const ScalingConfiguration = z.object({ min: createNumberValidator( 0, @@ -58,8 +71,11 @@ export const ScalingConfiguration = z.object({ ), }); -// Configuration schema for job types and their corresponding ECS resources -export const JobTypeConfigSchema = z.object({ +/** + * Configuration schema for job types and their corresponding ECS resources + * Defines the AWS resources and scaling parameters for each job type + */ +export const RawJobTypeConfigSchema = z.object({ taskDefinitionArn: z.string().min(1, 'Task Definition ARN is required'), clusterArn: z.string().min(1, 'Cluster ARN is required'), scaling: ScalingConfiguration, @@ -67,11 +83,18 @@ export const JobTypeConfigSchema = z.object({ securityGroup: z.string().min(1, 'Security group ARN is required'), }); -export type JobTypeConfig = z.infer; +export type RawJobTypeConfig = z.infer; -// Base configuration schema (not job-type specific) +/** + * Base configuration schema for environment variables + * Validates core application settings from environment + */ export const BaseEnvConfigSchema = z.object({ - POLL_INTERVAL_MS: z.string().transform(val => parseInt(val)), + POLL_INTERVAL_MS: createNumberValidator( + 500, + 'Poll interval expects valid number', + 'Minimum poll interval is 500(ms)', + ), API_ENDPOINT: z.string().url('API endpoint must be a valid URL'), AWS_REGION: z.string().min(1, 'AWS region is required'), API_USERNAME: z.string().min(1, 'API username is required'), @@ -79,15 +102,23 @@ export const BaseEnvConfigSchema = z.object({ VPC_ID: z.string().min(1, 'VPC ID is required'), }); -// Final configuration schema structure +export const JobTypeConfigSchema = RawJobTypeConfigSchema.extend({ + jobTypes: z.array(z.nativeEnum(JobType)), +}); +export type JobTypeConfig = z.infer; +/** + * Final configuration schema structure + * Combines all configuration elements into a complete application config + */ export const ConfigSchema = z.object({ - pollIntervalMs: z.number().min(1000, 'Poll interval must be at least 1000ms'), + pollIntervalMs: createNumberValidator( + 500, + 'Poll interval expects valid number', + 'Minimum poll interval is 500(ms)', + ), apiEndpoint: z.string().url('API endpoint must be a valid URL'), region: z.string().min(1, 'AWS region is required'), - jobTypes: z.record( - z.nativeEnum(JobType), - JobTypeConfigSchema.extend({ jobTypes: z.array(z.nativeEnum(JobType)) }), - ), + jobTypes: z.record(z.nativeEnum(JobType), JobTypeConfigSchema), auth: z.object({ email: z.string().min(1, 'Email is required'), password: z.string().min(1, 'Password is required'), @@ -99,15 +130,18 @@ export type Config = z.infer; /** * Builds job type configuration from environment variables + * Extracts and validates settings for a specific job type * - * @param env Validated environment variables - * @param jobType The job type to build configuration for + * @param env - Validated environment variables + * @param jobType - The job type to build configuration for * @returns Configuration for the specified job type */ function buildJobTypeConfig( env: Record, jobType: string, -): JobTypeConfig { +): RawJobTypeConfig { + logger.debug(`Building config for job type: ${jobType}`); + const optimisticParse = { taskDefinitionArn: env[`${jobType}_TASK_DEF`] as string, clusterArn: env[`${jobType}_CLUSTER`] as string, @@ -119,12 +153,16 @@ function buildJobTypeConfig( factor: env[`${jobType}_FACTOR`] as number, }, securityGroup: env[`${jobType}_SECURITY_GROUP`] as string, - } satisfies JobTypeConfig; + } satisfies RawJobTypeConfig; + try { - return JobTypeConfigSchema.parse(optimisticParse); + const validatedConfig = RawJobTypeConfigSchema.parse(optimisticParse); + logger.debug(`Validated config for job type: ${jobType}`); + return validatedConfig; } catch (e) { - console.error( - `Job type ${jobType} did not have valid environment variables. Error: ${e}.`, + logger.error( + `Job type ${jobType} did not have valid environment variables`, + { error: e }, ); throw e; } @@ -138,18 +176,21 @@ function buildJobTypeConfig( * @throws Error if configuration validation fails */ export function loadConfig(): Config { + logger.info('Loading application configuration'); + try { // Force types here as we zod process everything! const env = process.env as Record; // Initialize job types configuration object const jobTypesConfig: Record< string, - JobTypeConfig & { jobTypes?: JobType[] } + RawJobTypeConfig & { jobTypes?: JobType[] } > = {}; // Build configuration for each job type Object.values(JobType).forEach(jobType => { const typeString = jobType.toString(); + logger.debug(`Processing job type: ${typeString}`); jobTypesConfig[typeString] = buildJobTypeConfig( env as Record, typeString, @@ -157,6 +198,7 @@ export function loadConfig(): Config { }); // Group the job types by task ARN + logger.debug('Grouping job types by task ARN'); const arnToTypes: Map = new Map(); for (const [jobType, config] of Object.entries(jobTypesConfig)) { arnToTypes.set( @@ -186,15 +228,22 @@ export function loadConfig(): Config { }; // Validate the entire config object - return ConfigSchema.parse(config); + logger.debug('Validating complete configuration'); + const validatedConfig = ConfigSchema.parse(config); + logger.info('Configuration successfully loaded and validated'); + return validatedConfig; } catch (error) { if (error instanceof z.ZodError) { // Format Zod validation errors for better readability const formattedErrors = error.errors .map(err => `${err.path.join('.')}: ${err.message}`) .join('\n'); + logger.error('Configuration validation failed', { + errors: formattedErrors, + }); throw new Error(`Configuration validation failed:\n${formattedErrors}`); } + logger.error('Failed to load configuration', { error }); throw new Error(`Failed to load configuration: ${error}`); } } diff --git a/src/job-manager/src/index.ts b/src/job-manager/src/index.ts index b11a0a2..12d47a9 100755 --- a/src/job-manager/src/index.ts +++ b/src/job-manager/src/index.ts @@ -3,31 +3,45 @@ import { z } from 'zod'; import { Config, loadConfig } from './config'; import { CapacityManager } from './manager'; import { AuthApiClient } from './authClient'; +import { logger } from './logging'; + +/** + * Main entry point for the Capacity Manager service + * Sets up the health check endpoint, loads configuration, + * and initializes the capacity manager. + */ // Create and start the express app for health checks const app = express(); const port = process.env.PORT || 3000; +/** + * Health check endpoint + * Returns 200 OK to indicate the service is running + */ app.get('/health', (req, res) => { - console.log('Health check, returning 200 OK'); + logger.debug('Health check requested'); res.status(200).send('OK'); }); let config: Config; try { + // Load and validate configuration from environment variables config = loadConfig(); - console.log('Configuration loaded successfully'); + logger.info('Configuration loaded successfully'); } catch (error) { if (error instanceof z.ZodError) { - console.error('Configuration validation failed:', error.errors); + logger.error('Configuration validation failed:', { errors: error.errors }); } else { - console.error('Failed to load configuration:', error); + logger.error('Failed to load configuration:', { error }); } + // Exit with error code if configuration cannot be loaded process.exit(1); } // Create API client (base should include /api) +logger.info('Initializing API client'); const client = new AuthApiClient(config.apiEndpoint + '/api', { email: config.auth.email, password: config.auth.password, @@ -35,20 +49,43 @@ const client = new AuthApiClient(config.apiEndpoint + '/api', { // Start the express server app.listen(port, () => { - console.log(`Health check server listening on port ${port}`); + logger.info(`Health check server listening on port ${port}`); }); // Start the capacity manager +logger.info('Initializing capacity manager'); const manager = new CapacityManager(config, client); +logger.info('Starting capacity manager'); manager.start(); -// graceful shutdown handlers +/** + * Handles graceful shutdown on SIGTERM + * Stops the capacity manager before process exit + */ process.on('SIGTERM', () => { - console.log('Received SIGTERM signal, shutting down...'); + logger.info('Received SIGTERM signal, shutting down...'); manager.stop(); }); +/** + * Handles graceful shutdown on SIGINT (Ctrl+C) + * Stops the capacity manager before process exit + */ process.on('SIGINT', () => { - console.log('Received SIGINT signal, shutting down...'); + logger.info('Received SIGINT signal, shutting down...'); manager.stop(); }); + +// Additional error handling for uncaught exceptions +process.on('uncaughtException', (error) => { + logger.error('Uncaught exception, shutting down:', { error }); + manager.stop(); + process.exit(1); +}); + +// Additional error handling for unhandled promise rejections +process.on('unhandledRejection', (reason) => { + logger.error('Unhandled rejection, shutting down:', { reason }); + manager.stop(); + process.exit(1); +}); diff --git a/src/job-manager/src/logging.ts b/src/job-manager/src/logging.ts new file mode 100644 index 0000000..0783ba6 --- /dev/null +++ b/src/job-manager/src/logging.ts @@ -0,0 +1,47 @@ + +import winston from 'winston'; + +/** + * Winston logger configuration + * + * This logger provides structured logging with timestamps and log levels. + * The log level can be configured via the LOG_LEVEL environment variable. + * + * Available log levels (in order of verbosity): + * - error: 0 (least verbose, only errors) + * - warn: 1 (errors and warnings) + * - info: 2 (errors, warnings, and info - default) + * - http: 3 (errors, warnings, info, and HTTP requests) + * - verbose: 4 (all above plus verbose details) + * - debug: 5 (all above plus debug information) + * - silly: 6 (most verbose level) + * + * Setting LOG_LEVEL to a specific level will include all logs at that level + * and below (less verbose). For example, setting LOG_LEVEL=warn will include + * error and warn logs, but not info, http, etc. + */ + +/** + * Creates a configured winston logger instance + * Format includes timestamp and structured JSON for metadata + */ +export const logger = winston.createLogger({ + // Get log level from environment variable or default to 'info' + level: (process.env.LOG_LEVEL || 'info').toLowerCase(), + + // Define log format with timestamp and metadata + format: winston.format.combine( + // Add timestamp to all log entries + winston.format.timestamp(), + + // Custom formatter to include metadata as JSON + winston.format.printf(({ level, message, timestamp, ...metadata }) => { + return `[${timestamp}] [${level.toUpperCase()}] ${message} ${ + Object.keys(metadata).length ? JSON.stringify(metadata) : '' + }`; + }), + ), + + // Define where logs are sent - console for basic setup + transports: [new winston.transports.Console()], +}); diff --git a/src/job-manager/src/manager.ts b/src/job-manager/src/manager.ts index 220214a..37a9e3c 100644 --- a/src/job-manager/src/manager.ts +++ b/src/job-manager/src/manager.ts @@ -10,60 +10,98 @@ import { Config, ConfigSchema, JobTypeConfig } from './config'; import { AuthApiClient } from './authClient'; import { JobType } from '@prisma/client'; import { PollJobsResponse } from '../../api/jobs/routes'; +import { logger } from './logging'; -// This interface are details for a worker which is pending or running +/** + * Interface for tracking worker status + * Contains details for a worker which is pending or running + */ interface TrackedWorker { + /** Unique ARN for the ECS task */ taskArn: string; + /** Task definition ARN for the ECS task */ + taskDefinitionArn: string; + /** The cluster ARN where the task is running */ clusterArn: string; + /** When the worker was started */ startTime: Date; + /** Types of jobs this worker can handle */ jobTypes: JobType[]; + /** Current status of the worker */ status: 'PENDING' | 'RUNNING' | 'STOPPED'; } +/** + * CapacityManager handles the automatic scaling of ECS tasks based on job queue demand. + * It tracks workers, polls for pending jobs, and adjusts the number of workers + * to efficiently process the jobs while respecting scaling constraints. + */ export class CapacityManager { private config: Config; private ecsClient: ECSClient; private ec2Client: EC2Client; + // Tracks the last scaled time for a given task definition ARN private lastScaleTime: Record = {}; private client: AuthApiClient; private isRunning: boolean = false; private pollTimeout: NodeJS.Timeout | null = null; - // Add tracking data structures + // Tracking data for workers private trackedWorkers: TrackedWorker[] = []; + /** + * Creates a new CapacityManager + * @param config - Configuration for the capacity manager + * @param client - Authentication client for API requests + */ constructor(config: Config, client: AuthApiClient) { this.config = ConfigSchema.parse(config); this.ecsClient = new ECSClient({ region: this.config.region }); this.ec2Client = new EC2Client({ region: this.config.region }); this.client = client; + logger.debug('CapacityManager initialized', { region: this.config.region }); } + /** + * Polls the job queue and adjusts capacity as needed + * @private + */ private async pollJobQueue() { if (!this.isRunning) return; const used = process.memoryUsage(); - console.log('Memory usage:', { + logger.debug('Memory usage', { heapTotal: `${Math.round(used.heapTotal / 1024 / 1024)} MB`, heapUsed: `${Math.round(used.heapUsed / 1024 / 1024)} MB`, rss: `${Math.round(used.rss / 1024 / 1024)} MB`, }); try { - console.log('Poll started at:', new Date().toISOString()); + logger.info('Poll started', { timestamp: new Date().toISOString() }); + logger.debug('Current tracked workers status', { + count: this.trackedWorkers.length, + }); // Update worker statuses await this.updateWorkerStatuses(); // Get jobs with their IDs + logger.debug('Fetching pending jobs from API'); const response = await this.client.get('/jobs/poll'); + logger.debug('Received job poll response', { + jobCount: response.jobs.length, + jobTypes: response.jobs.map(j => j.type), + }); await this.adjustCapacity({ pollResponse: response.jobs }); } catch (error) { - console.error('Error polling job queue:', error); + logger.error('Error polling job queue', { error }); } finally { // Only schedule next poll if still running if (this.isRunning) { + logger.debug('Scheduling next poll', { + ms: this.config.pollIntervalMs, + }); this.pollTimeout = setTimeout( () => this.pollJobQueue(), this.config.pollIntervalMs, @@ -72,12 +110,23 @@ export class CapacityManager { } } + /** + * Updates the status of all tracked workers by querying ECS + * @private + */ private async updateWorkerStatuses() { - if (this.trackedWorkers.length === 0) return; + if (this.trackedWorkers.length === 0) { + logger.debug('No workers to update'); + return; + } try { // Build set of distinct worker types from the tracked workers const clusterArns = new Set(this.trackedWorkers.map(w => w.clusterArn)); + logger.debug('Updating worker statuses', { + workerCount: this.trackedWorkers.length, + clusterCount: clusterArns.size, + }); // Create a Set to track which task ARNs were found in the API response const foundTaskArns = new Set(); @@ -90,10 +139,19 @@ export class CapacityManager { // Which task ARNs to fetch const taskArns = relevantWorkers.map(w => w.taskArn); + logger.debug('Checking tasks in cluster', { + clusterArn, + taskCount: taskArns.length, + }); + // Split into chunks if there are many tasks (ECS API has limits) const chunkSize = 100; for (let i = 0; i < taskArns.length; i += chunkSize) { const chunk = taskArns.slice(i, i + chunkSize); + logger.debug('Processing task chunk', { + chunkSize: chunk.length, + startIndex: i, + }); const command = new DescribeTasksCommand({ cluster: clusterArn, @@ -103,6 +161,11 @@ export class CapacityManager { const response = await this.ecsClient.send(command); if (response.tasks) { + logger.debug('Received task details', { + requestedCount: chunk.length, + receivedCount: response.tasks.length, + }); + // Add all found task ARNs to our tracking set response.tasks.forEach(task => { if (task.taskArn) { @@ -116,10 +179,17 @@ export class CapacityManager { // Check if any tasks weren't found but were requested // AWS ECS API returns info in response.failures for tasks that weren't found if (response.failures && response.failures.length > 0) { + logger.warn('Some tasks were not found', { + failureCount: response.failures.length, + }); + response.failures.forEach(failure => { if (failure.arn && failure.reason === 'MISSING') { - console.log( - `Task ${failure.arn} not found in ECS, removing from tracked workers`, + logger.info( + 'Task not found in ECS, removing from tracked workers', + { + taskArn: failure.arn, + }, ); // We explicitly don't add this to foundTaskArns since it's missing } @@ -137,22 +207,43 @@ export class CapacityManager { const removedCount = previousCount - this.trackedWorkers.length; if (removedCount > 0) { - console.log( - `Removed ${removedCount} workers that were not found in ECS`, - ); + logger.info('Removed workers not found in ECS', { + count: removedCount, + }); } + + logger.debug('Worker status update complete', { + originalCount: previousCount, + currentCount: this.trackedWorkers.length, + removed: removedCount, + }); } catch (error) { - console.error('Error updating worker statuses:', error); + logger.error('Error updating worker statuses', { error }); } } - // Update worker statuses based on ECS task information + /** + * Updates worker statuses based on ECS task information + * @param tasks - Task information from ECS + * @private + */ private updateWorkerStatusesFromTasks(tasks: Task[]) { + logger.debug('Updating worker statuses from tasks', { + taskCount: tasks.length, + }); + let statusChanges = 0; + for (const task of tasks) { - if (!task.taskArn) continue; + if (!task.taskArn) { + logger.warn('Task without ARN found in response'); + continue; + } const worker = this.trackedWorkers.find(w => w.taskArn === task.taskArn); - if (!worker) continue; + if (!worker) { + logger.debug('Task not in tracked workers', { taskArn: task.taskArn }); + continue; + } const lastStatus = task.lastStatus || ''; let newStatus: 'PENDING' | 'RUNNING' | 'STOPPED'; @@ -174,110 +265,170 @@ export class CapacityManager { newStatus = 'STOPPED'; } else { // For any unexpected status, log it but don't change worker status - console.warn( - `Worker ${task.taskArn} has unknown status: ${lastStatus}`, - ); + logger.warn('Worker has unknown status', { + taskArn: task.taskArn, + status: lastStatus, + }); continue; } // Only log if status changed if (worker.status !== newStatus) { - console.log( - `Worker ${task.taskArn} status changed: ${worker.status} -> ${newStatus}`, - ); + logger.info('Worker status changed', { + taskArn: task.taskArn, + oldStatus: worker.status, + newStatus: newStatus, + }); worker.status = newStatus; + statusChanges++; } } + // Count workers by status before cleanup + const workerStatusCounts = { + PENDING: this.trackedWorkers.filter(w => w.status === 'PENDING').length, + RUNNING: this.trackedWorkers.filter(w => w.status === 'RUNNING').length, + STOPPED: this.trackedWorkers.filter(w => w.status === 'STOPPED').length, + }; + + logger.debug('Worker status counts before cleanup', workerStatusCounts); + // Remove stopped workers after updating + const beforeCleanup = this.trackedWorkers.length; this.trackedWorkers = this.trackedWorkers.filter( worker => worker.status !== 'STOPPED', ); + const cleanupRemoved = beforeCleanup - this.trackedWorkers.length; + + if (cleanupRemoved > 0) { + logger.info('Removed stopped workers from tracking', { + count: cleanupRemoved, + }); + } + + logger.debug('Worker status update summary', { + statusChanges, + stoppedWorkersRemoved: cleanupRemoved, + remainingWorkers: this.trackedWorkers.length, + }); } /** - * For each job which is polling, determine how many workers we have running - * that can handle that type of job. Include pending tasks as part of the - * threshold. Observe cooldown period. + * Adjust capacity for each job type based on pending jobs + * @param pollResponse - Response from the job queue poll + * @private */ private async adjustCapacity({ pollResponse, }: { pollResponse: PollJobsResponse['jobs']; }): Promise { + logger.debug('Adjusting capacity based on poll response', { + jobCount: pollResponse.length, + }); + // Count pending jobs by type - const pendingByType: Record = Object.values( - JobType, - ).reduce>( + const pendingByDfnArn: Record = pollResponse.reduce< + Record + >( (current, acc) => { - current[acc] = 0; + const arn = this.config.jobTypes[acc.type]?.taskDefinitionArn; + if (!arn) { + logger.warn('Missing config definition for task type.', { + jobType: acc.type, + }); + return current; + } + current[arn] = current[arn] ? current[arn] + 1 : 1; return current; }, - {} as Record, + {} as Record, ); - for (const job of pollResponse) { - // Increment count - pendingByType[job.type] += 1; - } // Determine how many workers are already tracked for each type of job - const workersByType: Record = Object.values( - JobType, - ).reduce>( + const workersByDfnArn: Record = this.trackedWorkers.reduce< + Record + >( (current, acc) => { - current[acc] = 0; + const arn = acc.taskDefinitionArn; + current[arn] = current[arn] ? current[arn] + 1 : 1; return current; }, - {} as Record, + {} as Record, ); - for (const worker of this.trackedWorkers) { - // Count once for each type TODO consider if this has implications for - // scaling - de emphasising workers which handle multiple jobs - for (const type of worker.jobTypes) { - workersByType[type] += 1; - } - } - for (const jobType of Object.values(JobType)) { - const pending = pendingByType[jobType]; - const workers = workersByType[jobType]; + logger.debug('Job distribution', { + pendingByType: pendingByDfnArn, + workersByType: workersByDfnArn, + }); - const config = this.config.jobTypes[jobType as JobType]; - if (!config) { - console.warn(`No configuration found for job type: ${jobType}`); + for (const taskDefArn of Object.keys(pendingByDfnArn)) { + const taskConfig = Object.values(this.config.jobTypes).find( + c => c.taskDefinitionArn === taskDefArn, + ); + if (!taskConfig) { + logger.warn( + 'No configuration found for job with task definition arn needed', + { taskDefArn }, + ); continue; } - await this.adjustCapacityForType({ - jobType, + const pending = pendingByDfnArn[taskDefArn]; + const workers = workersByDfnArn[taskDefArn]; + + logger.debug('Considering capacity adjustment', { + taskDefinitionArn: taskDefArn, + pendingJobs: pending, + currentWorkers: workers, + }); + + await this.adjustCapacityForTask({ + jobTypes: taskConfig.jobTypes, pending, workers, - config, + config: taskConfig, }); } } /** - * Launches n jobs of the specified type/config - * - * Updates the worker tracking + * Launches n tasks of the specified type/config + * @param count - Number of tasks to launch + * @param jobType - Type of job the tasks will handle + * @param config - Configuration for the job type + * @private */ private async launchTask({ count = 1, - jobType, config, }: { count?: number; - jobType: JobType; config: JobTypeConfig; }) { try { + logger.info('Attempting to launch tasks', { + count, + arn: config.taskDefinitionArn, + }); let done = 0; + let failures = 0; + while (done < count) { const now = Date.now(); // Get a random public subnet for this task + logger.debug('Getting random public subnet', { + vpcId: this.config.vpcId, + }); const subnet = await this.getRandomPublicSubnet(this.config.vpcId); + + logger.debug('Constructing RunTaskCommand', { + cluster: config.clusterArn, + taskDef: config.taskDefinitionArn, + subnet, + }); + const command = new RunTaskCommand({ cluster: config.clusterArn, taskDefinition: config.taskDefinitionArn, @@ -300,23 +451,37 @@ export class CapacityManager { result.tasks.length > 0 && result.tasks[0].taskArn ) { - this.lastScaleTime[jobType] = now; + this.lastScaleTime[config.taskDefinitionArn] = now; this.trackedWorkers.push({ clusterArn: config.clusterArn, taskArn: result.tasks[0].taskArn, startTime: new Date(), - jobTypes: this.config.jobTypes[jobType]?.jobTypes ?? [jobType], + jobTypes: config.jobTypes, + taskDefinitionArn: config.taskDefinitionArn, status: 'PENDING', }); - console.log( - `Started new task ${result.tasks[0].taskArn} for job type: ${jobType}`, - ); + logger.info('Started new task', { + taskArn: result.tasks[0].taskArn, + }); + done += 1; + } else { + failures += 1; + logger.error('Failed to launch task', { + result, + }); } - done += 1; } + + logger.debug('Task launch summary', { + requested: count, + launched: done, + failures, + }); } catch (e) { - console.error('Failed to launch task(s). Exception: ' + e); + logger.error('Failed to launch task(s)', { + error: e, + }); } } @@ -335,6 +500,7 @@ export class CapacityManager { * @param baseJobCount - Reference job count that maps to roughly 1×sensitivity workers * (helps calibrate the scale for your specific workload) * @returns The target number of workers as an integer + * @private */ private computeOptimalWorkers({ pendingJobs, @@ -351,8 +517,10 @@ export class CapacityManager { }): number { // Handle edge cases if (pendingJobs <= 0) { + logger.debug('No pending jobs, using minWorkers', { minWorkers }); return minWorkers; } + // Compute workers using logarithmic scaling // The formula: sensitivity * log(pendingJobs/baseJobCount + 1) + minWorkers // @@ -364,33 +532,80 @@ export class CapacityManager { sensitivity * Math.log(pendingJobs / baseJobCount + 1) + minWorkers; // Round to nearest integer and enforce bounds - return Math.min( + let result = Math.min( Math.max(Math.round(computedWorkers), minWorkers), maxWorkers, ); + + // You should always deploy at least one worker if there is at least one job + if (pendingJobs > 0 && result < 1) { + logger.debug( + 'Optimal workers found < 1 when there was at least one pending job...forcing result to 1', + { + pendingJobs, + minWorkers, + }, + ); + result = 1; + } + + logger.debug('Computed optimal workers', { + pendingJobs, + sensitivity, + minWorkers, + maxWorkers, + baseJobCount, + rawComputed: computedWorkers, + finalResult: result, + }); + + return result; } - private async adjustCapacityForType({ - jobType, + /** + * Adjust capacity for a specific job type + * @param jobType - The job type to adjust capacity for + * @param pending - Number of pending jobs of this type + * @param workers - Current worker count for this type + * @param config - Configuration for this job type + * @private + */ + private async adjustCapacityForTask({ + jobTypes, pending, workers, config, }: { - jobType: JobType; + jobTypes: JobType[]; pending: number; workers: number; config: JobTypeConfig; }) { const now = Date.now(); - const lastScale = this.lastScaleTime[jobType] || 0; + const lastScale = this.lastScaleTime[config.taskDefinitionArn] || 0; + const cooldownMs = config.scaling.cooldownSeconds * 1000; + const timeInCooldown = now - lastScale; + const inCooldown = timeInCooldown < cooldownMs; // Check cooldown - if (now - lastScale < config.scaling.cooldownSeconds * 1000) { - console.log(`Still in cooldown period for ${jobType}`); + if (inCooldown) { + logger.debug('Still in cooldown period', { + jobTypes, + elapsed: timeInCooldown, + cooldownMs, + remaining: cooldownMs - timeInCooldown, + }); return; } // Determine the ideal number of workers + logger.debug('Calculating target capacity', { + jobTypes, + pending, + currentWorkers: workers, + scalingConfig: config.scaling, + }); + const idealTarget = this.computeOptimalWorkers({ pendingJobs: pending, sensitivity: config.scaling.sensitivity, @@ -398,34 +613,65 @@ export class CapacityManager { maxWorkers: config.scaling.max, baseJobCount: config.scaling.factor, }); + const diff = idealTarget - workers; + if (diff > 0) { - console.info(`Launching ${diff} tasks of type: ${jobType}.`); - this.launchTask({ count: diff, jobType, config }); + logger.info('Launching additional tasks', { + count: diff, + jobTypes, + currentWorkers: workers, + targetWorkers: idealTarget, + pendingJobs: pending, + }); + this.launchTask({ count: diff, config }); + } else if (diff < 0) { + logger.debug('Capacity reduction not implemented', { + jobTypes, + currentWorkers: workers, + targetWorkers: idealTarget, + excess: -diff, + }); + // Note: No implementation for scaling down - tasks will terminate themselves + } else { + logger.debug('No capacity adjustment needed', { + jobTypes, + currentWorkers: workers, + targetWorkers: idealTarget, + }); } } + /** + * Starts the capacity manager + */ public start() { - if (this.isRunning) return; + if (this.isRunning) { + logger.info('Capacity manager already running'); + return; + } - console.log('Starting capacity manager...'); + logger.info('Starting capacity manager...'); this.isRunning = true; this.pollJobQueue(); // Add error handlers for uncaught errors process.on('uncaughtException', error => { - console.error('Uncaught exception:', error); + logger.error('Uncaught exception', { error }); this.stop(); }); process.on('unhandledRejection', error => { - console.error('Unhandled rejection:', error); + logger.error('Unhandled rejection', { error }); this.stop(); }); } + /** + * Stops the capacity manager + */ public stop() { - console.log('Stopping capacity manager...'); + logger.info('Stopping capacity manager...'); this.isRunning = false; if (this.pollTimeout) { clearTimeout(this.pollTimeout); @@ -433,8 +679,15 @@ export class CapacityManager { } } + /** + * Gets a random public subnet from the VPC + * @param vpcId - The VPC ID to get subnets from + * @returns The subnet ID + * @private + */ private async getRandomPublicSubnet(vpcId: string): Promise { try { + logger.debug('Fetching public subnets', { vpcId }); const command = new DescribeSubnetsCommand({ Filters: [ { @@ -452,6 +705,7 @@ export class CapacityManager { const publicSubnets = response.Subnets || []; if (publicSubnets.length === 0) { + logger.error('No public subnets found', { vpcId }); throw new Error(`No public subnets found in VPC ${vpcId}`); } @@ -459,13 +713,45 @@ export class CapacityManager { const randomIndex = Math.floor(Math.random() * publicSubnets.length); const selectedSubnet = publicSubnets[randomIndex]; - console.log( - `Selected subnet ${selectedSubnet.SubnetId} in AZ ${selectedSubnet.AvailabilityZone}`, - ); + logger.info('Selected subnet', { + subnetId: selectedSubnet.SubnetId, + availabilityZone: selectedSubnet.AvailabilityZone, + }); return selectedSubnet.SubnetId!; } catch (error) { - console.error('Error getting public subnets:', error); + logger.error('Error getting public subnets', { error }); throw error; } } + + /** + * Returns information about the current worker distribution + * @returns Summary of current workers + */ + public getWorkerStats() { + const byStatus = { + PENDING: this.trackedWorkers.filter(w => w.status === 'PENDING').length, + RUNNING: this.trackedWorkers.filter(w => w.status === 'RUNNING').length, + }; + + const byJobType = Object.values(JobType).reduce>( + (acc, jobType) => { + acc[jobType] = 0; + return acc; + }, + {}, + ); + + this.trackedWorkers.forEach(worker => { + worker.jobTypes.forEach(jobType => { + byJobType[jobType]++; + }); + }); + + return { + totalWorkers: this.trackedWorkers.length, + byStatus, + byJobType, + }; + } } From f520c4d60110f1bb58104220d6d91b933a528034 Mon Sep 17 00:00:00 2001 From: Peter Baker Date: Mon, 19 May 2025 15:40:07 +1000 Subject: [PATCH 04/12] Fixing undefined issues Signed-off-by: Peter Baker --- src/job-manager/src/manager.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/job-manager/src/manager.ts b/src/job-manager/src/manager.ts index 37a9e3c..a1e5e3e 100644 --- a/src/job-manager/src/manager.ts +++ b/src/job-manager/src/manager.ts @@ -374,8 +374,8 @@ export class CapacityManager { continue; } - const pending = pendingByDfnArn[taskDefArn]; - const workers = workersByDfnArn[taskDefArn]; + const pending : number = pendingByDfnArn[taskDefArn] ?? 0; + const workers : number = workersByDfnArn[taskDefArn] ?? 0; logger.debug('Considering capacity adjustment', { taskDefinitionArn: taskDefArn, From 8f02c1ba8dfa67ac49b21e95de186d35ff061d15 Mon Sep 17 00:00:00 2001 From: Peter Baker Date: Mon, 19 May 2025 15:50:34 +1000 Subject: [PATCH 05/12] Downgrade log level Signed-off-by: Peter Baker --- src/infra/components/jobs.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/infra/components/jobs.ts b/src/infra/components/jobs.ts index ef2bdfb..db82b2a 100644 --- a/src/infra/components/jobs.ts +++ b/src/infra/components/jobs.ts @@ -302,7 +302,7 @@ export class JobSystem extends Construct { // Which vpc to deploy into VPC_ID: props.vpc.vpcId, // Log level for manager - LOG_LEVEL: 'debug', + LOG_LEVEL: 'info', }, // pass in the manager creds secrets: { From 7bffdbe69d74a8fc4fc2d6df4a441dd53ab4e4cd Mon Sep 17 00:00:00 2001 From: Peter Baker Date: Mon, 19 May 2025 15:52:42 +1000 Subject: [PATCH 06/12] Downgrading log level Signed-off-by: Peter Baker --- src/job-manager/src/manager.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/job-manager/src/manager.ts b/src/job-manager/src/manager.ts index a1e5e3e..9fe5ead 100644 --- a/src/job-manager/src/manager.ts +++ b/src/job-manager/src/manager.ts @@ -314,7 +314,7 @@ export class CapacityManager { } /** - * Adjust capacity for each job type based on pending jobs + * Adjust capacity for each task definition based on pending jobs * @param pollResponse - Response from the job queue poll * @private */ @@ -327,7 +327,7 @@ export class CapacityManager { jobCount: pollResponse.length, }); - // Count pending jobs by type + // Count pending jobs by task definition const pendingByDfnArn: Record = pollResponse.reduce< Record >( From 75239689963cd6af056ea8b43d0e9d480e85d5a0 Mon Sep 17 00:00:00 2001 From: Peter Baker Date: Mon, 19 May 2025 16:01:55 +1000 Subject: [PATCH 07/12] FOrmatting Signed-off-by: Peter Baker --- src/job-manager/src/authClient.ts | 11 ++++++----- src/job-manager/src/index.ts | 4 ++-- src/job-manager/src/logging.ts | 13 ++++++------- src/job-manager/src/manager.ts | 4 ++-- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/job-manager/src/authClient.ts b/src/job-manager/src/authClient.ts index 9e494ef..1ba4ece 100644 --- a/src/job-manager/src/authClient.ts +++ b/src/job-manager/src/authClient.ts @@ -170,10 +170,9 @@ export class AuthApiClient { ); if (response.status !== 200) { - logger.warn( - 'Non 200 response from refresh token endpoint', - { status: response.status } - ); + logger.warn('Non 200 response from refresh token endpoint', { + status: response.status, + }); throw new Error('Non 200 response from refresh token.'); } @@ -183,7 +182,9 @@ export class AuthApiClient { }; logger.debug('Token refreshed successfully'); } catch (error) { - logger.error('Error during token refresh, falling back to login', { error }); + logger.error('Error during token refresh, falling back to login', { + error, + }); // If refresh fails, try logging in again this.tokens = null; // awaiting login diff --git a/src/job-manager/src/index.ts b/src/job-manager/src/index.ts index 12d47a9..1975de2 100755 --- a/src/job-manager/src/index.ts +++ b/src/job-manager/src/index.ts @@ -77,14 +77,14 @@ process.on('SIGINT', () => { }); // Additional error handling for uncaught exceptions -process.on('uncaughtException', (error) => { +process.on('uncaughtException', error => { logger.error('Uncaught exception, shutting down:', { error }); manager.stop(); process.exit(1); }); // Additional error handling for unhandled promise rejections -process.on('unhandledRejection', (reason) => { +process.on('unhandledRejection', reason => { logger.error('Unhandled rejection, shutting down:', { reason }); manager.stop(); process.exit(1); diff --git a/src/job-manager/src/logging.ts b/src/job-manager/src/logging.ts index 0783ba6..3d12a04 100644 --- a/src/job-manager/src/logging.ts +++ b/src/job-manager/src/logging.ts @@ -1,12 +1,11 @@ - import winston from 'winston'; /** * Winston logger configuration - * + * * This logger provides structured logging with timestamps and log levels. * The log level can be configured via the LOG_LEVEL environment variable. - * + * * Available log levels (in order of verbosity): * - error: 0 (least verbose, only errors) * - warn: 1 (errors and warnings) @@ -15,7 +14,7 @@ import winston from 'winston'; * - verbose: 4 (all above plus verbose details) * - debug: 5 (all above plus debug information) * - silly: 6 (most verbose level) - * + * * Setting LOG_LEVEL to a specific level will include all logs at that level * and below (less verbose). For example, setting LOG_LEVEL=warn will include * error and warn logs, but not info, http, etc. @@ -28,12 +27,12 @@ import winston from 'winston'; export const logger = winston.createLogger({ // Get log level from environment variable or default to 'info' level: (process.env.LOG_LEVEL || 'info').toLowerCase(), - + // Define log format with timestamp and metadata format: winston.format.combine( // Add timestamp to all log entries winston.format.timestamp(), - + // Custom formatter to include metadata as JSON winston.format.printf(({ level, message, timestamp, ...metadata }) => { return `[${timestamp}] [${level.toUpperCase()}] ${message} ${ @@ -41,7 +40,7 @@ export const logger = winston.createLogger({ }`; }), ), - + // Define where logs are sent - console for basic setup transports: [new winston.transports.Console()], }); diff --git a/src/job-manager/src/manager.ts b/src/job-manager/src/manager.ts index 9fe5ead..90f17bc 100644 --- a/src/job-manager/src/manager.ts +++ b/src/job-manager/src/manager.ts @@ -374,8 +374,8 @@ export class CapacityManager { continue; } - const pending : number = pendingByDfnArn[taskDefArn] ?? 0; - const workers : number = workersByDfnArn[taskDefArn] ?? 0; + const pending: number = pendingByDfnArn[taskDefArn] ?? 0; + const workers: number = workersByDfnArn[taskDefArn] ?? 0; logger.debug('Considering capacity adjustment', { taskDefinitionArn: taskDefArn, From 3c684bbc3ddb3b3e707aa372f902228b98e4d92e Mon Sep 17 00:00:00 2001 From: Peter Baker Date: Mon, 19 May 2025 16:17:39 +1000 Subject: [PATCH 08/12] Tuning params Signed-off-by: Peter Baker --- src/infra/infra.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/infra/infra.ts b/src/infra/infra.ts index ec4d8c5..1321307 100644 --- a/src/infra/infra.ts +++ b/src/infra/infra.ts @@ -189,8 +189,8 @@ export class ReefguideWebApiStack extends cdk.Stack { command: ['using ReefGuideAPI; ReefGuideAPI.start_worker()'], desiredMinCapacity: 0, desiredMaxCapacity: 5, - scalingFactor: 20, - scalingSensitivity: 5.0, + scalingFactor: 3.3, + scalingSensitivity: 2.6, cooldownSeconds: 60, // This specifies where the config file path can be found for the From b6127336bec9054af460798cf5fda1337473f731 Mon Sep 17 00:00:00 2001 From: Peter Baker Date: Wed, 21 May 2025 12:45:54 +1000 Subject: [PATCH 09/12] Adding job tests and new payload options Signed-off-by: Peter Baker --- api-test/jobs.http | 81 +++++++++++++++++++++++++++++++++++----- src/api/services/jobs.ts | 38 ++++++++++++++----- 2 files changed, 100 insertions(+), 19 deletions(-) diff --git a/api-test/jobs.http b/api-test/jobs.http index a3a49f6..08cae75 100644 --- a/api-test/jobs.http +++ b/api-test/jobs.http @@ -50,11 +50,51 @@ Authorization: Bearer {{authToken}} { "type": "TEST", "inputPayload": { - "id" : 1 + "id" : 1111111 + } +} + +### Create a new regional assessment job (with full params) +# @name createRegionalAssessmentJob +POST {{baseUrl}}/jobs +Content-Type: {{contentType}} +Authorization: Bearer {{authToken}} + +{ + "type": "REGIONAL_ASSESSMENT", + "inputPayload": { + "region": "Mackay-Capricorn", + "reef_type": "slopes", + "depth_min": -10.1, + "depth_max": -2.1, + "slope_min": 0.1, + "slope_max": 30.0, + "rugosity_min": 0.0, + "rugosity_max": 6.0, + "waves_period_min": 1.94303, + "waves_period_max": 9.32689, + "waves_height_min": 0.237052, + "waves_height_max": 2.53194, + "threshold": 95 + } +} + +### Create a new regional assessment job (with minimal params) +# @name createRegionalAssessmentJob +POST {{baseUrl}}/jobs +Content-Type: {{contentType}} +Authorization: Bearer {{authToken}} + +{ + "type": "REGIONAL_ASSESSMENT", + "inputPayload": { + "region": "Mackay-Capricorn", + "reef_type": "slopes", + "depth_min" : -5.11111 } } -### Create a new suitability assessment job +### Create a new suitability assessment job (with full params) # @name createSuitabilityJob POST {{baseUrl}}/jobs Content-Type: {{contentType}} @@ -63,20 +103,41 @@ Authorization: Bearer {{authToken}} { "type": "SUITABILITY_ASSESSMENT", "inputPayload": { - "region": "Cairns-Cooktown", + "region": "Mackay-Capricorn", "reef_type": "slopes", - "depth_min": -7.0, - "depth_max": -3.9, - "slope_min": 0.2, - "slope_max": 40.0, + "depth_min": -10.1, + "depth_max": -2.1, + "slope_min": 0.1, + "slope_max": 30.0, "rugosity_min": 0.0, "rugosity_max": 6.0, - "x_dist" : 450, - "y_dist" : 20, + "waves_period_min": 1.94303, + "waves_period_max": 9.32689, + "waves_height_min": 0.237052, + "waves_height_max": 2.53194, + "x_dist": 451, + "y_dist": 27, "threshold": 95 } } +### Create a new suitability assessment job (with minimal params) +# @name createSuitabilityJob +POST {{baseUrl}}/jobs +Content-Type: {{contentType}} +Authorization: Bearer {{authToken}} + +{ + "type": "SUITABILITY_ASSESSMENT", + "inputPayload": { + "region": "Mackay-Capricorn", + "reef_type": "slopes", + "x_dist": 451, + "y_dist": 27 + } +} + + ### Store the job IDs for further operations @jobId = {{createJob.response.body.jobId}} @jobId = {{createSuitabilityJob.response.body.jobId}} @@ -241,4 +302,4 @@ Authorization: Bearer {{authToken}} ### Try download (should fail as job is not complete) GET {{baseUrl}}/jobs/{{jobId}}/download -Authorization: Bearer {{authToken}} \ No newline at end of file +Authorization: Bearer {{authToken}} diff --git a/src/api/services/jobs.ts b/src/api/services/jobs.ts index 642dd32..c30d911 100644 --- a/src/api/services/jobs.ts +++ b/src/api/services/jobs.ts @@ -35,17 +35,37 @@ type JobExpiryMap = { }; const sharedCriteriaSchema = z.object({ - // High level config + // High level config - common to all current scenarios region: z.string().describe('Region for assessment'), reef_type: z.string().describe('The type of reef, slopes or flats'), - // Criteria - depth_min: z.number().describe('The depth range (min)'), - depth_max: z.number().describe('The depth range (max)'), - slope_min: z.number().describe('The slope range (min)'), - slope_max: z.number().describe('The slope range (max)'), - rugosity_min: z.number().describe('The rugosity range (min)'), - rugosity_max: z.number().describe('The rugosity range (max)'), - threshold: z.number().describe('Suitability threshold integer (min)'), + + // Criteria - all optional to match the Union{Float64,Nothing} in worker + depth_min: z.number().optional().describe('The depth range (min)'), + depth_max: z.number().optional().describe('The depth range (max)'), + slope_min: z.number().optional().describe('The slope range (min)'), + slope_max: z.number().optional().describe('The slope range (max)'), + rugosity_min: z.number().optional().describe('The rugosity range (min)'), + rugosity_max: z.number().optional().describe('The rugosity range (max)'), + waves_period_min: z + .number() + .optional() + .describe('The wave period range (min)'), + waves_period_max: z + .number() + .optional() + .describe('The wave period range (max)'), + waves_height_min: z + .number() + .optional() + .describe('The wave height range (min)'), + waves_height_max: z + .number() + .optional() + .describe('The wave height range (max)'), + threshold: z + .number() + .optional() + .describe('Suitability threshold integer (min)'), }); /** From 6da77cea4b86c0a26fdec90a243b398d3535c441 Mon Sep 17 00:00:00 2001 From: Peter Baker Date: Wed, 21 May 2025 12:55:29 +1000 Subject: [PATCH 10/12] Reverting Signed-off-by: Peter Baker --- api-test/jobs.http | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/api-test/jobs.http b/api-test/jobs.http index 08cae75..e1bb8a4 100644 --- a/api-test/jobs.http +++ b/api-test/jobs.http @@ -89,8 +89,7 @@ Authorization: Bearer {{authToken}} "type": "REGIONAL_ASSESSMENT", "inputPayload": { "region": "Mackay-Capricorn", - "reef_type": "slopes", - "depth_min" : -5.11111 + "reef_type": "slopes" } } From 5bde71903bffe91ac516e9524bc7770d85beaeb6 Mon Sep 17 00:00:00 2001 From: Peter Baker Date: Thu, 22 May 2025 10:13:54 +1000 Subject: [PATCH 11/12] Fixes from review Signed-off-by: Peter Baker --- src/job-manager/src/authClient.ts | 4 +++- src/job-manager/src/config.ts | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/job-manager/src/authClient.ts b/src/job-manager/src/authClient.ts index 1ba4ece..7bd63e3 100644 --- a/src/job-manager/src/authClient.ts +++ b/src/job-manager/src/authClient.ts @@ -173,7 +173,9 @@ export class AuthApiClient { logger.warn('Non 200 response from refresh token endpoint', { status: response.status, }); - throw new Error('Non 200 response from refresh token.'); + throw new Error( + `Non 200 response from refresh token. Code: ${response.status}.`, + ); } this.tokens = { diff --git a/src/job-manager/src/config.ts b/src/job-manager/src/config.ts index 174458d..568a4c3 100644 --- a/src/job-manager/src/config.ts +++ b/src/job-manager/src/config.ts @@ -192,7 +192,7 @@ export function loadConfig(): Config { const typeString = jobType.toString(); logger.debug(`Processing job type: ${typeString}`); jobTypesConfig[typeString] = buildJobTypeConfig( - env as Record, + env, typeString, ); }); From 7b6843ffd3f18f219e8ad7ddba44f9791c7e12ee Mon Sep 17 00:00:00 2001 From: Peter Baker Date: Thu, 22 May 2025 10:14:05 +1000 Subject: [PATCH 12/12] Formatting Signed-off-by: Peter Baker --- src/job-manager/src/config.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/job-manager/src/config.ts b/src/job-manager/src/config.ts index 568a4c3..d38ae94 100644 --- a/src/job-manager/src/config.ts +++ b/src/job-manager/src/config.ts @@ -191,10 +191,7 @@ export function loadConfig(): Config { Object.values(JobType).forEach(jobType => { const typeString = jobType.toString(); logger.debug(`Processing job type: ${typeString}`); - jobTypesConfig[typeString] = buildJobTypeConfig( - env, - typeString, - ); + jobTypesConfig[typeString] = buildJobTypeConfig(env, typeString); }); // Group the job types by task ARN