Skip to content

Commit bcd647f

Browse files
Merge pull request #130 from depot/task-locks
Clear completed tasks at the start of the loop
2 parents 9916ae6 + 40208ee commit bcd647f

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

src/handlers/state.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
reconcile as reconcileFly,
1212
} from '../utils/fly/reconcile'
1313
import {client} from '../utils/grpc'
14+
import {clearCompletedTasks} from '../utils/scheduler'
1415

1516
interface CloudProvider<T extends AwsCurrentState | FlyCurrentState> {
1617
getCurrentState(): Promise<T>
@@ -33,6 +34,9 @@ export async function startStateStream<T extends AwsCurrentState | FlyCurrentSta
3334
) {
3435
while (!signal.aborted) {
3536
try {
37+
// Allow any completed tasks to be scheduled again if needed
38+
clearCompletedTasks()
39+
3640
const currentState = await provider.getCurrentState()
3741

3842
const response = await client.getDesiredState(

src/utils/scheduler.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import {reportError} from './errors'
22

33
const inProgressTasks = new Set<string>()
4+
const completedTasks = new Set<string>()
45

56
/**
67
* Schedule an update to run, ensuring that only one update is running at a time.
78
*/
89
export async function scheduleTask(key: string, task: () => Promise<void>) {
9-
if (inProgressTasks.has(key)) {
10-
console.log(`Skipping ${key} because it is already in progress`)
10+
if (inProgressTasks.has(key) || completedTasks.has(key)) {
11+
console.log(`Skipping ${key} because it is already in progress or completed`)
1112
return
1213
}
1314

@@ -21,7 +22,20 @@ export async function scheduleTask(key: string, task: () => Promise<void>) {
2122
await reportError(err)
2223
} finally {
2324
inProgressTasks.delete(key)
25+
completedTasks.add(key)
2426
const duration = new Date().getTime() - start.getTime()
2527
console.log(`Task ${key} completed (${duration}ms)`)
2628
}
2729
}
30+
31+
/**
32+
* Clear completed tasks, allowing them to be scheduled again.
33+
* This should be called at the beginning of each reconciliation loop.
34+
*/
35+
export function clearCompletedTasks() {
36+
const count = completedTasks.size
37+
if (count > 0) {
38+
console.log(`Clearing ${count} completed task(s)`)
39+
completedTasks.clear()
40+
}
41+
}

0 commit comments

Comments
 (0)