Skip to content

Commit fb4e83b

Browse files
committed
Allow concurrent jobs with async job queue
1 parent df10132 commit fb4e83b

File tree

2 files changed

+47
-17
lines changed

2 files changed

+47
-17
lines changed

client/platform/desktop/frontend/store/asyncJobQueue.ts

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,23 @@ import { DesktopJob, DesktopJobUpdate, JobArgs } from 'platform/desktop/constant
33
export default abstract class AsyncJobQueue<T extends JobArgs> {
44
jobSpecs: T[];
55

6-
processingJob: null | DesktopJob;
6+
processingJobs: DesktopJob[];
77

8-
waiting: boolean;
8+
size: number;
9+
10+
// Represents jobs that have been selected to start, but haven't actually started yet
11+
queued: number;
12+
13+
private dequeueing = false;
914

1015
ipcRenderer: Electron.IpcRenderer;
1116

12-
constructor(ipcRenderer: Electron.IpcRenderer) {
17+
constructor(ipcRenderer: Electron.IpcRenderer, size: number = 1) {
1318
this.jobSpecs = [];
14-
this.processingJob = null;
15-
this.waiting = false;
19+
this.processingJobs = [];
20+
this.queued = 0;
1621
this.ipcRenderer = ipcRenderer;
22+
this.size = size;
1723

1824
this.init();
1925
}
@@ -30,27 +36,50 @@ export default abstract class AsyncJobQueue<T extends JobArgs> {
3036
}
3137

3238
async dequeue() {
33-
if (this.processingJob || this.waiting) return;
34-
this.waiting = true;
35-
const nextSpec = this.jobSpecs.shift();
36-
if (!nextSpec) return;
39+
if (this.dequeueing) return;
40+
this.dequeueing = true;
41+
try {
42+
while (this.processingJobs.length + this.queued < this.size && this.jobSpecs.length > 0) {
43+
this.queued += 1;
44+
const nextSpec = this.jobSpecs.shift()!;
45+
try {
46+
// eslint-disable-next-line no-await-in-loop
47+
await this.beginJob(nextSpec);
48+
} finally {
49+
this.queued -= 1;
50+
}
51+
}
52+
} finally {
53+
this.dequeueing = false;
54+
}
55+
/**
56+
*
57+
// Always return if at capacity (running this.count jobs)
58+
if (this.processingJobs.length === this.size) return;
59+
if (this.processingJobs.length + this.queued < this.size) {
60+
this.queued += 1;
61+
const nextSpec = this.jobSpecs.shift();
62+
if (!nextSpec) return;
3763
38-
await this.beginJob(nextSpec);
39-
this.waiting = false;
64+
await this.beginJob(nextSpec);
65+
this.queued -= 1;
66+
}
67+
*/
4068
}
4169

4270
abstract beginJob(spec: T): Promise<void>;
4371

4472
processJob(update: DesktopJobUpdate) {
45-
if (!this.processingJob) return;
46-
if (update.key !== this.processingJob.key) return;
73+
if (!this.processingJobs.length) return;
74+
const updatedJob = this.processingJobs.find((job: DesktopJob) => job.key === update.key);
75+
if (!updatedJob) return;
4776
if (update.endTime) {
48-
this.finishJob();
77+
this.finishJob(update.key);
4978
}
5079
}
5180

52-
async finishJob() {
53-
this.processingJob = null;
81+
async finishJob(finishedJobKey: string) {
82+
this.processingJobs = this.processingJobs.filter((job) => job.key !== finishedJobKey);
5483
await this.dequeue();
5584
}
5685

client/platform/desktop/frontend/store/asyncPipelineJobQueue.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import AsyncJobQueue from './asyncJobQueue';
33

44
export default class PipelineJobQueue extends AsyncJobQueue<RunPipeline> {
55
async beginJob(spec: RunPipeline) {
6-
this.processingJob = await this.ipcRenderer.invoke('run-pipeline', spec);
6+
const newJob = await this.ipcRenderer.invoke('run-pipeline', spec);
7+
this.processingJobs.push(newJob);
78
}
89
}

0 commit comments

Comments
 (0)