@@ -45,6 +45,15 @@ import {
45
45
import { BloomFilter , BloomFilterError } from './bloom_filter' ;
46
46
import { ExistenceFilter } from './existence_filter' ;
47
47
import { RemoteEvent , TargetChange } from './remote_event' ;
48
+ import {
49
+ getPipelineDocuments ,
50
+ getPipelineFlavor ,
51
+ getPipelineSourceType ,
52
+ isPipeline ,
53
+ TargetOrPipeline
54
+ } from '../core/pipeline-util' ;
55
+ import { Pipeline } from '../lite-api/pipeline' ;
56
+ import { ResourcePath } from '../model/path' ;
48
57
49
58
/**
50
59
* Internal representation of the watcher API protocol buffers.
@@ -405,6 +414,17 @@ export class WatchChangeAggregator {
405
414
}
406
415
}
407
416
417
+ isSingleDocumentTarget ( target : TargetOrPipeline ) : boolean {
418
+ if ( targetIsPipelineTarget ( target ) ) {
419
+ return (
420
+ getPipelineSourceType ( target ) === 'documents' &&
421
+ getPipelineDocuments ( target ) ?. length === 1
422
+ ) ;
423
+ }
424
+
425
+ return targetIsDocumentTarget ( target ) ;
426
+ }
427
+
408
428
/**
409
429
* Handles existence filters and synthesizes deletes for filter mismatches.
410
430
* Targets that are invalidated by filter mismatches are added to
@@ -417,29 +437,7 @@ export class WatchChangeAggregator {
417
437
const targetData = this . targetDataForActiveTarget ( targetId ) ;
418
438
if ( targetData ) {
419
439
const target = targetData . target ;
420
- if ( targetIsPipelineTarget ( target ) ) {
421
- //TODO(pipeline): handle existence filter correctly for pipelines
422
- } else if ( targetIsDocumentTarget ( target ) ) {
423
- if ( expectedCount === 0 ) {
424
- // The existence filter told us the document does not exist. We deduce
425
- // that this document does not exist and apply a deleted document to
426
- // our updates. Without applying this deleted document there might be
427
- // another query that will raise this document as part of a snapshot
428
- // until it is resolved, essentially exposing inconsistency between
429
- // queries.
430
- const key = new DocumentKey ( target . path ) ;
431
- this . removeDocumentFromTarget (
432
- targetId ,
433
- key ,
434
- MutableDocument . newNoDocument ( key , SnapshotVersion . min ( ) )
435
- ) ;
436
- } else {
437
- hardAssert (
438
- expectedCount === 1 ,
439
- 'Single document existence filter with count: ' + expectedCount
440
- ) ;
441
- }
442
- } else {
440
+ if ( ! this . isSingleDocumentTarget ( target ) ) {
443
441
const currentSize = this . getCurrentDocumentCountForTarget ( targetId ) ;
444
442
// Existence filter mismatch. Mark the documents as being in limbo, and
445
443
// raise a snapshot with `isFromCache:true`.
@@ -474,6 +472,30 @@ export class WatchChangeAggregator {
474
472
)
475
473
) ;
476
474
}
475
+ } else {
476
+ if ( expectedCount === 0 ) {
477
+ // The existence filter told us the document does not exist. We deduce
478
+ // that this document does not exist and apply a deleted document to
479
+ // our updates. Without applying this deleted document there might be
480
+ // another query that will raise this document as part of a snapshot
481
+ // until it is resolved, essentially exposing inconsistency between
482
+ // queries.
483
+ const key = new DocumentKey (
484
+ targetIsPipelineTarget ( target )
485
+ ? ResourcePath . fromString ( getPipelineDocuments ( target ) ! [ 0 ] )
486
+ : target . path
487
+ ) ;
488
+ this . removeDocumentFromTarget (
489
+ targetId ,
490
+ key ,
491
+ MutableDocument . newNoDocument ( key , SnapshotVersion . min ( ) )
492
+ ) ;
493
+ } else {
494
+ hardAssert (
495
+ expectedCount === 1 ,
496
+ 'Single document existence filter with count: ' + expectedCount
497
+ ) ;
498
+ }
477
499
}
478
500
}
479
501
}
@@ -591,8 +613,7 @@ export class WatchChangeAggregator {
591
613
if ( targetData ) {
592
614
if (
593
615
targetState . current &&
594
- ! targetIsPipelineTarget ( targetData . target ) &&
595
- targetIsDocumentTarget ( targetData . target )
616
+ this . isSingleDocumentTarget ( targetData . target )
596
617
) {
597
618
// Document queries for document that don't exist can produce an empty
598
619
// result set. To update our local cache, we synthesize a document
@@ -603,7 +624,12 @@ export class WatchChangeAggregator {
603
624
// TODO(dimond): Ideally we would have an explicit lookup target
604
625
// instead resulting in an explicit delete message and we could
605
626
// remove this special logic.
606
- const key = new DocumentKey ( targetData . target . path ) ;
627
+ const path = targetIsPipelineTarget ( targetData . target )
628
+ ? ResourcePath . fromString (
629
+ getPipelineDocuments ( targetData . target ) ! [ 0 ]
630
+ )
631
+ : targetData . target . path ;
632
+ const key = new DocumentKey ( path ) ;
607
633
if (
608
634
this . pendingDocumentUpdates . get ( key ) === null &&
609
635
! this . targetContainsDocument ( targetId , key )
@@ -695,7 +721,12 @@ export class WatchChangeAggregator {
695
721
targetState . addDocumentChange ( document . key , changeType ) ;
696
722
697
723
if (
698
- targetIsPipelineTarget ( this . targetDataForActiveTarget ( targetId ) ! . target )
724
+ targetIsPipelineTarget (
725
+ this . targetDataForActiveTarget ( targetId ) ! . target
726
+ ) &&
727
+ getPipelineFlavor (
728
+ this . targetDataForActiveTarget ( targetId ) ! . target as Pipeline
729
+ ) !== 'exact'
699
730
) {
700
731
this . pendingAugmentedDocumentUpdates =
701
732
this . pendingAugmentedDocumentUpdates . insert ( document . key , document ) ;
@@ -747,7 +778,12 @@ export class WatchChangeAggregator {
747
778
748
779
if ( updatedDocument ) {
749
780
if (
750
- targetIsPipelineTarget ( this . targetDataForActiveTarget ( targetId ) ! . target )
781
+ targetIsPipelineTarget (
782
+ this . targetDataForActiveTarget ( targetId ) ! . target
783
+ ) &&
784
+ getPipelineFlavor (
785
+ this . targetDataForActiveTarget ( targetId ) ! . target as Pipeline
786
+ ) !== 'exact'
751
787
) {
752
788
this . pendingAugmentedDocumentUpdates =
753
789
this . pendingAugmentedDocumentUpdates . insert ( key , updatedDocument ) ;
0 commit comments