From 30512beb58f19120a80f428ff5a0a58b8d08c6f4 Mon Sep 17 00:00:00 2001 From: alitariksahin Date: Wed, 18 Jun 2025 00:04:43 +0300 Subject: [PATCH 1/2] feat(function): implemented context.fail() functionality --- src/context/context.test.ts | 22 ++++++++++++++++++++++ src/context/context.ts | 11 +++++++++++ src/error.ts | 7 ++++++- src/serve/authorization.ts | 7 +++++++ src/serve/index.ts | 4 ++++ src/serve/options.ts | 7 +++++++ src/serve/serve.test.ts | 33 +++++++++++++++++++++++++++++++++ src/types.ts | 3 ++- src/workflow-requests.test.ts | 32 ++++++++++++++++++++++++++++++++ src/workflow-requests.ts | 5 ++++- 10 files changed, 128 insertions(+), 3 deletions(-) diff --git a/src/context/context.test.ts b/src/context/context.test.ts index 539f4d08..e515e819 100644 --- a/src/context/context.test.ts +++ b/src/context/context.test.ts @@ -442,6 +442,28 @@ describe("context tests", () => { throw new Error("Test error: context.cancel should have thrown abort error."); }); + test("fail should throw abort", async () => { + const context = new WorkflowContext({ + qstashClient, + initialPayload: "my-payload", + steps: [], + url: WORKFLOW_ENDPOINT, + headers: new Headers() as Headers, + workflowRunId: "wfr-id", + }); + try { + await context.fail(); + } catch (error) { + expect(error instanceof WorkflowAbort).toBeTrue(); + const _error = error as WorkflowAbort; + expect(_error.stepName).toBe("fail"); + expect(_error.name).toBe("WorkflowAbort"); + expect(_error.failWorkflow).toBeTrue(); + return; + } + throw new Error("Test error: context.fail should have thrown abort error."); + }); + describe("context.api steps", () => { test("should throw if provider isn't provdided", async () => { const context = new WorkflowContext({ diff --git a/src/context/context.ts b/src/context/context.ts index a2f8976d..4cc21ad1 100644 --- a/src/context/context.ts +++ b/src/context/context.ts @@ -465,6 +465,17 @@ export class WorkflowContext { throw new WorkflowAbort("cancel", undefined, true); } + /** + * Fail the current workflow run + * + * Will throw WorkflowAbort to stop workflow retries. + * Shouldn't be inside try/catch. + */ + public async fail() { + // throw an abort which will stop workflow retries + throw new WorkflowAbort("fail", undefined, false, true); + } + /** * Adds steps to the executor. Needed so that it can be overwritten in * DisabledWorkflowContext. diff --git a/src/error.ts b/src/error.ts index 62ebbb1c..a06050f3 100644 --- a/src/error.ts +++ b/src/error.ts @@ -22,6 +22,10 @@ export class WorkflowAbort extends Error { * whether workflow is to be canceled on abort */ public cancelWorkflow: boolean; + /** + * whether workflow is to be failed on abort + */ + public failWorkflow: boolean; /** * @@ -29,7 +33,7 @@ export class WorkflowAbort extends Error { * @param stepInfo step information * @param cancelWorkflow */ - constructor(stepName: string, stepInfo?: Step, cancelWorkflow = false) { + constructor(stepName: string, stepInfo?: Step, cancelWorkflow = false, failWorkflow = false) { super( "This is an Upstash Workflow error thrown after a step executes. It is expected to be raised." + " Make sure that you await for each step. Also, if you are using try/catch blocks, you should not wrap context.run/sleep/sleepUntil/call methods with try/catch." + @@ -39,6 +43,7 @@ export class WorkflowAbort extends Error { this.stepName = stepName; this.stepInfo = stepInfo; this.cancelWorkflow = cancelWorkflow; + this.failWorkflow = failWorkflow; } } diff --git a/src/serve/authorization.ts b/src/serve/authorization.ts index 639e4bd5..64f08184 100644 --- a/src/serve/authorization.ts +++ b/src/serve/authorization.ts @@ -57,6 +57,13 @@ export class DisabledWorkflowContext< return; } + /** + * overwrite fail method to do nothing + */ + public async fail() { + return; + } + /** * copies the passed context to create a DisabledWorkflowContext. Then, runs the * route function with the new context. diff --git a/src/serve/index.ts b/src/serve/index.ts index 54d041d3..96d61ad4 100644 --- a/src/serve/index.ts +++ b/src/serve/index.ts @@ -214,6 +214,10 @@ export const serveBase = < debug, }); + if (result.isOk() && result.value === "workflow-failed") { + return onStepFinish(workflowRunId, "workflow-failed"); + } + if (result.isErr()) { // error while running the workflow or when cleaning up await debug?.log("ERROR", "ERROR", { error: result.error.message }); diff --git a/src/serve/options.ts b/src/serve/options.ts index b016ff05..06225105 100644 --- a/src/serve/options.ts +++ b/src/serve/options.ts @@ -55,6 +55,13 @@ export const processOptions = { expect(runs).toBeFalse(); }); + test("should call qstash to fail workflow on context.fail", async () => { + const request = getRequest(WORKFLOW_ENDPOINT, "wfr-foo-2", "my-payload", []); + let called = false; + let runs = false; + const { handler: endpoint } = serve( + async (context) => { + called = true; + await context.fail(); + await context.run("wont run", () => { + runs = true; + }); + }, + { + qstashClient, + receiver: undefined, + verbose: true, + } + ); + + await mockQStashServer({ + execute: async () => { + const response = await endpoint(request); + + expect(response.status).toBe(489); + expect(response.headers.get("Upstash-NonRetryable-Error")).toBe("true"); + }, + responseFields: { body: undefined, status: 489 }, + receivesRequest: false, + }); + expect(called).toBeTrue(); + expect(runs).toBeFalse(); + }); + test("should send waitForEvent", async () => { const request = getRequest(WORKFLOW_ENDPOINT, "wfr-bar", "my-payload", []); const { handler: endpoint } = serve( diff --git a/src/types.ts b/src/types.ts index 38e7c650..b3e2639e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -132,7 +132,8 @@ export type FinishCondition = | "fromCallback" | "auth-fail" | "failure-callback" - | "workflow-already-ended"; + | "workflow-already-ended" + | "workflow-failed"; export type WorkflowServeOptions< TResponse extends Response = Response, diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts index 415cb95c..adbd4aea 100644 --- a/src/workflow-requests.test.ts +++ b/src/workflow-requests.test.ts @@ -186,6 +186,38 @@ describe("Workflow Requests", () => { expect(result.value).toBe("workflow-finished"); }); + test("should fail workflow and return ok if context.fail is called", async () => { + const workflowRunId = nanoid(); + const token = "myToken"; + + const context = new WorkflowContext({ + qstashClient: new Client({ baseUrl: MOCK_SERVER_URL, token }), + workflowRunId: workflowRunId, + initialPayload: undefined, + headers: new Headers({}) as Headers, + steps: [], + url: WORKFLOW_ENDPOINT, + }); + + const result = await triggerRouteFunction({ + onStep: async () => { + await context.fail(); + await context.run("shouldn't call", () => { + throw new Error("shouldn't call context.run"); + }); + }, + onCleanup: async () => { + throw new Error("shouldn't call"); + }, + onCancel: async () => { + throw new Error("shouldn't call"); + }, + }); + expect(result.isOk()).toBeTrue(); + // @ts-expect-error value will be set since result isOk + expect(result.value).toBe("workflow-failed"); + }); + test("should call onCancel if context.cancel is called inside context.run", async () => { const workflowRunId = nanoid(); const token = "myToken"; diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts index a6d739d6..62edac2a 100644 --- a/src/workflow-requests.ts +++ b/src/workflow-requests.ts @@ -138,7 +138,8 @@ export const triggerRouteFunction = async ({ onCancel: () => Promise; debug?: WorkflowLogger; }): Promise< - Ok<"workflow-finished" | "step-finished" | "workflow-was-finished", never> | Err + | Ok<"workflow-finished" | "step-finished" | "workflow-was-finished" | "workflow-failed", never> + | Err > => { try { // When onStep completes successfully, it throws an exception named `WorkflowAbort`, @@ -158,6 +159,8 @@ export const triggerRouteFunction = async ({ return ok("workflow-was-finished"); } else if (!(error_ instanceof WorkflowAbort)) { return err(error_); + } else if (error_.failWorkflow) { + return ok("workflow-failed"); } else if (error_.cancelWorkflow) { await onCancel(); return ok("workflow-finished"); From aea2da80fc8cd8534a21b5a13f695b37423542b2 Mon Sep 17 00:00:00 2001 From: alitariksahin Date: Wed, 18 Jun 2025 11:09:34 +0300 Subject: [PATCH 2/2] fix(version): version updates applied --- src/serve/serve-many.test.ts | 6 +++--- src/serve/serve.test.ts | 16 ++++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/serve/serve-many.test.ts b/src/serve/serve-many.test.ts index 2b2df6f0..d94f0b70 100644 --- a/src/serve/serve-many.test.ts +++ b/src/serve/serve-many.test.ts @@ -164,7 +164,7 @@ describe("serveMany", () => { "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], "Upstash-Telemetry-Framework": ["nextjs"], "Upstash-Telemetry-Runtime": ["node@v22.6.0"], - "Upstash-Telemetry-Sdk": ["@upstash/workflow@v0.2.13"], + "Upstash-Telemetry-Sdk": ["@upstash/workflow@v0.2.14"], "Upstash-Workflow-Init": ["false"], "Upstash-Workflow-RunId": ["wfr_id"], "Upstash-Workflow-Runid": ["wfr_id"], @@ -219,7 +219,7 @@ describe("serveMany", () => { "Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"], "Upstash-Telemetry-Framework": ["nextjs"], "Upstash-Telemetry-Runtime": ["node@v22.6.0"], - "Upstash-Telemetry-Sdk": ["@upstash/workflow@v0.2.13"], + "Upstash-Telemetry-Sdk": ["@upstash/workflow@v0.2.14"], "Upstash-Workflow-Init": ["false"], "Upstash-Workflow-RunId": ["wfr_id"], "Upstash-Workflow-Runid": ["wfr_id"], @@ -293,7 +293,7 @@ describe("serveMany", () => { "upstash-retries": "0", "upstash-telemetry-framework": "nextjs", "upstash-telemetry-runtime": "node@v22.6.0", - "upstash-telemetry-sdk": "@upstash/workflow@v0.2.13", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.14", "upstash-workflow-calltype": "toCallback", "upstash-workflow-init": "false", "upstash-workflow-runid": "wfr_id", diff --git a/src/serve/serve.test.ts b/src/serve/serve.test.ts index 0cafdb2f..a126553b 100644 --- a/src/serve/serve.test.ts +++ b/src/serve/serve.test.ts @@ -83,7 +83,7 @@ describe("serve", () => { "upstash-retries": "1", "upstash-telemetry-framework": "unknown", "upstash-telemetry-runtime": "unknown", - "upstash-telemetry-sdk": "@upstash/workflow@v0.2.13", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.14", "upstash-workflow-init": "true", "upstash-workflow-runid": expect.stringMatching(/^wfr_/), "upstash-workflow-sdk-version": "1", @@ -429,7 +429,7 @@ describe("serve", () => { "upstash-workflow-url": WORKFLOW_ENDPOINT, "upstash-telemetry-framework": "unknown", "upstash-telemetry-runtime": "unknown", - "upstash-telemetry-sdk": "@upstash/workflow@v0.2.13", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.14", }, body: '{"stepId":3,"stepName":"step 3","stepType":"Run","out":"\\"combined results: result 1,result 2\\"","concurrent":1}', }, @@ -478,7 +478,7 @@ describe("serve", () => { "upstash-workflow-url": WORKFLOW_ENDPOINT, "upstash-telemetry-framework": "unknown", "upstash-telemetry-runtime": "unknown", - "upstash-telemetry-sdk": "@upstash/workflow@v0.2.13", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.14", }, body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}', }, @@ -531,7 +531,7 @@ describe("serve", () => { "upstash-failure-callback-workflow-url": "https://requestcatcher.com/api", "upstash-telemetry-framework": "unknown", "upstash-telemetry-runtime": "unknown", - "upstash-telemetry-sdk": "@upstash/workflow@v0.2.13", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.14", }, body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}', }, @@ -586,7 +586,7 @@ describe("serve", () => { "upstash-failure-callback-workflow-url": "https://requestcatcher.com/api", "upstash-telemetry-framework": "unknown", "upstash-telemetry-runtime": "unknown", - "upstash-telemetry-sdk": "@upstash/workflow@v0.2.13", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.14", }, body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}', }, @@ -681,7 +681,7 @@ describe("serve", () => { "upstash-retries": "4", "upstash-telemetry-framework": "unknown", "upstash-telemetry-runtime": "unknown", - "upstash-telemetry-sdk": "@upstash/workflow@v0.2.13", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.14", "upstash-timeout": "10", "upstash-workflow-calltype": "toCallback", "upstash-workflow-init": "false", @@ -928,7 +928,7 @@ describe("serve", () => { "Upstash-Workflow-Url": [WORKFLOW_ENDPOINT], "Upstash-Telemetry-Framework": ["unknown"], "Upstash-Telemetry-Runtime": ["unknown"], - "Upstash-Telemetry-Sdk": ["@upstash/workflow@v0.2.13"], + "Upstash-Telemetry-Sdk": ["@upstash/workflow@v0.2.14"], }, timeoutUrl: WORKFLOW_ENDPOINT, url: WORKFLOW_ENDPOINT, @@ -1218,7 +1218,7 @@ describe("serve", () => { "upstash-forward-test-header": headerValue, "upstash-telemetry-framework": "unknown", "upstash-telemetry-runtime": "unknown", - "upstash-telemetry-sdk": "@upstash/workflow@v0.2.13", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.14", }, body: '{"stepId":1,"stepName":"sleep-step","stepType":"SleepFor","sleepFor":1,"concurrent":1}', },