Skip to content

feat(function): implemented context.fail() functionality #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/context/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,28 @@ describe("context tests", () => {
throw new Error("Test error: context.cancel should have thrown abort error.");
});

test("fail should throw abort", async () => {
const context = new WorkflowContext({
qstashClient,
initialPayload: "my-payload",
steps: [],
url: WORKFLOW_ENDPOINT,
headers: new Headers() as Headers,
workflowRunId: "wfr-id",
});
try {
await context.fail();
} catch (error) {
expect(error instanceof WorkflowAbort).toBeTrue();
const _error = error as WorkflowAbort;
expect(_error.stepName).toBe("fail");
expect(_error.name).toBe("WorkflowAbort");
expect(_error.failWorkflow).toBeTrue();
return;
}
throw new Error("Test error: context.fail should have thrown abort error.");
});

describe("context.api steps", () => {
test("should throw if provider isn't provdided", async () => {
const context = new WorkflowContext({
Expand Down
11 changes: 11 additions & 0 deletions src/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,17 @@ export class WorkflowContext<TInitialPayload = unknown> {
throw new WorkflowAbort("cancel", undefined, true);
}

/**
* Fail the current workflow run
*
* Will throw WorkflowAbort to stop workflow retries.
* Shouldn't be inside try/catch.
*/
public async fail() {
// throw an abort which will stop workflow retries
throw new WorkflowAbort("fail", undefined, false, true);
}

/**
* Adds steps to the executor. Needed so that it can be overwritten in
* DisabledWorkflowContext.
Expand Down
7 changes: 6 additions & 1 deletion src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@ export class WorkflowAbort extends Error {
* whether workflow is to be canceled on abort
*/
public cancelWorkflow: boolean;
/**
* whether workflow is to be failed on abort
*/
public failWorkflow: boolean;

/**
*
* @param stepName name of the aborting step
* @param stepInfo step information
* @param cancelWorkflow
*/
constructor(stepName: string, stepInfo?: Step, cancelWorkflow = false) {
constructor(stepName: string, stepInfo?: Step, cancelWorkflow = false, failWorkflow = false) {
super(
"This is an Upstash Workflow error thrown after a step executes. It is expected to be raised." +
" Make sure that you await for each step. Also, if you are using try/catch blocks, you should not wrap context.run/sleep/sleepUntil/call methods with try/catch." +
Expand All @@ -39,6 +43,7 @@ export class WorkflowAbort extends Error {
this.stepName = stepName;
this.stepInfo = stepInfo;
this.cancelWorkflow = cancelWorkflow;
this.failWorkflow = failWorkflow;
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/serve/authorization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ export class DisabledWorkflowContext<
return;
}

/**
* overwrite fail method to do nothing
*/
public async fail() {
return;
}

/**
* copies the passed context to create a DisabledWorkflowContext. Then, runs the
* route function with the new context.
Expand Down
4 changes: 4 additions & 0 deletions src/serve/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ export const serveBase = <
debug,
});

if (result.isOk() && result.value === "workflow-failed") {
return onStepFinish(workflowRunId, "workflow-failed");
}

if (result.isErr()) {
// error while running the workflow or when cleaning up
await debug?.log("ERROR", "ERROR", { error: result.error.message });
Expand Down
7 changes: 7 additions & 0 deletions src/serve/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ export const processOptions = <TResponse extends Response = Response, TInitialPa
status: 400,
}
) as TResponse;
} else if (finishCondition === "workflow-failed") {
return new Response(JSON.stringify({ workflowRunId }), {
headers: {
"Upstash-NonRetryable-Error": "true",
},
status: 489,
}) as TResponse;
}
return new Response(JSON.stringify({ workflowRunId }), {
status: 200,
Expand Down
33 changes: 33 additions & 0 deletions src/serve/serve.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
await mockQStashServer({
execute: async () => {
const response = await endpoint(request);
expect(response.status).toBe(200);

Check failure on line 66 in src/serve/serve.test.ts

View workflow job for this annotation

GitHub Actions / Upstash Workflow Tests

error: expect(received).toBe(expected)

Expected: 200 Received: 500 at <anonymous> (/home/runner/work/workflow-js/workflow-js/src/serve/serve.test.ts:66:33)
},
responseFields: { body: [{ messageId: "msgId" }], status: 200 },
receivesRequest: {
Expand Down Expand Up @@ -406,7 +406,7 @@
workflowRunId: string;
finishCondition: FinishCondition;
};
expect(workflowRunId).toBe("wfr-foo");

Check failure on line 409 in src/serve/serve.test.ts

View workflow job for this annotation

GitHub Actions / Upstash Workflow Tests

error: expect(received).toBe(expected)

Expected: "wfr-foo" Received: undefined at <anonymous> (/home/runner/work/workflow-js/workflow-js/src/serve/serve.test.ts:409:33)
expect(finishCondition).toBe("success");
called = true;
},
Expand Down Expand Up @@ -455,7 +455,7 @@
await mockQStashServer({
execute: async () => {
const result = await endpoint(request);
expect(result.status).toBe(200);

Check failure on line 458 in src/serve/serve.test.ts

View workflow job for this annotation

GitHub Actions / Upstash Workflow Tests

error: expect(received).toBe(expected)

Expected: 200 Received: 500 at <anonymous> (/home/runner/work/workflow-js/workflow-js/src/serve/serve.test.ts:458:33)
called = true;
},
responseFields: { body: { messageId: "some-message-id" }, status: 200 },
Expand Down Expand Up @@ -500,7 +500,7 @@
await mockQStashServer({
execute: async () => {
const response = await endpoint(request);
expect(response.status).toBe(200);

Check failure on line 503 in src/serve/serve.test.ts

View workflow job for this annotation

GitHub Actions / Upstash Workflow Tests

error: expect(received).toBe(expected)

Expected: 200 Received: 500 at <anonymous> (/home/runner/work/workflow-js/workflow-js/src/serve/serve.test.ts:503:35)
called = true;
},
responseFields: { body: { messageId: "some-message-id" }, status: 200 },
Expand Down Expand Up @@ -555,7 +555,7 @@
await mockQStashServer({
execute: async () => {
const response = await endpoint(request);
expect(response.status).toBe(200);

Check failure on line 558 in src/serve/serve.test.ts

View workflow job for this annotation

GitHub Actions / Upstash Workflow Tests

error: expect(received).toBe(expected)

Expected: 200 Received: 500 at <anonymous> (/home/runner/work/workflow-js/workflow-js/src/serve/serve.test.ts:558:35)
called = true;
},
responseFields: { body: { messageId: "some-message-id" }, status: 200 },
Expand Down Expand Up @@ -626,7 +626,7 @@
await mockQStashServer({
execute: async () => {
const response = await endpoint(request);
expect(response.status).toBe(200);

Check failure on line 629 in src/serve/serve.test.ts

View workflow job for this annotation

GitHub Actions / Upstash Workflow Tests

error: expect(received).toBe(expected)

Expected: 200 Received: 500 at <anonymous> (/home/runner/work/workflow-js/workflow-js/src/serve/serve.test.ts:629:35)
called = true;
},
responseFields: { body: { messageId: "some-message-id" }, status: 200 },
Expand Down Expand Up @@ -844,6 +844,39 @@
expect(runs).toBeFalse();
});

test("should call qstash to fail workflow on context.fail", async () => {
const request = getRequest(WORKFLOW_ENDPOINT, "wfr-foo-2", "my-payload", []);
let called = false;
let runs = false;
const { handler: endpoint } = serve(
async (context) => {
called = true;
await context.fail();
await context.run("wont run", () => {
runs = true;
});
},
{
qstashClient,
receiver: undefined,
verbose: true,
}
);

await mockQStashServer({
execute: async () => {
const response = await endpoint(request);

expect(response.status).toBe(489);
expect(response.headers.get("Upstash-NonRetryable-Error")).toBe("true");
},
responseFields: { body: undefined, status: 489 },
receivesRequest: false,
});
expect(called).toBeTrue();
expect(runs).toBeFalse();
});

test("should send waitForEvent", async () => {
const request = getRequest(WORKFLOW_ENDPOINT, "wfr-bar", "my-payload", []);
const { handler: endpoint } = serve(
Expand All @@ -864,7 +897,7 @@
await mockQStashServer({
execute: async () => {
const result = await endpoint(request);
expect(result.status).toBe(200);

Check failure on line 900 in src/serve/serve.test.ts

View workflow job for this annotation

GitHub Actions / Upstash Workflow Tests

error: expect(received).toBe(expected)

Expected: 200 Received: 500 at <anonymous> (/home/runner/work/workflow-js/workflow-js/src/serve/serve.test.ts:900:31)
called = true;
},
responseFields: { body: { messageId: "some-message-id" }, status: 200 },
Expand Down
3 changes: 2 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ export type FinishCondition =
| "fromCallback"
| "auth-fail"
| "failure-callback"
| "workflow-already-ended";
| "workflow-already-ended"
| "workflow-failed";

export type WorkflowServeOptions<
TResponse extends Response = Response,
Expand Down
32 changes: 32 additions & 0 deletions src/workflow-requests.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,38 @@ describe("Workflow Requests", () => {
expect(result.value).toBe("workflow-finished");
});

test("should fail workflow and return ok if context.fail is called", async () => {
const workflowRunId = nanoid();
const token = "myToken";

const context = new WorkflowContext({
qstashClient: new Client({ baseUrl: MOCK_SERVER_URL, token }),
workflowRunId: workflowRunId,
initialPayload: undefined,
headers: new Headers({}) as Headers,
steps: [],
url: WORKFLOW_ENDPOINT,
});

const result = await triggerRouteFunction({
onStep: async () => {
await context.fail();
await context.run("shouldn't call", () => {
throw new Error("shouldn't call context.run");
});
},
onCleanup: async () => {
throw new Error("shouldn't call");
},
onCancel: async () => {
throw new Error("shouldn't call");
},
});
expect(result.isOk()).toBeTrue();
// @ts-expect-error value will be set since result isOk
expect(result.value).toBe("workflow-failed");
});

test("should call onCancel if context.cancel is called inside context.run", async () => {
const workflowRunId = nanoid();
const token = "myToken";
Expand Down
5 changes: 4 additions & 1 deletion src/workflow-requests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ export const triggerRouteFunction = async ({
onCancel: () => Promise<void>;
debug?: WorkflowLogger;
}): Promise<
Ok<"workflow-finished" | "step-finished" | "workflow-was-finished", never> | Err<never, Error>
| Ok<"workflow-finished" | "step-finished" | "workflow-was-finished" | "workflow-failed", never>
| Err<never, Error>
> => {
try {
// When onStep completes successfully, it throws an exception named `WorkflowAbort`,
Expand All @@ -158,6 +159,8 @@ export const triggerRouteFunction = async ({
return ok("workflow-was-finished");
} else if (!(error_ instanceof WorkflowAbort)) {
return err(error_);
} else if (error_.failWorkflow) {
Copy link
Preview

Copilot AI Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider restructuring the error handling logic by nesting the WorkflowAbort conditions together (i.e. checking failWorkflow and cancelWorkflow inside an explicit WorkflowAbort branch) to improve clarity.

Copilot uses AI. Check for mistakes.

return ok("workflow-failed");
} else if (error_.cancelWorkflow) {
await onCancel();
return ok("workflow-finished");
Expand Down
Loading