Skip to content

Commit 82b8303

Browse files
committed
Setting up QueryOrPipeline to replace Query
1 parent 23b5135 commit 82b8303

File tree

6 files changed

+104
-144
lines changed

6 files changed

+104
-144
lines changed

packages/firestore/src/api/pipeline.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {
22
firestoreClientExecutePipeline,
3-
firestoreClientListenPipeline
3+
firestoreClientListen
44
} from '../core/firestore_client';
55
import { Pipeline as LitePipeline } from '../lite-api/pipeline';
66
import { PipelineResult } from '../lite-api/pipeline-result';
@@ -140,7 +140,8 @@ export class Pipeline<
140140
this.stages.push(new Sort([Field.of('__name__').ascending()]));
141141

142142
const client = ensureFirestoreConfigured(this.db);
143-
firestoreClientListenPipeline(client, this, { next, error, complete });
143+
// TODO(pipeline) hook up options
144+
firestoreClientListen(client, this, {}, observer);
144145

145146
return () => {};
146147
}

packages/firestore/src/core/event_manager.ts

Lines changed: 66 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
2727
import { Pipeline } from '../api/pipeline';
2828
import { PipelineSnapshot } from '../api/snapshot';
2929
import { PipelineResultView } from './sync_engine_impl';
30+
import { canonifyPipeline, pipelineEq } from './pipeline-util';
3031

3132
/**
3233
* Holds the listeners and the last received ViewSnapshot for a query being
@@ -50,6 +51,12 @@ export interface Observer<T> {
5051
error: EventHandler<FirestoreError>;
5152
}
5253

54+
export type QueryOrPipeline = Query | Pipeline;
55+
56+
export function isPipeline(q: QueryOrPipeline): q is Pipeline {
57+
return q instanceof Pipeline;
58+
}
59+
5360
/**
5461
* EventManager is responsible for mapping queries to query event emitters.
5562
* It handles "fan-out". -- Identical queries will re-use the same watch on the
@@ -61,14 +68,15 @@ export interface Observer<T> {
6168
*/
6269
export interface EventManager {
6370
onListen?: (
64-
query: Query,
71+
query: QueryOrPipeline,
6572
enableRemoteListen: boolean
6673
) => Promise<ViewSnapshot>;
67-
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;
68-
onFirstRemoteStoreListen?: (query: Query) => Promise<void>;
69-
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;
70-
// TODO(pipeline): consolidate query and pipeline
71-
onListenPipeline?: (pipeline: PipelineListener) => Promise<void>;
74+
onUnlisten?: (
75+
query: QueryOrPipeline,
76+
disableRemoteListen: boolean
77+
) => Promise<void>;
78+
onFirstRemoteStoreListen?: (query: QueryOrPipeline) => Promise<void>;
79+
onLastRemoteStoreUnlisten?: (query: QueryOrPipeline) => Promise<void>;
7280
terminate(): void;
7381
}
7482

@@ -77,31 +85,34 @@ export function newEventManager(): EventManager {
7785
}
7886

