diff --git a/Sources/LiveViewNative/Coordinators/LiveSessionCoordinator.swift b/Sources/LiveViewNative/Coordinators/LiveSessionCoordinator.swift index 2418bc2a4..65fd58341 100644 --- a/Sources/LiveViewNative/Coordinators/LiveSessionCoordinator.swift +++ b/Sources/LiveViewNative/Coordinators/LiveSessionCoordinator.swift @@ -50,9 +50,13 @@ public class LiveSessionCoordinator: ObservableObject { @Published private(set) var rootLayout: LiveViewNativeCore.Document? @Published private(set) var stylesheet: Stylesheet? - // Socket connection - var liveSocket: LiveViewNativeCore.LiveSocket? - var socket: LiveViewNativeCore.Socket? + private var persistence: SimplePersistentStore + private var eventHandler: SimpleEventHandler + private var patchHandler: SimplePatchHandler + private var navHandler: SimpleNavHandler + + private var liveviewClient: LiveViewClient? + private var builder: LiveViewClientBuilder private var liveReloadChannel: LiveViewNativeCore.LiveChannel? private var liveReloadListenerLoop: Task<(), any Error>? @@ -63,7 +67,20 @@ public class LiveSessionCoordinator: ObservableObject { private var eventSubject = PassthroughSubject<(LiveViewCoordinator, (String, Json)), Never>() private var eventHandlers = Set() - private var reconnectAttempts = 0 + deinit { + let client = liveviewClient + let channel = liveReloadChannel + Task { @MainActor in + if let client { + client.shutdown() + } + if let channel { + do { + try await channel.shutdownParentSocket() + } + } + } + } /// Positions for `` elements with an explicit ID. /// @@ -85,6 +102,14 @@ public class LiveSessionCoordinator: ObservableObject { public convenience init(_ host: some LiveViewHost, config: LiveSessionConfiguration = .init(), customRegistryType: R.Type = R.self) { self.init(host.url, config: config, customRegistryType: customRegistryType) } + + public func clientChannel() -> LiveViewClientChannel? { + self.liveviewClient?.channel() + } + + public func status() -> SocketStatus { + (try? self.liveviewClient?.status()) ?? .disconnected + } /// Creates a new coordinator with a custom registry. /// - Parameter url: The URL of the page to establish the connection to. @@ -95,6 +120,29 @@ public class LiveSessionCoordinator: ObservableObject { self.configuration = config + self.patchHandler = SimplePatchHandler() + self.eventHandler = SimpleEventHandler() + self.navHandler = SimpleNavHandler() + self.persistence = SimplePersistentStore() + + self.builder = LiveViewClientBuilder(); + + self.builder.setPatchHandler(patchHandler) + self.builder.setNavigationHandler(navHandler) + self.builder.setPersistenceProvider(persistence) + self.builder.setLiveChannelEventHandler(eventHandler) + self.builder.setLogLevel(.debug) + + self.eventHandler.viewReloadSubject + .receive(on: DispatchQueue.main) + .sink { [weak self] newView in + guard let self else { return } + guard let last = self.navigationPath.last else { return } + if let client = self.liveviewClient { + last.coordinator.join(client, self.eventHandler, self.patchHandler) + } + }.store(in: &cancellables) + // load cookies into core for cookie in HTTPCookieStorage.shared.cookies(for: url) ?? [] { try? LiveViewNativeCore.storeSessionCookie("\(cookie.name)=\(cookie.value)", self.url.absoluteString) @@ -111,38 +159,27 @@ public class LiveSessionCoordinator: ObservableObject { $navigationPath.scan(([LiveNavigationEntry](), [LiveNavigationEntry]()), { ($0.1, $1) }).sink { [weak self] prev, next in guard let self else { return } + guard let client = liveviewClient else { return } Task { - try await prev.last?.coordinator.disconnect() + prev.last?.coordinator.disconnect() if prev.count > next.count { - let targetEntry = self.liveSocket!.getEntries()[next.count - 1] - next.last?.coordinator.join( - try await self.liveSocket!.traverseTo(targetEntry.id, - .some([ - "_format": .str(string: LiveSessionParameters.platform), - "_interface": .object(object: LiveSessionParameters.platformParams) - ]), - nil) - ) + + var opts = NavActionOptions() + opts.joinParams = .some([ "_interface": .object(object: LiveSessionParameters.platformParams)]) + let targetEntry = client.getEntries()[next.count - 1] + let _ = try await client.traverseTo(targetEntry.id, opts) + } else if next.count > prev.count && prev.count > 0 { // forward navigation (from `redirect` or ``) - next.last?.coordinator.join( - try await self.liveSocket!.navigate(next.last!.url.absoluteString, - .some([ - "_format": .str(string: LiveSessionParameters.platform), - "_interface": .object(object: LiveSessionParameters.platformParams) - ]), - NavOptions(action: .push)) - ) + var opts = NavOptions() + opts.joinParams = .some([ "_interface": .object(object: LiveSessionParameters.platformParams)]) + opts.action = .push + let _ = try await client.navigate(next.last!.url.absoluteString, opts) } else if next.count == prev.count { - guard let liveChannel = - try await self.liveSocket?.navigate(next.last!.url.absoluteString, - .some([ - "_format": .str(string: LiveSessionParameters.platform), - "_interface": .object(object: LiveSessionParameters.platformParams) - ]), - NavOptions(action: .replace)) - else { return } - next.last?.coordinator.join(liveChannel) + var opts = NavOptions() + opts.joinParams = .some([ "_interface": .object(object: LiveSessionParameters.platformParams)]) + opts.action = .replace + let _ = try await client.navigate(next.last!.url.absoluteString, opts) } } }.store(in: &cancellables) @@ -171,19 +208,7 @@ public class LiveSessionCoordinator: ObservableObject { self.init(url, config: config, customRegistryType: EmptyRegistry.self) } - deinit { - let socket = socket - let liveReloadChannel = liveReloadChannel - Task { - do { - try await socket?.shutdown() - } - do { - try await liveReloadChannel?.shutdownParentSocket() - } - } - } - + /// Connects this coordinator to the LiveView channel. /// /// You generally do not call this function yourself. It is called automatically when the ``LiveView`` appears. @@ -195,6 +220,7 @@ public class LiveSessionCoordinator: ObservableObject { /// - Parameter httpMethod: The HTTP method to use for the dead render. Defaults to `GET`. /// - Parameter httpBody: The HTTP body to send when requesting the dead render. public func connect(httpMethod: String? = nil, httpBody: Data? = nil, additionalHeaders: [String: String]? = nil) async { + do { switch state { case .setup, .disconnected, .connectionFailed: @@ -212,37 +238,26 @@ public class LiveSessionCoordinator: ObservableObject { let headers = (configuration.headers ?? [:]) .merging(additionalHeaders ?? [:]) { $1 } - if let socket { - try await socket.shutdown() - } - + // TODO: add this interface let adapter = ReconnectStrategyAdapter(self.configuration.reconnectBehavior) - self.liveSocket = try await LiveSocket( - originalURL.absoluteString, - LiveSessionParameters.platform, - ConnectOpts( - headers: headers, - body: httpBody, - method: httpMethod.flatMap(Method.init(_:)), - timeoutMs: 10_000 - ), - adapter - ) - - // save cookies to storage - HTTPCookieStorage.shared.setCookies( - (self.liveSocket!.joinHeaders()["set-cookie"] ?? []).flatMap { - HTTPCookie.cookies(withResponseHeaderFields: ["Set-Cookie": $0], for: URL(string: self.liveSocket!.joinUrl())!) - }, - for: self.url, - mainDocumentURL: nil + let opts = ClientConnectOpts( + joinParams: .some([ "_interface": .object(object: LiveSessionParameters.platformParams)]), + headers: .some(headers), + method: Method.init(httpMethod ?? "Get"), + requestBody: httpBody ) + + if let client = self.liveviewClient { + try await client.reconnect(originalURL.absoluteString, opts) + } else { + self.liveviewClient = try await self.builder.connect(originalURL.absoluteString, opts) + self.navigationPath.last!.coordinator.join(self.liveviewClient!, self.eventHandler, self.patchHandler) + } - self.socket = self.liveSocket?.socket() - self.rootLayout = self.liveSocket!.deadRender() - let styleURLs = self.liveSocket!.styleUrls() + self.rootLayout = try self.liveviewClient!.deadRender() + let styleURLs = try self.liveviewClient!.styleUrls() self.stylesheet = try await withThrowingTaskGroup(of: Stylesheet.self) { @Sendable group in for style in styleURLs { @@ -266,34 +281,15 @@ public class LiveSessionCoordinator: ObservableObject { } } - let liveChannel = try await self.liveSocket!.joinLiveviewChannel( - .some([ - "_format": .str(string: LiveSessionParameters.platform), - "_interface": .object(object: LiveSessionParameters.platformParams) - ]), - nil - ) - - self.navigationPath.last!.coordinator.join(liveChannel) - self.state = .connected - if let liveReloadChannel { - try await liveReloadChannel.shutdownParentSocket() - self.liveReloadChannel = nil - } - - if self.liveSocket!.hasLiveReload() { - self.liveReloadChannel = try await self.liveSocket!.joinLivereloadChannel() - bindLiveReloadListener() - } } catch { self.state = .connectionFailed(error) } } - + + // TODO: move this error handlign into core func overrideLiveReloadChannel(channel: LiveChannel) async throws { - if let liveReloadChannel { try await liveReloadChannel.shutdownParentSocket() self.liveReloadChannel = nil @@ -332,7 +328,7 @@ public class LiveSessionCoordinator: ObservableObject { private func disconnect(preserveNavigationPath: Bool = false) async { do { for entry in self.navigationPath { - try await entry.coordinator.disconnect() + entry.coordinator.disconnect() if !preserveNavigationPath { entry.coordinator.document = nil } @@ -356,9 +352,10 @@ public class LiveSessionCoordinator: ObservableObject { self.liveReloadChannel = nil - try await self.socket?.shutdown() - self.socket = nil - self.liveSocket = nil + if let client = self.liveviewClient { + try await client.disconnect() + } + self.state = .disconnected } catch { self.state = .connectionFailed(error) @@ -377,22 +374,6 @@ public class LiveSessionCoordinator: ObservableObject { self.navigationPath = [.init(url: self.url, coordinator: self.navigationPath.first!.coordinator, navigationTransition: nil, pendingView: nil)] } await self.connect(httpMethod: httpMethod, httpBody: httpBody, additionalHeaders: headers) -// do { -// if let url { -// try await self.disconnect(preserveNavigationPath: false) -// self.url = url -// self.navigationPath = [.init(url: self.url, coordinator: self.navigationPath.first!.coordinator, navigationTransition: nil, pendingView: nil)] -// } else { -// // preserve the navigation path, but still clear the stale documents, since they're being completely replaced. -// try await self.disconnect(preserveNavigationPath: true) -// for entry in self.navigationPath { -// entry.coordinator.document = nil -// } -// } -// try await self.connect(httpMethod: httpMethod, httpBody: httpBody, additionalHeaders: headers) -// } catch { -// self.state = .connectionFailed(error) -// } } /// Creates a publisher that can be used to listen for server-sent LiveView events. @@ -432,6 +413,18 @@ public class LiveSessionCoordinator: ObservableObject { .store(in: &eventHandlers) } + public func postFormData( + url: Url, + formData: [String: String] + ) async throws { + if let client = self.liveviewClient { + try await client.postForm(url.absoluteString, + formData, + .some([ "_interface": .object(object: LiveSessionParameters.platformParams)]), + nil) + } + } + func redirect( _ redirect: LiveRedirect, navigationTransition: Any? = nil, @@ -615,3 +608,5 @@ fileprivate extension URL { extension Socket: @unchecked Sendable {} extension Channel: @unchecked Sendable {} +extension LiveViewClient: @unchecked Sendable {} +extension LiveViewClientBuilder: @unchecked Sendable {} diff --git a/Sources/LiveViewNative/Coordinators/LiveViewCoordinator.swift b/Sources/LiveViewNative/Coordinators/LiveViewCoordinator.swift index bfc4e741b..a3a0776d6 100644 --- a/Sources/LiveViewNative/Coordinators/LiveViewCoordinator.swift +++ b/Sources/LiveViewNative/Coordinators/LiveViewCoordinator.swift @@ -33,10 +33,10 @@ public class LiveViewCoordinator: ObservableObject { @_spi(LiveForm) public private(set) weak var session: LiveSessionCoordinator! - var url: URL + private weak var liveviewClient: LiveViewClient? + private var channel: LiveViewClientChannel? - private(set) var liveChannel: LiveViewNativeCore.LiveChannel? - private var channel: LiveViewNativeCore.Channel? + var url: URL @Published var document: LiveViewNativeCore.Document? { didSet { @@ -58,11 +58,7 @@ public class LiveViewCoordinator: ObservableObject { private(set) internal var eventSubject = PassthroughSubject<(String, Json), Never>() private(set) internal var eventHandlers = Set() -// private var eventListener: Channel.EventStream? - private var eventListenerLoop: Task<(), any Error>? -// private var statusListener: Channel.StatusStream? - private var statusListenerLoop: Task<(), any Error>? - + private var patchHandlerCancellable: AnyCancellable? private(set) internal var liveViewModel = LiveViewModel() @@ -79,26 +75,6 @@ public class LiveViewCoordinator: ObservableObject { self.url = url } - deinit { - let channel = channel - Task { - do { - try await channel?.shutdown() - } - } - - if let eventListenerLoop { - if !eventListenerLoop.isCancelled { - eventListenerLoop.cancel() - } - } - - if let statusListenerLoop { - if !statusListenerLoop.isCancelled { - statusListenerLoop.cancel() - } - } - } /// Pushes a LiveView event with the given name and payload to the server. /// @@ -132,17 +108,37 @@ public class LiveViewCoordinator: ObservableObject { @discardableResult internal func doPushEvent(_ event: String, payload: LiveViewNativeCore.Payload) async throws -> [String:Any]? { - guard let channel = channel else { + + guard case .connected = state else { + throw LiveSocketError.DisconnectionError + } + + if let replyPayload = try await channel?.call(event, payload) { + return try await handleEventReplyPayload(replyPayload) + } else { return nil } - - guard case .joined = channel.status() else { + } + + @discardableResult + public func call(event: String, payload: LiveViewNativeCore.Payload) async throws -> LiveViewNativeCore.Payload? { + guard case .connected = state else { throw LiveSocketError.DisconnectionError } - - let replyPayload = try await channel.call(event: .user(user: event), payload: payload, timeout: PUSH_TIMEOUT) - - return try await handleEventReplyPayload(replyPayload) + + if let replyPayload = try await channel?.call(event, payload) { + return replyPayload + } else { + return nil + } + } + + public func uploadFile(file: LiveViewNativeCore.LiveFile) async throws { + guard case .connected = state else { + throw LiveSocketError.DisconnectionError + } + + try await liveviewClient?.uploadFiles([file]); } /// Creates a publisher that can be used to listen for server-sent LiveView events. @@ -212,10 +208,6 @@ public class LiveViewCoordinator: ObservableObject { .catch({ _ in Empty() }) } - private func handleDiff(payload: LiveViewNativeCore.Json, baseURL: URL) throws { - handleEvents(payload) - try self.document?.mergeFragmentJson(String(data: try JSONEncoder().encode(payload), encoding: .utf8)!) - } func handleEventReplyPayload(_ replyPayload: LiveViewNativeCore.Payload) async throws -> [String:Any]? { switch replyPayload { @@ -223,19 +215,9 @@ public class LiveViewCoordinator: ObservableObject { switch json { case let .object(object): if case let .object(diff) = object["diff"] { - try self.handleDiff(payload: .object(object: diff), baseURL: self.url) if case let .object(reply) = diff["r"] { return reply } - } else if case let .object(redirectObject) = object["live_redirect"], - let redirect = LiveRedirect(from: redirectObject, relativeTo: self.url) - { - try await session.redirect(redirect) - } else if case let .object(redirectObject) = object["redirect"], - case let .str(destinationString) = redirectObject["to"], - let destination = URL(string: destinationString, relativeTo: self.url) - { - try await session.redirect(.init(kind: .push, to: destination, mode: .replaceTop)) } else { return nil } @@ -262,49 +244,7 @@ public class LiveViewCoordinator: ObservableObject { } } - func bindEventListener() { - self.eventListenerLoop = Task { [weak self, unowned channel] in - let eventListener = channel!.eventStream() - for try await event in eventListener { - guard let self else { return } - guard !Task.isCancelled else { return } - do { - switch event.event { - case .user(user: "diff"): - switch event.payload { - case let .jsonPayload(json): - try self.handleDiff(payload: json, baseURL: self.url) - case .binary: - fatalError() - } - case .user(user: "live_redirect"): - guard case let .jsonPayload(json) = event.payload, - case let .object(payload) = json, - let redirect = LiveRedirect(from: payload, relativeTo: self.url) - else { break } - try await self.session.redirect(redirect) - case .user(user: "live_patch"): - guard case let .jsonPayload(json) = event.payload, - case let .object(payload) = json, - let redirect = LiveRedirect(from: payload, relativeTo: self.url, mode: .patch) - else { return } - try await self.session.redirect(redirect) - case .user(user: "redirect"): - guard case let .jsonPayload(json) = event.payload, - case let .object(payload) = json, - let destination = (payload["to"] as? String).flatMap({ URL.init(string: $0, relativeTo: self.url) }) - else { return } - try await self.session.redirect(.init(kind: .push, to: destination, mode: .replaceTop)) - default: - logger.error("Unhandled event: \(String(describing: event))") - } - } catch { - logger.error("Event handling error: \(error.localizedDescription)") - } - } - } - } - + func bindDocumentListener() { let handler = SimplePatchHandler() patchHandlerCancellable = handler.patchEventSubject.sink { [weak self] patch in @@ -340,38 +280,56 @@ public class LiveViewCoordinator: ObservableObject { } self.document?.setEventHandler(handler) } - - func join(_ liveChannel: LiveViewNativeCore.LiveChannel) { - self.liveChannel = liveChannel - let channel = liveChannel.channel() - self.channel = channel + + func join(_ client: LiveViewNativeCore.LiveViewClient, + _ eventListener: SimpleEventHandler, + _ docHandler: SimplePatchHandler + ) { + self.liveviewClient = client + self.channel = client.channel() + self.document = try! client.document() - if statusListenerLoop != nil && !statusListenerLoop!.isCancelled { - statusListenerLoop?.cancel() - } + eventListener.channelStatusSubject + .receive(on: DispatchQueue.main) + .sink { event in + self.internalState = switch event.status { + case .joined: + .connected + case .joining, .waitingForSocketToConnect, .waitingToJoin: + .connecting + case .waitingToRejoin: + .reconnecting + case .leaving, .left, .shuttingDown, .shutDown: + .disconnected + } + }.store(in: &eventHandlers) - statusListenerLoop = Task { @MainActor [weak self, unowned channel] in - let statusListener = channel.statusStream() - for try await status in statusListener { - self?.internalState = switch status { - case .joined: - .connected - case .joining, .waitingForSocketToConnect, .waitingToJoin: - .connecting - case .waitingToRejoin: - .reconnecting - case .leaving, .left, .shuttingDown, .shutDown: - .disconnected + + docHandler.patchEventSubject + .receive(on: DispatchQueue.main) + .sink { event in + switch event.data { + case .root: + // when the root changes, update the `NavStackEntry` itself. + self.objectWillChange.send() + case .leaf: + // text nodes don't have their own views, changes to them need to be handled by the parent Text view + // note: aren't these branches the same? + if event.parent != nil { + self.elementChanged(event.node).send() + } else { + self.elementChanged(event.node).send() } + case .nodeElement: + // when a single element changes, send an update only to that element. + self.elementChanged(event.node).send() } - } - - self.bindEventListener() + }.store(in: &eventHandlers) + - self.document = liveChannel.document() - self.bindDocumentListener() + - switch liveChannel.joinPayload() { + switch try! client.joinPayload() { case let .jsonPayload(.object(payload)): self.handleEvents(payload["rendered"]!) default: @@ -381,27 +339,9 @@ public class LiveViewCoordinator: ObservableObject { self.internalState = .connected } - func disconnect() async throws { - try await self.channel?.leave() - try await self.channel?.shutdown() - - if let eventListenerLoop { - if !eventListenerLoop.isCancelled { - eventListenerLoop.cancel() - } - } - - if let statusListenerLoop { - if !statusListenerLoop.isCancelled { - statusListenerLoop.cancel() - } - } - - self.eventListenerLoop = nil - self.statusListenerLoop = nil - self.liveChannel = nil + func disconnect() { + self.liveviewClient = nil self.channel = nil - self.internalState = .setup } } diff --git a/Sources/LiveViewNative/Live/LiveView.swift b/Sources/LiveViewNative/Live/LiveView.swift index df9601eb9..f5d113f61 100644 --- a/Sources/LiveViewNative/Live/LiveView.swift +++ b/Sources/LiveViewNative/Live/LiveView.swift @@ -247,7 +247,7 @@ public struct LiveView< .onChange(of: scenePhase) { newValue in guard case .active = newValue else { return } - if case .connected = session.socket?.status() { + if case .connected = session.status() { return } Task { diff --git a/Sources/LiveViewNative/ViewModel.swift b/Sources/LiveViewNative/ViewModel.swift index 0e922becb..286efd2f3 100644 --- a/Sources/LiveViewNative/ViewModel.swift +++ b/Sources/LiveViewNative/ViewModel.swift @@ -268,55 +268,54 @@ public class FormModel: ObservableObject, CustomDebugStringConvertible { } public func queueFileUpload( - name: String, - id: String, - contents: Data, - fileType: UTType, - fileName: String, - coordinator: LiveViewCoordinator - ) async throws { - guard let liveChannel = coordinator.liveChannel - else { return } - - let file = LiveFile( - contents, - fileType.preferredMIMEType!, - fileName, - "", - id - ) - if let changeEventName { - let replyPayload = try await coordinator.liveChannel!.channel().call( - event: .user(user: "event"), - payload: .jsonPayload(json: .object(object: [ - "type": .str(string: "form"), - "event": .str(string: changeEventName), - "value": .str(string: "_target=\(name)"), - "uploads": .object(object: [ - id: .array(array: [ - .object(object: [ - "path": .str(string: fileName), - "ref": .str(string: String(coordinator.nextUploadRef())), - "last_modified": .numb(number: .posInt(pos: UInt64(Date().timeIntervalSince1970 * 1000))), // in milliseconds - "name": .str(string: fileName), - "relative_path": .str(string: ""), - "type": .str(string: fileType.preferredMIMEType!), - "size": .numb(number: .posInt(pos: UInt64(contents.count))) - ]) - ]) - ]) - ])), - timeout: 10_000 - ) - try await coordinator.handleEventReplyPayload(replyPayload) - } - self.fileUploads.append(.init( - id: id, - data: contents, - upload: { try await liveChannel.uploadFile(file) } - )) - } -} + name: String, + id: String, + contents: Data, + fileType: UTType, + fileName: String, + coordinator: LiveViewCoordinator + ) async throws { + + let file = LiveFile( + contents, + fileType.preferredMIMEType!, + fileName, + "", + id + ) + if let changeEventName { + let replyPayload = try await coordinator.call( + event: "event", + payload: .jsonPayload(json: .object(object: [ + "type": .str(string: "form"), + "event": .str(string: changeEventName), + "value": .str(string: "_target=\(name)"), + "uploads": .object(object: [ + id: .array(array: [ + .object(object: [ + "path": .str(string: fileName), + "ref": .str(string: String(coordinator.nextUploadRef())), + "last_modified": .numb(number: .posInt(pos: UInt64(Date().timeIntervalSince1970 * 1000))), // in milliseconds + "name": .str(string: fileName), + "relative_path": .str(string: ""), + "type": .str(string: fileType.preferredMIMEType!), + "size": .numb(number: .posInt(pos: UInt64(contents.count))) + ]) + ]) + ]) + ])) + ); + if let payload = replyPayload { + try await coordinator.handleEventReplyPayload(payload) + } + } + self.fileUploads.append(.init( + id: id, + data: contents, + upload: { try await coordinator.uploadFile(file: file) } + )) + } + } private extension URLComponents { var formEncodedQuery: String? {