Skip to content

Commit ff5041f

Browse files
committed
WIP add gpu and cpu job queues
1 parent 78ea7d0 commit ff5041f

File tree

10 files changed

+174
-38
lines changed

10 files changed

+174
-38
lines changed

client/platform/desktop/backend/ipcService.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
DesktopJobUpdate, RunPipeline, RunTraining, Settings, ExportDatasetArgs,
88
DesktopMediaImportResponse,
99
ExportTrainedPipeline,
10+
MaxConcurrency,
1011
} from 'platform/desktop/constants';
1112

1213
import linux from './native/linux';
@@ -114,12 +115,15 @@ export default function register() {
114115
const updater = (update: DesktopJobUpdate) => {
115116
event.sender.send('job-update', update);
116117
};
117-
const ret = await common.finalizeMediaImport(settings.get(), args, updater);
118-
return ret;
118+
return common.finalizeMediaImport(settings.get(), args, updater);
119119
});
120120

121121
ipcMain.handle('validate-settings', async (_, s: Settings) => {
122-
const ret = await currentPlatform.validateViamePath(s);
122+
let ret = await currentPlatform.validateViamePath(s);
123+
if (Math.floor(s.concurrency) > MaxConcurrency) {
124+
const prefix = typeof ret === 'string' ? `${ret}, ` : '';
125+
ret = `${prefix}Concurrency must not exceed ${MaxConcurrency}`;
126+
}
123127
return ret;
124128
});
125129
ipcMain.handle('run-pipeline', async (event, args: RunPipeline) => {
@@ -140,4 +144,10 @@ export default function register() {
140144
};
141145
return currentPlatform.train(settings.get(), args, updater);
142146
});
147+
ipcMain.handle('convert', async (event, args: ConversionArgs) => {
148+
const updater = (update: DesktopJobUpdate) => {
149+
event.sender.send('job-update', update);
150+
};
151+
return currentPlatform.convert(settings.get(), args, updater);
152+
});
143153
}