7987
export class EventManagerImpl implements EventManager {
80-
queries: ObjectMap<Query, QueryListenersInfo> = newQueriesObjectMap();
88+
queries: ObjectMap<QueryOrPipeline, QueryListenersInfo> =
89+
newQueriesObjectMap();
8190

8291
onlineState: OnlineState = OnlineState.Unknown;
8392

8493
snapshotsInSyncListeners: Set<Observer<void>> = new Set();
8594

8695
/** Callback invoked when a Query is first listen to. */
8796
onListen?: (
88-
query: Query,
97+
query: QueryOrPipeline,
8998
enableRemoteListen: boolean
9099
) => Promise<ViewSnapshot>;
91100
/** Callback invoked once all listeners to a Query are removed. */
92-
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;
93-
onListenPipeline?: (pipeline: PipelineListener) => Promise<void>;
101+
onUnlisten?: (
102+
query: QueryOrPipeline,
103+
disableRemoteListen: boolean
104+
) => Promise<void>;
94105

95106
/**
96107
* Callback invoked when a Query starts listening to the remote store, while
97108
* already listening to the cache.
98109
*/
99-
onFirstRemoteStoreListen?: (query: Query) => Promise<void>;
110+
onFirstRemoteStoreListen?: (query: QueryOrPipeline) => Promise<void>;
100111
/**
101112
* Callback invoked when a Query stops listening to the remote store, while
102113
* still listening to the cache.
103114
*/
104-
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;
115+
onLastRemoteStoreUnlisten?: (query: QueryOrPipeline) => Promise<void>;
105116

106117
terminate(): void {
107118
errorAllTargets(
@@ -111,10 +122,43 @@ export class EventManagerImpl implements EventManager {
111122
}
112123
}
113124

114-
function newQueriesObjectMap(): ObjectMap<Query, QueryListenersInfo> {
115-
return new ObjectMap<Query, QueryListenersInfo>(
116-
q => canonifyQuery(q),
117-
queryEquals
125+
export function stringifyQueryOrPipeline(q: QueryOrPipeline): string {
126+
if (isPipeline(q)) {
127+
return canonifyPipeline(q);
128+
}
129+
130+
return stringifyQuery(q);
131+
}
132+
133+
export function canonifyQueryOrPipeline(q: QueryOrPipeline): string {
134+
if (isPipeline(q)) {
135+
return canonifyPipeline(q);
136+
}
137+
138+
return canonifyQuery(q);
139+
}
140+
141+
export function queryOrPipelineEqual(
142+
left: QueryOrPipeline,
143+
right: QueryOrPipeline
144+
): boolean {
145+
if (left instanceof Pipeline && right instanceof Pipeline) {
146+
return pipelineEq(left, right);
147+
}
148+
if (
149+
(left instanceof Pipeline && !(right instanceof Pipeline)) ||
150+
(!(left instanceof Pipeline) && right instanceof Pipeline)
151+
) {
152+
return false;
153+
}
154+
155+
return queryEquals(left as Query, right as Query);
156+
}
157+
158+
function newQueriesObjectMap(): ObjectMap<QueryOrPipeline, QueryListenersInfo> {
159+
return new ObjectMap<QueryOrPipeline, QueryListenersInfo>(
160+
q => canonifyQueryOrPipeline(q),
161+
queryOrPipelineEqual
118162
);
119163
}
120164

@@ -129,7 +173,6 @@ function validateEventManager(eventManagerImpl: EventManagerImpl): void {
129173
!!eventManagerImpl.onLastRemoteStoreUnlisten,
130174
'onLastRemoteStoreUnlisten not set'
131175
);
132-
debugAssert(!!eventManagerImpl.onListenPipeline, 'onListenPipeline not set');
133176
}
134177

135178
const enum ListenerSetupAction {
@@ -194,7 +237,11 @@ export async function eventManagerListen(
194237
} catch (e) {
195238
const firestoreError = wrapInUserErrorIfRecoverable(
196239
e as Error,
197-
`Initialization of query '${stringifyQuery(listener.query)}' failed`
240+
`Initialization of query '${
241+
isPipeline(listener.query)
242+
? canonifyPipeline(listener.query)
243+
: stringifyQuery(listener.query)
244+
}' failed`
198245
);
199246
listener.onError(firestoreError);
200247
return;
@@ -220,25 +267,6 @@ export async function eventManagerListen(
220267
}
221268
}
222269

223-
export async function eventManagerListenPipeline(
224-
eventManager: EventManager,
225-
listener: PipelineListener
226-
): Promise<void> {
227-
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
228-
validateEventManager(eventManagerImpl);
229-
230-
try {
231-
await eventManagerImpl.onListenPipeline!(listener);
232-
} catch (e) {
233-
const firestoreError = wrapInUserErrorIfRecoverable(
234-
e as Error,
235-
`Initialization of query '${listener.pipeline}' failed`
236-
);
237-
listener.onError(firestoreError);
238-
return;
239-
}
240-
}
241-
242270
export async function eventManagerUnlisten(
243271
eventManager: EventManager,
244272
listener: QueryListener
@@ -312,13 +340,6 @@ export function eventManagerOnWatchChange(
312340
}
313341
}
314342

315-
export function eventManagerOnPipelineWatchChange(
316-
eventManager: EventManager,
317-
viewSnaps: PipelineResultView[]
318-
): void {
319-
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
320-
}
321-
322343
export function eventManagerOnWatchError(
323344
eventManager: EventManager,
324345
query: Query,
@@ -445,7 +466,7 @@ export class QueryListener {
445466
private onlineState = OnlineState.Unknown;
446467

447468
constructor(
448-
readonly query: Query,
469+
readonly query: QueryOrPipeline,
449470
private queryObserver: Observer<ViewSnapshot>,
450471
options?: ListenOptions
451472
) {

packages/firestore/src/core/firestore_client.ts

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,11 @@ import {
8080
addSnapshotsInSyncListener,
8181
EventManager,
8282
eventManagerListen,
83-
eventManagerListenPipeline,
8483
eventManagerUnlisten,
8584
ListenOptions,
8685
Observer,
87-
PipelineListener,
8886
QueryListener,
87+
QueryOrPipeline,
8988
removeSnapshotsInSyncListener
9089
} from './event_manager';
9190
import { newQueryForPath, Query } from './query';
@@ -410,10 +409,6 @@ export async function getEventManager(
410409
null,
411410
onlineComponentProvider.syncEngine
412411
);
413-
eventManager.onListenPipeline = syncEngineListenPipeline.bind(
414-
null,
415-
onlineComponentProvider.syncEngine
416-
);
417412
return eventManager;
418413
}
419414

@@ -459,7 +454,7 @@ export function firestoreClientWaitForPendingWrites(
459454

460455
export function firestoreClientListen(
461456
client: FirestoreClient,
462-
query: Query,
457+
query: QueryOrPipeline,
463458
options: ListenOptions,
464459
observer: Partial<Observer<ViewSnapshot>>
465460
): () => void {
@@ -581,27 +576,6 @@ export function firestoreClientExecutePipeline(
581576
return deferred.promise;
582577
}
583578

584-
export function firestoreClientListenPipeline(
585-
client: FirestoreClient,
586-
pipeline: Pipeline,
587-
observer: {
588-
next?: (snapshot: PipelineSnapshot) => void;
589-
error?: (error: FirestoreError) => void;
590-
complete?: () => void;
591-
}
592-
): Unsubscribe {
593-
const wrappedObserver = new AsyncObserver(observer);
594-
const listener = new PipelineListener(pipeline, wrappedObserver);
595-
client.asyncQueue.enqueueAndForget(async () => {
596-
const eventManager = await getEventManager(client);
597-
return eventManagerListenPipeline(eventManager, listener);
598-
});
599-
return () => {
600-
wrappedObserver.mute();
601-
// TODO(pipeline): actually unlisten
602-
};
603-
}
604-
605579
export function firestoreClientWrite(
606580
client: FirestoreClient,
607581
mutations: Mutation[]

0 commit comments

Comments
 (0)