Skip to content

Commit dd321e3

Browse files
authored
Throw an error for concurrent waits (with nice docs link) (#1618)
* WIP preventing concurrent waits, throw an error * Added ConcurrentWaitError (not retryable) * Move preventMultipleWaits out of the RuntimeAPI * Added preventMultipleWaits to the devRuntimeManager * Added throwable InternalError. Plus new TASK_DID_CONCURRENT_WAIT code * Docs link for troubleshooting concurrent waits * Docs for troubleshooting concurrent waits * preventMultipleWaits function * Added TASK_DID_CONCURRENT_WAIT code * Deal with InternalErrors that skipRetrying * Added preventMultipleWaits to prod
1 parent 2ad664d commit dd321e3

File tree

10 files changed

+221
-79
lines changed

10 files changed

+221
-79
lines changed

docs/troubleshooting.mdx

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,21 @@ Your code is deployed separately from the rest of your app(s) so you need to mak
8080

8181
Prisma uses code generation to create the client from your schema file. This means you need to add a bit of config so we can generate this file before your tasks run: [Read the guide](/config/config-file#prisma).
8282

83+
### `Parallel waits are not supported`
84+
85+
In the current version, you can't perform more that one "wait" in parallel.
86+
87+
Waits include:
88+
- `wait.for()`
89+
- `wait.until()`
90+
- `task.triggerAndWait()`
91+
- `task.batchTriggerAndWait()`
92+
- And any of our functions with `wait` in the name.
93+
94+
This restriction exists because we suspend the task server after a wait, and resume it when the wait is done. At the moment, if you do more than one wait, the run will never continue when deployed, so we throw this error instead.
95+
96+
The most common situation this happens is if you're using `Promise.all` around some of our wait functions. Instead of doing this use our built-in functions for [triggering tasks](/triggering#triggering-from-inside-another-task). We have functions that allow you to trigger different tasks in parallel.
97+
8398
### When triggering subtasks the parent task finishes too soon
8499

85100
Make sure that you always use `await` when you call `trigger`, `triggerAndWait`, `batchTrigger`, and `batchTriggerAndWait`. If you don't then it's likely the task(s) won't be triggered because the calling function process can be terminated before the networks calls are sent.

packages/core/src/v3/errors.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,37 @@ import { links } from "./links.js";
1212
import { ExceptionEventProperties } from "./schemas/openTelemetry.js";
1313
import { assertExhaustive } from "../utils.js";
1414

15+
/**
16+
* If you throw this, it will get converted into an INTERNAL_ERROR
17+
*/
18+
export class InternalError extends Error {
19+
public readonly code: TaskRunErrorCodes;
20+
public readonly skipRetrying: boolean;
21+
22+
constructor({
23+
code,
24+
message,
25+
showStackTrace = true,
26+
skipRetrying = false,
27+
}: {
28+
code: TaskRunErrorCodes;
29+
message?: string;
30+
showStackTrace?: boolean;
31+
skipRetrying?: boolean;
32+
}) {
33+
super(`${code}: ${message ?? "No message"}`);
34+
this.name = "InternalError";
35+
this.code = code;
36+
this.message = message ?? "InternalError";
37+
38+
if (!showStackTrace) {
39+
this.stack = undefined;
40+
}
41+
42+
this.skipRetrying = skipRetrying;
43+
}
44+
}
45+
1546
export class AbortTaskRunError extends Error {
1647
constructor(message: string) {
1748
super(message);
@@ -32,6 +63,15 @@ export class TaskPayloadParsedError extends Error {
3263
}
3364

3465
export function parseError(error: unknown): TaskRunError {
66+
if (error instanceof InternalError) {
67+
return {
68+
type: "INTERNAL_ERROR",
69+
code: error.code,
70+
message: error.message,
71+
stackTrace: error.stack ?? "",
72+
};
73+
}
74+
3575
if (error instanceof Error) {
3676
return {
3777
type: "BUILT_IN_ERROR",
@@ -168,6 +208,7 @@ export function shouldRetryError(error: TaskRunError): boolean {
168208
case "DISK_SPACE_EXCEEDED":
169209
case "TASK_RUN_HEARTBEAT_TIMEOUT":
170210
case "OUTDATED_SDK_VERSION":
211+
case "TASK_DID_CONCURRENT_WAIT":
171212
return false;
172213

173214
case "GRACEFUL_EXIT_TIMEOUT":
@@ -437,6 +478,14 @@ const prettyInternalErrors: Partial<
437478
href: links.docs.upgrade.beta,
438479
},
439480
},
481+
TASK_DID_CONCURRENT_WAIT: {
482+
message:
483+
"Parallel waits are not supported, e.g. using Promise.all() around our wait functions.",
484+
link: {
485+
name: "Read the docs for solutions",
486+
href: links.docs.troubleshooting.concurrentWaits,
487+
},
488+
},
440489
};
441490

442491
const getPrettyTaskRunError = (code: TaskRunInternalError["code"]): TaskRunInternalError => {

packages/core/src/v3/links.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ export const links = {
1212
upgrade: {
1313
beta: "https://trigger.dev/docs/upgrading-beta",
1414
},
15+
troubleshooting: {
16+
concurrentWaits: "https://trigger.dev/docs/troubleshooting#parallel-waits-are-not-supported",
17+
},
1518
},
1619
site: {
1720
home: "https://trigger.dev",

packages/core/src/v3/runtime/devRuntimeManager.ts

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
} from "../schemas/index.js";
77
import { unboundedTimeout } from "../utils/timers.js";
88
import { RuntimeManager } from "./manager.js";
9+
import { preventMultipleWaits } from "./preventMultipleWaits.js";
910

1011
export class DevRuntimeManager implements RuntimeManager {
1112
_taskWaits: Map<string, { resolve: (value: TaskRunExecutionResult) => void }> = new Map();
@@ -17,71 +18,77 @@ export class DevRuntimeManager implements RuntimeManager {
1718

1819
_pendingCompletionNotifications: Map<string, TaskRunExecutionResult> = new Map();
1920

21+
_preventMultipleWaits = preventMultipleWaits();
22+
2023
disable(): void {
2124
// do nothing
2225
}
2326

2427
async waitForDuration(ms: number): Promise<void> {
25-
await unboundedTimeout(ms);
28+
await this._preventMultipleWaits(() => unboundedTimeout(ms));
2629
}
2730

2831
async waitUntil(date: Date): Promise<void> {
2932
return this.waitForDuration(date.getTime() - Date.now());
3033
}
3134

3235
async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise<TaskRunExecutionResult> {
33-
const pendingCompletion = this._pendingCompletionNotifications.get(params.id);
36+
return this._preventMultipleWaits(async () => {
37+
const pendingCompletion = this._pendingCompletionNotifications.get(params.id);
3438

35-
if (pendingCompletion) {
36-
this._pendingCompletionNotifications.delete(params.id);
39+
if (pendingCompletion) {
40+
this._pendingCompletionNotifications.delete(params.id);
3741

38-
return pendingCompletion;
39-
}
42+
return pendingCompletion;
43+
}
4044

41-
const promise = new Promise<TaskRunExecutionResult>((resolve) => {
42-
this._taskWaits.set(params.id, { resolve });
43-
});
45+
const promise = new Promise<TaskRunExecutionResult>((resolve) => {
46+
this._taskWaits.set(params.id, { resolve });
47+
});
4448

45-
await this.#tryFlushMetadata();
49+
await this.#tryFlushMetadata();
4650

47-
return await promise;
51+
return await promise;
52+
});
4853
}
4954

5055
async waitForBatch(params: {
5156
id: string;
5257
runs: string[];
5358
ctx: TaskRunContext;
5459
}): Promise<BatchTaskRunExecutionResult> {
55-
if (!params.runs.length) {
56-
return Promise.resolve({ id: params.id, items: [] });
57-
}
60+
return this._preventMultipleWaits(async () => {
61+
if (!params.runs.length) {
62+
return Promise.resolve({ id: params.id, items: [] });
63+
}
5864

59-
const promise = Promise.all(
60-
params.runs.map((runId) => {
61-
return new Promise<TaskRunExecutionResult>((resolve, reject) => {
62-
const pendingCompletion = this._pendingCompletionNotifications.get(runId);
65+
const promise = Promise.all(
66+
params.runs.map((runId) => {
67+
return new Promise<TaskRunExecutionResult>((resolve, reject) => {
68+
const pendingCompletion = this._pendingCompletionNotifications.get(runId);
6369

64-
if (pendingCompletion) {
65-
this._pendingCompletionNotifications.delete(runId);
70+
if (pendingCompletion) {
71+
this._pendingCompletionNotifications.delete(runId);
6672

67-
resolve(pendingCompletion);
73+
resolve(pendingCompletion);
6874

69-
return;
70-
}
75+
return;
76+
}
7177

72-
this._taskWaits.set(runId, { resolve });
73-
});
74-
})
75-
);
78+
this._taskWaits.set(runId, { resolve });
79+
});
80+
})
81+
);
7682

77-
await this.#tryFlushMetadata();
83+
await this.#tryFlushMetadata();
7884

79-
const results = await promise;
85+
const results = await promise;
8086

81-
return {
82-
id: params.id,
83-
items: results,
84-
};
87+
return {
88+
id: params.id,
89+
items: results,
90+
};
91+
});
8592
}
8693

8794
resumeTask(completion: TaskRunExecutionResult, runId: string): void {

packages/core/src/v3/runtime/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ import { usage } from "../usage-api.js";
1212

1313
const NOOP_RUNTIME_MANAGER = new NoopRuntimeManager();
1414

15+
/**
16+
* All state must be inside the RuntimeManager, do NOT store it on this class.
17+
* This is because of the "dual package hazard", this can be bundled multiple times.
18+
*/
1519
export class RuntimeAPI {
1620
private static _instance?: RuntimeAPI;
1721

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { InternalError } from "../errors.js";
2+
import { TaskRunErrorCodes } from "../schemas/common.js";
3+
4+
const concurrentWaitErrorMessage =
5+
"Parallel waits are not supported, e.g. using Promise.all() around our wait functions.";
6+
7+
export function preventMultipleWaits() {
8+
let isExecutingWait = false;
9+
10+
return async <T>(cb: () => Promise<T>): Promise<T> => {
11+
if (isExecutingWait) {
12+
console.error(concurrentWaitErrorMessage);
13+
throw new InternalError({
14+
code: TaskRunErrorCodes.TASK_DID_CONCURRENT_WAIT,
15+
message: concurrentWaitErrorMessage,
16+
skipRetrying: true,
17+
showStackTrace: false,
18+
});
19+
}
20+
21+
isExecutingWait = true;
22+
23+
try {
24+
return await cb();
25+
} finally {
26+
isExecutingWait = false;
27+
}
28+
};
29+
}

0 commit comments

Comments
 (0)