@@ -62,8 +62,11 @@ type GenericLav<Kind extends string, Value> = {
62
62
}
63
63
}
64
64
65
- type NidYek = GenericYek < 'nid' , Nid >
66
- type NidLav = GenericLav < 'nid' , TNodeJson >
65
+ type AllNidsYek = GenericYek < 'all-nids' , undefined >
66
+ type AllNidsLav = GenericLav < 'all-nids' , Nid [ ] >
67
+
68
+ type NidToNodeYek = GenericYek < 'nid->node' , Nid >
69
+ type NidToNodeLav = GenericLav < 'nid->node' , TNodeJson >
67
70
68
71
type OriginToNidYek = GenericYek < 'origin->nid' , OriginId >
69
72
type OriginToNidLav = GenericLav < 'origin->nid' , Nid [ ] >
@@ -74,28 +77,32 @@ type NidToEdgeLav = GenericLav<'nid->edge', TEdgeJson[]>
74
77
type OriginToActivityYek = GenericYek < 'origin->activity' , OriginId >
75
78
type OriginToActivityLav = GenericLav < 'origin->activity' , TotalUserActivity >
76
79
77
- type ExtPipelineYek = GenericYek < 'ext-pipe' , UserExternalPipelineId >
80
+ type ExtPipelineYek = GenericYek < 'ext-pipe->progress ' , UserExternalPipelineId >
78
81
type ExtPipelineLav = GenericLav <
79
- 'ext-pipe' ,
82
+ 'ext-pipe->progress ' ,
80
83
Omit < UserExternalPipelineIngestionProgress , 'epid' >
81
84
>
82
85
83
86
type Yek =
84
- | NidYek
87
+ | AllNidsYek
88
+ | NidToNodeYek
85
89
| OriginToNidYek
86
90
| NidToEdgeYek
87
91
| OriginToActivityYek
88
92
| ExtPipelineYek
89
93
type Lav =
90
- | NidLav
94
+ | AllNidsLav
95
+ | NidToNodeLav
91
96
| OriginToNidLav
92
97
| NidToEdgeLav
93
98
| OriginToActivityLav
94
99
| ExtPipelineLav
95
100
96
101
type YekLav = { yek : Yek ; lav : Lav }
97
102
98
- function isOfArrayKind ( lav : Lav ) : lav is OriginToNidLav | NidToEdgeLav {
103
+ function isOfArrayKind (
104
+ lav : Lav
105
+ ) : lav is OriginToNidLav | NidToEdgeLav | AllNidsLav {
99
106
return Array . isArray ( lav . lav . value )
100
107
}
101
108
@@ -129,10 +136,11 @@ class YekLavStore {
129
136
return this . store . set ( records )
130
137
}
131
138
132
- get ( yek : NidYek ) : Promise < NidLav | undefined >
139
+ get ( yek : AllNidsYek ) : Promise < AllNidsLav | undefined >
140
+ get ( yek : NidToNodeYek ) : Promise < NidToNodeLav | undefined >
133
141
get ( yek : OriginToNidYek ) : Promise < OriginToNidLav | undefined >
134
142
get ( yek : NidToEdgeYek ) : Promise < NidToEdgeLav | undefined >
135
- get ( yek : NidYek [ ] ) : Promise < NidLav [ ] >
143
+ get ( yek : NidToNodeYek [ ] ) : Promise < NidToNodeLav [ ] >
136
144
get ( yek : OriginToActivityYek ) : Promise < OriginToActivityLav | undefined >
137
145
get ( yek : ExtPipelineYek ) : Promise < ExtPipelineLav | undefined >
138
146
get ( yek : Yek ) : Promise < Lav | undefined >
@@ -163,6 +171,13 @@ class YekLavStore {
163
171
164
172
// TODO[snikitin@outlook .com] Explain that this method is a poor man's attempt
165
173
// to increase atomicity of data insertion
174
+ async prepareAppend (
175
+ yek : AllNidsYek ,
176
+ lav : AllNidsLav
177
+ ) : Promise < {
178
+ yek : AllNidsYek
179
+ lav : AllNidsLav
180
+ } >
166
181
async prepareAppend (
167
182
yek : OriginToNidYek ,
168
183
lav : OriginToNidLav
@@ -206,15 +221,17 @@ class YekLavStore {
206
221
207
222
private stringify ( yek : Yek ) : string {
208
223
switch ( yek . yek . kind ) {
209
- case 'nid' :
210
- return 'nid:' + yek . yek . key
224
+ case 'all-nids' :
225
+ return 'all-nids'
226
+ case 'nid->node' :
227
+ return 'nid->node:' + yek . yek . key
211
228
case 'origin->nid' :
212
229
return 'origin->nid:' + yek . yek . key . id
213
230
case 'nid->edge' :
214
231
return 'nid->edge:' + yek . yek . key
215
232
case 'origin->activity' :
216
233
return 'origin->activity:' + yek . yek . key . id
217
- case 'ext-pipe' :
234
+ case 'ext-pipe->progress ' :
218
235
return 'ext-pipe:' + yek . yek . key . pipeline_key
219
236
}
220
237
}
@@ -264,10 +281,14 @@ async function createNode(
264
281
updated_at : createdAt ,
265
282
}
266
283
267
- let records : { yek : Yek ; lav : Lav } [ ] = [
284
+ let records : YekLav [ ] = [
285
+ await store . prepareAppend (
286
+ { yek : { kind : 'all-nids' , key : undefined } } ,
287
+ { lav : { kind : 'all-nids' , value : [ node . nid ] } }
288
+ ) ,
268
289
{
269
- yek : { yek : { kind : 'nid' , key : node . nid } } ,
270
- lav : { lav : { kind : 'nid' , value : node } } ,
290
+ yek : { yek : { kind : 'nid->node ' , key : node . nid } } ,
291
+ lav : { lav : { kind : 'nid->node ' , value : node } } ,
271
292
} ,
272
293
]
273
294
@@ -329,8 +350,8 @@ async function getNode({
329
350
store : YekLavStore
330
351
nid : Nid
331
352
} ) : Promise < TNode > {
332
- const yek : NidYek = { yek : { kind : 'nid' , key : nid } }
333
- const lav : NidLav | undefined = await store . get ( yek )
353
+ const yek : NidToNodeYek = { yek : { kind : 'nid->node ' , key : nid } }
354
+ const lav : NidToNodeLav | undefined = await store . get ( yek )
334
355
if ( lav == null ) {
335
356
throw new Error ( `Failed to get node ${ nid } because it wasn't found` )
336
357
}
@@ -351,30 +372,32 @@ async function getNodesByOrigin({
351
372
return [ ]
352
373
}
353
374
const value : Nid [ ] = lav . lav . value
354
- const nidYeks : NidYek [ ] = value . map ( ( nid : Nid ) : NidYek => {
355
- return { yek : { kind : 'nid' , key : nid } }
375
+ const nidYeks : NidToNodeYek [ ] = value . map ( ( nid : Nid ) : NidToNodeYek => {
376
+ return { yek : { kind : 'nid->node ' , key : nid } }
356
377
} )
357
- const nidLavs : NidLav [ ] = await store . get ( nidYeks )
358
- return nidLavs . map ( ( lav : NidLav ) => NodeUtil . fromJson ( lav . lav . value ) )
378
+ const nidLavs : NidToNodeLav [ ] = await store . get ( nidYeks )
379
+ return nidLavs . map ( ( lav : NidToNodeLav ) => NodeUtil . fromJson ( lav . lav . value ) )
359
380
}
360
381
361
382
async function getNodeBatch (
362
383
store : YekLavStore ,
363
384
req : NodeBatchRequestBody
364
385
) : Promise < NodeBatch > {
365
- const yeks : NidYek [ ] = req . nids . map ( ( nid : Nid ) : NidYek => {
366
- return { yek : { kind : 'nid' , key : nid } }
386
+ const yeks : NidToNodeYek [ ] = req . nids . map ( ( nid : Nid ) : NidToNodeYek => {
387
+ return { yek : { kind : 'nid->node ' , key : nid } }
367
388
} )
368
- const lavs : NidLav [ ] = await store . get ( yeks )
369
- return { nodes : lavs . map ( ( lav : NidLav ) => NodeUtil . fromJson ( lav . lav . value ) ) }
389
+ const lavs : NidToNodeLav [ ] = await store . get ( yeks )
390
+ return {
391
+ nodes : lavs . map ( ( lav : NidToNodeLav ) => NodeUtil . fromJson ( lav . lav . value ) ) ,
392
+ }
370
393
}
371
394
372
395
async function updateNode (
373
396
store : YekLavStore ,
374
397
args : { nid : Nid } & NodePatchRequest
375
398
) : Promise < Ack > {
376
- const yek : NidYek = { yek : { kind : 'nid' , key : args . nid } }
377
- const lav : NidLav | undefined = await store . get ( yek )
399
+ const yek : NidToNodeYek = { yek : { kind : 'nid->node ' , key : args . nid } }
400
+ const lav : NidToNodeLav | undefined = await store . get ( yek )
378
401
if ( lav == null ) {
379
402
throw new Error ( `Failed to update node ${ args . nid } because it wasn't found` )
380
403
}
@@ -389,6 +412,46 @@ async function updateNode(
389
412
return { ack : true }
390
413
}
391
414
415
+ class Iterator implements INodeIterator {
416
+ private store : YekLavStore
417
+ private nids : Promise < Nid [ ] >
418
+ private index : number
419
+
420
+ constructor ( store : YekLavStore ) {
421
+ this . store = store
422
+ this . index = 0
423
+
424
+ const yek : AllNidsYek = {
425
+ yek : { kind : 'all-nids' , key : undefined } ,
426
+ }
427
+ this . nids = store
428
+ . get ( yek )
429
+ . then ( ( lav : AllNidsLav | undefined ) => lav ?. lav . value ?? [ ] )
430
+ }
431
+
432
+ async next ( ) : Promise < TNode | null > {
433
+ const nids = await this . nids
434
+ if ( this . index >= nids . length ) {
435
+ return null
436
+ }
437
+ const nid : Nid = nids [ this . index ]
438
+ const yek : NidToNodeYek = { yek : { kind : 'nid->node' , key : nid } }
439
+ const lav : NidToNodeLav | undefined = await this . store . get ( yek )
440
+ if ( lav == null ) {
441
+ throw new Error ( `Failed to find node for nid ${ nid } ` )
442
+ }
443
+ ++ this . index
444
+ return NodeUtil . fromJson ( lav . lav . value )
445
+ }
446
+ total ( ) : number {
447
+ return this . index
448
+ }
449
+ abort ( ) : void {
450
+ this . index = 0
451
+ this . nids = Promise . resolve ( [ ] )
452
+ }
453
+ }
454
+
392
455
async function createEdge (
393
456
store : YekLavStore ,
394
457
args : CreateEdgeArgs
@@ -507,7 +570,7 @@ async function getUserIngestionProgress(
507
570
epid : UserExternalPipelineId
508
571
) : Promise < UserExternalPipelineIngestionProgress > {
509
572
const yek : ExtPipelineYek = {
510
- yek : { kind : 'ext-pipe' , key : epid } ,
573
+ yek : { kind : 'ext-pipe->progress ' , key : epid } ,
511
574
}
512
575
const lav : ExtPipelineLav | undefined = await store . get ( yek )
513
576
if ( lav == null ) {
@@ -533,10 +596,10 @@ async function advanceUserIngestionProgress(
533
596
progress . ingested_until = new_progress . ingested_until
534
597
535
598
const yek : ExtPipelineYek = {
536
- yek : { kind : 'ext-pipe' , key : epid } ,
599
+ yek : { kind : 'ext-pipe->progress ' , key : epid } ,
537
600
}
538
601
const lav : ExtPipelineLav = {
539
- lav : { kind : 'ext-pipe' , value : progress } ,
602
+ lav : { kind : 'ext-pipe->progress ' , value : progress } ,
540
603
}
541
604
await store . set ( [ { yek, lav } ] )
542
605
return { ack : true }
@@ -567,7 +630,7 @@ export function makeLocalStorageApi(
567
630
) => updateNode ( store , args ) ,
568
631
create : ( args : CreateNodeArgs , _signal ?: AbortSignal ) =>
569
632
createNode ( store , args ) ,
570
- iterate : throwUnimplementedError ( 'node.iterate' ) ,
633
+ iterate : ( ) => new Iterator ( store ) ,
571
634
delete : throwUnimplementedError ( 'node.delete' ) ,
572
635
bulkDelete : throwUnimplementedError ( 'node.bulkdDelete' ) ,
573
636
batch : {
0 commit comments