Skip to content

fix(datastore): propagate remote mutationEvents to Hub for sync received #3697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}

enum ApplyRemoteModelResult {
case applied(RemoteModel)
case applied(RemoteModel, AppliedModel)
case dropped
}

Expand All @@ -363,7 +363,7 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
promise(.failure(dataStoreError))
}
case .success:
promise(.success(.applied(remoteModel)))
promise(.success(.applied(remoteModel, remoteModel)))
}
}
}
Expand All @@ -387,14 +387,13 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
let anyModel: AnyModel
do {
anyModel = try savedModel.eraseToAnyModel()
let appliedModel = MutationSync(model: anyModel, syncMetadata: remoteModel.syncMetadata)
promise(.success(.applied(remoteModel, appliedModel)))
} catch {
let dataStoreError = DataStoreError(error: error)
self.notifyDropped(error: dataStoreError)
promise(.failure(dataStoreError))
return
}
let inProcessModel = MutationSync(model: anyModel, syncMetadata: remoteModel.syncMetadata)
promise(.success(.applied(inProcessModel)))
}
}
}
Expand All @@ -417,21 +416,15 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
result: ApplyRemoteModelResult,
mutationType: MutationEvent.MutationType
) -> AnyPublisher<Void, DataStoreError> {
if case let .applied(inProcessModel) = result {
return self.saveMetadata(storageAdapter: storageAdapter, remoteModel: inProcessModel, mutationType: mutationType)
.handleEvents( receiveOutput: { syncMetadata in
let appliedModel = MutationSync(model: inProcessModel.model, syncMetadata: syncMetadata)
self.notify(savedModel: appliedModel, mutationType: mutationType)
}, receiveCompletion: { completion in
if case .failure(let error) = completion {
self.notifyDropped(error: error)
}
})
.map { _ in () }
switch result {
case .applied(let remoteModel, let appliedModel):
return self.saveMetadata(storageAdapter: storageAdapter, remoteModel: remoteModel, mutationType: mutationType)
.map { MutationSync(model: appliedModel.model, syncMetadata: $0) }
.map { [weak self] in self?.notify(appliedModel: $0, mutationType: mutationType) }
.eraseToAnyPublisher()

case .dropped:
return Just(()).setFailureType(to: DataStoreError.self).eraseToAnyPublisher()
}
return Just(()).setFailureType(to: DataStoreError.self).eraseToAnyPublisher()
}

private func saveMetadata(
Expand All @@ -440,9 +433,17 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
mutationType: MutationEvent.MutationType
) -> Future<MutationSyncMetadata, DataStoreError> {
Future { promise in
storageAdapter.save(remoteModel.syncMetadata,
condition: nil,
eagerLoad: self.isEagerLoad) { result in
storageAdapter.save(
remoteModel.syncMetadata,
condition: nil,
eagerLoad: self.isEagerLoad
) { result in
switch result {
case .failure(let error):
self.notifyDropped(error: error)
case .success:
self.notifyHub(remoteModel: remoteModel, mutationType: mutationType)
}
promise(result)
}
}
Expand All @@ -454,28 +455,46 @@ class ReconcileAndLocalSaveOperation: AsynchronousOperation {
}
}

private func notify(savedModel: AppliedModel,
mutationType: MutationEvent.MutationType) {
let version = savedModel.syncMetadata.version
/// Inform the mutationEvents subscribers about the updated model,
/// which incorporates lazy loading information if applicable.
private func notify(appliedModel: AppliedModel, mutationType: MutationEvent.MutationType) {
guard let json = try? appliedModel.model.instance.toJSON() else {
log.error("Could not notify mutation event")
return
}

let modelIdentifier = appliedModel.model.instance.identifier(schema: modelSchema).stringValue
let mutationEvent = MutationEvent(modelId: modelIdentifier,
modelName: modelSchema.name,
json: json,
mutationType: mutationType,
version: appliedModel.syncMetadata.version)
mutationEventPublisher.send(.mutationEvent(mutationEvent))
}

/// Inform the remote mutationEvents to Hub event subscribers,
/// which only contains information received from AppSync server.
private func notifyHub(
remoteModel: RemoteModel,
mutationType: MutationEvent.MutationType
) {
// TODO: Dispatch/notify error if we can't erase to any model? Would imply an error in JSON decoding,
// which shouldn't be possible this late in the process. Possibly notify global conflict/error handler?
guard let json = try? savedModel.model.instance.toJSON() else {
log.error("Could not notify mutation event")
guard let json = try? remoteModel.model.instance.toJSON() else {
log.error("Could not notify Hub mutation event")
return
}
let modelIdentifier = savedModel.model.instance.identifier(schema: modelSchema).stringValue

let modelIdentifier = remoteModel.model.instance.identifier(schema: modelSchema).stringValue
let mutationEvent = MutationEvent(modelId: modelIdentifier,
modelName: modelSchema.name,
json: json,
mutationType: mutationType,
version: version)
version: remoteModel.syncMetadata.version)

let payload = HubPayload(eventName: HubPayload.EventName.DataStore.syncReceived,
data: mutationEvent)
Amplify.Hub.dispatch(to: .dataStore, payload: payload)

mutationEventPublisher.send(.mutationEvent(mutationEvent))
}

private func notifyFinished() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,16 +705,16 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
waitForExpectations(timeout: 1)
}

func testApplyRemoteModels_deleteDisposition() {
func testApplyRemoteModels_deleteDisposition() async {
let expect = expectation(description: "operation should send value and complete successfully")
expect.expectedFulfillmentCount = 2
let stoargeExpect = expectation(description: "storage delete should be called")
let storageExpect = expectation(description: "storage delete should be called")
let storageMetadataExpect = expectation(description: "storage save metadata should be called")
let notifyExpect = expectation(description: "mutation event should be emitted")
let hubExpect = expectation(description: "Hub is notified")
let deleteResponder = DeleteUntypedModelCompletionResponder { _, id in
XCTAssertEqual(id, self.anyPostMutationSync.model.id)
stoargeExpect.fulfill()
storageExpect.fulfill()
return .emptyResult
}
storageAdapter.responders[.deleteUntypedModel] = deleteResponder
Expand Down Expand Up @@ -758,24 +758,33 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
}, receiveValue: { _ in
expect.fulfill()
}).store(in: &cancellables)
waitForExpectations(timeout: 1)


await fulfillment(of: [
expect,
storageExpect,
storageMetadataExpect,
notifyExpect,
hubExpect
], timeout: 1)
}

func testApplyRemoteModels_multipleDispositions() {
let dispositions: [RemoteSyncReconciler.Disposition] = [.create(anyPostMutationSync),
.create(anyPostMutationSync),
.update(anyPostMutationSync),
.update(anyPostMutationSync),
.delete(anyPostMutationSync),
.delete(anyPostMutationSync),
.create(anyPostMutationSync),
.update(anyPostMutationSync),
.delete(anyPostMutationSync)]
func testApplyRemoteModels_multipleDispositions() async {
let dispositions: [RemoteSyncReconciler.Disposition] = [
.create(anyPostMutationSync),
.create(anyPostMutationSync),
.update(anyPostMutationSync),
.update(anyPostMutationSync),
.delete(anyPostMutationSync),
.delete(anyPostMutationSync),
.create(anyPostMutationSync),
.update(anyPostMutationSync),
.delete(anyPostMutationSync)
]

let expect = expectation(description: "should complete successfully")
expect.expectedFulfillmentCount = 2
let stoargeExpect = expectation(description: "storage save/delete should be called")
stoargeExpect.expectedFulfillmentCount = dispositions.count
let storageExpect = expectation(description: "storage save/delete should be called")
storageExpect.expectedFulfillmentCount = dispositions.count
let storageMetadataExpect = expectation(description: "storage save metadata should be called")
storageMetadataExpect.expectedFulfillmentCount = dispositions.count
let notifyExpect = expectation(description: "mutation event should be emitted")
Expand All @@ -784,14 +793,14 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
hubExpect.expectedFulfillmentCount = dispositions.count

let saveResponder = SaveUntypedModelResponder { _, completion in
stoargeExpect.fulfill()
storageExpect.fulfill()
completion(.success(self.anyPostMutationSync.model))
}
storageAdapter.responders[.saveUntypedModel] = saveResponder

let deleteResponder = DeleteUntypedModelCompletionResponder { _, id in
XCTAssertEqual(id, self.anyPostMutationSync.model.id)
stoargeExpect.fulfill()
storageExpect.fulfill()
return .emptyResult
}
storageAdapter.responders[.deleteUntypedModel] = deleteResponder
Expand Down Expand Up @@ -835,10 +844,16 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
}, receiveValue: { _ in
expect.fulfill()
}).store(in: &cancellables)
waitForExpectations(timeout: 1)
await fulfillment(of: [
expect,
storageExpect,
storageMetadataExpect,
notifyExpect,
hubExpect
], timeout: 1)
}

func testApplyRemoteModels_skipFailedOperations() throws {
func testApplyRemoteModels_skipFailedOperations() async throws {
let dispositions: [RemoteSyncReconciler.Disposition] = [.create(anyPostMutationSync),
.create(anyPostMutationSync),
.update(anyPostMutationSync),
Expand Down Expand Up @@ -890,7 +905,12 @@ class ReconcileAndLocalSaveOperationTests: XCTestCase {
}, receiveValue: { _ in

}).store(in: &cancellables)
waitForExpectations(timeout: 1)

await fulfillment(of: [
expect,
expectedDropped,
expectedDeleteSuccess
], timeout: 1)
}

func testApplyRemoteModels_failWithConstraintViolationShouldBeSuccessful() {
Expand Down