Skip to content

Commit acb7dee

Browse files
authored
chore: kickoff release
2 parents d792245 + 00aac42 commit acb7dee

File tree

12 files changed

+2008
-1573
lines changed

12 files changed

+2008
-1573
lines changed

AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient+HandleRequest.swift

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ extension AppSyncRealTimeClient {
3333
// listen to response
3434
self.subject
3535
.setFailureType(to: AppSyncRealTimeRequest.Error.self)
36-
.flatMap { Self.filterResponse(request: request, response: $0) }
36+
.flatMap { Self.filterResponse(request: request, result: $0) }
3737
.timeout(.seconds(timeout), scheduler: DispatchQueue.global(qos: .userInitiated), customError: { .timeout })
3838
.first()
3939
.sink(receiveCompletion: { completion in
@@ -65,47 +65,59 @@ extension AppSyncRealTimeClient {
6565

6666
private static func filterResponse(
6767
request: AppSyncRealTimeRequest,
68-
response: AppSyncRealTimeResponse
68+
result: Result<AppSyncRealTimeResponse, Error>
6969
) -> AnyPublisher<AppSyncRealTimeResponse, AppSyncRealTimeRequest.Error> {
70-
let justTheResponse = Just(response)
71-
.setFailureType(to: AppSyncRealTimeRequest.Error.self)
72-
.eraseToAnyPublisher()
73-
74-
switch (request, response.type) {
75-
case (.connectionInit, .connectionAck):
76-
return justTheResponse
77-
78-
case (.start(let startRequest), .startAck) where startRequest.id == response.id:
79-
return justTheResponse
80-
81-
case (.stop(let id), .stopAck) where id == response.id:
82-
return justTheResponse
83-
84-
case (_, .error)
85-
where request.id != nil
86-
&& request.id == response.id
87-
&& response.payload?.errors != nil:
88-
let errorsJson: JSONValue = (response.payload?.errors)!
89-
let errors = errorsJson.asArray ?? [errorsJson]
90-
let reqeustErrors = errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
91-
if reqeustErrors.isEmpty {
70+
71+
switch result {
72+
case .success(let response):
73+
let justTheResponse = Just(response)
74+
.setFailureType(to: AppSyncRealTimeRequest.Error.self)
75+
.eraseToAnyPublisher()
76+
77+
switch (request, response.type) {
78+
case (.connectionInit, .connectionAck):
79+
return justTheResponse
80+
81+
case (.start(let startRequest), .startAck) where startRequest.id == response.id:
82+
return justTheResponse
83+
84+
case (.stop(let id), .stopAck) where id == response.id:
85+
return justTheResponse
86+
87+
case (_, .error)
88+
where request.id != nil
89+
&& request.id == response.id
90+
&& response.payload?.errors != nil:
91+
let errorsJson: JSONValue = (response.payload?.errors)!
92+
let errors = errorsJson.asArray ?? [errorsJson]
93+
let reqeustErrors = errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
94+
if reqeustErrors.isEmpty {
95+
return Empty(
96+
outputType: AppSyncRealTimeResponse.self,
97+
failureType: AppSyncRealTimeRequest.Error.self
98+
).eraseToAnyPublisher()
99+
} else {
100+
return Fail(
101+
outputType: AppSyncRealTimeResponse.self,
102+
failure: reqeustErrors.first!
103+
).eraseToAnyPublisher()
104+
}
105+
106+
default:
92107
return Empty(
93108
outputType: AppSyncRealTimeResponse.self,
94109
failureType: AppSyncRealTimeRequest.Error.self
95110
).eraseToAnyPublisher()
96-
} else {
97-
return Fail(
98-
outputType: AppSyncRealTimeResponse.self,
99-
failure: reqeustErrors.first!
100-
).eraseToAnyPublisher()
111+
101112
}
102113

103-
default:
104-
return Empty(
114+
case .failure:
115+
return Fail(
105116
outputType: AppSyncRealTimeResponse.self,
106-
failureType: AppSyncRealTimeRequest.Error.self
117+
failure: .timeout
107118
).eraseToAnyPublisher()
108-
109119
}
120+
121+
110122
}
111123
}

AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
4848
/// WebSocketClient offering connections at the WebSocket protocol level
4949
internal var webSocketClient: AppSyncWebSocketClientProtocol
5050
/// Writable data stream convert WebSocketEvent to AppSyncRealTimeResponse
51-
internal let subject = PassthroughSubject<AppSyncRealTimeResponse, Never>()
51+
internal let subject = PassthroughSubject<Result<AppSyncRealTimeResponse, Error>, Never>()
5252

5353
var isConnected: Bool {
5454
self.state.value == .connected
@@ -283,15 +283,25 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
283283
private func filterAppSyncSubscriptionEvent(
284284
with id: String
285285
) -> AnyPublisher<AppSyncSubscriptionEvent, Never> {
286-
subject.filter { $0.id == id || $0.type == .connectionError }
287-
.map { response -> AppSyncSubscriptionEvent? in
288-
switch response.type {
289-
case .connectionError, .error:
290-
return .error(Self.decodeAppSyncRealTimeResponseError(response.payload))
291-
case .data:
292-
return response.payload.map { .data($0) }
293-
default:
294-
return nil
286+
subject.filter {
287+
switch $0 {
288+
case .success(let response): return response.id == id || response.type == .connectionError
289+
case .failure(let error): return true
290+
}
291+
}
292+
.map { result -> AppSyncSubscriptionEvent? in
293+
switch result {
294+
case .success(let response):
295+
switch response.type {
296+
case .connectionError, .error:
297+
return .error(Self.decodeAppSyncRealTimeResponseError(response.payload))
298+
case .data:
299+
return response.payload.map { .data($0) }
300+
default:
301+
return nil
302+
}
303+
case .failure(let error):
304+
return .error([error])
295305
}
296306
}
297307
.compactMap { $0 }
@@ -368,9 +378,9 @@ extension AppSyncRealTimeClient {
368378
self.cancellablesBindToConnection = Set()
369379

370380
case .error(let error):
371-
// Since we've activated auto-reconnect functionality in WebSocketClient upon connection failure,
372-
// we only record errors here for debugging purposes.
381+
// Propagate connection error to downstream for Sync engine to restart
373382
log.debug("[AppSyncRealTimeClient] WebSocket error event: \(error)")
383+
self.subject.send(.failure(error))
374384
case .string(let string):
375385
guard let data = string.data(using: .utf8) else {
376386
log.debug("[AppSyncRealTimeClient] Failed to decode string \(string)")
@@ -400,7 +410,7 @@ extension AppSyncRealTimeClient {
400410
switch event.type {
401411
case .connectionAck:
402412
log.debug("[AppSyncRealTimeClient] AppSync connected: \(String(describing: event.payload))")
403-
subject.send(event)
413+
subject.send(.success(event))
404414

405415
self.resumeExistingSubscriptions()
406416
self.state.send(.connected)
@@ -411,7 +421,7 @@ extension AppSyncRealTimeClient {
411421

412422
default:
413423
log.debug("[AppSyncRealTimeClient] AppSync received response: \(event)")
414-
subject.send(event)
424+
subject.send(.success(event))
415425
}
416426
}
417427

AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77

88
import Amplify
99
import Foundation
10-
import AWSPluginsCore
10+
@_spi(WebSocket) import AWSPluginsCore
1111
import InternalAmplifyCredentials
1212
import Combine
1313

14+
1415
public class AWSGraphQLSubscriptionTaskRunner<R: Decodable>: InternalTaskRunner, InternalTaskAsyncThrowingSequence, InternalTaskThrowingChannel {
1516
public typealias Request = GraphQLOperationRequest<R>
1617
public typealias InProcess = GraphQLSubscriptionEvent<R>
@@ -387,32 +388,7 @@ fileprivate func toAPIError<R: Decodable>(_ errors: [Error], type: R.Type) -> AP
387388
"Subscription item event failed with error" +
388389
(hasAuthorizationError ? ": \(APIError.UnauthorizedMessageString)" : "")
389390
}
390-
391-
#if swift(<5.8)
392-
if let errors = errors.cast(to: AppSyncRealTimeRequest.Error.self) {
393-
let hasAuthorizationError = errors.contains(where: { $0 == .unauthorized})
394-
return APIError.operationError(
395-
errorDescription(hasAuthorizationError),
396-
"",
397-
errors.first
398-
)
399-
} else if let errors = errors.cast(to: GraphQLError.self) {
400-
let hasAuthorizationError = errors.map(\.extensions)
401-
.compactMap { $0.flatMap { $0["errorType"]?.stringValue } }
402-
.contains(where: { AppSyncErrorType($0) == .unauthorized })
403-
return APIError.operationError(
404-
errorDescription(hasAuthorizationError),
405-
"",
406-
GraphQLResponseError<R>.error(errors)
407-
)
408-
} else {
409-
return APIError.operationError(
410-
errorDescription(),
411-
"",
412-
errors.first
413-
)
414-
}
415-
#else
391+
416392
switch errors {
417393
case let errors as [AppSyncRealTimeRequest.Error]:
418394
let hasAuthorizationError = errors.contains(where: { $0 == .unauthorized})
@@ -430,12 +406,14 @@ fileprivate func toAPIError<R: Decodable>(_ errors: [Error], type: R.Type) -> AP
430406
"",
431407
GraphQLResponseError<R>.error(errors)
432408
)
409+
410+
case let errors as [WebSocketClient.Error]:
411+
return APIError.networkError("WebSocketClient connection aborted", nil, URLError(.networkConnectionLost))
433412
default:
434413
return APIError.operationError(
435414
errorDescription(),
436415
"",
437416
errors.first
438417
)
439418
}
440-
#endif
441419
}

AmplifyPlugins/API/Tests/AWSAPIPluginTests/AppSyncRealTimeClient/AppSyncRealTimeClientTests.swift

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class AppSyncRealTimeClientTests: XCTestCase {
5959
}
6060
Task {
6161
try await Task.sleep(nanoseconds: 80 * 1000)
62-
await appSyncClient.subject.send(.init(id: nil, payload: nil, type: .connectionAck))
62+
await appSyncClient.subject.send(.success(.init(id: nil, payload: nil, type: .connectionAck)))
6363
}
6464
await fulfillment(of: [finishExpectation], timeout: timeout + 1)
6565
}
@@ -91,7 +91,7 @@ class AppSyncRealTimeClientTests: XCTestCase {
9191
}
9292
Task {
9393
try await Task.sleep(nanoseconds: 80 * 1000)
94-
await appSyncClient.subject.send(.init(
94+
await appSyncClient.subject.send(.success(.init(
9595
id: id,
9696
payload: .object([
9797
"errors": .array([
@@ -101,7 +101,7 @@ class AppSyncRealTimeClientTests: XCTestCase {
101101
])
102102
]),
103103
type: .error
104-
))
104+
)))
105105
}
106106
await fulfillment(of: [limitExceededErrorExpectation], timeout: timeout + 1)
107107
}
@@ -134,7 +134,7 @@ class AppSyncRealTimeClientTests: XCTestCase {
134134

135135
Task {
136136
try await Task.sleep(nanoseconds: 80 * 1000)
137-
await appSyncClient.subject.send(.init(
137+
await appSyncClient.subject.send(.success(.init(
138138
id: id,
139139
payload: .object([
140140
"errors": .array([
@@ -144,7 +144,7 @@ class AppSyncRealTimeClientTests: XCTestCase {
144144
])
145145
]),
146146
type: .error
147-
))
147+
)))
148148
}
149149
await fulfillment(of: [
150150
maxSubscriptionsReachedExpectation
@@ -181,7 +181,7 @@ class AppSyncRealTimeClientTests: XCTestCase {
181181

182182
Task {
183183
try await Task.sleep(nanoseconds: 80 * 1000)
184-
await appSyncClient.subject.send(.init(
184+
await appSyncClient.subject.send(.success(.init(
185185
id: id,
186186
payload: .object([
187187
"errors": .array([
@@ -191,7 +191,7 @@ class AppSyncRealTimeClientTests: XCTestCase {
191191
])
192192
]),
193193
type: .error
194-
))
194+
)))
195195
}
196196
await fulfillment(of: [
197197
triggerUnknownErrorExpectation
@@ -487,4 +487,68 @@ class AppSyncRealTimeClientTests: XCTestCase {
487487
startTriggered
488488
], timeout: 3, enforceOrder: true)
489489
}
490+
491+
func testNetworkInterrupt_withAppSyncRealTimeClientConnected_triggersApiNetworkError() async throws {
492+
var cancellables = Set<AnyCancellable>()
493+
let mockWebSocketClient = MockWebSocketClient()
494+
let mockAppSyncRequestInterceptor = MockAppSyncRequestInterceptor()
495+
let appSyncClient = AppSyncRealTimeClient(
496+
endpoint: URL(string: "https://example.com")!,
497+
requestInterceptor: mockAppSyncRequestInterceptor,
498+
webSocketClient: mockWebSocketClient
499+
)
500+
let id = UUID().uuidString
501+
let query = UUID().uuidString
502+
503+
let startTriggered = expectation(description: "webSocket writing start event to connection")
504+
let errorReceived = expectation(description: "webSocket connection lost error is received")
505+
506+
await mockWebSocketClient.setStateToConnected()
507+
Task {
508+
try await Task.sleep(nanoseconds: 80 * 1_000_000)
509+
await mockWebSocketClient.subject.send(.connected)
510+
try await Task.sleep(nanoseconds: 80 * 1_000_000)
511+
await mockWebSocketClient.subject.send(.string("""
512+
{"type": "connection_ack", "payload": { "connectionTimeoutMs": 300000 }}
513+
"""))
514+
try await Task.sleep(nanoseconds: 80 * 1_000_000)
515+
await mockWebSocketClient.subject.send(.error(WebSocketClient.Error.connectionLost))
516+
}
517+
try await appSyncClient.subscribe(id: id, query: query).sink { event in
518+
if case .error(let errors) = event,
519+
errors.count == 1,
520+
let error = errors.first,
521+
let connectionLostError = error as? WebSocketClient.Error,
522+
connectionLostError == WebSocketClient.Error.connectionLost
523+
{
524+
errorReceived.fulfill()
525+
}
526+
}.store(in: &cancellables)
527+
await mockWebSocketClient.actionSubject
528+
.sink { action in
529+
switch action {
530+
case .write(let message):
531+
guard let response = try? JSONDecoder().decode(
532+
JSONValue.self,
533+
from: message.data(using: .utf8)!
534+
) else {
535+
XCTFail("Response should be able to decode to AppSyncRealTimeResponse")
536+
return
537+
}
538+
539+
if response.type?.stringValue == "start" {
540+
XCTAssertEqual(response.id?.stringValue, id)
541+
XCTAssertEqual(response.payload?.asObject?["data"]?.stringValue, query)
542+
startTriggered.fulfill()
543+
}
544+
545+
default:
546+
break
547+
}
548+
}
549+
.store(in: &cancellables)
550+
551+
await fulfillment(of: [startTriggered, errorReceived], timeout: 2)
552+
553+
}
490554
}

AmplifyPlugins/Core/AWSPluginsCore/WebSocket/AmplifyNetworkMonitor.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
// SPDX-License-Identifier: Apache-2.0
66
//
77

8-
98
import Network
109
import Combine
1110

@@ -38,6 +37,7 @@ public final class AmplifyNetworkMonitor {
3837
label: "com.amazonaws.amplify.ios.network.websocket.monitor",
3938
qos: .userInitiated
4039
))
40+
4141
}
4242

4343
public func updateState(_ nextState: State) {
@@ -48,4 +48,5 @@ public final class AmplifyNetworkMonitor {
4848
subject.send(completion: .finished)
4949
monitor.cancel()
5050
}
51+
5152
}

AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+Retryable.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ extension RemoteSyncEngine {
1616
}
1717

1818
func scheduleRestartOrTerminate(error: AmplifyError) {
19+
Self.log.debug("scheduling restart or terminate on error: \(error)")
1920
let advice = getRetryAdvice(error: error)
2021
if advice.shouldRetry {
2122
scheduleRestart(advice: advice)

0 commit comments

Comments
 (0)