From dbaeaf40812b58a6ca80b6131cb9fbd82d1a46b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 14 Mar 2025 09:04:47 +0100 Subject: [PATCH 01/29] refactor(UploadService): Moved parallelism observation to uploadService --- .../UploadService+Parallelism.swift | 56 +++++++++++++++++++ .../UploadService/UploadService.swift | 5 ++ .../WorkloadParallelismHeuristic.swift | 0 .../UploadQueue/Queue/UploadQueue.swift | 42 +------------- .../UploadQueue/Queue/UploadQueueable.swift | 2 + 5 files changed, 64 insertions(+), 41 deletions(-) create mode 100644 kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Parallelism.swift rename kDriveCore/Data/Upload/{UploadQueue/Queue => Servicies/UploadService}/WorkloadParallelismHeuristic.swift (100%) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Parallelism.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Parallelism.swift new file mode 100644 index 0000000000..2aabef868f --- /dev/null +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Parallelism.swift @@ -0,0 +1,56 @@ +/* + Infomaniak kDrive - iOS App + Copyright (C) 2025 Infomaniak Network SA + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +import Foundation + +extension UploadService: ParallelismHeuristicDelegate { + // MARK: - Memory warnings + + /// A critical memory warning in `FileProvider` context will reschedule, in order to transition uploads to Main App. + func observeMemoryWarnings() { + guard appContextService.context == .fileProviderExtension else { + return + } + + let source = DispatchSource.makeMemoryPressureSource(eventMask: .all, queue: .main) + memoryPressureObserver = source + source.setEventHandler { [weak self] in + guard let self else { return } + let event: DispatchSource.MemoryPressureEvent = source.data + switch event { + case DispatchSource.MemoryPressureEvent.normal: + Log.uploadQueue("MemoryPressureEvent normal", level: .info) + case DispatchSource.MemoryPressureEvent.warning: + Log.uploadQueue("MemoryPressureEvent warning", level: .info) + case DispatchSource.MemoryPressureEvent.critical: + Log.uploadQueue("MemoryPressureEvent critical", level: .error) + self.rescheduleRunningOperations() + default: + break + } + } + source.resume() + } + + // MARK: - ParallelismHeuristicDelegate + + func parallelismShouldChange(value: Int) { + Log.uploadQueue("UploadQueue parallelism is:\(value)") + allQueues.forEach { $0.parallelismShouldChange(value: value) } + } +} diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift index 282998b50e..24967319aa 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift @@ -45,6 +45,8 @@ public final class UploadService { lazy var allQueues = [globalUploadQueue, photoUploadQueue] + var uploadParallelismHeuristic: WorkloadParallelismHeuristic? + var memoryPressureObserver: DispatchSourceMemoryPressure? var fileUploadedCount = 0 var observations = ( didUploadFile: [UUID: (UploadFile, File?) -> Void](), @@ -59,6 +61,9 @@ public final class UploadService { public var pausedNotificationSent = false public init() { + observeMemoryWarnings() + uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self) + Task { rebuildUploadQueueFromObjectsInRealm() } diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/WorkloadParallelismHeuristic.swift b/kDriveCore/Data/Upload/Servicies/UploadService/WorkloadParallelismHeuristic.swift similarity index 100% rename from kDriveCore/Data/Upload/UploadQueue/Queue/WorkloadParallelismHeuristic.swift rename to kDriveCore/Data/Upload/Servicies/UploadService/WorkloadParallelismHeuristic.swift diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift index 8f50c19920..ce6942cb75 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift @@ -24,17 +24,12 @@ import RealmSwift import Sentry public class UploadQueue: ParallelismHeuristicDelegate { - private var memoryPressure: DispatchSourceMemoryPressure? - @LazyInjectService var appContextService: AppContextServiceable @LazyInjectService var uploadPublisher: UploadPublishable /// Something to track an operation for a File ID let keyedUploadOperations = KeyedUploadOperationable() - /// Something to adapt the upload parallelism live - var uploadParallelismHeuristic: WorkloadParallelismHeuristic? - public lazy var operationQueue: OperationQueue = { let queue = OperationQueue() queue.name = "kDrive upload queue" @@ -76,10 +71,6 @@ public class UploadQueue: ParallelismHeuristicDelegate { } Log.uploadQueue("Starting up") - - uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self) - - // Observe network state change ReachabilityListener.instance.observeNetworkChange(self) { [weak self] _ in guard let self else { return @@ -89,42 +80,11 @@ public class UploadQueue: ParallelismHeuristicDelegate { operationQueue.isSuspended = isSuspended Log.uploadQueue("observeNetworkChange :\(isSuspended)") } - - observeMemoryWarnings() - - Log.uploadQueue("UploadQueue parallelism is:\(operationQueue.maxConcurrentOperationCount)") - } - - // MARK: - Memory warnings - - /// A critical memory warning in `FileProvider` context will reschedule, in order to transition uploads to Main App. - private func observeMemoryWarnings() { - guard appContextService.context == .fileProviderExtension else { - return - } - - let source = DispatchSource.makeMemoryPressureSource(eventMask: .all, queue: .main) - memoryPressure = source - source.setEventHandler { - let event: DispatchSource.MemoryPressureEvent = source.data - switch event { - case DispatchSource.MemoryPressureEvent.normal: - Log.uploadQueue("MemoryPressureEvent normal", level: .info) - case DispatchSource.MemoryPressureEvent.warning: - Log.uploadQueue("MemoryPressureEvent warning", level: .info) - case DispatchSource.MemoryPressureEvent.critical: - Log.uploadQueue("MemoryPressureEvent critical", level: .error) - self.rescheduleRunningOperations() - default: - break - } - } - source.resume() } // MARK: - ParallelismHeuristicDelegate - func parallelismShouldChange(value: Int) { + public func parallelismShouldChange(value: Int) { Log.uploadQueue("Upload queue new parallelism: \(value)", level: .info) operationQueue.maxConcurrentOperationCount = value } diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift index 1bab7aa131..5d68e2519e 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift @@ -38,6 +38,8 @@ public protocol UploadQueueable { func rescheduleRunningOperations() + func parallelismShouldChange(value: Int) + func cancel(uploadFileId: String) var operationCount: Int { get } From acf338798be5a5f1d5dcc788d2eba16e33a9d2ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 14 Mar 2025 10:14:19 +0100 Subject: [PATCH 02/29] feat(UploadQueueObserver): New mechanism to observe if a an upload queue is empty or not --- kDriveCore/DI/FactoryService.swift | 21 +++++-- .../UploadService+Parallelism.swift | 10 ++++ .../UploadQueue/Queue/UploadQueue.swift | 15 ++++- .../Queue/UploadQueueObserver.swift | 55 +++++++++++++++++++ 4 files changed, 96 insertions(+), 5 deletions(-) create mode 100644 kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift diff --git a/kDriveCore/DI/FactoryService.swift b/kDriveCore/DI/FactoryService.swift index 7fa651c67a..ec6bd13e3d 100644 --- a/kDriveCore/DI/FactoryService.swift +++ b/kDriveCore/DI/FactoryService.swift @@ -124,6 +124,9 @@ public enum FactoryService { factoryParameters: nil, resolver: resolver) }, + Factory(type: UploadQueueDelegate.self) { _, _ in + UploadParallelismOrchestrator() + }, Factory(type: BGTaskScheduler.self) { _, _ in BGTaskScheduler.shared }, @@ -232,12 +235,22 @@ public enum FactoryService { } static var uploadQueues: [FactoryWithIdentifier] { - let globalUploadQueue = Factory(type: UploadQueueable.self) { _, _ in - UploadQueue() + let globalUploadQueue = Factory(type: UploadQueueable.self) { _, resolver in + let uploadQueueDelegate = try resolver.resolve(type: UploadQueueDelegate.self, + forCustomTypeIdentifier: nil, + factoryParameters: nil, + resolver: resolver) + + return UploadQueue(delegate: uploadQueueDelegate) } - let photoUploadQueue = Factory(type: UploadQueueable.self) { _, _ in - PhotoUploadQueue() + let photoUploadQueue = Factory(type: UploadQueueable.self) { _, resolver in + let uploadQueueDelegate = try resolver.resolve(type: UploadQueueDelegate.self, + forCustomTypeIdentifier: nil, + factoryParameters: nil, + resolver: resolver) + + return PhotoUploadQueue(delegate: uploadQueueDelegate) } let services = [ diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Parallelism.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Parallelism.swift index 2aabef868f..fae4cfc288 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Parallelism.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Parallelism.swift @@ -18,6 +18,16 @@ import Foundation +public final class UploadParallelismOrchestrator: UploadQueueDelegate { + public func operationQueueBecameEmpty(_ queue: UploadQueue) { + print("queue empty") + } + + public func operationQueueNoLongerEmpty(_ queue: UploadQueue) { + print("queue not empty") + } +} + extension UploadService: ParallelismHeuristicDelegate { // MARK: - Memory warnings diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift index ce6942cb75..7e2a940e8d 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift @@ -23,10 +23,19 @@ import InfomaniakDI import RealmSwift import Sentry +public protocol UploadQueueDelegate: AnyObject { + func operationQueueBecameEmpty(_ queue: UploadQueue) + func operationQueueNoLongerEmpty(_ queue: UploadQueue) +} + public class UploadQueue: ParallelismHeuristicDelegate { @LazyInjectService var appContextService: AppContextServiceable @LazyInjectService var uploadPublisher: UploadPublishable + private var queueObserver: UploadQueueObserver? + + weak var delegate: UploadQueueDelegate? + /// Something to track an operation for a File ID let keyedUploadOperations = KeyedUploadOperationable() @@ -64,12 +73,14 @@ public class UploadQueue: ParallelismHeuristicDelegate { /// Should suspend operation queue based on explicit `suspendAllOperations()` call var forceSuspendQueue = false - public init() { + public init(delegate: UploadQueueDelegate?) { guard appContextService.context != .shareExtension else { Log.uploadQueue("UploadQueue disabled in ShareExtension", level: .error) return } + self.delegate = delegate + Log.uploadQueue("Starting up") ReachabilityListener.instance.observeNetworkChange(self) { [weak self] _ in guard let self else { @@ -80,6 +91,8 @@ public class UploadQueue: ParallelismHeuristicDelegate { operationQueue.isSuspended = isSuspended Log.uploadQueue("observeNetworkChange :\(isSuspended)") } + + queueObserver = UploadQueueObserver(uploadQueue: self, delegate: delegate) } // MARK: - ParallelismHeuristicDelegate diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift new file mode 100644 index 0000000000..44ef150b41 --- /dev/null +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift @@ -0,0 +1,55 @@ +/* + Infomaniak kDrive - iOS App + Copyright (C) 2025 Infomaniak Network SA + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +import Foundation + +public class UploadQueueObserver: NSObject { + private var previousCount: Int? + private var observation: NSKeyValueObservation? + + var uploadQueue: UploadQueue + weak var delegate: UploadQueueDelegate? + + init(uploadQueue: UploadQueue, delegate: UploadQueueDelegate?) { + self.uploadQueue = uploadQueue + self.delegate = delegate + super.init() + + observation = uploadQueue.operationQueue.observe(\.operationCount, options: [.new, .old]) { [weak self] _, change in + guard let self else { return } + guard let newCount = change.newValue else { return } + + defer { previousCount = newCount } + + guard let previousCount else { + delegate?.operationQueueNoLongerEmpty(uploadQueue) + return + } + + guard previousCount != newCount else { + return + } + + if newCount == 0 { + delegate?.operationQueueBecameEmpty(uploadQueue) + } else { + delegate?.operationQueueNoLongerEmpty(uploadQueue) + } + } + } +} From d7681c9d268ad2bb72ed172b13fa2ebcde203c39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 14 Mar 2025 10:49:25 +0100 Subject: [PATCH 03/29] feat(UploadParallelismOrchestrator): New object managing dynamically the parallelism of all upload queues --- .../UploadParallelismOrchestrator.swift} | 52 +++++++++++++------ .../WorkloadParallelismHeuristic.swift | 2 +- .../UploadService/UploadService.swift | 5 -- 3 files changed, 37 insertions(+), 22 deletions(-) rename kDriveCore/Data/Upload/Servicies/UploadService/{UploadService+Parallelism.swift => Parallelism/UploadParallelismOrchestrator.swift} (58%) rename kDriveCore/Data/Upload/Servicies/UploadService/{ => Parallelism}/WorkloadParallelismHeuristic.swift (98%) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Parallelism.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift similarity index 58% rename from kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Parallelism.swift rename to kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift index fae4cfc288..284d3eec40 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Parallelism.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift @@ -17,21 +17,22 @@ */ import Foundation +import InfomaniakDI -public final class UploadParallelismOrchestrator: UploadQueueDelegate { - public func operationQueueBecameEmpty(_ queue: UploadQueue) { - print("queue empty") - } +public final class UploadParallelismOrchestrator { + @LazyInjectService(customTypeIdentifier: UploadQueueID.global) private var globalUploadQueue: UploadQueueable + @LazyInjectService(customTypeIdentifier: UploadQueueID.photo) private var photoUploadQueue: UploadQueueable + @LazyInjectService private var uploadService: UploadServiceable + @LazyInjectService private var appContextService: AppContextServiceable - public func operationQueueNoLongerEmpty(_ queue: UploadQueue) { - print("queue not empty") - } -} + private var uploadParallelismHeuristic: WorkloadParallelismHeuristic + private var memoryPressureObserver: DispatchSourceMemoryPressure? -extension UploadService: ParallelismHeuristicDelegate { - // MARK: - Memory warnings + public init() { + observeMemoryWarnings() + uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self) + } - /// A critical memory warning in `FileProvider` context will reschedule, in order to transition uploads to Main App. func observeMemoryWarnings() { guard appContextService.context == .fileProviderExtension else { return @@ -49,7 +50,7 @@ extension UploadService: ParallelismHeuristicDelegate { Log.uploadQueue("MemoryPressureEvent warning", level: .info) case DispatchSource.MemoryPressureEvent.critical: Log.uploadQueue("MemoryPressureEvent critical", level: .error) - self.rescheduleRunningOperations() + uploadService.rescheduleRunningOperations() default: break } @@ -57,10 +58,29 @@ extension UploadService: ParallelismHeuristicDelegate { source.resume() } - // MARK: - ParallelismHeuristicDelegate + func computeUploadParallelismPerQueueAndApply() { + Log.uploadQueue("Current total upload parallelism :\(uploadParallelismHeuristic.currentParallelism)") + // let quota = … + +// globalUploadQueue +// photoUploadQueue + } +} + +extension UploadParallelismOrchestrator: UploadQueueDelegate { + public func operationQueueBecameEmpty(_ queue: UploadQueue) { + print("queue empty") + computeUploadParallelismPerQueueAndApply() + } + + public func operationQueueNoLongerEmpty(_ queue: UploadQueue) { + print("queue not empty") + computeUploadParallelismPerQueueAndApply() + } +} - func parallelismShouldChange(value: Int) { - Log.uploadQueue("UploadQueue parallelism is:\(value)") - allQueues.forEach { $0.parallelismShouldChange(value: value) } +extension UploadParallelismOrchestrator: ParallelismHeuristicDelegate { + public func parallelismShouldChange(value: Int) { + computeUploadParallelismPerQueueAndApply() } } diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/WorkloadParallelismHeuristic.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift similarity index 98% rename from kDriveCore/Data/Upload/Servicies/UploadService/WorkloadParallelismHeuristic.swift rename to kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift index e8de6e163f..4e8af043fd 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/WorkloadParallelismHeuristic.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift @@ -20,7 +20,7 @@ import Foundation import InfomaniakDI /// Delegate protocol of UploadParallelismHeuristic -protocol ParallelismHeuristicDelegate: AnyObject { +public protocol ParallelismHeuristicDelegate: AnyObject { /// This method is called with a new parallelism to apply each time to the uploadQueue /// - Parameter value: The new parallelism value to use func parallelismShouldChange(value: Int) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift index 24967319aa..282998b50e 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift @@ -45,8 +45,6 @@ public final class UploadService { lazy var allQueues = [globalUploadQueue, photoUploadQueue] - var uploadParallelismHeuristic: WorkloadParallelismHeuristic? - var memoryPressureObserver: DispatchSourceMemoryPressure? var fileUploadedCount = 0 var observations = ( didUploadFile: [UUID: (UploadFile, File?) -> Void](), @@ -61,9 +59,6 @@ public final class UploadService { public var pausedNotificationSent = false public init() { - observeMemoryWarnings() - uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self) - Task { rebuildUploadQueueFromObjectsInRealm() } From 7c293e9411a217d91ef1810690850757733508b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 14 Mar 2025 14:56:25 +0100 Subject: [PATCH 04/29] refactor(WorkloadParallelismHeuristic): Better default depth --- .../Parallelism/WorkloadParallelismHeuristic.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift index 4e8af043fd..dbd907290e 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift @@ -100,7 +100,7 @@ final class WorkloadParallelismHeuristic { currentParallelism = parallelism } - public private(set) var currentParallelism = 0 { + public private(set) var currentParallelism = WorkloadParallelismHeuristic.reducedParallelism { didSet { delegate?.parallelismShouldChange(value: currentParallelism) } From cb16c562737544807f914fb08f7ffadb0aa69033 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 14 Mar 2025 14:57:12 +0100 Subject: [PATCH 05/29] fix(WorkloadParallelismHeuristic): Remove side effect at init that would break all the current code --- .../Parallelism/WorkloadParallelismHeuristic.swift | 3 --- 1 file changed, 3 deletions(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift index dbd907290e..f80584ec3d 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift @@ -56,9 +56,6 @@ final class WorkloadParallelismHeuristic { name: NSNotification.Name.NSProcessInfoPowerStateDidChange, object: nil ) - - // Update the value a first time - computeParallelism() } deinit { From 04f1cec8174c46e48ecc9bb852f4d202a130a04d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 14 Mar 2025 14:57:50 +0100 Subject: [PATCH 06/29] refactor(UploadQueue): New isActive property --- .../Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift index 4c6e6a1694..ccdf2a8132 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift @@ -34,6 +34,10 @@ extension UploadQueue: UploadQueueable { operationQueue.isSuspended } + public var isActive: Bool { + operationQueue.operationCount > 0 && !operationQueue.isSuspended + } + public func waitForCompletion(_ completionHandler: @escaping () -> Void) { Log.uploadQueue("waitForCompletion") DispatchQueue.global(qos: .default).async { From 7927909c972332fda7db39fd8d9a87ff3766eb69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 14 Mar 2025 15:01:40 +0100 Subject: [PATCH 07/29] feat(UploadParallelismOrchestrator): Implementation of the dynamic upload parallelism algo per queue --- .../UploadParallelismOrchestrator.swift | 31 ++++++++++++++----- .../UploadQueue/Queue/UploadQueue.swift | 5 +-- .../UploadQueue/Queue/UploadQueueable.swift | 4 ++- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift index 284d3eec40..d857a8199e 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift @@ -25,9 +25,11 @@ public final class UploadParallelismOrchestrator { @LazyInjectService private var uploadService: UploadServiceable @LazyInjectService private var appContextService: AppContextServiceable - private var uploadParallelismHeuristic: WorkloadParallelismHeuristic + private var uploadParallelismHeuristic: WorkloadParallelismHeuristic? private var memoryPressureObserver: DispatchSourceMemoryPressure? + private lazy var allQueues = [globalUploadQueue, photoUploadQueue] + public init() { observeMemoryWarnings() uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self) @@ -59,22 +61,37 @@ public final class UploadParallelismOrchestrator { } func computeUploadParallelismPerQueueAndApply() { - Log.uploadQueue("Current total upload parallelism :\(uploadParallelismHeuristic.currentParallelism)") - // let quota = … + let parallelismAvailable = uploadParallelismHeuristic?.currentParallelism ?? 2 + Log.uploadQueue("Current total upload parallelism :\(parallelismAvailable)") + + let activeQueues = allQueues.filter(\.isActive) + let inactiveQueues = allQueues.filter { lhs in + !activeQueues.contains { rhs in + lhs === rhs + } + } + + assert(activeQueues.count + inactiveQueues.count == allQueues.count, "expecting to match") + + inactiveQueues.forEach { $0.parallelismShouldChange(value: 1) } + + Log.uploadQueue("Updating parallelism in inactiveQueues:\(inactiveQueues.count) activeQueues:\(activeQueues.count)") + guard !activeQueues.isEmpty else { + return + } -// globalUploadQueue -// photoUploadQueue + let parallelismPerActiveQueue = max(1, parallelismAvailable / activeQueues.count) + Log.uploadQueue("Parallelism per active queue :\(parallelismPerActiveQueue)") + activeQueues.forEach { $0.parallelismShouldChange(value: parallelismPerActiveQueue) } } } extension UploadParallelismOrchestrator: UploadQueueDelegate { public func operationQueueBecameEmpty(_ queue: UploadQueue) { - print("queue empty") computeUploadParallelismPerQueueAndApply() } public func operationQueueNoLongerEmpty(_ queue: UploadQueue) { - print("queue not empty") computeUploadParallelismPerQueueAndApply() } } diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift index 7e2a940e8d..69990d7580 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift @@ -81,11 +81,8 @@ public class UploadQueue: ParallelismHeuristicDelegate { self.delegate = delegate - Log.uploadQueue("Starting up") ReachabilityListener.instance.observeNetworkChange(self) { [weak self] _ in - guard let self else { - return - } + guard let self else { return } let isSuspended = (shouldSuspendQueue || forceSuspendQueue) operationQueue.isSuspended = isSuspended diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift index 5d68e2519e..9f83bb3524 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift @@ -20,7 +20,7 @@ import FileProvider import Foundation import RealmSwift -public protocol UploadQueueable { +public protocol UploadQueueable: AnyObject { func getOperation(forUploadFileId uploadFileId: String) -> UploadOperationable? @discardableResult @@ -45,4 +45,6 @@ public protocol UploadQueueable { var operationCount: Int { get } var isSuspended: Bool { get } + + var isActive: Bool { get } } From 480262f23e58e74ca74eefec6066da83f8226330 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 14 Mar 2025 15:26:59 +0100 Subject: [PATCH 08/29] feat(ParallelismDefaults): Enum to clarify default values --- .../UploadParallelismOrchestrator.swift | 19 +++++++++++++------ .../WorkloadParallelismHeuristic.swift | 19 +++++++++++-------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift index d857a8199e..beccc171b4 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift @@ -28,6 +28,13 @@ public final class UploadParallelismOrchestrator { private var uploadParallelismHeuristic: WorkloadParallelismHeuristic? private var memoryPressureObserver: DispatchSourceMemoryPressure? + private var availableParallelism: Int { + guard let uploadParallelismHeuristic else { + return ParallelismDefaults.reducedParallelism + } + return uploadParallelismHeuristic.currentParallelism + } + private lazy var allQueues = [globalUploadQueue, photoUploadQueue] public init() { @@ -61,8 +68,8 @@ public final class UploadParallelismOrchestrator { } func computeUploadParallelismPerQueueAndApply() { - let parallelismAvailable = uploadParallelismHeuristic?.currentParallelism ?? 2 - Log.uploadQueue("Current total upload parallelism :\(parallelismAvailable)") + let currentAvailableParallelism = availableParallelism + Log.uploadQueue("Current total available upload parallelism :\(currentAvailableParallelism)") let activeQueues = allQueues.filter(\.isActive) let inactiveQueues = allQueues.filter { lhs in @@ -71,17 +78,17 @@ public final class UploadParallelismOrchestrator { } } - assert(activeQueues.count + inactiveQueues.count == allQueues.count, "expecting to match") + assert(activeQueues.count + inactiveQueues.count == allQueues.count, "expecting to not miss a queue") - inactiveQueues.forEach { $0.parallelismShouldChange(value: 1) } + inactiveQueues.forEach { $0.parallelismShouldChange(value: ParallelismDefaults.serial) } Log.uploadQueue("Updating parallelism in inactiveQueues:\(inactiveQueues.count) activeQueues:\(activeQueues.count)") guard !activeQueues.isEmpty else { return } - let parallelismPerActiveQueue = max(1, parallelismAvailable / activeQueues.count) - Log.uploadQueue("Parallelism per active queue :\(parallelismPerActiveQueue)") + let parallelismPerActiveQueue = max(ParallelismDefaults.serial, currentAvailableParallelism / activeQueues.count) + Log.uploadQueue("New parallelism per active queue :\(parallelismPerActiveQueue)") activeQueues.forEach { $0.parallelismShouldChange(value: parallelismPerActiveQueue) } } } diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift index f80584ec3d..65e1987110 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift @@ -19,6 +19,12 @@ import Foundation import InfomaniakDI +public enum ParallelismDefaults { + static let reducedParallelism = 2 + + static let serial = 1 +} + /// Delegate protocol of UploadParallelismHeuristic public protocol ParallelismHeuristicDelegate: AnyObject { /// This method is called with a new parallelism to apply each time to the uploadQueue @@ -31,9 +37,6 @@ public protocol ParallelismHeuristicDelegate: AnyObject { /// Value can change depending on many factors, including thermal state battery or extension mode. /// Scaling is achieved given the number of active cores available. final class WorkloadParallelismHeuristic { - /// With 2 Operations max, and a chuck of 1MiB max, the UploadQueue can spike to max 4MiB memory usage. - private static let reducedParallelism = 2 - @LazyInjectService private var appContextService: AppContextServiceable private weak var delegate: ParallelismHeuristicDelegate? @@ -69,19 +72,19 @@ final class WorkloadParallelismHeuristic { // If the device is too hot we cool down now let thermalState = processInfo.thermalState guard thermalState != .critical else { - currentParallelism = Self.reducedParallelism + currentParallelism = ParallelismDefaults.reducedParallelism return } // In low power mode, we reduce parallelism guard !processInfo.isLowPowerModeEnabled else { - currentParallelism = Self.reducedParallelism + currentParallelism = ParallelismDefaults.reducedParallelism return } // In extension, to reduce memory footprint, we reduce drastically parallelism guard !appContextService.isExtension else { - currentParallelism = Self.reducedParallelism + currentParallelism = ParallelismDefaults.reducedParallelism return } @@ -90,14 +93,14 @@ final class WorkloadParallelismHeuristic { // Beginning with .serious state, we start reducing the load on the system guard thermalState != .serious else { - currentParallelism = max(Self.reducedParallelism, parallelism / 2) + currentParallelism = max(ParallelismDefaults.reducedParallelism, parallelism / 2) return } currentParallelism = parallelism } - public private(set) var currentParallelism = WorkloadParallelismHeuristic.reducedParallelism { + public private(set) var currentParallelism = ParallelismDefaults.reducedParallelism { didSet { delegate?.parallelismShouldChange(value: currentParallelism) } From eab10468afc9db482850b8864cd7621306819460 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Tue, 18 Mar 2025 14:55:22 +0100 Subject: [PATCH 09/29] fix(WorkloadParallelismHeuristic): Make sure to compute a first time a parallelism value outside of the DI queue --- .../Parallelism/WorkloadParallelismHeuristic.swift | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift index 65e1987110..ef1a43a730 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift @@ -59,6 +59,10 @@ final class WorkloadParallelismHeuristic { name: NSNotification.Name.NSProcessInfoPowerStateDidChange, object: nil ) + + DispatchQueue.global(qos: .default).async { + self.computeParallelism() + } } deinit { From f77b276072e575b57ab8ceeea1b28fce034f1c32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Tue, 18 Mar 2025 14:55:56 +0100 Subject: [PATCH 10/29] fix(UploadService): Make sure to rebuild the upload queues on init outside the DI queue --- .../Data/Upload/Servicies/UploadService/UploadService.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift index 282998b50e..43897285ca 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift @@ -59,8 +59,8 @@ public final class UploadService { public var pausedNotificationSent = false public init() { - Task { - rebuildUploadQueueFromObjectsInRealm() + DispatchQueue.global(qos: .default).async { + self.rebuildUploadQueueFromObjectsInRealm() } } } From 499b265a025dc2f1403e46acb141c5ccbc43e9d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Tue, 18 Mar 2025 17:11:06 +0100 Subject: [PATCH 11/29] fix(UploadQueueObserver): Only notify operationQueueNoLongerEmpty when needed --- .../Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift index 44ef150b41..303cad9784 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift @@ -47,7 +47,7 @@ public class UploadQueueObserver: NSObject { if newCount == 0 { delegate?.operationQueueBecameEmpty(uploadQueue) - } else { + } else if previousCount == 0 && newCount > 0 { delegate?.operationQueueNoLongerEmpty(uploadQueue) } } From a1e93444bcd65938ed2bd40e032ee342081b9989 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Thu, 20 Mar 2025 11:27:02 +0100 Subject: [PATCH 12/29] refactor(UploadParallelismOrchestrator): Switched set logic for simpler filter function --- .../Parallelism/UploadParallelismOrchestrator.swift | 6 +----- .../Data/Upload/UploadQueue/Queue/UploadQueueable.swift | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift index beccc171b4..3b173c787e 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift @@ -72,11 +72,7 @@ public final class UploadParallelismOrchestrator { Log.uploadQueue("Current total available upload parallelism :\(currentAvailableParallelism)") let activeQueues = allQueues.filter(\.isActive) - let inactiveQueues = allQueues.filter { lhs in - !activeQueues.contains { rhs in - lhs === rhs - } - } + let inactiveQueues = allQueues.filter { !$0.isActive } assert(activeQueues.count + inactiveQueues.count == allQueues.count, "expecting to not miss a queue") diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift index 9f83bb3524..79a8eef15f 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift @@ -20,7 +20,7 @@ import FileProvider import Foundation import RealmSwift -public protocol UploadQueueable: AnyObject { +public protocol UploadQueueable { func getOperation(forUploadFileId uploadFileId: String) -> UploadOperationable? @discardableResult From 19e83c99b70ddf455ceda510fc0adf09cb1705aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Thu, 20 Mar 2025 16:59:37 +0100 Subject: [PATCH 13/29] refactor(UploadService): Dedicated queues for events and rebuilding the upload queues --- .../UploadService+Notifications.swift | 2 +- .../UploadService+Observation.swift | 6 +++--- .../UploadService/UploadService+Publish.swift | 8 ++++---- .../UploadService/UploadService.swift | 19 +++++++++++++++---- 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift index 8746f267c7..4357e293ea 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift @@ -48,7 +48,7 @@ extension UploadService: UploadNotifiable { public func sendFileUploadStateNotificationIfNeeded(with result: UploadCompletionResult) { Log.uploadQueue("sendFileUploadStateNotificationIfNeeded") - serialQueue.async { [weak self] in + serialEventQueue.async { [weak self] in guard let self else { return } guard let uploadFile = result.uploadFile, uploadFile.error != .taskRescheduled, diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift index 07108d1f73..55f5fb4d93 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift @@ -49,7 +49,7 @@ extension UploadService: UploadObservable { fileId: String? = nil, using closure: @escaping (UploadFile, File?) -> Void) -> ObservationToken { var token: ObservationToken! - serialQueue.sync { [weak self] in + serialEventQueue.sync { [weak self] in guard let self else { return } let key = UUID() observations.didUploadFile[key] = { [weak self, weak observer] uploadFile, driveFile in @@ -81,7 +81,7 @@ extension UploadService: UploadObservable { parentId: Int, using closure: @escaping (Int, Int) -> Void) -> ObservationToken { var token: ObservationToken! - serialQueue.sync { [weak self] in + serialEventQueue.sync { [weak self] in guard let self else { return } let key = UUID() observations.didChangeUploadCountInParent[key] = { [weak self, weak observer] updatedParentId, count in @@ -111,7 +111,7 @@ extension UploadService: UploadObservable { driveId: Int, using closure: @escaping (Int, Int) -> Void) -> ObservationToken { var token: ObservationToken! - serialQueue.sync { [weak self] in + serialEventQueue.sync { [weak self] in guard let self else { return } let key = UUID() observations.didChangeUploadCountInDrive[key] = { [weak self, weak observer] updatedDriveId, count in diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift index 5d49b098e8..75cec7fec9 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift @@ -43,7 +43,7 @@ extension UploadService: UploadPublishable { userId: Int, driveId: Int) { Log.uploadQueue("publishUploadCount") - serialQueue.async { [weak self] in + serialEventQueue.async { [weak self] in guard let self else { return } publishUploadCountInParent(parentId: parentId, userId: userId, driveId: driveId) publishUploadCountInDrive(userId: userId, driveId: driveId) @@ -54,7 +54,7 @@ extension UploadService: UploadPublishable { userId: Int, driveId: Int) { Log.uploadQueue("publishUploadCountInParent") - serialQueue.async { [weak self] in + serialEventQueue.async { [weak self] in guard let self else { return } let uploadCount = getUploadingFiles(withParent: parentId, userId: userId, driveId: driveId).count @@ -69,7 +69,7 @@ extension UploadService: UploadPublishable { public func publishUploadCountInDrive(userId: Int, driveId: Int) { Log.uploadQueue("publishUploadCountInDrive") - serialQueue.async { [weak self] in + serialEventQueue.async { [weak self] in guard let self else { return } let uploadCount = getUploadingFiles(userId: userId, driveId: driveId).count for closure in observations.didChangeUploadCountInDrive.values { @@ -84,7 +84,7 @@ extension UploadService: UploadPublishable { Log.uploadQueue("publishFileUploaded") logFileUploadedWithSuccess(for: result.uploadFile) sendFileUploadStateNotificationIfNeeded(with: result) - serialQueue.async { [weak self] in + serialEventQueue.async { [weak self] in guard let self else { return } for closure in observations.didUploadFile.values { guard let uploadFile = result.uploadFile, !uploadFile.isInvalidated else { diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift index 43897285ca..b14d03d838 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift @@ -32,13 +32,24 @@ public final class UploadService { @LazyInjectService var notificationHelper: NotificationsHelpable @LazyInjectService var appContextService: AppContextServiceable - let serialQueue: DispatchQueue = { + private let serialRebuildUploadsQueue: DispatchQueue = { @LazyInjectService var appContextService: AppContextServiceable let autoreleaseFrequency: DispatchQueue.AutoreleaseFrequency = appContextService.isExtension ? .workItem : .inherit return DispatchQueue( - label: "com.infomaniak.drive.upload-service", - qos: .userInitiated, + label: "com.infomaniak.drive.upload-service.rebuild-uploads", + qos: .default, + autoreleaseFrequency: autoreleaseFrequency + ) + }() + + let serialEventQueue: DispatchQueue = { + @LazyInjectService var appContextService: AppContextServiceable + let autoreleaseFrequency: DispatchQueue.AutoreleaseFrequency = appContextService.isExtension ? .workItem : .inherit + + return DispatchQueue( + label: "com.infomaniak.drive.upload-service.event", + qos: .default, autoreleaseFrequency: autoreleaseFrequency ) }() @@ -87,7 +98,7 @@ extension UploadService: UploadServiceable { public func rebuildUploadQueueFromObjectsInRealm() { Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm") - serialQueue.sync { + serialRebuildUploadsQueue.sync { // Clean cache if necessary before we try to restart the uploads. @InjectService var freeSpaceService: FreeSpaceService freeSpaceService.cleanCacheIfAlmostFull() From 420fd57eef4845cacf0e2eb1238966c645b47889 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 21 Mar 2025 08:07:38 +0100 Subject: [PATCH 14/29] refactor(UploadService): Explicit blocking and non blocking rebuildUploadQueue func --- kDrive/AppRouter.swift | 2 +- .../PhotoSyncSettingsViewController.swift | 4 +- .../UploadService/UploadService.swift | 92 ++++++++++--------- .../UploadService/UploadServiceable.swift | 4 +- .../Services/BackgroundTasksService.swift | 2 +- 5 files changed, 56 insertions(+), 48 deletions(-) diff --git a/kDrive/AppRouter.swift b/kDrive/AppRouter.swift index ee817a1a6e..945148014d 100644 --- a/kDrive/AppRouter.swift +++ b/kDrive/AppRouter.swift @@ -660,7 +660,7 @@ public struct AppRouter: AppNavigable { // Resolving an upload queue will restart it if this is the first time @InjectService var uploadService: UploadServiceable - uploadService.rebuildUploadQueueFromObjectsInRealm() + uploadService.rebuildUploadQueue() } // MARK: RouterFileNavigable diff --git a/kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift b/kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift index fe91e82ea9..24563fa71c 100644 --- a/kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift +++ b/kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift @@ -504,11 +504,11 @@ extension PhotoSyncSettingsViewController: FooterButtonDelegate { sender.setLoading(false) } - DispatchQueue.global(qos: .userInitiated).async { + DispatchQueue.global(qos: .default).async { // Add new pictures to be uploaded and reload upload queue self.photoLibraryUploader.scheduleNewPicturesForUpload() @InjectService var uploadService: UploadServiceable - uploadService.rebuildUploadQueueFromObjectsInRealm() + uploadService.rebuildUploadQueue() } } } diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift index b14d03d838..204dde800b 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift @@ -26,22 +26,18 @@ public enum UploadServiceBackgroundIdentifier { } public final class UploadService { - @LazyInjectService(customTypeIdentifier: UploadQueueID.global) var globalUploadQueue: UploadQueueable - @LazyInjectService(customTypeIdentifier: UploadQueueID.photo) var photoUploadQueue: UploadQueueable + @InjectService(customTypeIdentifier: UploadQueueID.global) var globalUploadQueue: UploadQueueable + @InjectService(customTypeIdentifier: UploadQueueID.photo) var photoUploadQueue: UploadQueueable + @LazyInjectService(customTypeIdentifier: kDriveDBID.uploads) var uploadsDatabase: Transactionable @LazyInjectService var notificationHelper: NotificationsHelpable @LazyInjectService var appContextService: AppContextServiceable - private let serialRebuildUploadsQueue: DispatchQueue = { - @LazyInjectService var appContextService: AppContextServiceable - let autoreleaseFrequency: DispatchQueue.AutoreleaseFrequency = appContextService.isExtension ? .workItem : .inherit - - return DispatchQueue( - label: "com.infomaniak.drive.upload-service.rebuild-uploads", - qos: .default, - autoreleaseFrequency: autoreleaseFrequency - ) - }() + private let serialRebuildUploadsQueue = DispatchQueue( + label: "com.infomaniak.drive.upload-service.rebuild-uploads", + qos: .default, + autoreleaseFrequency: .workItem + ) let serialEventQueue: DispatchQueue = { @LazyInjectService var appContextService: AppContextServiceable @@ -71,7 +67,7 @@ public final class UploadService { public init() { DispatchQueue.global(qos: .default).async { - self.rebuildUploadQueueFromObjectsInRealm() + self.rebuildUploadQueue() } } } @@ -96,43 +92,53 @@ extension UploadService: UploadServiceable { allQueues.allSatisfy(\.isSuspended) } - public func rebuildUploadQueueFromObjectsInRealm() { - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm") + public func blockingRebuildUploadQueue() { serialRebuildUploadsQueue.sync { - // Clean cache if necessary before we try to restart the uploads. - @InjectService var freeSpaceService: FreeSpaceService - freeSpaceService.cleanCacheIfAlmostFull() + self.rebuildUploadQueueFromObjectsInRealm() + } + } - guard let uploadFileQuery else { - Log.uploadQueue("\(#function) disabled in \(appContextService.context.rawValue)", level: .error) - return - } + public func rebuildUploadQueue() { + serialRebuildUploadsQueue.async { + self.rebuildUploadQueueFromObjectsInRealm() + } + } - let uploadingFiles = uploadsDatabase.fetchResults(ofType: UploadFile.self) { lazyCollection in - return lazyCollection.filter(uploadFileQuery) - .sorted(byKeyPath: "taskCreationDate") - } + private func rebuildUploadQueueFromObjectsInRealm() { + Log.uploadQueue("rebuildUploadQueue") + // Clean cache if necessary before we try to restart the uploads. + @InjectService var freeSpaceService: FreeSpaceService + freeSpaceService.cleanCacheIfAlmostFull() - let uploadingFileIds = Array(uploadingFiles.map(\.id)) - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm uploads to restart:\(uploadingFileIds.count)") + guard let uploadFileQuery else { + Log.uploadQueue("\(#function) disabled in \(appContextService.context.rawValue)", level: .error) + return + } - let batches = uploadingFileIds.chunks(ofCount: 100) - Log.uploadQueue("batched count:\(batches.count)") - for batch in batches { - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm in batch") - let batchArray = Array(batch) - let matchedFrozenFiles = uploadsDatabase.fetchResults(ofType: UploadFile.self) { lazyCollection in - lazyCollection.filter("id IN %@", batchArray).freezeIfNeeded() - } - for file in matchedFrozenFiles { - let uploadQueue = uploadQueue(for: file) - uploadQueue.addToQueueIfNecessary(uploadFile: file, itemIdentifier: nil) - } - resumeAllOperations() - } + let uploadingFiles = uploadsDatabase.fetchResults(ofType: UploadFile.self) { lazyCollection in + return lazyCollection.filter(uploadFileQuery) + .sorted(byKeyPath: "taskCreationDate") + } - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm exit") + let uploadingFileIds = Array(uploadingFiles.map(\.id)) + Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm uploads to restart:\(uploadingFileIds.count)") + + let batches = uploadingFileIds.chunks(ofCount: 100) + Log.uploadQueue("batched count:\(batches.count)") + for batch in batches { + Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm in batch") + let batchArray = Array(batch) + let matchedFrozenFiles = uploadsDatabase.fetchResults(ofType: UploadFile.self) { lazyCollection in + lazyCollection.filter("id IN %@", batchArray).freezeIfNeeded() + } + for file in matchedFrozenFiles { + let uploadQueue = uploadQueue(for: file) + uploadQueue.addToQueueIfNecessary(uploadFile: file, itemIdentifier: nil) + } + resumeAllOperations() } + + Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm exit") } public func suspendAllOperations() { diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadServiceable.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadServiceable.swift index f733e9f32b..bb671d741a 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadServiceable.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadServiceable.swift @@ -28,7 +28,9 @@ public protocol UploadServiceable { var operationCount: Int { get } - func rebuildUploadQueueFromObjectsInRealm() + func blockingRebuildUploadQueue() + + func rebuildUploadQueue() func suspendAllOperations() diff --git a/kDriveCore/Services/BackgroundTasksService.swift b/kDriveCore/Services/BackgroundTasksService.swift index fd7dfe1e48..3df93fd41a 100644 --- a/kDriveCore/Services/BackgroundTasksService.swift +++ b/kDriveCore/Services/BackgroundTasksService.swift @@ -121,7 +121,7 @@ struct BackgroundTasksService: BackgroundTasksServiceable { } Log.backgroundTaskScheduling("Reload operations in queue") - uploadService.rebuildUploadQueueFromObjectsInRealm() + uploadService.blockingRebuildUploadQueue() guard !expiringActivity.shouldTerminate else { Log.backgroundTaskScheduling(Self.activityShouldTerminateMessage, level: .error) From f816a62074414bfc08ff9d262e5d07aa7b4c2277 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 21 Mar 2025 08:26:32 +0100 Subject: [PATCH 15/29] refactor(WorkloadParallelismHeuristic): Internal serial event queue --- .../WorkloadParallelismHeuristic.swift | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift index ef1a43a730..cb93cced4c 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift @@ -36,18 +36,23 @@ public protocol ParallelismHeuristicDelegate: AnyObject { /// /// Value can change depending on many factors, including thermal state battery or extension mode. /// Scaling is achieved given the number of active cores available. -final class WorkloadParallelismHeuristic { +public final class WorkloadParallelismHeuristic { @LazyInjectService private var appContextService: AppContextServiceable private weak var delegate: ParallelismHeuristicDelegate? + private let serialEventQueue = DispatchQueue( + label: "com.infomaniak.drive.parallelism-heuristic.event", + qos: .default + ) + init(delegate: ParallelismHeuristicDelegate) { self.delegate = delegate // Update on thermal change NotificationCenter.default.addObserver( self, - selector: #selector(computeParallelism), + selector: #selector(computeParallelismInQueue), name: ProcessInfo.thermalStateDidChangeNotification, object: nil ) @@ -55,7 +60,7 @@ final class WorkloadParallelismHeuristic { // Update on low power mode NotificationCenter.default.addObserver( self, - selector: #selector(computeParallelism), + selector: #selector(computeParallelismInQueue), name: NSNotification.Name.NSProcessInfoPowerStateDidChange, object: nil ) @@ -63,6 +68,7 @@ final class WorkloadParallelismHeuristic { DispatchQueue.global(qos: .default).async { self.computeParallelism() } + computeParallelismInQueue() } deinit { @@ -70,7 +76,13 @@ final class WorkloadParallelismHeuristic { NotificationCenter.default.removeObserver(self, name: NSNotification.Name.NSProcessInfoPowerStateDidChange, object: nil) } - @objc private func computeParallelism() { + @objc private func computeParallelismInQueue() { + serialEventQueue.async { + self.computeParallelism() + } + } + + private func computeParallelism() { let processInfo = ProcessInfo.processInfo // If the device is too hot we cool down now From a08c87eb5283bd5b155b265d08b34ac870fac1c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 21 Mar 2025 08:30:25 +0100 Subject: [PATCH 16/29] refactor(UploadParallelismOrchestrator): Internal serial event queue --- .../UploadParallelismOrchestrator.swift | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift index 3b173c787e..70ff8c614b 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift @@ -25,6 +25,11 @@ public final class UploadParallelismOrchestrator { @LazyInjectService private var uploadService: UploadServiceable @LazyInjectService private var appContextService: AppContextServiceable + private let serialEventQueue = DispatchQueue( + label: "com.infomaniak.drive.upload-parallelism-orchestrator.event", + qos: .default + ) + private var uploadParallelismHeuristic: WorkloadParallelismHeuristic? private var memoryPressureObserver: DispatchSourceMemoryPressure? @@ -38,8 +43,10 @@ public final class UploadParallelismOrchestrator { private lazy var allQueues = [globalUploadQueue, photoUploadQueue] public init() { - observeMemoryWarnings() - uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self) + serialEventQueue.async { + self.observeMemoryWarnings() + self.uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self) + } } func observeMemoryWarnings() { @@ -67,25 +74,28 @@ public final class UploadParallelismOrchestrator { source.resume() } - func computeUploadParallelismPerQueueAndApply() { - let currentAvailableParallelism = availableParallelism - Log.uploadQueue("Current total available upload parallelism :\(currentAvailableParallelism)") + private func computeUploadParallelismPerQueueAndApply() { + serialEventQueue.async { + let currentAvailableParallelism = self.availableParallelism + Log.uploadQueue("Current total available upload parallelism :\(currentAvailableParallelism)") - let activeQueues = allQueues.filter(\.isActive) - let inactiveQueues = allQueues.filter { !$0.isActive } + let activeQueues = self.allQueues.filter(\.isActive) + let inactiveQueues = self.allQueues.filter { !$0.isActive } - assert(activeQueues.count + inactiveQueues.count == allQueues.count, "expecting to not miss a queue") + assert(activeQueues.count + inactiveQueues.count == self.allQueues.count, "queue count should match") - inactiveQueues.forEach { $0.parallelismShouldChange(value: ParallelismDefaults.serial) } + inactiveQueues.forEach { $0.parallelismShouldChange(value: ParallelismDefaults.serial) } - Log.uploadQueue("Updating parallelism in inactiveQueues:\(inactiveQueues.count) activeQueues:\(activeQueues.count)") - guard !activeQueues.isEmpty else { - return - } + Log.uploadQueue("Inactive queues:\(inactiveQueues.count) set to serial") + guard !activeQueues.isEmpty else { + Log.uploadQueue("No active queues") + return + } - let parallelismPerActiveQueue = max(ParallelismDefaults.serial, currentAvailableParallelism / activeQueues.count) - Log.uploadQueue("New parallelism per active queue :\(parallelismPerActiveQueue)") - activeQueues.forEach { $0.parallelismShouldChange(value: parallelismPerActiveQueue) } + let parallelismPerActiveQueue = max(ParallelismDefaults.serial, currentAvailableParallelism / activeQueues.count) + Log.uploadQueue("Active queues \(activeQueues.count) new parallelism:\(parallelismPerActiveQueue)") + activeQueues.forEach { $0.parallelismShouldChange(value: parallelismPerActiveQueue) } + } } } From 43016697b84a8daf15af69ffd079f457cb28da3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 21 Mar 2025 08:33:51 +0100 Subject: [PATCH 17/29] fix(WorkloadParallelismHeuristic): Double call to computeParallelism --- .../Parallelism/WorkloadParallelismHeuristic.swift | 3 --- 1 file changed, 3 deletions(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift index cb93cced4c..9a628e39b5 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift @@ -65,9 +65,6 @@ public final class WorkloadParallelismHeuristic { object: nil ) - DispatchQueue.global(qos: .default).async { - self.computeParallelism() - } computeParallelismInQueue() } From 354178aae94d9c5dd9a2f9eed8339e23db9a7789 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 21 Mar 2025 13:45:16 +0100 Subject: [PATCH 18/29] refactor(UploadParallelismOrchestrator): Notify memory pressure on an internal queue --- .../Parallelism/UploadParallelismOrchestrator.swift | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift index 70ff8c614b..556e20ed25 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift @@ -22,7 +22,6 @@ import InfomaniakDI public final class UploadParallelismOrchestrator { @LazyInjectService(customTypeIdentifier: UploadQueueID.global) private var globalUploadQueue: UploadQueueable @LazyInjectService(customTypeIdentifier: UploadQueueID.photo) private var photoUploadQueue: UploadQueueable - @LazyInjectService private var uploadService: UploadServiceable @LazyInjectService private var appContextService: AppContextServiceable private let serialEventQueue = DispatchQueue( @@ -66,7 +65,10 @@ public final class UploadParallelismOrchestrator { Log.uploadQueue("MemoryPressureEvent warning", level: .info) case DispatchSource.MemoryPressureEvent.critical: Log.uploadQueue("MemoryPressureEvent critical", level: .error) - uploadService.rescheduleRunningOperations() + serialEventQueue.async { + @InjectService var uploadService: UploadServiceable + uploadService.rescheduleRunningOperations() + } default: break } From 9e2540ca81b8fe123d121aeef86d6c7a7d7704a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 21 Mar 2025 13:48:04 +0100 Subject: [PATCH 19/29] refactor(UploadQueueObserver): Notify observation on an internal queue --- .../Queue/UploadQueueObserver.swift | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift index 303cad9784..0704cc7705 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift @@ -22,6 +22,11 @@ public class UploadQueueObserver: NSObject { private var previousCount: Int? private var observation: NSKeyValueObservation? + private let serialEventQueue = DispatchQueue( + label: "com.infomaniak.drive.upload-queue-observer.event.\(UUID().uuidString)", + qos: .default + ) + var uploadQueue: UploadQueue weak var delegate: UploadQueueDelegate? @@ -30,25 +35,30 @@ public class UploadQueueObserver: NSObject { self.delegate = delegate super.init() - observation = uploadQueue.operationQueue.observe(\.operationCount, options: [.new, .old]) { [weak self] _, change in + observation = uploadQueue.operationQueue.observe(\.operationCount, options: [ + .new, + .old + ]) { [weak self] _, change in guard let self else { return } - guard let newCount = change.newValue else { return } + self.serialEventQueue.async { + guard let newCount = change.newValue else { return } - defer { previousCount = newCount } + defer { self.previousCount = newCount } - guard let previousCount else { - delegate?.operationQueueNoLongerEmpty(uploadQueue) - return - } + guard let previousCount = self.previousCount else { + delegate?.operationQueueNoLongerEmpty(uploadQueue) + return + } - guard previousCount != newCount else { - return - } + guard previousCount != newCount else { + return + } - if newCount == 0 { - delegate?.operationQueueBecameEmpty(uploadQueue) - } else if previousCount == 0 && newCount > 0 { - delegate?.operationQueueNoLongerEmpty(uploadQueue) + if newCount == 0 { + delegate?.operationQueueBecameEmpty(uploadQueue) + } else if previousCount == 0 && newCount > 0 { + delegate?.operationQueueNoLongerEmpty(uploadQueue) + } } } } From eabe82daec586c0125c3b27a5a1390ecafb053f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 21 Mar 2025 14:01:06 +0100 Subject: [PATCH 20/29] refactor(UploadQueueObserver): Observation setup in a dedicated method to clean a Sonar warning --- .../Upload/UploadQueue/Queue/UploadQueueObserver.swift | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift index 0704cc7705..fb0aeb9644 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift @@ -35,6 +35,10 @@ public class UploadQueueObserver: NSObject { self.delegate = delegate super.init() + setupObservation() + } + + private func setupObservation() { observation = uploadQueue.operationQueue.observe(\.operationCount, options: [ .new, .old @@ -46,7 +50,7 @@ public class UploadQueueObserver: NSObject { defer { self.previousCount = newCount } guard let previousCount = self.previousCount else { - delegate?.operationQueueNoLongerEmpty(uploadQueue) + self.delegate?.operationQueueNoLongerEmpty(self.uploadQueue) return } @@ -55,9 +59,9 @@ public class UploadQueueObserver: NSObject { } if newCount == 0 { - delegate?.operationQueueBecameEmpty(uploadQueue) + self.delegate?.operationQueueBecameEmpty(self.uploadQueue) } else if previousCount == 0 && newCount > 0 { - delegate?.operationQueueNoLongerEmpty(uploadQueue) + self.delegate?.operationQueueNoLongerEmpty(self.uploadQueue) } } } From c95f7366273cbc9b2bd1cb998b283c1d53a87393 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Fri, 21 Mar 2025 14:16:33 +0100 Subject: [PATCH 21/29] refactor(UploadService): Removed extraneous lazy in DI call --- .../Data/Upload/Servicies/UploadService/UploadService.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift index 204dde800b..7de986f72c 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift @@ -40,7 +40,7 @@ public final class UploadService { ) let serialEventQueue: DispatchQueue = { - @LazyInjectService var appContextService: AppContextServiceable + @InjectService var appContextService: AppContextServiceable let autoreleaseFrequency: DispatchQueue.AutoreleaseFrequency = appContextService.isExtension ? .workItem : .inherit return DispatchQueue( From 9cdd583569cb697b904406d3a8b6cc725132aed7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Mon, 24 Mar 2025 13:56:19 +0100 Subject: [PATCH 22/29] refactor(UploadQueueObserver): Split some existing code to reduce complexity and a Sonar warning --- .../Queue/UploadQueueObserver.swift | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift index fb0aeb9644..999d16ebae 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift @@ -46,24 +46,27 @@ public class UploadQueueObserver: NSObject { guard let self else { return } self.serialEventQueue.async { guard let newCount = change.newValue else { return } + self.operationCountDidChange(newCount: newCount) + } + } + } - defer { self.previousCount = newCount } + private func operationCountDidChange(newCount: Int) { + defer { previousCount = newCount } - guard let previousCount = self.previousCount else { - self.delegate?.operationQueueNoLongerEmpty(self.uploadQueue) - return - } + guard let previousCount = previousCount else { + delegate?.operationQueueNoLongerEmpty(uploadQueue) + return + } - guard previousCount != newCount else { - return - } + guard previousCount != newCount else { + return + } - if newCount == 0 { - self.delegate?.operationQueueBecameEmpty(self.uploadQueue) - } else if previousCount == 0 && newCount > 0 { - self.delegate?.operationQueueNoLongerEmpty(self.uploadQueue) - } - } + if newCount == 0 { + delegate?.operationQueueBecameEmpty(uploadQueue) + } else if previousCount == 0 && newCount > 0 { + delegate?.operationQueueNoLongerEmpty(uploadQueue) } } } From d84dd5447d8be58172d94a496af2eb114b5da180 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Tue, 25 Mar 2025 11:15:15 +0100 Subject: [PATCH 23/29] refactor(UploadQueueObserver): Stateless implementation --- .../Queue/UploadQueueObserver.swift | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift index 999d16ebae..d977f15c87 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift @@ -18,8 +18,7 @@ import Foundation -public class UploadQueueObserver: NSObject { - private var previousCount: Int? +public final class UploadQueueObserver { private var observation: NSKeyValueObservation? private let serialEventQueue = DispatchQueue( @@ -33,7 +32,6 @@ public class UploadQueueObserver: NSObject { init(uploadQueue: UploadQueue, delegate: UploadQueueDelegate?) { self.uploadQueue = uploadQueue self.delegate = delegate - super.init() setupObservation() } @@ -45,21 +43,19 @@ public class UploadQueueObserver: NSObject { ]) { [weak self] _, change in guard let self else { return } self.serialEventQueue.async { - guard let newCount = change.newValue else { return } - self.operationCountDidChange(newCount: newCount) + self.operationCountDidChange(previousCount: change.oldValue, newCount: change.newValue) } } } - private func operationCountDidChange(newCount: Int) { - defer { previousCount = newCount } - - guard let previousCount = previousCount else { - delegate?.operationQueueNoLongerEmpty(uploadQueue) + private func operationCountDidChange(previousCount: Int?, newCount: Int?) { + guard let newCount else { + delegate?.operationQueueBecameEmpty(uploadQueue) return } - guard previousCount != newCount else { + guard let previousCount else { + delegate?.operationQueueNoLongerEmpty(uploadQueue) return } From 0acebfbdd64e0f3b718a3f3264c0bfdbcf26417a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Tue, 25 Mar 2025 11:50:10 +0100 Subject: [PATCH 24/29] chore: Fix name of function in log event --- .../Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift index ccdf2a8132..6421f77b48 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift @@ -167,14 +167,14 @@ extension UploadQueue: UploadQueueable { return } - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm ufid:\(uploadFile.id)") + Log.uploadQueue("addToQueueIfNecessary ufid:\(uploadFile.id)") guard operation(uploadFileId: uploadFile.id) != nil else { - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm ADD ufid:\(uploadFile.id)") + Log.uploadQueue("addToQueueIfNecessary ADD ufid:\(uploadFile.id)") addToQueue(uploadFile: uploadFile, itemIdentifier: nil) return } - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm NOOP ufid:\(uploadFile.id)") + Log.uploadQueue("addToQueueIfNecessary NOOP ufid:\(uploadFile.id)") } @discardableResult From a3faa7fde0fa076fc548f82f1c82a637ab49f8bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Tue, 25 Mar 2025 11:50:59 +0100 Subject: [PATCH 25/29] refactor(UploadService): Using internal serial queue for complex transactions --- .../UploadService/UploadService.swift | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift index 7de986f72c..15d7dc556a 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift @@ -33,7 +33,7 @@ public final class UploadService { @LazyInjectService var notificationHelper: NotificationsHelpable @LazyInjectService var appContextService: AppContextServiceable - private let serialRebuildUploadsQueue = DispatchQueue( + private let serialTransactionQueue = DispatchQueue( label: "com.infomaniak.drive.upload-service.rebuild-uploads", qos: .default, autoreleaseFrequency: .workItem @@ -93,13 +93,13 @@ extension UploadService: UploadServiceable { } public func blockingRebuildUploadQueue() { - serialRebuildUploadsQueue.sync { + serialTransactionQueue.sync { self.rebuildUploadQueueFromObjectsInRealm() } } public func rebuildUploadQueue() { - serialRebuildUploadsQueue.async { + serialTransactionQueue.async { self.rebuildUploadQueueFromObjectsInRealm() } } @@ -164,13 +164,13 @@ extension UploadService: UploadServiceable { return } - Task { + serialTransactionQueue.async { guard let frozenFile = self.uploadsDatabase.fetchObject(ofType: UploadFile.self, forPrimaryKey: uploadFileId)? .freeze() else { return } - let specificQueue = uploadQueue(for: frozenFile) + let specificQueue = self.uploadQueue(for: frozenFile) try? self.uploadsDatabase.writeTransaction { writableRealm in guard let file = writableRealm.object(ofType: UploadFile.self, forPrimaryKey: uploadFileId), @@ -185,7 +185,7 @@ extension UploadService: UploadServiceable { } specificQueue.addToQueue(uploadFile: frozenFile, itemIdentifier: nil) - resumeAllOperations() + self.resumeAllOperations() } } @@ -196,16 +196,16 @@ extension UploadService: UploadServiceable { return } - Task { - let failedFileIds = getFailedFileIds(parentId: parentId, userId: userId, driveId: driveId) + serialTransactionQueue.async { + let failedFileIds = self.getFailedFileIds(parentId: parentId, userId: userId, driveId: driveId) let batches = failedFileIds.chunks(ofCount: 100) Log.uploadQueue("batches:\(batches.count)") - resumeAllOperations() + self.resumeAllOperations() for batch in batches { - cancelAnyInBatch(batch) - enqueueAnyInBatch(batch) + self.cancelAnyInBatch(batch) + self.enqueueAnyInBatch(batch) } } } @@ -269,14 +269,14 @@ extension UploadService: UploadServiceable { return } - Task { - suspendAllOperations() + serialTransactionQueue.async { + self.suspendAllOperations() defer { - resumeAllOperations() + self.resumeAllOperations() Log.uploadQueue("cancelAllOperations finished") } - let uploadingFiles = getUploadingFiles(withParent: parentId, userId: userId, driveId: driveId) + let uploadingFiles = self.getUploadingFiles(withParent: parentId, userId: userId, driveId: driveId) let allUploadingFilesIds = Array(uploadingFiles.map(\.id)) let photoSyncUploadingFilesIds: [String] = uploadingFiles.compactMap { uploadFile in guard uploadFile.isPhotoSyncUpload else { return nil } @@ -294,7 +294,7 @@ extension UploadService: UploadServiceable { Log.uploadQueue("cancelAllOperations IDS count:\(allUploadingFilesIds.count) parentId:\(parentId)") - try? uploadsDatabase.writeTransaction { writableRealm in + try? self.uploadsDatabase.writeTransaction { writableRealm in // Delete all the linked UploadFiles from Realm. This is fast. Log.uploadQueue("delete all matching files count:\(uploadingFiles.count) parentId:\(parentId)") let objectsToDelete = writableRealm.objects(UploadFile.self).filter("id IN %@", allUploadingFilesIds) @@ -303,10 +303,10 @@ extension UploadService: UploadServiceable { Log.uploadQueue("Done deleting all matching files for parentId:\(parentId)") } - globalUploadQueue.cancelAllOperations(uploadingFilesIds: globalUploadingFilesIds) - photoUploadQueue.cancelAllOperations(uploadingFilesIds: photoSyncUploadingFilesIds) + self.globalUploadQueue.cancelAllOperations(uploadingFilesIds: globalUploadingFilesIds) + self.photoUploadQueue.cancelAllOperations(uploadingFilesIds: photoSyncUploadingFilesIds) - publishUploadCount(withParent: parentId, userId: userId, driveId: driveId) + self.publishUploadCount(withParent: parentId, userId: userId, driveId: driveId) } } From 1910c76e50c5d10cc17d01f68ef99dc39bfeda7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Tue, 25 Mar 2025 14:56:25 +0100 Subject: [PATCH 26/29] refactor(UploadService): Simpler init since the called method runs on a dedicated queue --- .../Data/Upload/Servicies/UploadService/UploadService.swift | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift index 15d7dc556a..9e48d02032 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift @@ -66,9 +66,7 @@ public final class UploadService { public var pausedNotificationSent = false public init() { - DispatchQueue.global(qos: .default).async { - self.rebuildUploadQueue() - } + rebuildUploadQueue() } } From fe72c1551a50b860a413c1baa14120037bc5576c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Tue, 25 Mar 2025 15:10:48 +0100 Subject: [PATCH 27/29] refactor(UploadService): New setupObservation function --- .../Parallelism/WorkloadParallelismHeuristic.swift | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift index 9a628e39b5..85f8aa52d2 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift @@ -48,7 +48,10 @@ public final class WorkloadParallelismHeuristic { init(delegate: ParallelismHeuristicDelegate) { self.delegate = delegate + setupObservation() + } + private func setupObservation() { // Update on thermal change NotificationCenter.default.addObserver( self, From f0fae91f7722fab1d09d047227b503ab44ea7dcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Tue, 25 Mar 2025 15:11:00 +0100 Subject: [PATCH 28/29] refactor(UploadParallelismOrchestrator): New setupObservation function --- .../Parallelism/UploadParallelismOrchestrator.swift | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift index 556e20ed25..ac583fffde 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift @@ -42,6 +42,10 @@ public final class UploadParallelismOrchestrator { private lazy var allQueues = [globalUploadQueue, photoUploadQueue] public init() { + setupObservation() + } + + private func setupObservation() { serialEventQueue.async { self.observeMemoryWarnings() self.uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self) From ba9d70a0336c5aa7d5614f079a74891f02b4d05a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Coye=20de=20Brune=CC=81lis?= Date: Tue, 25 Mar 2025 15:11:16 +0100 Subject: [PATCH 29/29] chore(UploadService): Removed comment --- .../Data/Upload/Servicies/UploadService/UploadService.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift index 9e48d02032..9b9e8466de 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift @@ -104,7 +104,6 @@ extension UploadService: UploadServiceable { private func rebuildUploadQueueFromObjectsInRealm() { Log.uploadQueue("rebuildUploadQueue") - // Clean cache if necessary before we try to restart the uploads. @InjectService var freeSpaceService: FreeSpaceService freeSpaceService.cleanCacheIfAlmostFull()