diff --git a/api-test/jobs.http b/api-test/jobs.http index a3a49f6..e1bb8a4 100644 --- a/api-test/jobs.http +++ b/api-test/jobs.http @@ -50,11 +50,50 @@ Authorization: Bearer {{authToken}} { "type": "TEST", "inputPayload": { - "id" : 1 + "id" : 1111111 } } -### Create a new suitability assessment job +### Create a new regional assessment job (with full params) +# @name createRegionalAssessmentJob +POST {{baseUrl}}/jobs +Content-Type: {{contentType}} +Authorization: Bearer {{authToken}} + +{ + "type": "REGIONAL_ASSESSMENT", + "inputPayload": { + "region": "Mackay-Capricorn", + "reef_type": "slopes", + "depth_min": -10.1, + "depth_max": -2.1, + "slope_min": 0.1, + "slope_max": 30.0, + "rugosity_min": 0.0, + "rugosity_max": 6.0, + "waves_period_min": 1.94303, + "waves_period_max": 9.32689, + "waves_height_min": 0.237052, + "waves_height_max": 2.53194, + "threshold": 95 + } +} + +### Create a new regional assessment job (with minimal params) +# @name createRegionalAssessmentJob +POST {{baseUrl}}/jobs +Content-Type: {{contentType}} +Authorization: Bearer {{authToken}} + +{ + "type": "REGIONAL_ASSESSMENT", + "inputPayload": { + "region": "Mackay-Capricorn", + "reef_type": "slopes" + } +} + +### Create a new suitability assessment job (with full params) # @name createSuitabilityJob POST {{baseUrl}}/jobs Content-Type: {{contentType}} @@ -63,20 +102,41 @@ Authorization: Bearer {{authToken}} { "type": "SUITABILITY_ASSESSMENT", "inputPayload": { - "region": "Cairns-Cooktown", + "region": "Mackay-Capricorn", "reef_type": "slopes", - "depth_min": -7.0, - "depth_max": -3.9, - "slope_min": 0.2, - "slope_max": 40.0, + "depth_min": -10.1, + "depth_max": -2.1, + "slope_min": 0.1, + "slope_max": 30.0, "rugosity_min": 0.0, "rugosity_max": 6.0, - "x_dist" : 450, - "y_dist" : 20, + "waves_period_min": 1.94303, + "waves_period_max": 9.32689, + "waves_height_min": 0.237052, + "waves_height_max": 2.53194, + "x_dist": 451, + "y_dist": 27, "threshold": 95 } } +### Create a new suitability assessment job (with minimal params) +# @name createSuitabilityJob +POST {{baseUrl}}/jobs +Content-Type: {{contentType}} +Authorization: Bearer {{authToken}} + +{ + "type": "SUITABILITY_ASSESSMENT", + "inputPayload": { + "region": "Mackay-Capricorn", + "reef_type": "slopes", + "x_dist": 451, + "y_dist": 27 + } +} + + ### Store the job IDs for further operations @jobId = {{createJob.response.body.jobId}} @jobId = {{createSuitabilityJob.response.body.jobId}} @@ -241,4 +301,4 @@ Authorization: Bearer {{authToken}} ### Try download (should fail as job is not complete) GET {{baseUrl}}/jobs/{{jobId}}/download -Authorization: Bearer {{authToken}} \ No newline at end of file +Authorization: Bearer {{authToken}} diff --git a/package-lock.json b/package-lock.json index 58f1f02..8267516 100644 --- a/package-lock.json +++ b/package-lock.json @@ -33,6 +33,7 @@ "passport": "^0.7.0", "passport-jwt": "^4.0.1", "source-map-support": "^0.5.21", + "winston": "^3.17.0", "zod": "^3.23.8", "zod-express-middleware": "^1.4.0" }, @@ -3830,6 +3831,14 @@ "node": ">=12" } }, + "node_modules/@colors/colors": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/@colors/colors/-/colors-1.6.0.tgz", + "integrity": "sha512-Ir+AOibqzrIsL6ajt3Rz3LskB7OiMVHqltZmspbW/TJuTVuyOMirVqAkjfY6JISiLHgyNqicAC8AyHHGzNd/dA==", + "engines": { + "node": ">=0.1.90" + } + }, "node_modules/@cspotcode/source-map-support": { "version": "0.8.1", "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.8.1.tgz", @@ -3852,6 +3861,16 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "node_modules/@dabh/diagnostics": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/@dabh/diagnostics/-/diagnostics-2.0.3.tgz", + "integrity": "sha512-hrlQOIi7hAfzsMqlGSFyVucrx38O+j6wiGOf//H2ecvIEqYN4ADBSS2iLMh5UFyDunCNniUIPk/q3riFv45xRA==", + "dependencies": { + "colorspace": "1.1.x", + "enabled": "2.0.x", + "kuler": "^2.0.0" + } + }, "node_modules/@discoveryjs/json-ext": { "version": "0.5.7", "resolved": "https://registry.npmjs.org/@discoveryjs/json-ext/-/json-ext-0.5.7.tgz", @@ -5958,6 +5977,11 @@ "@types/superagent": "^8.1.0" } }, + "node_modules/@types/triple-beam": { + "version": "1.3.5", + "resolved": "https://registry.npmjs.org/@types/triple-beam/-/triple-beam-1.3.5.tgz", + "integrity": "sha512-6WaYesThRMCl19iryMYP7/x2OVgCtbIVflDGFpWnb9irXI3UjYE4AzmYuiUKY1AJstGijoY+MgUszMgRxIYTYw==" + }, "node_modules/@types/uuid": { "version": "9.0.8", "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.8.tgz", @@ -6700,7 +6724,6 @@ "version": "3.2.5", "resolved": "https://registry.npmjs.org/async/-/async-3.2.5.tgz", "integrity": "sha512-baNZyqaaLhyLVKm/DlvdW051MSgO6b8eVfIezl9E5PqWxFgzLm/wQntEW4zOytVburDEr0JlALEpdOFwvErLsg==", - "dev": true, "license": "MIT" }, "node_modules/asynckit": { @@ -7615,6 +7638,15 @@ "integrity": "sha512-lHl4d5/ONEbLlJvaJNtsF/Lz+WvB07u2ycqTYbdrq7UypDXailES4valYb2eWiJFxZlVmpGekfqoxQhzyFdT4Q==", "dev": true }, + "node_modules/color": { + "version": "3.2.1", + "resolved": "https://registry.npmjs.org/color/-/color-3.2.1.tgz", + "integrity": "sha512-aBl7dZI9ENN6fUGC7mWpMTPNHmWUSNan9tuWN6ahh5ZLNk9baLJOnSMlrQkHcrfFgz2/RigjUVAjdx36VcemKA==", + "dependencies": { + "color-convert": "^1.9.3", + "color-string": "^1.6.0" + } + }, "node_modules/color-convert": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", @@ -7630,8 +7662,29 @@ "node_modules/color-name": { "version": "1.1.4", "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", - "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==", - "dev": true + "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==" + }, + "node_modules/color-string": { + "version": "1.9.1", + "resolved": "https://registry.npmjs.org/color-string/-/color-string-1.9.1.tgz", + "integrity": "sha512-shrVawQFojnZv6xM40anx4CkoDP+fZsw/ZerEMsW/pyzsRbElpsL/DBVW7q3ExxwusdNXI3lXpuhEZkzs8p5Eg==", + "dependencies": { + "color-name": "^1.0.0", + "simple-swizzle": "^0.2.2" + } + }, + "node_modules/color/node_modules/color-convert": { + "version": "1.9.3", + "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz", + "integrity": "sha512-QfAUtd+vFdAtFQcC8CCyYt1fYWxSqAiK2cSD6zDB8N3cpsEBAvRxp9zOGg6G/SHHJYAT88/az/IuDGALsNVbGg==", + "dependencies": { + "color-name": "1.1.3" + } + }, + "node_modules/color/node_modules/color-name": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.3.tgz", + "integrity": "sha512-72fSenhMw2HZMTVHeCA9KCmpEIbzWiQsjN+BHcBbS9vr1mtt+vJjPdksIBNUmKAW8TFUDPJK5SUU3QhE9NEXDw==" }, "node_modules/colorette": { "version": "2.0.20", @@ -7648,6 +7701,15 @@ "node": ">=0.1.90" } }, + "node_modules/colorspace": { + "version": "1.1.4", + "resolved": "https://registry.npmjs.org/colorspace/-/colorspace-1.1.4.tgz", + "integrity": "sha512-BgvKJiuVu1igBUF2kEjRCZXol6wiiGbY5ipL/oVPwm0BL9sIpMIzM8IK7vwuxIIzOXMV3Ey5w+vxhm0rR/TN8w==", + "dependencies": { + "color": "^3.1.3", + "text-hex": "1.0.x" + } + }, "node_modules/combined-stream": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", @@ -8039,6 +8101,11 @@ "integrity": "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==", "dev": true }, + "node_modules/enabled": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/enabled/-/enabled-2.0.0.tgz", + "integrity": "sha512-AKrN98kuwOzMIdAizXGI86UFBoo26CL21UM763y1h/GMSJ4/OHU9k2YlsmBpyScFo/wbLzWQJBMCW4+IO3/+OQ==" + }, "node_modules/encodeurl": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/encodeurl/-/encodeurl-2.0.0.tgz", @@ -8843,6 +8910,11 @@ "bser": "2.1.1" } }, + "node_modules/fecha": { + "version": "4.2.3", + "resolved": "https://registry.npmjs.org/fecha/-/fecha-4.2.3.tgz", + "integrity": "sha512-OP2IUU6HeYKJi3i0z4A19kHMQoLVs4Hc+DPqqxI2h/DPZHTm/vjsfC6P0b4jCMy14XizLBqvndQ+UilD7707Jw==" + }, "node_modules/file-entry-cache": { "version": "6.0.1", "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.1.tgz", @@ -8978,6 +9050,11 @@ "integrity": "sha512-5nqDSxl8nn5BSNxyR3n4I6eDmbolI6WT+QqR547RwxQapgjQBmtktdP+HTBb/a/zLsbzERTONyUB5pefh5TtjQ==", "dev": true }, + "node_modules/fn.name": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/fn.name/-/fn.name-1.1.0.tgz", + "integrity": "sha512-GRnmB5gPyJpAhTQdSZTSp9uaPSvl09KoYcMQtsB9rQoOmzs9dH6ffeccH+Z+cv6P68Hu5bC6JjRh4Ah/mHSNRw==" + }, "node_modules/follow-redirects": { "version": "1.15.9", "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.9.tgz", @@ -9825,7 +9902,6 @@ "version": "2.0.1", "resolved": "https://registry.npmjs.org/is-stream/-/is-stream-2.0.1.tgz", "integrity": "sha512-hFoiJiTl63nn+kstHGBtewWSKnQLpyb155KHheA1l39uvtO9nWIop1p3udqPcUd/xbF1VLMO4n7OI6p7RbngDg==", - "dev": true, "engines": { "node": ">=8" }, @@ -10707,6 +10783,11 @@ "node": ">=6" } }, + "node_modules/kuler": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/kuler/-/kuler-2.0.0.tgz", + "integrity": "sha512-Xq9nH7KlWZmXAtodXDDRE7vs6DU1gTU8zYDHDiWLSip45Egwq3plLHzPn27NgvzL2r1LMPC1vdqh98sQxtqj4A==" + }, "node_modules/leven": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/leven/-/leven-3.1.0.tgz", @@ -10813,6 +10894,22 @@ "resolved": "https://registry.npmjs.org/lodash.once/-/lodash.once-4.1.1.tgz", "integrity": "sha512-Sb487aTOCr9drQVL8pIxOzVhafOjZN9UU54hiN8PU3uAiSV7lx1yYNpbNmex2PK6dSJoNTSJUUswT651yww3Mg==" }, + "node_modules/logform": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/logform/-/logform-2.7.0.tgz", + "integrity": "sha512-TFYA4jnP7PVbmlBIfhlSe+WKxs9dklXMTEGcBCIvLhE/Tn3H6Gk1norupVW7m5Cnd4bLcr08AytbyV/xj7f/kQ==", + "dependencies": { + "@colors/colors": "1.6.0", + "@types/triple-beam": "^1.3.2", + "fecha": "^4.2.0", + "ms": "^2.1.1", + "safe-stable-stringify": "^2.3.1", + "triple-beam": "^1.3.0" + }, + "engines": { + "node": ">= 12.0.0" + } + }, "node_modules/lru-cache": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-5.1.1.tgz", @@ -11230,6 +11327,14 @@ "wrappy": "1" } }, + "node_modules/one-time": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/one-time/-/one-time-1.0.0.tgz", + "integrity": "sha512-5DXOiRKwuSEcQ/l0kGCF6Q3jcADFv5tSmRaJck/OqkVFcOzutB134KRSfF0xDrL39MNnqxbHBbUUcjZIhTgb2g==", + "dependencies": { + "fn.name": "1.x.x" + } + }, "node_modules/onetime": { "version": "5.1.2", "resolved": "https://registry.npmjs.org/onetime/-/onetime-5.1.2.tgz", @@ -11747,6 +11852,19 @@ "integrity": "sha512-xWGDIW6x921xtzPkhiULtthJHoJvBbF3q26fzloPCK0hsvxtPVelvftw3zjbHWSkR2km9Z+4uxbDDK/6Zw9B8w==", "dev": true }, + "node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/readdirp": { "version": "3.6.0", "resolved": "https://registry.npmjs.org/readdirp/-/readdirp-3.6.0.tgz", @@ -11952,6 +12070,14 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/safe-stable-stringify": { + "version": "2.5.0", + "resolved": "https://registry.npmjs.org/safe-stable-stringify/-/safe-stable-stringify-2.5.0.tgz", + "integrity": "sha512-b3rppTKm9T+PsVCBEOUR46GWI7fdOs00VKZ1+9c1EWDaDMvjQc6tUwuFyIprgGgTcWoVHSKrU8H31ZHA2e0RHA==", + "engines": { + "node": ">=10" + } + }, "node_modules/safer-buffer": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", @@ -12187,6 +12313,19 @@ "integrity": "sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ==", "dev": true }, + "node_modules/simple-swizzle": { + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/simple-swizzle/-/simple-swizzle-0.2.2.tgz", + "integrity": "sha512-JA//kQgZtbuY83m+xT+tXJkmJncGMTFT+C+g2h2R9uxkYIrE2yy9sgmcLhCnw57/WSD+Eh3J97FPEDFnbXnDUg==", + "dependencies": { + "is-arrayish": "^0.3.1" + } + }, + "node_modules/simple-swizzle/node_modules/is-arrayish": { + "version": "0.3.2", + "resolved": "https://registry.npmjs.org/is-arrayish/-/is-arrayish-0.3.2.tgz", + "integrity": "sha512-eVRqCvVlZbuw3GrM63ovNSNAeA1K16kaR/LRY/92w0zxQ5/1YzwblUX652i4Xs9RwAGjW9d9y6X88t8OaAJfWQ==" + }, "node_modules/sisteransi": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/sisteransi/-/sisteransi-1.0.5.tgz", @@ -12225,6 +12364,14 @@ "integrity": "sha512-D9cPgkvLlV3t3IzL0D0YLvGA9Ahk4PcvVwUbN0dSGr1aP0Nrt4AEnTUbuGvquEC0mA64Gqt1fzirlRs5ibXx8g==", "dev": true }, + "node_modules/stack-trace": { + "version": "0.0.10", + "resolved": "https://registry.npmjs.org/stack-trace/-/stack-trace-0.0.10.tgz", + "integrity": "sha512-KGzahc7puUKkzyMt+IqAep+TVNbKP+k2Lmwhub39m1AsTSkaDutx56aDCo+HLDzf/D26BIHTJWNiTG1KAJiQCg==", + "engines": { + "node": "*" + } + }, "node_modules/stack-utils": { "version": "2.0.6", "resolved": "https://registry.npmjs.org/stack-utils/-/stack-utils-2.0.6.tgz", @@ -12255,6 +12402,14 @@ "node": ">= 0.8" } }, + "node_modules/string_decoder": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", + "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", + "dependencies": { + "safe-buffer": "~5.2.0" + } + }, "node_modules/string-length": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/string-length/-/string-length-4.0.2.tgz", @@ -12590,6 +12745,11 @@ "node": ">=8" } }, + "node_modules/text-hex": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/text-hex/-/text-hex-1.0.0.tgz", + "integrity": "sha512-uuVGNWzgJ4yhRaNSiubPY7OjISw4sw4E5Uv0wbjp+OzcbmVU/rsT8ujgcXJhn9ypzsgr5vlzpPqP+MBBKcGvbg==" + }, "node_modules/text-table": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/text-table/-/text-table-0.2.0.tgz", @@ -12642,6 +12802,14 @@ "tree-kill": "cli.js" } }, + "node_modules/triple-beam": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/triple-beam/-/triple-beam-1.4.1.tgz", + "integrity": "sha512-aZbgViZrg1QNcG+LULa7nhZpJTZSLm/mXnHXnbAbjmN5aSa0y7V+wvv6+4WaBtpISJzThKy+PIPxc1Nq1EJ9mg==", + "engines": { + "node": ">= 14.0.0" + } + }, "node_modules/truncate-utf8-bytes": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/truncate-utf8-bytes/-/truncate-utf8-bytes-1.0.2.tgz", @@ -13120,6 +13288,11 @@ "integrity": "sha512-Xn0w3MtiQ6zoz2vFyUVruaCL53O/DwUvkEeOvj+uulMm0BkUGYWmBYVyElqZaSLhY6ZD0ulfU3aBra2aVT4xfA==", "dev": true }, + "node_modules/util-deprecate": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", + "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==" + }, "node_modules/utils-merge": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/utils-merge/-/utils-merge-1.0.1.tgz", @@ -13415,6 +13588,40 @@ "integrity": "sha512-CC1bOL87PIWSBhDcTrdeLo6eGT7mCFtrg0uIJtqJUFyK+eJnzl8A1niH56uu7KMa5XFrtiV+AQuHO3n7DsHnLQ==", "dev": true }, + "node_modules/winston": { + "version": "3.17.0", + "resolved": "https://registry.npmjs.org/winston/-/winston-3.17.0.tgz", + "integrity": "sha512-DLiFIXYC5fMPxaRg832S6F5mJYvePtmO5G9v9IgUFPhXm9/GkXarH/TUrBAVzhTCzAj9anE/+GjrgXp/54nOgw==", + "dependencies": { + "@colors/colors": "^1.6.0", + "@dabh/diagnostics": "^2.0.2", + "async": "^3.2.3", + "is-stream": "^2.0.0", + "logform": "^2.7.0", + "one-time": "^1.0.0", + "readable-stream": "^3.4.0", + "safe-stable-stringify": "^2.3.1", + "stack-trace": "0.0.x", + "triple-beam": "^1.3.0", + "winston-transport": "^4.9.0" + }, + "engines": { + "node": ">= 12.0.0" + } + }, + "node_modules/winston-transport": { + "version": "4.9.0", + "resolved": "https://registry.npmjs.org/winston-transport/-/winston-transport-4.9.0.tgz", + "integrity": "sha512-8drMJ4rkgaPo1Me4zD/3WLfI/zPdA9o2IipKODunnGDcuqbHwjsbB79ylv04LCGGzU0xQ6vTznOMpQGaLhhm6A==", + "dependencies": { + "logform": "^2.7.0", + "readable-stream": "^3.6.2", + "triple-beam": "^1.3.0" + }, + "engines": { + "node": ">= 12.0.0" + } + }, "node_modules/word-wrap": { "version": "1.2.5", "resolved": "https://registry.npmjs.org/word-wrap/-/word-wrap-1.2.5.tgz", diff --git a/package.json b/package.json index f75259c..88f0a05 100644 --- a/package.json +++ b/package.json @@ -23,8 +23,8 @@ "format:write": "prettier --write \"src/**/*.{ts,js,json,html,scss,md}\" \"!**/node_modules/**\" \"!**/dist/**\"", "dev-manager": "env-cmd -f src/job-manager/.env ts-node src/job-manager/src/index.ts", "dev-worker": "env-cmd -f src/example-worker/.env ts-node src/example-worker/src/index.ts", - "start-manager": "ts-node src/job-manager/src/index.ts", - "start-worker": "ts-node src/example-worker/src/index.ts" + "start-manager": "ts-node -T src/job-manager/src/index.ts", + "start-worker": "ts-node -T src/example-worker/src/index.ts" }, "keywords": [], "author": "AIMS", @@ -54,6 +54,7 @@ "passport": "^0.7.0", "passport-jwt": "^4.0.1", "source-map-support": "^0.5.21", + "winston": "^3.17.0", "zod": "^3.23.8", "zod-express-middleware": "^1.4.0" }, diff --git a/src/api/services/jobs.ts b/src/api/services/jobs.ts index 642dd32..c30d911 100644 --- a/src/api/services/jobs.ts +++ b/src/api/services/jobs.ts @@ -35,17 +35,37 @@ type JobExpiryMap = { }; const sharedCriteriaSchema = z.object({ - // High level config + // High level config - common to all current scenarios region: z.string().describe('Region for assessment'), reef_type: z.string().describe('The type of reef, slopes or flats'), - // Criteria - depth_min: z.number().describe('The depth range (min)'), - depth_max: z.number().describe('The depth range (max)'), - slope_min: z.number().describe('The slope range (min)'), - slope_max: z.number().describe('The slope range (max)'), - rugosity_min: z.number().describe('The rugosity range (min)'), - rugosity_max: z.number().describe('The rugosity range (max)'), - threshold: z.number().describe('Suitability threshold integer (min)'), + + // Criteria - all optional to match the Union{Float64,Nothing} in worker + depth_min: z.number().optional().describe('The depth range (min)'), + depth_max: z.number().optional().describe('The depth range (max)'), + slope_min: z.number().optional().describe('The slope range (min)'), + slope_max: z.number().optional().describe('The slope range (max)'), + rugosity_min: z.number().optional().describe('The rugosity range (min)'), + rugosity_max: z.number().optional().describe('The rugosity range (max)'), + waves_period_min: z + .number() + .optional() + .describe('The wave period range (min)'), + waves_period_max: z + .number() + .optional() + .describe('The wave period range (max)'), + waves_height_min: z + .number() + .optional() + .describe('The wave height range (min)'), + waves_height_max: z + .number() + .optional() + .describe('The wave height range (max)'), + threshold: z + .number() + .optional() + .describe('Suitability threshold integer (min)'), }); /** diff --git a/src/infra/components/jobs.ts b/src/infra/components/jobs.ts index 014b9cf..db82b2a 100644 --- a/src/infra/components/jobs.ts +++ b/src/infra/components/jobs.ts @@ -25,8 +25,9 @@ export interface JobTypeConfig { // Scaling configuration desiredMinCapacity: number; desiredMaxCapacity: number; - scaleUpThreshold: number; cooldownSeconds: number; + scalingSensitivity: number; + scalingFactor: number; serverPort: number; command: string[]; @@ -300,6 +301,8 @@ export class JobSystem extends Construct { AWS_REGION: Stack.of(this).region, // Which vpc to deploy into VPC_ID: props.vpc.vpcId, + // Log level for manager + LOG_LEVEL: 'info', }, // pass in the manager creds secrets: { @@ -374,8 +377,10 @@ export class JobSystem extends Construct { workerConfig.desiredMinCapacity.toString(); taskDefEnvVars[`${jobType}_MAX_CAPACITY`] = workerConfig.desiredMaxCapacity.toString(); - taskDefEnvVars[`${jobType}_SCALE_THRESHOLD`] = - workerConfig.scaleUpThreshold.toString(); + taskDefEnvVars[`${jobType}_SENSITIVITY`] = + workerConfig.scalingSensitivity.toString(); + taskDefEnvVars[`${jobType}_FACTOR`] = + workerConfig.scalingFactor.toString(); taskDefEnvVars[`${jobType}_COOLDOWN`] = workerConfig.cooldownSeconds.toString(); taskDefEnvVars[`${jobType}_SECURITY_GROUP`] = workerSg.securityGroupId; diff --git a/src/infra/infra.ts b/src/infra/infra.ts index ca2bc5d..1321307 100644 --- a/src/infra/infra.ts +++ b/src/infra/infra.ts @@ -189,7 +189,8 @@ export class ReefguideWebApiStack extends cdk.Stack { command: ['using ReefGuideAPI; ReefGuideAPI.start_worker()'], desiredMinCapacity: 0, desiredMaxCapacity: 5, - scaleUpThreshold: 1, + scalingFactor: 3.3, + scalingSensitivity: 2.6, cooldownSeconds: 60, // This specifies where the config file path can be found for the diff --git a/src/job-manager/src/authClient.ts b/src/job-manager/src/authClient.ts index 0eb7358..7bd63e3 100644 --- a/src/job-manager/src/authClient.ts +++ b/src/job-manager/src/authClient.ts @@ -1,25 +1,52 @@ import axios, { AxiosInstance, AxiosRequestConfig } from 'axios'; import { jwtDecode } from 'jwt-decode'; +import { logger } from './logging'; +/** + * Interface for authentication credentials + */ interface Credentials { + /** User email for authentication */ email: string; + /** User password for authentication */ password: string; } +/** + * Interface for JWT authentication tokens + */ interface AuthTokens { + /** Access token for API authorization */ token: string; + /** Refresh token for obtaining new access tokens */ refreshToken?: string; } +/** + * Interface for decoded JWT payload structure + */ interface JWTPayload { + /** User ID */ id: string; + /** User email */ email: string; + /** User roles/permissions */ roles: string[]; + /** Token expiration timestamp */ exp: number; } -// Generic error type for API errors +/** + * Custom error class for API-related errors + * Includes status code and original response for better error handling + */ export class ApiError extends Error { + /** + * Creates a new API error + * @param message - Error message + * @param statusCode - HTTP status code + * @param response - Optional original response object + */ constructor( message: string, public statusCode: number, @@ -30,15 +57,21 @@ export class ApiError extends Error { } } +/** + * Client for authenticated API requests + * Handles login, token refresh, and authenticated HTTP requests + */ export class AuthApiClient { private axiosInstance: AxiosInstance; - private credentials: Credentials; - private tokens: AuthTokens | null = null; - private readonly TOKEN_REFRESH_THRESHOLD = 60; // 1 minute in seconds + /** + * Creates a new authenticated API client + * @param baseURL - Base URL for the API + * @param credentials - Authentication credentials + */ constructor(baseURL: string, credentials: Credentials) { this.credentials = credentials; this.axiosInstance = axios.create({ @@ -48,6 +81,8 @@ export class AuthApiClient { }, }); + logger.debug('AuthApiClient initialized', { baseURL }); + // Add request interceptor to handle token management this.axiosInstance.interceptors.request.use( async config => { @@ -70,8 +105,14 @@ export class AuthApiClient { ); } + /** + * Gets a valid token, refreshing or logging in if necessary + * @returns A valid JWT token or null if authentication failed + * @private + */ private async getValidToken(): Promise { if (!this.tokens?.token) { + logger.debug('No token available, initiating login'); await this.login(); return this.tokens?.token || null; } @@ -80,29 +121,43 @@ export class AuthApiClient { const expiresIn = decodedToken.exp - Math.floor(Date.now() / 1000); if (expiresIn <= this.TOKEN_REFRESH_THRESHOLD) { + logger.debug(`Token expires in ${expiresIn}s, refreshing`); await this.refreshToken(); } return this.tokens?.token || null; } + /** + * Authenticates with the API using provided credentials + * @throws Error if login fails + * @private + */ private async login(): Promise { try { + logger.info('Logging in to API'); const response = await this.axiosInstance.post( '/auth/login', this.credentials, ); this.tokens = response.data; + logger.debug('Login successful, token received'); } catch (error) { + logger.error('Failed to login', { error }); throw new Error('Failed to login'); } } + /** + * Refreshes the access token using the refresh token + * Falls back to login if refresh fails + * @private + */ private async refreshToken(): Promise { - console.log('Token refresh started at:', new Date().toISOString()); + logger.info('Token refresh started at:', new Date().toISOString()); try { if (!this.tokens?.refreshToken) { - console.log('No refresh token, logging in...'); + logger.warn('No refresh token available, falling back to login'); await this.login(); return; } @@ -115,61 +170,102 @@ export class AuthApiClient { ); if (response.status !== 200) { - console.log( - 'Non 200 response from refresh token endpoint.', - response.status, + logger.warn('Non 200 response from refresh token endpoint', { + status: response.status, + }); + throw new Error( + `Non 200 response from refresh token. Code: ${response.status}.`, ); - throw new Error('Non 200 response from refresh token.'); } this.tokens = { ...this.tokens, token: response.data.token, }; + logger.debug('Token refreshed successfully'); } catch (error) { - console.log('Error caught during refresh'); + logger.error('Error during token refresh, falling back to login', { + error, + }); // If refresh fails, try logging in again this.tokens = null; // awaiting login await this.login(); } - console.log('Token refresh completed at:', new Date().toISOString()); + logger.info('Token refresh completed at:', new Date().toISOString()); } - // Base HTTP methods with proper typing + /** + * Performs a GET request to the API + * @param url - Endpoint URL (relative to base URL) + * @param config - Optional Axios request configuration + * @returns Response data + */ public async get(url: string, config?: AxiosRequestConfig): Promise { + logger.debug('GET request', { url }); const response = await this.axiosInstance.get(url, config); return response.data; } + /** + * Performs a POST request to the API + * @param url - Endpoint URL (relative to base URL) + * @param data - Request body data + * @param config - Optional Axios request configuration + * @returns Response data + */ public async post( url: string, data?: any, config?: AxiosRequestConfig, ): Promise { + logger.debug('POST request', { url }); const response = await this.axiosInstance.post(url, data, config); return response.data; } + /** + * Performs a PUT request to the API + * @param url - Endpoint URL (relative to base URL) + * @param data - Request body data + * @param config - Optional Axios request configuration + * @returns Response data + */ public async put( url: string, data?: any, config?: AxiosRequestConfig, ): Promise { + logger.debug('PUT request', { url }); const response = await this.axiosInstance.put(url, data, config); return response.data; } + /** + * Performs a PATCH request to the API + * @param url - Endpoint URL (relative to base URL) + * @param data - Request body data + * @param config - Optional Axios request configuration + * @returns Response data + */ public async patch( url: string, data?: any, config?: AxiosRequestConfig, ): Promise { + logger.debug('PATCH request', { url }); const response = await this.axiosInstance.patch(url, data, config); return response.data; } + /** + * Performs a DELETE request to the API + * @param url - Endpoint URL (relative to base URL) + * @param config - Optional Axios request configuration + * @returns Response data + */ public async delete(url: string, config?: AxiosRequestConfig): Promise { + logger.debug('DELETE request', { url }); const response = await this.axiosInstance.delete(url, config); return response.data; } diff --git a/src/job-manager/src/config.ts b/src/job-manager/src/config.ts index db77b83..d38ae94 100644 --- a/src/job-manager/src/config.ts +++ b/src/job-manager/src/config.ts @@ -1,27 +1,100 @@ import { JobType } from '@prisma/client'; import { z } from 'zod'; +import { logger } from './logging'; -// Configuration schema for job types and their corresponding ECS resources -export const JobTypeConfigSchema = z.object({ +/** + * Helper function to create a number validator that also accepts string inputs + * Ensures values meet minimum requirements and handles type conversion + * + * @param min - Minimum allowed value (null for no minimum) + * @param errorMessage - Error message for invalid numbers + * @param minErrorMessage - Error message for values below minimum + * @returns A Zod validator that accepts both numbers and strings + */ +const createNumberValidator = ( + min: number | null = null, + errorMessage: string = 'Value must be a valid number', + minErrorMessage: string = `Value must be at least ${min}`, +) => { + return z.union([ + min !== null ? z.number().min(min, minErrorMessage) : z.number(), + z.string().transform((val, ctx) => { + const parsed = Number(val); + if (isNaN(parsed)) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: errorMessage, + }); + return z.NEVER; + } + if (min !== null && parsed < min) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: minErrorMessage, + }); + return z.NEVER; + } + return parsed; + }), + ]); +}; + +/** + * Schema for scaling configuration + * Defines how the capacity manager should scale resources + */ +export const ScalingConfiguration = z.object({ + min: createNumberValidator( + 0, + 'Minimum capacity must be a valid number', + 'Minimum capacity must be non-negative', + ), + max: createNumberValidator( + 0, + 'Maximum capacity must be a valid number', + 'Maximum capacity must be non-negative', + ), + sensitivity: createNumberValidator( + 0, + 'Sensitivity must be a valid number', + 'Logarithmic sensitivity must be non-negative', + ), + factor: createNumberValidator( + 1, + 'Factor must be a valid number', + 'Division factor for jobs - this allows you to consider different job count scales. Must be > 1.', + ), + cooldownSeconds: createNumberValidator( + 0, + 'Cooldown seconds must be a valid number', + 'Cooldown seconds must be non-negative', + ), +}); + +/** + * Configuration schema for job types and their corresponding ECS resources + * Defines the AWS resources and scaling parameters for each job type + */ +export const RawJobTypeConfigSchema = z.object({ taskDefinitionArn: z.string().min(1, 'Task Definition ARN is required'), clusterArn: z.string().min(1, 'Cluster ARN is required'), - desiredMinCapacity: z - .number() - .min(0, 'Minimum capacity must be non-negative'), - desiredMaxCapacity: z - .number() - .min(0, 'Maximum capacity must be non-negative'), - scaleUpThreshold: z.number().min(1, 'Scale-up threshold must be at least 1'), - cooldownSeconds: z.number().min(0, 'Cooldown seconds must be non-negative'), + scaling: ScalingConfiguration, // Security group ARN for this task securityGroup: z.string().min(1, 'Security group ARN is required'), }); -export type JobTypeConfig = z.infer; +export type RawJobTypeConfig = z.infer; -// Base configuration schema (not job-type specific) +/** + * Base configuration schema for environment variables + * Validates core application settings from environment + */ export const BaseEnvConfigSchema = z.object({ - POLL_INTERVAL_MS: z.string().transform(val => parseInt(val)), + POLL_INTERVAL_MS: createNumberValidator( + 500, + 'Poll interval expects valid number', + 'Minimum poll interval is 500(ms)', + ), API_ENDPOINT: z.string().url('API endpoint must be a valid URL'), AWS_REGION: z.string().min(1, 'AWS region is required'), API_USERNAME: z.string().min(1, 'API username is required'), @@ -29,20 +102,20 @@ export const BaseEnvConfigSchema = z.object({ VPC_ID: z.string().min(1, 'VPC ID is required'), }); -// Job type specific environment variable fields that will be expected for each job type -const JOB_TYPE_ENV_FIELDS = { - TASK_DEF: 'taskDefinitionArn', - CLUSTER: 'clusterArn', - MIN_CAPACITY: 'desiredMinCapacity', - MAX_CAPACITY: 'desiredMaxCapacity', - SCALE_THRESHOLD: 'scaleUpThreshold', - COOLDOWN: 'cooldownSeconds', - SECURITY_GROUP: 'securityGroup', -}; - -// Final configuration schema structure +export const JobTypeConfigSchema = RawJobTypeConfigSchema.extend({ + jobTypes: z.array(z.nativeEnum(JobType)), +}); +export type JobTypeConfig = z.infer; +/** + * Final configuration schema structure + * Combines all configuration elements into a complete application config + */ export const ConfigSchema = z.object({ - pollIntervalMs: z.number().min(1000, 'Poll interval must be at least 1000ms'), + pollIntervalMs: createNumberValidator( + 500, + 'Poll interval expects valid number', + 'Minimum poll interval is 500(ms)', + ), apiEndpoint: z.string().url('API endpoint must be a valid URL'), region: z.string().min(1, 'AWS region is required'), jobTypes: z.record(z.nativeEnum(JobType), JobTypeConfigSchema), @@ -55,82 +128,43 @@ export const ConfigSchema = z.object({ export type Config = z.infer; -/** - * Creates a Zod schema for all environment variables based on available job types - * Dynamically generates validation for each job type's environment variables - * - * @returns Zod schema for environment variables - */ -export function createEnvVarsSchema(): z.ZodObject { - // Start with the base config schema - let envSchema: Record = { ...BaseEnvConfigSchema.shape }; - - // For each job type in the enum, add its specific environment variables - Object.values(JobType).forEach(jobType => { - const typePrefix = jobType.toString(); - - // Add each field for this job type - Object.entries(JOB_TYPE_ENV_FIELDS).forEach(([envSuffix, configField]) => { - const envVarName = `${typePrefix}_${envSuffix}`; - - // Handle numeric fields with transformation - if ( - [ - 'MIN_CAPACITY', - 'MAX_CAPACITY', - 'SCALE_THRESHOLD', - 'COOLDOWN', - ].includes(envSuffix) - ) { - envSchema[envVarName] = z - .string() - .transform(val => { - const parsed = parseInt(val); - if (isNaN(parsed)) { - throw new Error(`${envVarName} must be a valid number`); - } - return parsed; - }) - .describe(`${configField} for ${typePrefix} job type`); - } else { - // String fields - envSchema[envVarName] = z - .string() - .min(1, `${envVarName} is required for ${typePrefix} job type`) - .describe(`${configField} for ${typePrefix} job type`); - } - }); - }); - - // Return the complete environment variables schema - return z.object(envSchema); -} - /** * Builds job type configuration from environment variables + * Extracts and validates settings for a specific job type * - * @param env Validated environment variables - * @param jobType The job type to build configuration for + * @param env - Validated environment variables + * @param jobType - The job type to build configuration for * @returns Configuration for the specified job type */ function buildJobTypeConfig( - env: Record, + env: Record, jobType: string, -): JobTypeConfig { +): RawJobTypeConfig { + logger.debug(`Building config for job type: ${jobType}`); + + const optimisticParse = { + taskDefinitionArn: env[`${jobType}_TASK_DEF`] as string, + clusterArn: env[`${jobType}_CLUSTER`] as string, + scaling: { + min: env[`${jobType}_MIN_CAPACITY`] as number, + max: env[`${jobType}_MAX_CAPACITY`] as number, + cooldownSeconds: env[`${jobType}_COOLDOWN`] as number, + sensitivity: env[`${jobType}_SENSITIVITY`] as number, + factor: env[`${jobType}_FACTOR`] as number, + }, + securityGroup: env[`${jobType}_SECURITY_GROUP`] as string, + } satisfies RawJobTypeConfig; + try { - return { - taskDefinitionArn: env[`${jobType}_TASK_DEF`], - clusterArn: env[`${jobType}_CLUSTER`], - desiredMinCapacity: env[`${jobType}_MIN_CAPACITY`], - desiredMaxCapacity: env[`${jobType}_MAX_CAPACITY`], - scaleUpThreshold: env[`${jobType}_SCALE_THRESHOLD`], - cooldownSeconds: env[`${jobType}_COOLDOWN`], - securityGroup: env[`${jobType}_SECURITY_GROUP`], - }; - } catch (error) { - throw new Error( - `Failed to build configuration for job type ${jobType}: ${error}`, + const validatedConfig = RawJobTypeConfigSchema.parse(optimisticParse); + logger.debug(`Validated config for job type: ${jobType}`); + return validatedConfig; + } catch (e) { + logger.error( + `Job type ${jobType} did not have valid environment variables`, + { error: e }, ); + throw e; } } @@ -142,45 +176,71 @@ function buildJobTypeConfig( * @throws Error if configuration validation fails */ export function loadConfig(): Config { - try { - // Create the environment schema based on available job types - const EnvVarsSchema = createEnvVarsSchema(); - - // Validate all required environment variables - const env = EnvVarsSchema.parse(process.env); + logger.info('Loading application configuration'); + try { + // Force types here as we zod process everything! + const env = process.env as Record; // Initialize job types configuration object - const jobTypesConfig: Record = {}; + const jobTypesConfig: Record< + string, + RawJobTypeConfig & { jobTypes?: JobType[] } + > = {}; // Build configuration for each job type Object.values(JobType).forEach(jobType => { const typeString = jobType.toString(); + logger.debug(`Processing job type: ${typeString}`); jobTypesConfig[typeString] = buildJobTypeConfig(env, typeString); }); + // Group the job types by task ARN + logger.debug('Grouping job types by task ARN'); + const arnToTypes: Map = new Map(); + for (const [jobType, config] of Object.entries(jobTypesConfig)) { + arnToTypes.set( + config.taskDefinitionArn, + (arnToTypes.get(config.taskDefinitionArn) ?? []).concat([ + jobType as JobType, + ]), + ); + } + + // Update with grouped types + for (let config of Object.values(jobTypesConfig)) { + config.jobTypes = arnToTypes.get(config.taskDefinitionArn); + } + // Construct the complete config object with validated values const config: Config = { - pollIntervalMs: env.POLL_INTERVAL_MS, - apiEndpoint: env.API_ENDPOINT, - region: env.AWS_REGION, + pollIntervalMs: env.POLL_INTERVAL_MS as number, + apiEndpoint: env.API_ENDPOINT as string, + region: env.AWS_REGION as string, jobTypes: jobTypesConfig, auth: { - email: env.API_USERNAME, - password: env.API_PASSWORD, + email: env.API_USERNAME as string, + password: env.API_PASSWORD as string, }, - vpcId: env.VPC_ID, + vpcId: env.VPC_ID as string, }; // Validate the entire config object - return ConfigSchema.parse(config); + logger.debug('Validating complete configuration'); + const validatedConfig = ConfigSchema.parse(config); + logger.info('Configuration successfully loaded and validated'); + return validatedConfig; } catch (error) { if (error instanceof z.ZodError) { // Format Zod validation errors for better readability const formattedErrors = error.errors .map(err => `${err.path.join('.')}: ${err.message}`) .join('\n'); + logger.error('Configuration validation failed', { + errors: formattedErrors, + }); throw new Error(`Configuration validation failed:\n${formattedErrors}`); } + logger.error('Failed to load configuration', { error }); throw new Error(`Failed to load configuration: ${error}`); } } diff --git a/src/job-manager/src/index.ts b/src/job-manager/src/index.ts index b11a0a2..1975de2 100755 --- a/src/job-manager/src/index.ts +++ b/src/job-manager/src/index.ts @@ -3,31 +3,45 @@ import { z } from 'zod'; import { Config, loadConfig } from './config'; import { CapacityManager } from './manager'; import { AuthApiClient } from './authClient'; +import { logger } from './logging'; + +/** + * Main entry point for the Capacity Manager service + * Sets up the health check endpoint, loads configuration, + * and initializes the capacity manager. + */ // Create and start the express app for health checks const app = express(); const port = process.env.PORT || 3000; +/** + * Health check endpoint + * Returns 200 OK to indicate the service is running + */ app.get('/health', (req, res) => { - console.log('Health check, returning 200 OK'); + logger.debug('Health check requested'); res.status(200).send('OK'); }); let config: Config; try { + // Load and validate configuration from environment variables config = loadConfig(); - console.log('Configuration loaded successfully'); + logger.info('Configuration loaded successfully'); } catch (error) { if (error instanceof z.ZodError) { - console.error('Configuration validation failed:', error.errors); + logger.error('Configuration validation failed:', { errors: error.errors }); } else { - console.error('Failed to load configuration:', error); + logger.error('Failed to load configuration:', { error }); } + // Exit with error code if configuration cannot be loaded process.exit(1); } // Create API client (base should include /api) +logger.info('Initializing API client'); const client = new AuthApiClient(config.apiEndpoint + '/api', { email: config.auth.email, password: config.auth.password, @@ -35,20 +49,43 @@ const client = new AuthApiClient(config.apiEndpoint + '/api', { // Start the express server app.listen(port, () => { - console.log(`Health check server listening on port ${port}`); + logger.info(`Health check server listening on port ${port}`); }); // Start the capacity manager +logger.info('Initializing capacity manager'); const manager = new CapacityManager(config, client); +logger.info('Starting capacity manager'); manager.start(); -// graceful shutdown handlers +/** + * Handles graceful shutdown on SIGTERM + * Stops the capacity manager before process exit + */ process.on('SIGTERM', () => { - console.log('Received SIGTERM signal, shutting down...'); + logger.info('Received SIGTERM signal, shutting down...'); manager.stop(); }); +/** + * Handles graceful shutdown on SIGINT (Ctrl+C) + * Stops the capacity manager before process exit + */ process.on('SIGINT', () => { - console.log('Received SIGINT signal, shutting down...'); + logger.info('Received SIGINT signal, shutting down...'); manager.stop(); }); + +// Additional error handling for uncaught exceptions +process.on('uncaughtException', error => { + logger.error('Uncaught exception, shutting down:', { error }); + manager.stop(); + process.exit(1); +}); + +// Additional error handling for unhandled promise rejections +process.on('unhandledRejection', reason => { + logger.error('Unhandled rejection, shutting down:', { reason }); + manager.stop(); + process.exit(1); +}); diff --git a/src/job-manager/src/logging.ts b/src/job-manager/src/logging.ts new file mode 100644 index 0000000..3d12a04 --- /dev/null +++ b/src/job-manager/src/logging.ts @@ -0,0 +1,46 @@ +import winston from 'winston'; + +/** + * Winston logger configuration + * + * This logger provides structured logging with timestamps and log levels. + * The log level can be configured via the LOG_LEVEL environment variable. + * + * Available log levels (in order of verbosity): + * - error: 0 (least verbose, only errors) + * - warn: 1 (errors and warnings) + * - info: 2 (errors, warnings, and info - default) + * - http: 3 (errors, warnings, info, and HTTP requests) + * - verbose: 4 (all above plus verbose details) + * - debug: 5 (all above plus debug information) + * - silly: 6 (most verbose level) + * + * Setting LOG_LEVEL to a specific level will include all logs at that level + * and below (less verbose). For example, setting LOG_LEVEL=warn will include + * error and warn logs, but not info, http, etc. + */ + +/** + * Creates a configured winston logger instance + * Format includes timestamp and structured JSON for metadata + */ +export const logger = winston.createLogger({ + // Get log level from environment variable or default to 'info' + level: (process.env.LOG_LEVEL || 'info').toLowerCase(), + + // Define log format with timestamp and metadata + format: winston.format.combine( + // Add timestamp to all log entries + winston.format.timestamp(), + + // Custom formatter to include metadata as JSON + winston.format.printf(({ level, message, timestamp, ...metadata }) => { + return `[${timestamp}] [${level.toUpperCase()}] ${message} ${ + Object.keys(metadata).length ? JSON.stringify(metadata) : '' + }`; + }), + ), + + // Define where logs are sent - console for basic setup + transports: [new winston.transports.Console()], +}); diff --git a/src/job-manager/src/manager.ts b/src/job-manager/src/manager.ts index 3f1cc62..90f17bc 100644 --- a/src/job-manager/src/manager.ts +++ b/src/job-manager/src/manager.ts @@ -1,61 +1,107 @@ -import { AssignPublicIp, ECSClient, RunTaskCommand } from '@aws-sdk/client-ecs'; +import { + AssignPublicIp, + ECSClient, + RunTaskCommand, + DescribeTasksCommand, + Task, +} from '@aws-sdk/client-ecs'; import { EC2Client, DescribeSubnetsCommand } from '@aws-sdk/client-ec2'; import { Config, ConfigSchema, JobTypeConfig } from './config'; import { AuthApiClient } from './authClient'; import { JobType } from '@prisma/client'; +import { PollJobsResponse } from '../../api/jobs/routes'; +import { logger } from './logging'; + +/** + * Interface for tracking worker status + * Contains details for a worker which is pending or running + */ +interface TrackedWorker { + /** Unique ARN for the ECS task */ + taskArn: string; + /** Task definition ARN for the ECS task */ + taskDefinitionArn: string; + /** The cluster ARN where the task is running */ + clusterArn: string; + /** When the worker was started */ + startTime: Date; + /** Types of jobs this worker can handle */ + jobTypes: JobType[]; + /** Current status of the worker */ + status: 'PENDING' | 'RUNNING' | 'STOPPED'; +} +/** + * CapacityManager handles the automatic scaling of ECS tasks based on job queue demand. + * It tracks workers, polls for pending jobs, and adjusts the number of workers + * to efficiently process the jobs while respecting scaling constraints. + */ export class CapacityManager { private config: Config; - private ecsClient: ECSClient; - private ec2Client: EC2Client; - + // Tracks the last scaled time for a given task definition ARN private lastScaleTime: Record = {}; - private client: AuthApiClient; - private isRunning: boolean = false; - private pollTimeout: NodeJS.Timeout | null = null; + // Tracking data for workers + private trackedWorkers: TrackedWorker[] = []; + + /** + * Creates a new CapacityManager + * @param config - Configuration for the capacity manager + * @param client - Authentication client for API requests + */ constructor(config: Config, client: AuthApiClient) { this.config = ConfigSchema.parse(config); this.ecsClient = new ECSClient({ region: this.config.region }); this.ec2Client = new EC2Client({ region: this.config.region }); this.client = client; + logger.debug('CapacityManager initialized', { region: this.config.region }); } + /** + * Polls the job queue and adjusts capacity as needed + * @private + */ private async pollJobQueue() { if (!this.isRunning) return; const used = process.memoryUsage(); - console.log('Memory usage:', { + logger.debug('Memory usage', { heapTotal: `${Math.round(used.heapTotal / 1024 / 1024)} MB`, heapUsed: `${Math.round(used.heapUsed / 1024 / 1024)} MB`, rss: `${Math.round(used.rss / 1024 / 1024)} MB`, }); try { - console.log('Poll started at:', new Date().toISOString()); + logger.info('Poll started', { timestamp: new Date().toISOString() }); + logger.debug('Current tracked workers status', { + count: this.trackedWorkers.length, + }); - const response = await this.client.get<{ jobs: any[] }>('/jobs/poll'); + // Update worker statuses + await this.updateWorkerStatuses(); - const jobsByType = response.jobs.reduce( - (acc: Record, job: any) => { - acc[job.type] = (acc[job.type] || 0) + 1; - return acc; - }, - {}, - ); + // Get jobs with their IDs + logger.debug('Fetching pending jobs from API'); + const response = await this.client.get('/jobs/poll'); + logger.debug('Received job poll response', { + jobCount: response.jobs.length, + jobTypes: response.jobs.map(j => j.type), + }); - console.log('Jobs by type:', jobsByType); - await this.adjustCapacity(jobsByType); + await this.adjustCapacity({ pollResponse: response.jobs }); } catch (error) { - console.error('Error polling job queue:', error); + logger.error('Error polling job queue', { error }); } finally { // Only schedule next poll if still running if (this.isRunning) { + logger.debug('Scheduling next poll', { + ms: this.config.pollIntervalMs, + }); this.pollTimeout = setTimeout( () => this.pollJobQueue(), this.config.pollIntervalMs, @@ -64,27 +110,568 @@ export class CapacityManager { } } + /** + * Updates the status of all tracked workers by querying ECS + * @private + */ + private async updateWorkerStatuses() { + if (this.trackedWorkers.length === 0) { + logger.debug('No workers to update'); + return; + } + + try { + // Build set of distinct worker types from the tracked workers + const clusterArns = new Set(this.trackedWorkers.map(w => w.clusterArn)); + logger.debug('Updating worker statuses', { + workerCount: this.trackedWorkers.length, + clusterCount: clusterArns.size, + }); + + // Create a Set to track which task ARNs were found in the API response + const foundTaskArns = new Set(); + + // Now loop through each worker type and figure out the cluster ARN/task ARNs + for (const clusterArn of clusterArns) { + const relevantWorkers = this.trackedWorkers.filter( + w => w.clusterArn === clusterArn, + ); + // Which task ARNs to fetch + const taskArns = relevantWorkers.map(w => w.taskArn); + + logger.debug('Checking tasks in cluster', { + clusterArn, + taskCount: taskArns.length, + }); + + // Split into chunks if there are many tasks (ECS API has limits) + const chunkSize = 100; + for (let i = 0; i < taskArns.length; i += chunkSize) { + const chunk = taskArns.slice(i, i + chunkSize); + logger.debug('Processing task chunk', { + chunkSize: chunk.length, + startIndex: i, + }); + + const command = new DescribeTasksCommand({ + cluster: clusterArn, + tasks: chunk, + }); + + const response = await this.ecsClient.send(command); + + if (response.tasks) { + logger.debug('Received task details', { + requestedCount: chunk.length, + receivedCount: response.tasks.length, + }); + + // Add all found task ARNs to our tracking set + response.tasks.forEach(task => { + if (task.taskArn) { + foundTaskArns.add(task.taskArn); + } + }); + + this.updateWorkerStatusesFromTasks(response.tasks); + } + + // Check if any tasks weren't found but were requested + // AWS ECS API returns info in response.failures for tasks that weren't found + if (response.failures && response.failures.length > 0) { + logger.warn('Some tasks were not found', { + failureCount: response.failures.length, + }); + + response.failures.forEach(failure => { + if (failure.arn && failure.reason === 'MISSING') { + logger.info( + 'Task not found in ECS, removing from tracked workers', + { + taskArn: failure.arn, + }, + ); + // We explicitly don't add this to foundTaskArns since it's missing + } + }); + } + } + } + + // Remove workers that weren't found in the API response + const previousCount = this.trackedWorkers.length; + this.trackedWorkers = this.trackedWorkers.filter(worker => { + // Keep workers that were found in the API response + return foundTaskArns.has(worker.taskArn); + }); + + const removedCount = previousCount - this.trackedWorkers.length; + if (removedCount > 0) { + logger.info('Removed workers not found in ECS', { + count: removedCount, + }); + } + + logger.debug('Worker status update complete', { + originalCount: previousCount, + currentCount: this.trackedWorkers.length, + removed: removedCount, + }); + } catch (error) { + logger.error('Error updating worker statuses', { error }); + } + } + + /** + * Updates worker statuses based on ECS task information + * @param tasks - Task information from ECS + * @private + */ + private updateWorkerStatusesFromTasks(tasks: Task[]) { + logger.debug('Updating worker statuses from tasks', { + taskCount: tasks.length, + }); + let statusChanges = 0; + + for (const task of tasks) { + if (!task.taskArn) { + logger.warn('Task without ARN found in response'); + continue; + } + + const worker = this.trackedWorkers.find(w => w.taskArn === task.taskArn); + if (!worker) { + logger.debug('Task not in tracked workers', { taskArn: task.taskArn }); + continue; + } + + const lastStatus = task.lastStatus || ''; + let newStatus: 'PENDING' | 'RUNNING' | 'STOPPED'; + + // Map AWS ECS task statuses to our internal tracking statuses + if (['PROVISIONING', 'PENDING', 'ACTIVATING'].includes(lastStatus)) { + newStatus = 'PENDING'; + } else if (lastStatus === 'RUNNING') { + newStatus = 'RUNNING'; + } else if ( + [ + 'DEACTIVATING', + 'STOPPING', + 'STOPPED', + 'DEPROVISIONING', + 'DEPROVISIONED', + ].includes(lastStatus) + ) { + newStatus = 'STOPPED'; + } else { + // For any unexpected status, log it but don't change worker status + logger.warn('Worker has unknown status', { + taskArn: task.taskArn, + status: lastStatus, + }); + continue; + } + + // Only log if status changed + if (worker.status !== newStatus) { + logger.info('Worker status changed', { + taskArn: task.taskArn, + oldStatus: worker.status, + newStatus: newStatus, + }); + worker.status = newStatus; + statusChanges++; + } + } + + // Count workers by status before cleanup + const workerStatusCounts = { + PENDING: this.trackedWorkers.filter(w => w.status === 'PENDING').length, + RUNNING: this.trackedWorkers.filter(w => w.status === 'RUNNING').length, + STOPPED: this.trackedWorkers.filter(w => w.status === 'STOPPED').length, + }; + + logger.debug('Worker status counts before cleanup', workerStatusCounts); + + // Remove stopped workers after updating + const beforeCleanup = this.trackedWorkers.length; + this.trackedWorkers = this.trackedWorkers.filter( + worker => worker.status !== 'STOPPED', + ); + const cleanupRemoved = beforeCleanup - this.trackedWorkers.length; + + if (cleanupRemoved > 0) { + logger.info('Removed stopped workers from tracking', { + count: cleanupRemoved, + }); + } + + logger.debug('Worker status update summary', { + statusChanges, + stoppedWorkersRemoved: cleanupRemoved, + remainingWorkers: this.trackedWorkers.length, + }); + } + + /** + * Adjust capacity for each task definition based on pending jobs + * @param pollResponse - Response from the job queue poll + * @private + */ + private async adjustCapacity({ + pollResponse, + }: { + pollResponse: PollJobsResponse['jobs']; + }): Promise { + logger.debug('Adjusting capacity based on poll response', { + jobCount: pollResponse.length, + }); + + // Count pending jobs by task definition + const pendingByDfnArn: Record = pollResponse.reduce< + Record + >( + (current, acc) => { + const arn = this.config.jobTypes[acc.type]?.taskDefinitionArn; + if (!arn) { + logger.warn('Missing config definition for task type.', { + jobType: acc.type, + }); + return current; + } + current[arn] = current[arn] ? current[arn] + 1 : 1; + return current; + }, + {} as Record, + ); + + // Determine how many workers are already tracked for each type of job + const workersByDfnArn: Record = this.trackedWorkers.reduce< + Record + >( + (current, acc) => { + const arn = acc.taskDefinitionArn; + current[arn] = current[arn] ? current[arn] + 1 : 1; + return current; + }, + {} as Record, + ); + + logger.debug('Job distribution', { + pendingByType: pendingByDfnArn, + workersByType: workersByDfnArn, + }); + + for (const taskDefArn of Object.keys(pendingByDfnArn)) { + const taskConfig = Object.values(this.config.jobTypes).find( + c => c.taskDefinitionArn === taskDefArn, + ); + if (!taskConfig) { + logger.warn( + 'No configuration found for job with task definition arn needed', + { taskDefArn }, + ); + continue; + } + + const pending: number = pendingByDfnArn[taskDefArn] ?? 0; + const workers: number = workersByDfnArn[taskDefArn] ?? 0; + + logger.debug('Considering capacity adjustment', { + taskDefinitionArn: taskDefArn, + pendingJobs: pending, + currentWorkers: workers, + }); + + await this.adjustCapacityForTask({ + jobTypes: taskConfig.jobTypes, + pending, + workers, + config: taskConfig, + }); + } + } + + /** + * Launches n tasks of the specified type/config + * @param count - Number of tasks to launch + * @param jobType - Type of job the tasks will handle + * @param config - Configuration for the job type + * @private + */ + private async launchTask({ + count = 1, + config, + }: { + count?: number; + config: JobTypeConfig; + }) { + try { + logger.info('Attempting to launch tasks', { + count, + arn: config.taskDefinitionArn, + }); + let done = 0; + let failures = 0; + + while (done < count) { + const now = Date.now(); + + // Get a random public subnet for this task + logger.debug('Getting random public subnet', { + vpcId: this.config.vpcId, + }); + const subnet = await this.getRandomPublicSubnet(this.config.vpcId); + + logger.debug('Constructing RunTaskCommand', { + cluster: config.clusterArn, + taskDef: config.taskDefinitionArn, + subnet, + }); + + const command = new RunTaskCommand({ + cluster: config.clusterArn, + taskDefinition: config.taskDefinitionArn, + launchType: 'FARGATE', + count: 1, + networkConfiguration: { + awsvpcConfiguration: { + subnets: [subnet], + securityGroups: [config.securityGroup], + assignPublicIp: AssignPublicIp.ENABLED, + }, + }, + }); + + const result = await this.ecsClient.send(command); + + // If task was created successfully, track it + if ( + result.tasks && + result.tasks.length > 0 && + result.tasks[0].taskArn + ) { + this.lastScaleTime[config.taskDefinitionArn] = now; + this.trackedWorkers.push({ + clusterArn: config.clusterArn, + taskArn: result.tasks[0].taskArn, + startTime: new Date(), + jobTypes: config.jobTypes, + taskDefinitionArn: config.taskDefinitionArn, + status: 'PENDING', + }); + + logger.info('Started new task', { + taskArn: result.tasks[0].taskArn, + }); + done += 1; + } else { + failures += 1; + logger.error('Failed to launch task', { + result, + }); + } + } + + logger.debug('Task launch summary', { + requested: count, + launched: done, + failures, + }); + } catch (e) { + logger.error('Failed to launch task(s)', { + error: e, + }); + } + } + + /** + * Computes the optimal number of workers to handle pending jobs using a logarithmic scale. + * + * This function uses a logarithmic relationship between pending jobs and worker count, + * which provides diminishing returns as the number of jobs increases - appropriate for + * many distributed processing scenarios. + * + * @param pendingJobs - The number of jobs waiting to be processed + * @param sensitivity - Controls how aggressively to scale workers (higher = more workers) + * Recommended range: 1.0 (conservative) to 3.0 (aggressive) + * @param minWorkers - Minimum number of workers to maintain regardless of job count + * @param maxWorkers - Maximum number of workers allowed regardless of job count + * @param baseJobCount - Reference job count that maps to roughly 1×sensitivity workers + * (helps calibrate the scale for your specific workload) + * @returns The target number of workers as an integer + * @private + */ + private computeOptimalWorkers({ + pendingJobs, + sensitivity, + minWorkers, + maxWorkers, + baseJobCount, + }: { + pendingJobs: number; + sensitivity: number; + minWorkers: number; + maxWorkers: number; + baseJobCount: number; + }): number { + // Handle edge cases + if (pendingJobs <= 0) { + logger.debug('No pending jobs, using minWorkers', { minWorkers }); + return minWorkers; + } + + // Compute workers using logarithmic scaling + // The formula: sensitivity * log(pendingJobs/baseJobCount + 1) + minWorkers + // + // This gives us: + // - When pendingJobs = 0: minWorkers + // - When pendingJobs = baseJobCount: roughly minWorkers + sensitivity + // - As pendingJobs grows, workers increase logarithmically + const computedWorkers = + sensitivity * Math.log(pendingJobs / baseJobCount + 1) + minWorkers; + + // Round to nearest integer and enforce bounds + let result = Math.min( + Math.max(Math.round(computedWorkers), minWorkers), + maxWorkers, + ); + + // You should always deploy at least one worker if there is at least one job + if (pendingJobs > 0 && result < 1) { + logger.debug( + 'Optimal workers found < 1 when there was at least one pending job...forcing result to 1', + { + pendingJobs, + minWorkers, + }, + ); + result = 1; + } + + logger.debug('Computed optimal workers', { + pendingJobs, + sensitivity, + minWorkers, + maxWorkers, + baseJobCount, + rawComputed: computedWorkers, + finalResult: result, + }); + + return result; + } + + /** + * Adjust capacity for a specific job type + * @param jobType - The job type to adjust capacity for + * @param pending - Number of pending jobs of this type + * @param workers - Current worker count for this type + * @param config - Configuration for this job type + * @private + */ + private async adjustCapacityForTask({ + jobTypes, + pending, + workers, + config, + }: { + jobTypes: JobType[]; + pending: number; + workers: number; + config: JobTypeConfig; + }) { + const now = Date.now(); + const lastScale = this.lastScaleTime[config.taskDefinitionArn] || 0; + const cooldownMs = config.scaling.cooldownSeconds * 1000; + const timeInCooldown = now - lastScale; + const inCooldown = timeInCooldown < cooldownMs; + + // Check cooldown + if (inCooldown) { + logger.debug('Still in cooldown period', { + jobTypes, + elapsed: timeInCooldown, + cooldownMs, + remaining: cooldownMs - timeInCooldown, + }); + return; + } + + // Determine the ideal number of workers + logger.debug('Calculating target capacity', { + jobTypes, + pending, + currentWorkers: workers, + scalingConfig: config.scaling, + }); + + const idealTarget = this.computeOptimalWorkers({ + pendingJobs: pending, + sensitivity: config.scaling.sensitivity, + minWorkers: config.scaling.min, + maxWorkers: config.scaling.max, + baseJobCount: config.scaling.factor, + }); + + const diff = idealTarget - workers; + + if (diff > 0) { + logger.info('Launching additional tasks', { + count: diff, + jobTypes, + currentWorkers: workers, + targetWorkers: idealTarget, + pendingJobs: pending, + }); + this.launchTask({ count: diff, config }); + } else if (diff < 0) { + logger.debug('Capacity reduction not implemented', { + jobTypes, + currentWorkers: workers, + targetWorkers: idealTarget, + excess: -diff, + }); + // Note: No implementation for scaling down - tasks will terminate themselves + } else { + logger.debug('No capacity adjustment needed', { + jobTypes, + currentWorkers: workers, + targetWorkers: idealTarget, + }); + } + } + + /** + * Starts the capacity manager + */ public start() { - if (this.isRunning) return; + if (this.isRunning) { + logger.info('Capacity manager already running'); + return; + } - console.log('Starting capacity manager...'); + logger.info('Starting capacity manager...'); this.isRunning = true; this.pollJobQueue(); // Add error handlers for uncaught errors process.on('uncaughtException', error => { - console.error('Uncaught exception:', error); + logger.error('Uncaught exception', { error }); this.stop(); }); process.on('unhandledRejection', error => { - console.error('Unhandled rejection:', error); + logger.error('Unhandled rejection', { error }); this.stop(); }); } + /** + * Stops the capacity manager + */ public stop() { - console.log('Stopping capacity manager...'); + logger.info('Stopping capacity manager...'); this.isRunning = false; if (this.pollTimeout) { clearTimeout(this.pollTimeout); @@ -92,20 +679,15 @@ export class CapacityManager { } } - private async adjustCapacity(jobsByType: Record) { - for (const [jobType, pendingCount] of Object.entries(jobsByType)) { - const config = this.config.jobTypes[jobType as JobType]; - if (!config) { - console.warn(`No configuration found for job type: ${jobType}`); - continue; - } - - await this.adjustCapacityForType(jobType, pendingCount, config); - } - } - + /** + * Gets a random public subnet from the VPC + * @param vpcId - The VPC ID to get subnets from + * @returns The subnet ID + * @private + */ private async getRandomPublicSubnet(vpcId: string): Promise { try { + logger.debug('Fetching public subnets', { vpcId }); const command = new DescribeSubnetsCommand({ Filters: [ { @@ -123,6 +705,7 @@ export class CapacityManager { const publicSubnets = response.Subnets || []; if (publicSubnets.length === 0) { + logger.error('No public subnets found', { vpcId }); throw new Error(`No public subnets found in VPC ${vpcId}`); } @@ -130,57 +713,45 @@ export class CapacityManager { const randomIndex = Math.floor(Math.random() * publicSubnets.length); const selectedSubnet = publicSubnets[randomIndex]; - console.log( - `Selected subnet ${selectedSubnet.SubnetId} in AZ ${selectedSubnet.AvailabilityZone}`, - ); + logger.info('Selected subnet', { + subnetId: selectedSubnet.SubnetId, + availabilityZone: selectedSubnet.AvailabilityZone, + }); return selectedSubnet.SubnetId!; } catch (error) { - console.error('Error getting public subnets:', error); + logger.error('Error getting public subnets', { error }); throw error; } } - private async adjustCapacityForType( - jobType: string, - pendingCount: number, - config: JobTypeConfig, - ) { - const now = Date.now(); - const lastScale = this.lastScaleTime[jobType] || 0; - - // Check cooldown - if (now - lastScale < config.cooldownSeconds * 1000) { - console.log(`Still in cooldown period for ${jobType}`); - return; - } - - // Only scale up if we have more pending jobs than our threshold - if (pendingCount >= config.scaleUpThreshold) { - try { - // Get a random public subnet for this task - const subnet = await this.getRandomPublicSubnet(this.config.vpcId); - - const command = new RunTaskCommand({ - cluster: config.clusterArn, - taskDefinition: config.taskDefinitionArn, - launchType: 'FARGATE', - count: 1, // Start one task at a time - networkConfiguration: { - awsvpcConfiguration: { - subnets: [subnet], - securityGroups: [config.securityGroup], - assignPublicIp: AssignPublicIp.ENABLED, - }, - }, - }); - - await this.ecsClient.send(command); - this.lastScaleTime[jobType] = now; + /** + * Returns information about the current worker distribution + * @returns Summary of current workers + */ + public getWorkerStats() { + const byStatus = { + PENDING: this.trackedWorkers.filter(w => w.status === 'PENDING').length, + RUNNING: this.trackedWorkers.filter(w => w.status === 'RUNNING').length, + }; + + const byJobType = Object.values(JobType).reduce>( + (acc, jobType) => { + acc[jobType] = 0; + return acc; + }, + {}, + ); + + this.trackedWorkers.forEach(worker => { + worker.jobTypes.forEach(jobType => { + byJobType[jobType]++; + }); + }); - console.log(`Started new task for ${jobType}`); - } catch (error) { - console.error(`Error starting task for ${jobType}:`, error); - } - } + return { + totalWorkers: this.trackedWorkers.length, + byStatus, + byJobType, + }; } }