Skip to content

Activity pause/unpause #1729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
33 changes: 30 additions & 3 deletions packages/activity/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
*
* 1. `await` on {@link Context.cancelled | `Context.current().cancelled`} or
* {@link Context.sleep | `Context.current().sleep()`}, which each throw a {@link CancelledFailure}.
* 1. Pass the context's {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} at
* 2. Pass the context's {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} at
* {@link Context.cancellationSignal | `Context.current().cancellationSignal`} to a library that supports it.
*
* ### Examples
Expand All @@ -70,7 +70,16 @@
*/

import { AsyncLocalStorage } from 'node:async_hooks';
import { Logger, Duration, LogLevel, LogMetadata, MetricMeter, Priority } from '@temporalio/common';
import {
Logger,
Duration,
LogLevel,
LogMetadata,
MetricMeter,
Priority,
ActivityCancellationDetailsHolder,
ActivityCancellationDetails,
} from '@temporalio/common';
import { msToNumber } from '@temporalio/common/lib/time';
import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers';

Expand Down Expand Up @@ -289,6 +298,11 @@ export class Context {
*/
public readonly metricMeter: MetricMeter;

/**
* Holder object for activity cancellation details
*/
private readonly _cancellationDetails: ActivityCancellationDetailsHolder;

/**
* **Not** meant to instantiated by Activity code, used by the worker.
*
Expand All @@ -300,14 +314,16 @@ export class Context {
cancellationSignal: AbortSignal,
heartbeat: (details?: any) => void,
log: Logger,
metricMeter: MetricMeter
metricMeter: MetricMeter,
details: ActivityCancellationDetailsHolder
) {
this.info = info;
this.cancelled = cancelled;
this.cancellationSignal = cancellationSignal;
this.heartbeatFn = heartbeat;
this.log = log;
this.metricMeter = metricMeter;
this._cancellationDetails = details;
}

/**
Expand Down Expand Up @@ -347,6 +363,10 @@ export class Context {
});
return Promise.race([this.cancelled.finally(() => clearTimeout(handle)), timer]);
};

public cancellationDetails(): ActivityCancellationDetails | undefined {
return this._cancellationDetails.details;
}
}

/**
Expand Down Expand Up @@ -427,6 +447,13 @@ export function cancelled(): Promise<never> {
return Context.current().cancelled;
}

/**
* Returns the cancellation details for this activity, if any.
*/
export function cancellationDetails(): ActivityCancellationDetails | undefined {
return Context.current().cancellationDetails();
}

/**
* Return an {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that can be used to
* react to Activity cancellation.
Expand Down
13 changes: 13 additions & 0 deletions packages/client/src/async-completion-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ export class ActivityCompletionError extends Error {}
@SymbolBasedInstanceOfError('ActivityCancelledError')
export class ActivityCancelledError extends Error {}

/**
* Thrown by {@link AsyncCompletionClient.heartbeat} when the reporting Activity
* has been paused.
*/
@SymbolBasedInstanceOfError('ActivityPausedError')
export class ActivityPausedError extends Error {}

/**
* Options used to configure {@link AsyncCompletionClient}
*/
Expand Down Expand Up @@ -211,6 +218,7 @@ export class AsyncCompletionClient extends BaseClient {
async heartbeat(taskTokenOrFullActivityId: Uint8Array | FullActivityId, details?: unknown): Promise<void> {
const payloads = await encodeToPayloads(this.dataConverter, details);
let cancelRequested = false;
let paused = false;
try {
if (taskTokenOrFullActivityId instanceof Uint8Array) {
const response = await this.workflowService.recordActivityTaskHeartbeat({
Expand All @@ -220,6 +228,7 @@ export class AsyncCompletionClient extends BaseClient {
details: { payloads },
});
cancelRequested = !!response.cancelRequested;
paused = !!response.activityPaused;
} else {
const response = await this.workflowService.recordActivityTaskHeartbeatById({
identity: this.options.identity,
Expand All @@ -228,12 +237,16 @@ export class AsyncCompletionClient extends BaseClient {
details: { payloads },
});
cancelRequested = !!response.cancelRequested;
paused = !!response.activityPaused;
}
} catch (err) {
this.handleError(err);
}
if (cancelRequested) {
throw new ActivityCancelledError('cancelled');
}
if (paused) {
throw new ActivityPausedError('paused');
}
}
}
51 changes: 51 additions & 0 deletions packages/common/src/activity-cancellation-details.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import type { coresdk } from '@temporalio/proto';

export interface ActivityCancellationDetailsHolder {
details?: ActivityCancellationDetails;
}

export interface ActivityCancellationDetailsOptions {
notFound?: boolean;
cancelRequested?: boolean;
paused?: boolean;
timedOut?: boolean;
workerShutdown?: boolean;
reset?: boolean;
}

/**
* Provides the reasons for the activity's cancellation. Cancellation details are set once and do not change once set.
*/
export class ActivityCancellationDetails {
readonly notFound: boolean;
readonly cancelRequested: boolean;
readonly paused: boolean;
readonly timedOut: boolean;
readonly workerShutdown: boolean;
readonly reset: boolean;

public constructor(options: ActivityCancellationDetailsOptions = {}) {
this.notFound = options.notFound ?? false;
this.cancelRequested = options.cancelRequested ?? false;
this.paused = options.paused ?? false;
this.timedOut = options.timedOut ?? false;
this.workerShutdown = options.workerShutdown ?? false;
this.reset = options.reset ?? false;
}

static fromProto(
proto: coresdk.activity_task.IActivityCancellationDetails | null | undefined
): ActivityCancellationDetails {
if (proto == null) {
return new ActivityCancellationDetails();
}
return new ActivityCancellationDetails({
notFound: proto.isNotFound ?? false,
cancelRequested: proto.isCancelled ?? false,
paused: proto.isPaused ?? false,
timedOut: proto.isTimedOut ?? false,
workerShutdown: proto.isWorkerShutdown ?? false,
reset: proto.isReset ?? false,
});
}
}
1 change: 1 addition & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as encoding from './encoding';
import * as helpers from './type-helpers';

export * from './activity-options';
export * from './activity-cancellation-details';
export * from './converter/data-converter';
export * from './converter/failure-converter';
export * from './converter/payload-codec';
Expand Down
24 changes: 24 additions & 0 deletions packages/test/src/activities/heartbeat-cancellation-details.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { ActivityCancellationDetails } from '@temporalio/common';
import * as activity from '@temporalio/activity';

export async function heartbeatCancellationDetailsActivity(
catchErr: boolean
): Promise<ActivityCancellationDetails | undefined> {
// Exit early if we've already run this activity.
if (activity.activityInfo().heartbeatDetails === 'finally-complete') {
return activity.cancellationDetails();
}
// eslint-disable-next-line no-constant-condition
while (true) {
try {
activity.heartbeat();
await activity.sleep(300);
} catch (err) {
if (err instanceof activity.CancelledFailure && catchErr) {
return activity.cancellationDetails();
}
activity.heartbeat('finally-complete');
throw err;
}
}
}
29 changes: 28 additions & 1 deletion packages/test/src/helpers-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import * as workflow from '@temporalio/workflow';
import { temporal } from '@temporalio/proto';
import { defineSearchAttributeKey, SearchAttributeType } from '@temporalio/common/lib/search-attributes';
import { ConnectionInjectorInterceptor } from './activities/interceptors';
import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions } from './helpers';
import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions, waitUntil } from './helpers';

export interface Context {
env: TestWorkflowEnvironment;
workflowBundle: WorkflowBundle;
}

const defaultDynamicConfigOptions = [
'frontend.activityAPIsEnabled=true',
'frontend.enableExecuteMultiOperation=true',
'frontend.workerVersioningDataAPIs=true',
'frontend.workerVersioningWorkflowAPIs=true',
Expand Down Expand Up @@ -284,6 +285,32 @@ export function configurableHelpers<T>(
};
}

export async function setActivityPauseState(handle: WorkflowHandle, activityId: string, pause: boolean): Promise<void> {
const desc = await handle.describe();
const req = {
namespace: handle.client.options.namespace,
execution: {
workflowId: desc.raw.workflowExecutionInfo?.execution?.workflowId,
runId: desc.raw.workflowExecutionInfo?.execution?.runId,
},
id: activityId,
};
if (pause) {
await handle.client.workflowService.pauseActivity(req);
} else {
await handle.client.workflowService.unpauseActivity(req);
}
await waitUntil(async () => {
const { raw } = await handle.describe();
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId);
if (!activityInfo) return false;
if (pause) {
return activityInfo.paused ?? false;
}
return !activityInfo.paused;
}, 15000);
}

export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvironment = t.context.env): Helpers {
return configurableHelpers(t, t.context.workflowBundle, testEnv);
}
Loading