client/platform/desktop/backend/native/common.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import {
3838
JsonMeta, Settings, JsonMetaCurrentVersion, DesktopMetadata, DesktopJobUpdater,
3939
RunTraining, ExportDatasetArgs, DesktopMediaImportResponse,
4040
ExportConfigurationArgs, JobsFolderName, ProjectsFolderName, PipelinesFolderName,
41+
ConversionArgs,
4142
} from 'platform/desktop/constants';
4243
import {
4344
cleanString, filterByGlob, makeid, strNumericCompare,
@@ -1084,7 +1085,7 @@ async function _importTrackFile(
10841085
/**
10851086
* After media conversion we need to remove the transcodingKey to signify it is done
10861087
*/
1087-
async function completeConversion(settings: Settings, datasetId: string, transcodingJobKey: string, meta: JsonMeta) {
1088+
export async function completeConversion(settings: Settings, datasetId: string, transcodingJobKey: string, meta: JsonMeta) {
10881089
await getValidatedProjectDir(settings, datasetId);
10891090
if (meta.transcodingJobKey === transcodingJobKey) {
10901091
// eslint-disable-next-line no-param-reassign
@@ -1100,7 +1101,7 @@ async function finalizeMediaImport(
11001101
settings: Settings,
11011102
args: DesktopMediaImportResponse,
11021103
updater: DesktopJobUpdater,
1103-
) {
1104+
): Promise<ConversionArgs> {
11041105
const { jsonMeta, globPattern } = args;
11051106
let { mediaConvertList } = args;
11061107
const { type: datasetType } = jsonMeta;
@@ -1135,8 +1136,8 @@ async function finalizeMediaImport(
11351136

11361137
//Now we will kick off any conversions that are necessary
11371138
let jobBase = null;
1139+
const srcDstList: [string, string][] = [];
11381140
if (mediaConvertList.length) {
1139-
const srcDstList: [string, string][] = [];
11401141
const extension = datasetType === 'video' ? '.mp4' : '.png';
11411142
let destAbsPath = '';
11421143
mediaConvertList.forEach((absPath) => {
@@ -1197,7 +1198,10 @@ async function finalizeMediaImport(
11971198
if (args.metaFileAbsPath) {
11981199
await dataFileImport(settings, jsonMeta.id, args.metaFileAbsPath);
11991200
}
1200-
return finalJsonMeta;
1201+
return {
1202+
meta: finalJsonMeta,
1203+
mediaList: srcDstList,
1204+
};
12011205
}
12021206

12031207
async function openLink(url: string) {

client/platform/desktop/backend/native/linux.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@ import {
1414
RunTraining,
1515
DesktopJobUpdater,
1616
ExportTrainedPipeline,
17+
ConversionArgs,
1718
} from 'platform/desktop/constants';
1819
import { observeChild } from 'platform/desktop/backend/native/processManager';
20+
import { convertMedia } from 'platform/desktop/backend/native/mediaJobs';
21+
import { completeConversion } from 'platform/desktop/backend/native/common';
1922
import * as viame from './viame';
2023

2124
const DefaultSettings: Settings = {
@@ -111,6 +114,19 @@ async function train(
111114
});
112115
}
113116

117+
async function convert(
118+
settings: Settings,
119+
conversionArgs: ConversionArgs,
120+
updater: DesktopJobUpdater
121+
): Promise<DesktopJob> {
122+
return convertMedia(
123+
settings,
124+
conversionArgs,
125+
updater,
126+
(jobKey, meta) => completeConversion(settings, conversionArgs.meta.id, jobKey, meta)
127+
);
128+
}
129+
114130
// Based on https://github.com/chrisallenlane/node-nvidia-smi
115131
async function nvidiaSmi(): Promise<NvidiaSmiReply> {
116132
return new Promise((resolve) => {
@@ -146,5 +162,6 @@ export default {
146162
runPipeline,
147163
exportTrainedPipeline,
148164
train,
165+
convert,
149166
validateViamePath,
150167
};

client/platform/desktop/constants.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ export interface Settings {
3131
};
3232
}
3333

34-
export type JobArgs = RunPipeline | RunTraining | ExportTrainedPipeline;
35-
3634
// Handles Importing and storing of multi camera data
3735

3836
export interface Camera {
@@ -186,11 +184,33 @@ export interface RunTraining {
186184
};
187185
}
188186

187+
export type JobArgs = RunPipeline | RunTraining | ExportTrainedPipeline | ConversionArgs;
188+
189+
export function isRunPipeline(spec: JobArgs): spec is RunPipeline {
190+
return 'datasetId' in spec && 'pipeline' in spec;
191+
}
192+
193+
export function isExportTrainedPipeline(spec: JobArgs): spec is ExportTrainedPipeline {
194+
return 'path' in spec && 'pipeline' in spec;
195+
}
196+
197+
export function isRunTraining(spec: JobArgs): spec is RunTraining {
198+
return (
199+
'datasetIds' in spec
200+
&& 'pipelineName' in spec
201+
&& 'trainingConfig' in spec
202+
);
203+
}
204+
189205
export interface ConversionArgs {
190206
meta: JsonMeta;
191207
mediaList: [string, string][];
192208
}
193209

210+
export function isConversion(spec: JobArgs): spec is ConversionArgs {
211+
return 'meta' in spec && 'mediaList' in spec;
212+
}
213+
194214
export interface DesktopJob {
195215
// key unique identifier for this job
196216
key: string;

client/platform/desktop/frontend/api.ts

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ import {
1717
inputAnnotationFileTypes, listFileTypes, inputAnnotationTypes,
1818
} from 'dive-common/constants';
1919
import {
20-
DesktopJob, DesktopMetadata, JsonMeta, NvidiaSmiReply,
20+
DesktopJob, DesktopMetadata, NvidiaSmiReply,
2121
RunPipeline, RunTraining, ExportTrainedPipeline, ExportDatasetArgs, ExportConfigurationArgs,
22-
DesktopMediaImportResponse,
22+
DesktopMediaImportResponse, ConversionArgs,
2323
} from 'platform/desktop/constants';
2424

25-
import { pipelineJobQueue } from './store/jobs';
25+
import { gpuJobQueue } from './store/jobs';
2626

2727
/**
2828
* Native functions that run entirely in the renderer
@@ -98,14 +98,6 @@ async function runPipeline(itemId: string, pipeline: Pipe): Promise<DesktopJob>
9898
return ipcRenderer.invoke('run-pipeline', args);
9999
}
100100

101-
function queuePipeline(itemId: string, pipeline: Pipe): void {
102-
const args: RunPipeline = {
103-
pipeline,
104-
datasetId: itemId,
105-
};
106-
pipelineJobQueue.enqueue(args);
107-
}
108-
109101
async function exportTrainedPipeline(path: string, pipeline: Pipe): Promise<DesktopJob> {
110102
const args: ExportTrainedPipeline = {
111103
path,
@@ -138,6 +130,38 @@ async function runTraining(
138130
return ipcRenderer.invoke('run-training', args);
139131
}
140132

133+
function queueTraining(
134+
folderIds: string[],
135+
pipelineName: string,
136+
config: string,
137+
annotatedFramesOnly: boolean,
138+
labelText?: string,
139+
fineTuneModel?: {
140+
name: string;
141+
type: string;
142+
path?: string;
143+
folderId?: string;
144+
},
145+
): void {
146+
const args: RunTraining = {
147+
datasetIds: folderIds,
148+
pipelineName,
149+
trainingConfig: config,
150+
annotatedFramesOnly,
151+
labelText,
152+
fineTuneModel,
153+
};
154+
gpuJobQueue.enqueue(args);
155+
}
156+
157+
function queuePipeline(itemId: string, pipeline: Pipe): void {
158+
const args: RunPipeline = {
159+
pipeline,
160+
datasetId: itemId,
161+
};
162+
gpuJobQueue.enqueue(args);
163+
}
164+
141165
async function deleteTrainedPipeline(pipeline: Pipe): Promise<void> {
142166
return ipcRenderer.invoke('delete-trained-pipeline', pipeline);
143167
}
@@ -170,10 +194,15 @@ function importAnnotationFile(id: string, path: string, _htmlFile = undefined, a
170194
});
171195
}
172196

173-
function finalizeImport(args: DesktopMediaImportResponse): Promise<JsonMeta> {
197+
function finalizeImport(args: DesktopMediaImportResponse): Promise<ConversionArgs> {
198+
// Have this return JsonMeta as well as everything needed to start a job?
174199
return ipcRenderer.invoke('finalize-import', args);
175200
}
176201

202+
async function convert(args: ConversionArgs): Promise<DesktopJob> {
203+
return ipcRenderer.invoke('convert', args);
204+
}
205+
177206
async function exportDataset(id: string, exclude: boolean, typeFilter: readonly string[], type?: 'csv' | 'json'): Promise<string> {
178207
const location = await dialog.showSaveDialog({
179208
title: 'Export Dataset',
@@ -262,6 +291,7 @@ export {
262291
exportDataset,
263292
exportConfiguration,
264293
finalizeImport,
294+
convert,
265295
importMedia,
266296
bulkImportMedia,
267297
deleteDataset,
@@ -271,4 +301,5 @@ export {
271301
openLink,
272302
nvidiaSmi,
273303
queuePipeline,
304+
queueTraining,
274305
};

client/platform/desktop/frontend/components/Jobs.vue

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { DesktopJob } from 'platform/desktop/constants';
88
import BrowserLink from './BrowserLink.vue';
99
import NavigationBar from './NavigationBar.vue';
1010
import { datasets } from '../store/dataset';
11-
import { recentHistory, truncateOutputAtLines, pipelineJobQueue } from '../store/jobs';
11+
import { recentHistory, truncateOutputAtLines, gpuJobQueue } from '../store/jobs';
1212
1313
export default defineComponent({
1414
components: {
@@ -41,7 +41,7 @@ export default defineComponent({
4141
clockDriver,
4242
datasets,
4343
recentHistory,
44-
pipelineJobQueue,
44+
gpuJobQueue,
4545
moment,
4646
utc,
4747
visibleOutput,
@@ -232,16 +232,16 @@ export default defineComponent({
232232
</v-row>
233233
</v-card>
234234
<h1
235-
v-if="pipelineJobQueue.length() > 0"
235+
v-if="gpuJobQueue.length() > 0"
236236
class="text-h4 mb-4 font-weight-light"
237237
>
238-
Upcoming Jobs ({{ pipelineJobQueue.length() }})
238+
Upcoming Jobs ({{ gpuJobQueue.length() }})
239239
</h1>
240240
<v-card
241-
v-for="jobSpec in pipelineJobQueue.jobSpecs"
242-
:key="jobSpec.datasetId"
241+
v-for="jobSpec in gpuJobQueue.jobSpecs"
242+
:key="jobSpec.datasetId?"
243243
>
244-
{{ jobSpec.datasetId }}
244+
{{ jobSpec.datasetId? }}
245245
</v-card>
246246
</v-col>
247247
</v-row>

client/platform/desktop/frontend/components/Recent.vue

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,9 @@ export default defineComponent({
8888
const imports = await request(async () => Promise.all(argsArray.map((args) => api.finalizeImport(args))));
8989
pendingImportPayload.value = null;
9090
91-
imports.forEach(async (jsonMeta) => {
92-
const recentsMeta = await api.loadMetadata(jsonMeta.id);
91+
imports.forEach(async (conversionArgs) => {
92+
// TODO queue conversion job
93+
const recentsMeta = await api.loadMetadata(conversionArgs.meta.id);
9394
setRecents(recentsMeta);
9495
});
9596
@@ -100,16 +101,17 @@ export default defineComponent({
100101
async function finalizeImport(args: DesktopMediaImportResponse) {
101102
importing.value = true;
102103
await request(async () => {
103-
const jsonMeta = await api.finalizeImport(args);
104+
const conversionArgs = await api.finalizeImport(args);
104105
pendingImportPayload.value = null; // close dialog
105-
if (!jsonMeta.transcodingJobKey) {
106+
if (!conversionArgs.meta.transcodingJobKey) {
106107
router.push({
107108
name: 'viewer',
108-
params: { id: jsonMeta.id },
109+
params: { id: conversionArgs.meta.id },
109110
});
110111
} else {
111112
// Display new data and await transcoding to complete
112-
const recentsMeta = await api.loadMetadata(jsonMeta.id);
113+
// TODO: queue conversion job
114+
const recentsMeta = await api.loadMetadata(conversionArgs.meta.id);
113115
setRecents(recentsMeta);
114116
}
115117
});
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import {
2+
ConversionArgs,
3+
ExportTrainedPipeline,
4+
Settings,
5+
DesktopJobUpdater,
6+
JsonMeta,
7+
} from 'platform/desktop/constants';
8+
import { convertMedia } from 'platform/desktop/backend/native/mediaJobs';
9+
import AsyncJobQueue from './asyncJobQueue';
10+
11+
export default class AsyncCpuJobQueue extends AsyncJobQueue<ConversionArgs | ExportTrainedPipeline> {
12+
async beginConversionJob(
13+
spec: ConversionArgs,
14+
settings: Settings,
15+
updater: DesktopJobUpdater,
16+
onComplete?: (jobKey: string, meta: JsonMeta) => void,
17+
mediaIndex = 0,
18+
key = '',
19+
baseWorkDir = '',
20+
): JsonMeta {
21+
const result = await this.ipcRenderer.invoke('finalize-import')
22+
this.processingJobs.push(result.desk);
23+
return
24+
}
25+
26+
async beginJob(spec: ExportTrainedPipeline) {
27+
const newJob = await this.ipcRenderer.invoke('export-trained-pipeline', spec);
28+
this.processingJobs.push(newJob);
29+
}
30+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import {
2+
RunPipeline,
3+
RunTraining,
4+
isRunPipeline,
5+
isRunTraining,
6+
DesktopJob,
7+
} from 'platform/desktop/constants';
8+
import AsyncJobQueue from './asyncJobQueue';
9+
10+
export default class AsyncGpuJobQueue extends AsyncJobQueue<RunPipeline | RunTraining> {
11+
async beginJob(spec: RunPipeline | RunTraining) {
12+
let newJob: DesktopJob;
13+
if (isRunPipeline(spec)) {
14+
newJob = await this.ipcRenderer.invoke('run-pipeline', spec);
15+
} else if (isRunTraining(spec)) {
16+
newJob = await this.ipcRenderer.invoke('run-training', spec);
17+
} else {
18+
throw new Error('Unsupported job arguments provided to beginJob.');
19+
}
20+
this.processingJobs.push(newJob);
21+
}
22+
}

0 commit comments

Comments
 (0)