-
Notifications
You must be signed in to change notification settings - Fork 1.6k
[Do not merge] Support AsyncStream in realtime query #14924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
cherylEnkidu
wants to merge
3
commits into
main
Choose a base branch
from
cheryllin/AsyncFlowInRealtime
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 2 commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Copyright 2025 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#if SWIFT_PACKAGE | ||
@_exported import FirebaseFirestoreInternalWrapper | ||
#else | ||
@_exported import FirebaseFirestoreInternal | ||
#endif // SWIFT_PACKAGE | ||
import Foundation | ||
|
||
@available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *) | ||
public extension Query { | ||
func asyncThrowingStream() -> AsyncThrowingStream<QuerySnapshot, Error> { | ||
AsyncThrowingStream { continuation in | ||
let listener = self.addSnapshotListener { snapshot, error in | ||
if let snapshot = snapshot { | ||
continuation.yield(snapshot) | ||
} else if let error = error { | ||
continuation.finish(throwing: error) | ||
} | ||
} | ||
|
||
continuation.onTermination = { _ in | ||
listener.remove() | ||
} | ||
} | ||
} | ||
|
||
func asyncThrowingStream(options: SnapshotListenOptions) -> AsyncThrowingStream<QuerySnapshot, Error> { | ||
AsyncThrowingStream { continuation in | ||
let listener = self.addSnapshotListener(options: options) { snapshot, error in | ||
if let snapshot = snapshot { | ||
continuation.yield(snapshot) | ||
} else if let error = error { | ||
continuation.finish(throwing: error) | ||
} | ||
} | ||
|
||
continuation.onTermination = { _ in | ||
listener.remove() | ||
} | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,41 @@ let emptyBundle = """ | |
#if swift(>=5.5.2) | ||
@available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *) | ||
class AsyncAwaitIntegrationTests: FSTIntegrationTestCase { | ||
func assertQuerySnapshotDataEquals(_ snapshot: Any, | ||
_ expectedData: [[String: Any]]) throws { | ||
let extractedData = FIRQuerySnapshotGetData(snapshot as! QuerySnapshot) | ||
guard extractedData.count == expectedData.count else { | ||
XCTFail( | ||
"Result count mismatch: Expected \(expectedData.count), got \(extractedData.count)" | ||
) | ||
return | ||
} | ||
for index in 0 ..< extractedData.count { | ||
XCTAssertTrue(areDictionariesEqual(extractedData[index], expectedData[index])) | ||
} | ||
} | ||
|
||
// TODO(swift testing): update the function to be able to check other value types as well. | ||
func areDictionariesEqual(_ dict1: [String: Any], _ dict2: [String: Any]) -> Bool { | ||
guard dict1.count == dict2.count | ||
else { return false } // Check if the number of elements matches | ||
|
||
for (key, value1) in dict1 { | ||
guard let value2 = dict2[key] else { return false } | ||
|
||
// Value Checks (Assuming consistent types after the type check) | ||
if let str1 = value1 as? String, let str2 = value2 as? String { | ||
if str1 != str2 { return false } | ||
} else if let int1 = value1 as? Int, let int2 = value2 as? Int { | ||
if int1 != int2 { return false } | ||
} else { | ||
// Handle other potential types or return false for mismatch | ||
return false | ||
} | ||
} | ||
return true | ||
} | ||
|
||
func testAddData() async throws { | ||
let collection = collectionRef() | ||
let document = try await collection.addDocument(data: [:]) | ||
|
@@ -120,5 +155,48 @@ let emptyBundle = """ | |
} | ||
XCTAssertThrowsError(try deleteAllIndexes(), "The client has already been terminated.") | ||
} | ||
|
||
func testCanListenToDefaultSourceFirstAndThenCacheAsyncStream() async throws { | ||
let collRef = collectionRef(withDocuments: [ | ||
"a": ["k": "a", "sort": 0], | ||
"b": ["k": "b", "sort": 1], | ||
]) | ||
|
||
let query = collRef.whereField("sort", isGreaterThanOrEqualTo: 1).order(by: "sort") | ||
|
||
// 1. Create a signal stream. The test will wait on this stream. | ||
// The Task will write to it after receiving the first snapshot. | ||
let (signalStream, signalContinuation) = AsyncStream.makeStream(of: Void.self) | ||
|
||
let stream = query.asyncThrowingStream() | ||
var iterator = stream.makeAsyncIterator() | ||
|
||
let task = Task { | ||
// This task will now run and eventually signal its progress. | ||
let firstSnapshot = try await iterator.next() | ||
|
||
// Assertions for the first snapshot | ||
XCTAssertNotNil(firstSnapshot, "Expected an initial snapshot.") | ||
try assertQuerySnapshotDataEquals(firstSnapshot!, [["k": "b", "sort": 1]]) | ||
XCTAssertEqual(firstSnapshot!.metadata.isFromCache, false) | ||
|
||
// 2. Send the signal to the test function now that we have the first snapshot. | ||
signalContinuation.yield(()) | ||
signalContinuation.finish() // We only need to signal once. | ||
|
||
// This next await will be suspended until it's cancelled. | ||
let second = try await iterator.next() | ||
|
||
// After cancellation, the iterator should terminate and return nil. | ||
XCTAssertNil(second, "iterator.next() should have returned nil after cancellation.") | ||
} | ||
|
||
// 3. Instead of sleeping, await the signal from the Task. | ||
// This line will pause execution until `signalContinuation.yield()` is called. | ||
await signalStream.first { _ in true } | ||
|
||
// 4. As soon as we receive the signal, we know it's safe to cancel. | ||
task.cancel() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way to verify the stream has been closed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, after the stream closed, the listener (iterator) will received |
||
} | ||
} | ||
#endif |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a Task here really needed?
I found doing it without Task and signalStream much easier to read:
https://github.com/firebase/firebase-ios-sdk/pull/14931/files#diff-c9933475816c94708983249c37c7ee449116ab4815fa96fb928113552f83c1eaR121
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the task is mainly used for testing the stream behaviour after the stream closed.