Skip to content

Commit ad3abbb

Browse files
committed
Support asyncStream in realtime
1 parent 6d27b1c commit ad3abbb

File tree

2 files changed

+135
-0
lines changed

2 files changed

+135
-0
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#if SWIFT_PACKAGE
18+
@_exported import FirebaseFirestoreInternalWrapper
19+
#else
20+
@_exported import FirebaseFirestoreInternal
21+
#endif // SWIFT_PACKAGE
22+
import Foundation
23+
24+
@available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
25+
public extension Query {
26+
func snapshots() -> AsyncThrowingStream<QuerySnapshot, Error> {
27+
AsyncThrowingStream { continuation in
28+
let listener = self.addSnapshotListener { snapshot, error in
29+
if let snapshot = snapshot {
30+
continuation.yield(snapshot)
31+
} else if let error = error {
32+
continuation.finish(throwing: error)
33+
}
34+
}
35+
36+
continuation.onTermination = { _ in
37+
listener.remove()
38+
}
39+
}
40+
}
41+
42+
func snapshots(options: SnapshotListenOptions) -> AsyncThrowingStream<QuerySnapshot, Error> {
43+
AsyncThrowingStream { continuation in
44+
let listener = self.addSnapshotListener(options: options) { snapshot, error in
45+
if let snapshot = snapshot {
46+
continuation.yield(snapshot)
47+
} else if let error = error {
48+
continuation.finish(throwing: error)
49+
}
50+
}
51+
52+
continuation.onTermination = { _ in
53+
listener.remove()
54+
}
55+
}
56+
}
57+
}

Firestore/Swift/Tests/Integration/AsyncAwaitIntegrationTests.swift

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,41 @@ let emptyBundle = """
3535
#if swift(>=5.5.2)
3636
@available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
3737
class AsyncAwaitIntegrationTests: FSTIntegrationTestCase {
38+
func assertQuerySnapshotDataEquals(_ snapshot: Any,
39+
_ expectedData: [[String: Any]]) throws {
40+
let extractedData = FIRQuerySnapshotGetData(snapshot as! QuerySnapshot)
41+
guard extractedData.count == expectedData.count else {
42+
XCTFail(
43+
"Result count mismatch: Expected \(expectedData.count), got \(extractedData.count)"
44+
)
45+
return
46+
}
47+
for index in 0 ..< extractedData.count {
48+
XCTAssertTrue(areDictionariesEqual(extractedData[index], expectedData[index]))
49+
}
50+
}
51+
52+
// TODO(swift testing): update the function to be able to check other value types as well.
53+
func areDictionariesEqual(_ dict1: [String: Any], _ dict2: [String: Any]) -> Bool {
54+
guard dict1.count == dict2.count
55+
else { return false } // Check if the number of elements matches
56+
57+
for (key, value1) in dict1 {
58+
guard let value2 = dict2[key] else { return false }
59+
60+
// Value Checks (Assuming consistent types after the type check)
61+
if let str1 = value1 as? String, let str2 = value2 as? String {
62+
if str1 != str2 { return false }
63+
} else if let int1 = value1 as? Int, let int2 = value2 as? Int {
64+
if int1 != int2 { return false }
65+
} else {
66+
// Handle other potential types or return false for mismatch
67+
return false
68+
}
69+
}
70+
return true
71+
}
72+
3873
func testAddData() async throws {
3974
let collection = collectionRef()
4075
let document = try await collection.addDocument(data: [:])
@@ -120,5 +155,48 @@ let emptyBundle = """
120155
}
121156
XCTAssertThrowsError(try deleteAllIndexes(), "The client has already been terminated.")
122157
}
158+
159+
func testCanListenToDefaultSourceFirstAndThenCacheAsyncStream() async throws {
160+
let collRef = collectionRef(withDocuments: [
161+
"a": ["k": "a", "sort": 0],
162+
"b": ["k": "b", "sort": 1],
163+
])
164+
165+
let query = collRef.whereField("sort", isGreaterThanOrEqualTo: 1).order(by: "sort")
166+
167+
// 1. Create a signal stream. The test will wait on this stream.
168+
// The Task will write to it after receiving the first snapshot.
169+
let (signalStream, signalContinuation) = AsyncStream.makeStream(of: Void.self)
170+
171+
let stream = query.snapshots()
172+
var iterator = stream.makeAsyncIterator()
173+
174+
let task = Task {
175+
// This task will now run and eventually signal its progress.
176+
let firstSnapshot = try await iterator.next()
177+
178+
// Assertions for the first snapshot
179+
XCTAssertNotNil(firstSnapshot, "Expected an initial snapshot.")
180+
try assertQuerySnapshotDataEquals(firstSnapshot!, [["k": "b", "sort": 1]])
181+
XCTAssertEqual(firstSnapshot!.metadata.isFromCache, false)
182+
183+
// 2. Send the signal to the test function now that we have the first snapshot.
184+
signalContinuation.yield(())
185+
signalContinuation.finish() // We only need to signal once.
186+
187+
// This next await will be suspended until it's cancelled.
188+
let second = try await iterator.next()
189+
190+
// After cancellation, the iterator should terminate and return nil.
191+
XCTAssertNil(second, "iterator.next() should have returned nil after cancellation.")
192+
}
193+
194+
// 3. Instead of sleeping, await the signal from the Task.
195+
// This line will pause execution until `signalContinuation.yield()` is called.
196+
await signalStream.first { _ in true }
197+
198+
// 4. As soon as we receive the signal, we know it's safe to cancel.
199+
task.cancel()
200+
}
123201
}
124202
#endif

0 commit comments

Comments
 (0)