Skip to content

Commit 81884a6

Browse files
authored
fix(worker): Fix memory leak on non-existant activity (#1563)
1 parent 9a9c0a9 commit 81884a6

File tree

1 file changed

+28
-2
lines changed

1 file changed

+28
-2
lines changed

packages/worker/src/worker.ts

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,20 @@ export interface WorkerStatus {
351351
numInFlightWorkflowActivations: number;
352352
/**
353353
* Number of in-flight (currently actively processed) Activities
354+
*
355+
* This includes both local and non-local Activities.
356+
*
357+
* See {@link numInFlightNonLocalActivities} and {@link numInFlightLocalActivities} for a breakdown.
354358
*/
355359
numInFlightActivities: number;
360+
/**
361+
* Number of in-flight (currently actively processed) non-Local Activities
362+
*/
363+
numInFlightNonLocalActivities: number;
364+
/**
365+
* Number of in-flight (currently actively processed) Local Activities
366+
*/
367+
numInFlightLocalActivities: number;
356368
/**
357369
* Number of Workflow executions cached in Worker memory
358370
*/
@@ -437,6 +449,8 @@ export class Worker {
437449

438450
protected readonly numInFlightActivationsSubject = new BehaviorSubject<number>(0);
439451
protected readonly numInFlightActivitiesSubject = new BehaviorSubject<number>(0);
452+
protected readonly numInFlightNonLocalActivitiesSubject = new BehaviorSubject<number>(0);
453+
protected readonly numInFlightLocalActivitiesSubject = new BehaviorSubject<number>(0);
440454
protected readonly numCachedWorkflowsSubject = new BehaviorSubject<number>(0);
441455
protected readonly numHeartbeatingActivitiesSubject = new BehaviorSubject<number>(0);
442456
protected readonly evictionsEmitter = new EventEmitter();
@@ -769,6 +783,8 @@ export class Worker {
769783
numCachedWorkflows: this.numCachedWorkflowsSubject.value,
770784
numInFlightWorkflowActivations: this.numInFlightActivationsSubject.value,
771785
numInFlightActivities: this.numInFlightActivitiesSubject.value,
786+
numInFlightNonLocalActivities: this.numInFlightNonLocalActivitiesSubject.value,
787+
numInFlightLocalActivities: this.numInFlightLocalActivitiesSubject.value,
772788
};
773789
}
774790

@@ -1021,12 +1037,17 @@ export class Worker {
10211037

10221038
let result;
10231039

1040+
const numInFlightBreakdownSubject = output.activity.info.isLocal
1041+
? this.numInFlightLocalActivitiesSubject
1042+
: this.numInFlightNonLocalActivitiesSubject;
1043+
10241044
this.numInFlightActivitiesSubject.next(this.numInFlightActivitiesSubject.value + 1);
1045+
numInFlightBreakdownSubject.next(numInFlightBreakdownSubject.value + 1);
10251046
try {
10261047
result = await output.activity.run(output.input);
10271048
} finally {
1049+
numInFlightBreakdownSubject.next(numInFlightBreakdownSubject.value - 1);
10281050
this.numInFlightActivitiesSubject.next(this.numInFlightActivitiesSubject.value - 1);
1029-
group$.close();
10301051
}
10311052
const status = result.failed ? 'failed' : result.completed ? 'completed' : 'cancelled';
10321053

@@ -1060,7 +1081,12 @@ export class Worker {
10601081
return { taskToken, result };
10611082
}),
10621083
filter(<T>(result: T): result is Exclude<T, undefined> => result !== undefined),
1063-
map((rest) => coresdk.ActivityTaskCompletion.encodeDelimited(rest).finish())
1084+
map((rest) => coresdk.ActivityTaskCompletion.encodeDelimited(rest).finish()),
1085+
tap({
1086+
next: () => {
1087+
group$.close();
1088+
},
1089+
})
10641090
);
10651091
})
10661092
);

0 commit comments

Comments
 (0)