Skip to content

[Do not merge] Support AsyncStream in realtime query #14924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions Firestore/Swift/Source/AsyncAwait/Query+AsyncAwait.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if SWIFT_PACKAGE
@_exported import FirebaseFirestoreInternalWrapper
#else
@_exported import FirebaseFirestoreInternal
#endif // SWIFT_PACKAGE
import Foundation

@available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
public extension Query {
func asyncThrowingStream() -> AsyncThrowingStream<QuerySnapshot, Error> {
AsyncThrowingStream { continuation in
let listener = self.addSnapshotListener { snapshot, error in
if let snapshot = snapshot {
continuation.yield(snapshot)
} else if let error = error {
continuation.finish(throwing: error)
}
}

continuation.onTermination = { _ in
listener.remove()
}
}
}

func asyncThrowingStream(options: SnapshotListenOptions) -> AsyncThrowingStream<QuerySnapshot, Error> {
AsyncThrowingStream { continuation in
let listener = self.addSnapshotListener(options: options) { snapshot, error in
if let snapshot = snapshot {
continuation.yield(snapshot)
} else if let error = error {
continuation.finish(throwing: error)
}
}

continuation.onTermination = { _ in
listener.remove()
}
}
}
}
78 changes: 78 additions & 0 deletions Firestore/Swift/Tests/Integration/AsyncAwaitIntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,41 @@ let emptyBundle = """
#if swift(>=5.5.2)
@available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
class AsyncAwaitIntegrationTests: FSTIntegrationTestCase {
func assertQuerySnapshotDataEquals(_ snapshot: Any,
_ expectedData: [[String: Any]]) throws {
let extractedData = FIRQuerySnapshotGetData(snapshot as! QuerySnapshot)
guard extractedData.count == expectedData.count else {
XCTFail(
"Result count mismatch: Expected \(expectedData.count), got \(extractedData.count)"
)
return
}
for index in 0 ..< extractedData.count {
XCTAssertTrue(areDictionariesEqual(extractedData[index], expectedData[index]))
}
}

// TODO(swift testing): update the function to be able to check other value types as well.
func areDictionariesEqual(_ dict1: [String: Any], _ dict2: [String: Any]) -> Bool {
guard dict1.count == dict2.count
else { return false } // Check if the number of elements matches

for (key, value1) in dict1 {
guard let value2 = dict2[key] else { return false }

// Value Checks (Assuming consistent types after the type check)
if let str1 = value1 as? String, let str2 = value2 as? String {
if str1 != str2 { return false }
} else if let int1 = value1 as? Int, let int2 = value2 as? Int {
if int1 != int2 { return false }
} else {
// Handle other potential types or return false for mismatch
return false
}
}
return true
}

func testAddData() async throws {
let collection = collectionRef()
let document = try await collection.addDocument(data: [:])
Expand Down Expand Up @@ -120,5 +155,48 @@ let emptyBundle = """
}
XCTAssertThrowsError(try deleteAllIndexes(), "The client has already been terminated.")
}

func testCanListenToDefaultSourceFirstAndThenCacheAsyncStream() async throws {
let collRef = collectionRef(withDocuments: [
"a": ["k": "a", "sort": 0],
"b": ["k": "b", "sort": 1],
])

let query = collRef.whereField("sort", isGreaterThanOrEqualTo: 1).order(by: "sort")

// 1. Create a signal stream. The test will wait on this stream.
// The Task will write to it after receiving the first snapshot.
let (signalStream, signalContinuation) = AsyncStream.makeStream(of: Void.self)

let stream = query.asyncThrowingStream()
var iterator = stream.makeAsyncIterator()

let task = Task {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a Task here really needed?

I found doing it without Task and signalStream much easier to read:

https://github.com/firebase/firebase-ios-sdk/pull/14931/files#diff-c9933475816c94708983249c37c7ee449116ab4815fa96fb928113552f83c1eaR121

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the task is mainly used for testing the stream behaviour after the stream closed.

// This task will now run and eventually signal its progress.
let firstSnapshot = try await iterator.next()

// Assertions for the first snapshot
XCTAssertNotNil(firstSnapshot, "Expected an initial snapshot.")
try assertQuerySnapshotDataEquals(firstSnapshot!, [["k": "b", "sort": 1]])
XCTAssertEqual(firstSnapshot!.metadata.isFromCache, false)

// 2. Send the signal to the test function now that we have the first snapshot.
signalContinuation.yield(())
signalContinuation.finish() // We only need to signal once.

// This next await will be suspended until it's cancelled.
let second = try await iterator.next()

// After cancellation, the iterator should terminate and return nil.
XCTAssertNil(second, "iterator.next() should have returned nil after cancellation.")
}

// 3. Instead of sleeping, await the signal from the Task.
// This line will pause execution until `signalContinuation.yield()` is called.
await signalStream.first { _ in true }

// 4. As soon as we receive the signal, we know it's safe to cancel.
task.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to verify the stream has been closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, after the stream closed, the listener (iterator) will received nil

}
}
#endif
Loading