Skip to content
This repository was archived by the owner on Oct 25, 2023. It is now read-only.

Commit 6a0759d

Browse files
committed
implement external.ingestion
1 parent 65d7085 commit 6a0759d

File tree

1 file changed

+78
-7
lines changed

1 file changed

+78
-7
lines changed

archaeologist/src/storage_api_local.ts

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import {
1111
Ack,
1212
AddUserActivityRequest,
13+
AdvanceExternalPipelineIngestionProgress,
1314
CreateNodeArgs,
1415
Eid,
1516
GetNodeSliceArgs,
@@ -27,6 +28,8 @@ import {
2728
TNodeJson,
2829
TNodeSliceIterator,
2930
TotalUserActivity,
31+
UserExternalPipelineId,
32+
UserExternalPipelineIngestionProgress,
3033
} from 'smuggler-api'
3134
import { NodeType } from 'smuggler-api'
3235
import { v4 as uuidv4 } from 'uuid'
@@ -67,8 +70,24 @@ type NidToEdgeLav = GenericLav<'nid->edge', TEdgeJson[]>
6770
type OriginToActivityYek = GenericYek<'origin->activity', OriginId>
6871
type OriginToActivityLav = GenericLav<'origin->activity', TotalUserActivity>
6972

70-
type Yek = NidYek | OriginToNidYek | NidToEdgeYek | OriginToActivityYek
71-
type Lav = NidLav | OriginToNidLav | NidToEdgeLav | OriginToActivityLav
73+
type ExtPipelineYek = GenericYek<'ext-pipe', UserExternalPipelineId>
74+
type ExtPipelineLav = GenericLav<
75+
'ext-pipe',
76+
Omit<UserExternalPipelineIngestionProgress, 'epid'>
77+
>
78+
79+
type Yek =
80+
| NidYek
81+
| OriginToNidYek
82+
| NidToEdgeYek
83+
| OriginToActivityYek
84+
| ExtPipelineYek
85+
type Lav =
86+
| NidLav
87+
| OriginToNidLav
88+
| NidToEdgeLav
89+
| OriginToActivityLav
90+
| ExtPipelineLav
7291

7392
// TODO[snikitin@outlook.com] Describe that the purpose of this wrapper is to
7493
// add a bit of ORM-like typesafety to browser.Storage.StorageArea.
@@ -105,6 +124,7 @@ class YekLavStore {
105124
get(yek: NidToEdgeYek): Promise<NidToEdgeLav | undefined>
106125
get(yek: NidYek[]): Promise<NidLav[]>
107126
get(yek: OriginToActivityYek): Promise<OriginToActivityLav | undefined>
127+
get(yek: ExtPipelineYek): Promise<ExtPipelineLav | undefined>
108128
get(yek: Yek | Yek[]): Promise<Lav | Lav[] | undefined> {
109129
if (Array.isArray(yek)) {
110130
const keys: string[] = yek.map((value: Yek) => this.stringify(value))
@@ -135,11 +155,13 @@ class YekLavStore {
135155
case 'nid':
136156
return 'nid:' + yek.yek.key
137157
case 'origin->nid':
138-
return 'origin->nid:' + yek.yek.key
158+
return 'origin->nid:' + yek.yek.key.id
139159
case 'nid->edge':
140160
return 'nid->edge:' + yek.yek.key
141161
case 'origin->activity':
142-
return 'origin->activity:' + yek.yek.key
162+
return 'origin->activity:' + yek.yek.key.id
163+
case 'ext-pipe':
164+
return 'ext-pipe:' + yek.yek.key.pipeline_key
143165
}
144166
}
145167
}
@@ -346,6 +368,46 @@ async function getExternalUserActivity(
346368
return value
347369
}
348370

371+
async function getUserIngestionProgress(
372+
store: YekLavStore,
373+
epid: UserExternalPipelineId
374+
): Promise<UserExternalPipelineIngestionProgress> {
375+
const yek: ExtPipelineYek = {
376+
yek: { kind: 'ext-pipe', key: epid },
377+
}
378+
const lav: ExtPipelineLav | undefined = await store.get(yek)
379+
if (lav == null) {
380+
return {
381+
epid,
382+
ingested_until: 0,
383+
}
384+
}
385+
const value: UserExternalPipelineIngestionProgress = {
386+
epid,
387+
...lav.lav.value,
388+
}
389+
return value
390+
}
391+
392+
async function advanceUserIngestionProgress(
393+
store: YekLavStore,
394+
epid: UserExternalPipelineId,
395+
new_progress: AdvanceExternalPipelineIngestionProgress
396+
): Promise<Ack> {
397+
const progress: UserExternalPipelineIngestionProgress =
398+
await getUserIngestionProgress(store, epid)
399+
progress.ingested_until = new_progress.ingested_until
400+
401+
const yek: ExtPipelineYek = {
402+
yek: { kind: 'ext-pipe', key: epid },
403+
}
404+
const lav: ExtPipelineLav = {
405+
lav: { kind: 'ext-pipe', value: progress },
406+
}
407+
await store.set([{ yek, lav }])
408+
return { ack: true }
409+
}
410+
349411
export function makeLocalStorageApi(
350412
browserStore: browser.Storage.StorageArea
351413
): StorageApi {
@@ -406,7 +468,11 @@ export function makeLocalStorageApi(
406468
},
407469
activity: {
408470
external: {
409-
add: () => addExternalUserActivity,
471+
add: (
472+
origin: OriginId,
473+
activity: AddUserActivityRequest,
474+
_signal?: AbortSignal
475+
) => addExternalUserActivity(store, origin, activity),
410476
get: (origin: OriginId, _signal?: AbortSignal) =>
411477
getExternalUserActivity(store, origin),
412478
},
@@ -417,8 +483,13 @@ export function makeLocalStorageApi(
417483
},
418484
external: {
419485
ingestion: {
420-
// get: getUserIngestionProgress,
421-
// advance: advanceUserIngestionProgress,
486+
get: (epid: UserExternalPipelineId, _signal?: AbortSignal) =>
487+
getUserIngestionProgress(store, epid),
488+
advance: (
489+
epid: UserExternalPipelineId,
490+
new_progress: AdvanceExternalPipelineIngestionProgress,
491+
_signal?: AbortSignal
492+
) => advanceUserIngestionProgress(store, epid, new_progress),
422493
},
423494
},
424495
}

0 commit comments

Comments
 (0)