diff --git a/kDrive/AppRouter.swift b/kDrive/AppRouter.swift index ee817a1a6e..945148014d 100644 --- a/kDrive/AppRouter.swift +++ b/kDrive/AppRouter.swift @@ -660,7 +660,7 @@ public struct AppRouter: AppNavigable { // Resolving an upload queue will restart it if this is the first time @InjectService var uploadService: UploadServiceable - uploadService.rebuildUploadQueueFromObjectsInRealm() + uploadService.rebuildUploadQueue() } // MARK: RouterFileNavigable diff --git a/kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift b/kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift index fe91e82ea9..24563fa71c 100644 --- a/kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift +++ b/kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift @@ -504,11 +504,11 @@ extension PhotoSyncSettingsViewController: FooterButtonDelegate { sender.setLoading(false) } - DispatchQueue.global(qos: .userInitiated).async { + DispatchQueue.global(qos: .default).async { // Add new pictures to be uploaded and reload upload queue self.photoLibraryUploader.scheduleNewPicturesForUpload() @InjectService var uploadService: UploadServiceable - uploadService.rebuildUploadQueueFromObjectsInRealm() + uploadService.rebuildUploadQueue() } } } diff --git a/kDriveCore/DI/FactoryService.swift b/kDriveCore/DI/FactoryService.swift index 7fa651c67a..ec6bd13e3d 100644 --- a/kDriveCore/DI/FactoryService.swift +++ b/kDriveCore/DI/FactoryService.swift @@ -124,6 +124,9 @@ public enum FactoryService { factoryParameters: nil, resolver: resolver) }, + Factory(type: UploadQueueDelegate.self) { _, _ in + UploadParallelismOrchestrator() + }, Factory(type: BGTaskScheduler.self) { _, _ in BGTaskScheduler.shared }, @@ -232,12 +235,22 @@ public enum FactoryService { } static var uploadQueues: [FactoryWithIdentifier] { - let globalUploadQueue = Factory(type: UploadQueueable.self) { _, _ in - UploadQueue() + let globalUploadQueue = Factory(type: UploadQueueable.self) { _, resolver in + let uploadQueueDelegate = try resolver.resolve(type: UploadQueueDelegate.self, + forCustomTypeIdentifier: nil, + factoryParameters: nil, + resolver: resolver) + + return UploadQueue(delegate: uploadQueueDelegate) } - let photoUploadQueue = Factory(type: UploadQueueable.self) { _, _ in - PhotoUploadQueue() + let photoUploadQueue = Factory(type: UploadQueueable.self) { _, resolver in + let uploadQueueDelegate = try resolver.resolve(type: UploadQueueDelegate.self, + forCustomTypeIdentifier: nil, + factoryParameters: nil, + resolver: resolver) + + return PhotoUploadQueue(delegate: uploadQueueDelegate) } let services = [ diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift new file mode 100644 index 0000000000..ac583fffde --- /dev/null +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift @@ -0,0 +1,122 @@ +/* + Infomaniak kDrive - iOS App + Copyright (C) 2025 Infomaniak Network SA + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +import Foundation +import InfomaniakDI + +public final class UploadParallelismOrchestrator { + @LazyInjectService(customTypeIdentifier: UploadQueueID.global) private var globalUploadQueue: UploadQueueable + @LazyInjectService(customTypeIdentifier: UploadQueueID.photo) private var photoUploadQueue: UploadQueueable + @LazyInjectService private var appContextService: AppContextServiceable + + private let serialEventQueue = DispatchQueue( + label: "com.infomaniak.drive.upload-parallelism-orchestrator.event", + qos: .default + ) + + private var uploadParallelismHeuristic: WorkloadParallelismHeuristic? + private var memoryPressureObserver: DispatchSourceMemoryPressure? + + private var availableParallelism: Int { + guard let uploadParallelismHeuristic else { + return ParallelismDefaults.reducedParallelism + } + return uploadParallelismHeuristic.currentParallelism + } + + private lazy var allQueues = [globalUploadQueue, photoUploadQueue] + + public init() { + setupObservation() + } + + private func setupObservation() { + serialEventQueue.async { + self.observeMemoryWarnings() + self.uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self) + } + } + + func observeMemoryWarnings() { + guard appContextService.context == .fileProviderExtension else { + return + } + + let source = DispatchSource.makeMemoryPressureSource(eventMask: .all, queue: .main) + memoryPressureObserver = source + source.setEventHandler { [weak self] in + guard let self else { return } + let event: DispatchSource.MemoryPressureEvent = source.data + switch event { + case DispatchSource.MemoryPressureEvent.normal: + Log.uploadQueue("MemoryPressureEvent normal", level: .info) + case DispatchSource.MemoryPressureEvent.warning: + Log.uploadQueue("MemoryPressureEvent warning", level: .info) + case DispatchSource.MemoryPressureEvent.critical: + Log.uploadQueue("MemoryPressureEvent critical", level: .error) + serialEventQueue.async { + @InjectService var uploadService: UploadServiceable + uploadService.rescheduleRunningOperations() + } + default: + break + } + } + source.resume() + } + + private func computeUploadParallelismPerQueueAndApply() { + serialEventQueue.async { + let currentAvailableParallelism = self.availableParallelism + Log.uploadQueue("Current total available upload parallelism :\(currentAvailableParallelism)") + + let activeQueues = self.allQueues.filter(\.isActive) + let inactiveQueues = self.allQueues.filter { !$0.isActive } + + assert(activeQueues.count + inactiveQueues.count == self.allQueues.count, "queue count should match") + + inactiveQueues.forEach { $0.parallelismShouldChange(value: ParallelismDefaults.serial) } + + Log.uploadQueue("Inactive queues:\(inactiveQueues.count) set to serial") + guard !activeQueues.isEmpty else { + Log.uploadQueue("No active queues") + return + } + + let parallelismPerActiveQueue = max(ParallelismDefaults.serial, currentAvailableParallelism / activeQueues.count) + Log.uploadQueue("Active queues \(activeQueues.count) new parallelism:\(parallelismPerActiveQueue)") + activeQueues.forEach { $0.parallelismShouldChange(value: parallelismPerActiveQueue) } + } + } +} + +extension UploadParallelismOrchestrator: UploadQueueDelegate { + public func operationQueueBecameEmpty(_ queue: UploadQueue) { + computeUploadParallelismPerQueueAndApply() + } + + public func operationQueueNoLongerEmpty(_ queue: UploadQueue) { + computeUploadParallelismPerQueueAndApply() + } +} + +extension UploadParallelismOrchestrator: ParallelismHeuristicDelegate { + public func parallelismShouldChange(value: Int) { + computeUploadParallelismPerQueueAndApply() + } +} diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/WorkloadParallelismHeuristic.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift similarity index 72% rename from kDriveCore/Data/Upload/UploadQueue/Queue/WorkloadParallelismHeuristic.swift rename to kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift index e8de6e163f..85f8aa52d2 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/WorkloadParallelismHeuristic.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift @@ -19,8 +19,14 @@ import Foundation import InfomaniakDI +public enum ParallelismDefaults { + static let reducedParallelism = 2 + + static let serial = 1 +} + /// Delegate protocol of UploadParallelismHeuristic -protocol ParallelismHeuristicDelegate: AnyObject { +public protocol ParallelismHeuristicDelegate: AnyObject { /// This method is called with a new parallelism to apply each time to the uploadQueue /// - Parameter value: The new parallelism value to use func parallelismShouldChange(value: Int) @@ -30,21 +36,26 @@ protocol ParallelismHeuristicDelegate: AnyObject { /// /// Value can change depending on many factors, including thermal state battery or extension mode. /// Scaling is achieved given the number of active cores available. -final class WorkloadParallelismHeuristic { - /// With 2 Operations max, and a chuck of 1MiB max, the UploadQueue can spike to max 4MiB memory usage. - private static let reducedParallelism = 2 - +public final class WorkloadParallelismHeuristic { @LazyInjectService private var appContextService: AppContextServiceable private weak var delegate: ParallelismHeuristicDelegate? + private let serialEventQueue = DispatchQueue( + label: "com.infomaniak.drive.parallelism-heuristic.event", + qos: .default + ) + init(delegate: ParallelismHeuristicDelegate) { self.delegate = delegate + setupObservation() + } + private func setupObservation() { // Update on thermal change NotificationCenter.default.addObserver( self, - selector: #selector(computeParallelism), + selector: #selector(computeParallelismInQueue), name: ProcessInfo.thermalStateDidChangeNotification, object: nil ) @@ -52,13 +63,12 @@ final class WorkloadParallelismHeuristic { // Update on low power mode NotificationCenter.default.addObserver( self, - selector: #selector(computeParallelism), + selector: #selector(computeParallelismInQueue), name: NSNotification.Name.NSProcessInfoPowerStateDidChange, object: nil ) - // Update the value a first time - computeParallelism() + computeParallelismInQueue() } deinit { @@ -66,25 +76,31 @@ final class WorkloadParallelismHeuristic { NotificationCenter.default.removeObserver(self, name: NSNotification.Name.NSProcessInfoPowerStateDidChange, object: nil) } - @objc private func computeParallelism() { + @objc private func computeParallelismInQueue() { + serialEventQueue.async { + self.computeParallelism() + } + } + + private func computeParallelism() { let processInfo = ProcessInfo.processInfo // If the device is too hot we cool down now let thermalState = processInfo.thermalState guard thermalState != .critical else { - currentParallelism = Self.reducedParallelism + currentParallelism = ParallelismDefaults.reducedParallelism return } // In low power mode, we reduce parallelism guard !processInfo.isLowPowerModeEnabled else { - currentParallelism = Self.reducedParallelism + currentParallelism = ParallelismDefaults.reducedParallelism return } // In extension, to reduce memory footprint, we reduce drastically parallelism guard !appContextService.isExtension else { - currentParallelism = Self.reducedParallelism + currentParallelism = ParallelismDefaults.reducedParallelism return } @@ -93,14 +109,14 @@ final class WorkloadParallelismHeuristic { // Beginning with .serious state, we start reducing the load on the system guard thermalState != .serious else { - currentParallelism = max(Self.reducedParallelism, parallelism / 2) + currentParallelism = max(ParallelismDefaults.reducedParallelism, parallelism / 2) return } currentParallelism = parallelism } - public private(set) var currentParallelism = 0 { + public private(set) var currentParallelism = ParallelismDefaults.reducedParallelism { didSet { delegate?.parallelismShouldChange(value: currentParallelism) } diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift index 8746f267c7..4357e293ea 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift @@ -48,7 +48,7 @@ extension UploadService: UploadNotifiable { public func sendFileUploadStateNotificationIfNeeded(with result: UploadCompletionResult) { Log.uploadQueue("sendFileUploadStateNotificationIfNeeded") - serialQueue.async { [weak self] in + serialEventQueue.async { [weak self] in guard let self else { return } guard let uploadFile = result.uploadFile, uploadFile.error != .taskRescheduled, diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift index 07108d1f73..55f5fb4d93 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift @@ -49,7 +49,7 @@ extension UploadService: UploadObservable { fileId: String? = nil, using closure: @escaping (UploadFile, File?) -> Void) -> ObservationToken { var token: ObservationToken! - serialQueue.sync { [weak self] in + serialEventQueue.sync { [weak self] in guard let self else { return } let key = UUID() observations.didUploadFile[key] = { [weak self, weak observer] uploadFile, driveFile in @@ -81,7 +81,7 @@ extension UploadService: UploadObservable { parentId: Int, using closure: @escaping (Int, Int) -> Void) -> ObservationToken { var token: ObservationToken! - serialQueue.sync { [weak self] in + serialEventQueue.sync { [weak self] in guard let self else { return } let key = UUID() observations.didChangeUploadCountInParent[key] = { [weak self, weak observer] updatedParentId, count in @@ -111,7 +111,7 @@ extension UploadService: UploadObservable { driveId: Int, using closure: @escaping (Int, Int) -> Void) -> ObservationToken { var token: ObservationToken! - serialQueue.sync { [weak self] in + serialEventQueue.sync { [weak self] in guard let self else { return } let key = UUID() observations.didChangeUploadCountInDrive[key] = { [weak self, weak observer] updatedDriveId, count in diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift index 5d49b098e8..75cec7fec9 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift @@ -43,7 +43,7 @@ extension UploadService: UploadPublishable { userId: Int, driveId: Int) { Log.uploadQueue("publishUploadCount") - serialQueue.async { [weak self] in + serialEventQueue.async { [weak self] in guard let self else { return } publishUploadCountInParent(parentId: parentId, userId: userId, driveId: driveId) publishUploadCountInDrive(userId: userId, driveId: driveId) @@ -54,7 +54,7 @@ extension UploadService: UploadPublishable { userId: Int, driveId: Int) { Log.uploadQueue("publishUploadCountInParent") - serialQueue.async { [weak self] in + serialEventQueue.async { [weak self] in guard let self else { return } let uploadCount = getUploadingFiles(withParent: parentId, userId: userId, driveId: driveId).count @@ -69,7 +69,7 @@ extension UploadService: UploadPublishable { public func publishUploadCountInDrive(userId: Int, driveId: Int) { Log.uploadQueue("publishUploadCountInDrive") - serialQueue.async { [weak self] in + serialEventQueue.async { [weak self] in guard let self else { return } let uploadCount = getUploadingFiles(userId: userId, driveId: driveId).count for closure in observations.didChangeUploadCountInDrive.values { @@ -84,7 +84,7 @@ extension UploadService: UploadPublishable { Log.uploadQueue("publishFileUploaded") logFileUploadedWithSuccess(for: result.uploadFile) sendFileUploadStateNotificationIfNeeded(with: result) - serialQueue.async { [weak self] in + serialEventQueue.async { [weak self] in guard let self else { return } for closure in observations.didUploadFile.values { guard let uploadFile = result.uploadFile, !uploadFile.isInvalidated else { diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift index 282998b50e..9b9e8466de 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift @@ -26,19 +26,26 @@ public enum UploadServiceBackgroundIdentifier { } public final class UploadService { - @LazyInjectService(customTypeIdentifier: UploadQueueID.global) var globalUploadQueue: UploadQueueable - @LazyInjectService(customTypeIdentifier: UploadQueueID.photo) var photoUploadQueue: UploadQueueable + @InjectService(customTypeIdentifier: UploadQueueID.global) var globalUploadQueue: UploadQueueable + @InjectService(customTypeIdentifier: UploadQueueID.photo) var photoUploadQueue: UploadQueueable + @LazyInjectService(customTypeIdentifier: kDriveDBID.uploads) var uploadsDatabase: Transactionable @LazyInjectService var notificationHelper: NotificationsHelpable @LazyInjectService var appContextService: AppContextServiceable - let serialQueue: DispatchQueue = { - @LazyInjectService var appContextService: AppContextServiceable + private let serialTransactionQueue = DispatchQueue( + label: "com.infomaniak.drive.upload-service.rebuild-uploads", + qos: .default, + autoreleaseFrequency: .workItem + ) + + let serialEventQueue: DispatchQueue = { + @InjectService var appContextService: AppContextServiceable let autoreleaseFrequency: DispatchQueue.AutoreleaseFrequency = appContextService.isExtension ? .workItem : .inherit return DispatchQueue( - label: "com.infomaniak.drive.upload-service", - qos: .userInitiated, + label: "com.infomaniak.drive.upload-service.event", + qos: .default, autoreleaseFrequency: autoreleaseFrequency ) }() @@ -59,9 +66,7 @@ public final class UploadService { public var pausedNotificationSent = false public init() { - Task { - rebuildUploadQueueFromObjectsInRealm() - } + rebuildUploadQueue() } } @@ -85,43 +90,52 @@ extension UploadService: UploadServiceable { allQueues.allSatisfy(\.isSuspended) } - public func rebuildUploadQueueFromObjectsInRealm() { - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm") - serialQueue.sync { - // Clean cache if necessary before we try to restart the uploads. - @InjectService var freeSpaceService: FreeSpaceService - freeSpaceService.cleanCacheIfAlmostFull() + public func blockingRebuildUploadQueue() { + serialTransactionQueue.sync { + self.rebuildUploadQueueFromObjectsInRealm() + } + } - guard let uploadFileQuery else { - Log.uploadQueue("\(#function) disabled in \(appContextService.context.rawValue)", level: .error) - return - } + public func rebuildUploadQueue() { + serialTransactionQueue.async { + self.rebuildUploadQueueFromObjectsInRealm() + } + } - let uploadingFiles = uploadsDatabase.fetchResults(ofType: UploadFile.self) { lazyCollection in - return lazyCollection.filter(uploadFileQuery) - .sorted(byKeyPath: "taskCreationDate") - } + private func rebuildUploadQueueFromObjectsInRealm() { + Log.uploadQueue("rebuildUploadQueue") + @InjectService var freeSpaceService: FreeSpaceService + freeSpaceService.cleanCacheIfAlmostFull() - let uploadingFileIds = Array(uploadingFiles.map(\.id)) - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm uploads to restart:\(uploadingFileIds.count)") + guard let uploadFileQuery else { + Log.uploadQueue("\(#function) disabled in \(appContextService.context.rawValue)", level: .error) + return + } - let batches = uploadingFileIds.chunks(ofCount: 100) - Log.uploadQueue("batched count:\(batches.count)") - for batch in batches { - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm in batch") - let batchArray = Array(batch) - let matchedFrozenFiles = uploadsDatabase.fetchResults(ofType: UploadFile.self) { lazyCollection in - lazyCollection.filter("id IN %@", batchArray).freezeIfNeeded() - } - for file in matchedFrozenFiles { - let uploadQueue = uploadQueue(for: file) - uploadQueue.addToQueueIfNecessary(uploadFile: file, itemIdentifier: nil) - } - resumeAllOperations() - } + let uploadingFiles = uploadsDatabase.fetchResults(ofType: UploadFile.self) { lazyCollection in + return lazyCollection.filter(uploadFileQuery) + .sorted(byKeyPath: "taskCreationDate") + } - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm exit") + let uploadingFileIds = Array(uploadingFiles.map(\.id)) + Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm uploads to restart:\(uploadingFileIds.count)") + + let batches = uploadingFileIds.chunks(ofCount: 100) + Log.uploadQueue("batched count:\(batches.count)") + for batch in batches { + Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm in batch") + let batchArray = Array(batch) + let matchedFrozenFiles = uploadsDatabase.fetchResults(ofType: UploadFile.self) { lazyCollection in + lazyCollection.filter("id IN %@", batchArray).freezeIfNeeded() + } + for file in matchedFrozenFiles { + let uploadQueue = uploadQueue(for: file) + uploadQueue.addToQueueIfNecessary(uploadFile: file, itemIdentifier: nil) + } + resumeAllOperations() } + + Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm exit") } public func suspendAllOperations() { @@ -147,13 +161,13 @@ extension UploadService: UploadServiceable { return } - Task { + serialTransactionQueue.async { guard let frozenFile = self.uploadsDatabase.fetchObject(ofType: UploadFile.self, forPrimaryKey: uploadFileId)? .freeze() else { return } - let specificQueue = uploadQueue(for: frozenFile) + let specificQueue = self.uploadQueue(for: frozenFile) try? self.uploadsDatabase.writeTransaction { writableRealm in guard let file = writableRealm.object(ofType: UploadFile.self, forPrimaryKey: uploadFileId), @@ -168,7 +182,7 @@ extension UploadService: UploadServiceable { } specificQueue.addToQueue(uploadFile: frozenFile, itemIdentifier: nil) - resumeAllOperations() + self.resumeAllOperations() } } @@ -179,16 +193,16 @@ extension UploadService: UploadServiceable { return } - Task { - let failedFileIds = getFailedFileIds(parentId: parentId, userId: userId, driveId: driveId) + serialTransactionQueue.async { + let failedFileIds = self.getFailedFileIds(parentId: parentId, userId: userId, driveId: driveId) let batches = failedFileIds.chunks(ofCount: 100) Log.uploadQueue("batches:\(batches.count)") - resumeAllOperations() + self.resumeAllOperations() for batch in batches { - cancelAnyInBatch(batch) - enqueueAnyInBatch(batch) + self.cancelAnyInBatch(batch) + self.enqueueAnyInBatch(batch) } } } @@ -252,14 +266,14 @@ extension UploadService: UploadServiceable { return } - Task { - suspendAllOperations() + serialTransactionQueue.async { + self.suspendAllOperations() defer { - resumeAllOperations() + self.resumeAllOperations() Log.uploadQueue("cancelAllOperations finished") } - let uploadingFiles = getUploadingFiles(withParent: parentId, userId: userId, driveId: driveId) + let uploadingFiles = self.getUploadingFiles(withParent: parentId, userId: userId, driveId: driveId) let allUploadingFilesIds = Array(uploadingFiles.map(\.id)) let photoSyncUploadingFilesIds: [String] = uploadingFiles.compactMap { uploadFile in guard uploadFile.isPhotoSyncUpload else { return nil } @@ -277,7 +291,7 @@ extension UploadService: UploadServiceable { Log.uploadQueue("cancelAllOperations IDS count:\(allUploadingFilesIds.count) parentId:\(parentId)") - try? uploadsDatabase.writeTransaction { writableRealm in + try? self.uploadsDatabase.writeTransaction { writableRealm in // Delete all the linked UploadFiles from Realm. This is fast. Log.uploadQueue("delete all matching files count:\(uploadingFiles.count) parentId:\(parentId)") let objectsToDelete = writableRealm.objects(UploadFile.self).filter("id IN %@", allUploadingFilesIds) @@ -286,10 +300,10 @@ extension UploadService: UploadServiceable { Log.uploadQueue("Done deleting all matching files for parentId:\(parentId)") } - globalUploadQueue.cancelAllOperations(uploadingFilesIds: globalUploadingFilesIds) - photoUploadQueue.cancelAllOperations(uploadingFilesIds: photoSyncUploadingFilesIds) + self.globalUploadQueue.cancelAllOperations(uploadingFilesIds: globalUploadingFilesIds) + self.photoUploadQueue.cancelAllOperations(uploadingFilesIds: photoSyncUploadingFilesIds) - publishUploadCount(withParent: parentId, userId: userId, driveId: driveId) + self.publishUploadCount(withParent: parentId, userId: userId, driveId: driveId) } } diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadServiceable.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadServiceable.swift index f733e9f32b..bb671d741a 100644 --- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadServiceable.swift +++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadServiceable.swift @@ -28,7 +28,9 @@ public protocol UploadServiceable { var operationCount: Int { get } - func rebuildUploadQueueFromObjectsInRealm() + func blockingRebuildUploadQueue() + + func rebuildUploadQueue() func suspendAllOperations() diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift index 4c6e6a1694..6421f77b48 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift @@ -34,6 +34,10 @@ extension UploadQueue: UploadQueueable { operationQueue.isSuspended } + public var isActive: Bool { + operationQueue.operationCount > 0 && !operationQueue.isSuspended + } + public func waitForCompletion(_ completionHandler: @escaping () -> Void) { Log.uploadQueue("waitForCompletion") DispatchQueue.global(qos: .default).async { @@ -163,14 +167,14 @@ extension UploadQueue: UploadQueueable { return } - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm ufid:\(uploadFile.id)") + Log.uploadQueue("addToQueueIfNecessary ufid:\(uploadFile.id)") guard operation(uploadFileId: uploadFile.id) != nil else { - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm ADD ufid:\(uploadFile.id)") + Log.uploadQueue("addToQueueIfNecessary ADD ufid:\(uploadFile.id)") addToQueue(uploadFile: uploadFile, itemIdentifier: nil) return } - Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm NOOP ufid:\(uploadFile.id)") + Log.uploadQueue("addToQueueIfNecessary NOOP ufid:\(uploadFile.id)") } @discardableResult diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift index 8f50c19920..69990d7580 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift @@ -23,18 +23,22 @@ import InfomaniakDI import RealmSwift import Sentry -public class UploadQueue: ParallelismHeuristicDelegate { - private var memoryPressure: DispatchSourceMemoryPressure? +public protocol UploadQueueDelegate: AnyObject { + func operationQueueBecameEmpty(_ queue: UploadQueue) + func operationQueueNoLongerEmpty(_ queue: UploadQueue) +} +public class UploadQueue: ParallelismHeuristicDelegate { @LazyInjectService var appContextService: AppContextServiceable @LazyInjectService var uploadPublisher: UploadPublishable + private var queueObserver: UploadQueueObserver? + + weak var delegate: UploadQueueDelegate? + /// Something to track an operation for a File ID let keyedUploadOperations = KeyedUploadOperationable() - /// Something to adapt the upload parallelism live - var uploadParallelismHeuristic: WorkloadParallelismHeuristic? - public lazy var operationQueue: OperationQueue = { let queue = OperationQueue() queue.name = "kDrive upload queue" @@ -69,62 +73,28 @@ public class UploadQueue: ParallelismHeuristicDelegate { /// Should suspend operation queue based on explicit `suspendAllOperations()` call var forceSuspendQueue = false - public init() { + public init(delegate: UploadQueueDelegate?) { guard appContextService.context != .shareExtension else { Log.uploadQueue("UploadQueue disabled in ShareExtension", level: .error) return } - Log.uploadQueue("Starting up") + self.delegate = delegate - uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self) - - // Observe network state change ReachabilityListener.instance.observeNetworkChange(self) { [weak self] _ in - guard let self else { - return - } + guard let self else { return } let isSuspended = (shouldSuspendQueue || forceSuspendQueue) operationQueue.isSuspended = isSuspended Log.uploadQueue("observeNetworkChange :\(isSuspended)") } - observeMemoryWarnings() - - Log.uploadQueue("UploadQueue parallelism is:\(operationQueue.maxConcurrentOperationCount)") - } - - // MARK: - Memory warnings - - /// A critical memory warning in `FileProvider` context will reschedule, in order to transition uploads to Main App. - private func observeMemoryWarnings() { - guard appContextService.context == .fileProviderExtension else { - return - } - - let source = DispatchSource.makeMemoryPressureSource(eventMask: .all, queue: .main) - memoryPressure = source - source.setEventHandler { - let event: DispatchSource.MemoryPressureEvent = source.data - switch event { - case DispatchSource.MemoryPressureEvent.normal: - Log.uploadQueue("MemoryPressureEvent normal", level: .info) - case DispatchSource.MemoryPressureEvent.warning: - Log.uploadQueue("MemoryPressureEvent warning", level: .info) - case DispatchSource.MemoryPressureEvent.critical: - Log.uploadQueue("MemoryPressureEvent critical", level: .error) - self.rescheduleRunningOperations() - default: - break - } - } - source.resume() + queueObserver = UploadQueueObserver(uploadQueue: self, delegate: delegate) } // MARK: - ParallelismHeuristicDelegate - func parallelismShouldChange(value: Int) { + public func parallelismShouldChange(value: Int) { Log.uploadQueue("Upload queue new parallelism: \(value)", level: .info) operationQueue.maxConcurrentOperationCount = value } diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift new file mode 100644 index 0000000000..d977f15c87 --- /dev/null +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift @@ -0,0 +1,68 @@ +/* + Infomaniak kDrive - iOS App + Copyright (C) 2025 Infomaniak Network SA + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +import Foundation + +public final class UploadQueueObserver { + private var observation: NSKeyValueObservation? + + private let serialEventQueue = DispatchQueue( + label: "com.infomaniak.drive.upload-queue-observer.event.\(UUID().uuidString)", + qos: .default + ) + + var uploadQueue: UploadQueue + weak var delegate: UploadQueueDelegate? + + init(uploadQueue: UploadQueue, delegate: UploadQueueDelegate?) { + self.uploadQueue = uploadQueue + self.delegate = delegate + + setupObservation() + } + + private func setupObservation() { + observation = uploadQueue.operationQueue.observe(\.operationCount, options: [ + .new, + .old + ]) { [weak self] _, change in + guard let self else { return } + self.serialEventQueue.async { + self.operationCountDidChange(previousCount: change.oldValue, newCount: change.newValue) + } + } + } + + private func operationCountDidChange(previousCount: Int?, newCount: Int?) { + guard let newCount else { + delegate?.operationQueueBecameEmpty(uploadQueue) + return + } + + guard let previousCount else { + delegate?.operationQueueNoLongerEmpty(uploadQueue) + return + } + + if newCount == 0 { + delegate?.operationQueueBecameEmpty(uploadQueue) + } else if previousCount == 0 && newCount > 0 { + delegate?.operationQueueNoLongerEmpty(uploadQueue) + } + } +} diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift index 1bab7aa131..79a8eef15f 100644 --- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift +++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift @@ -38,9 +38,13 @@ public protocol UploadQueueable { func rescheduleRunningOperations() + func parallelismShouldChange(value: Int) + func cancel(uploadFileId: String) var operationCount: Int { get } var isSuspended: Bool { get } + + var isActive: Bool { get } } diff --git a/kDriveCore/Services/BackgroundTasksService.swift b/kDriveCore/Services/BackgroundTasksService.swift index fd7dfe1e48..3df93fd41a 100644 --- a/kDriveCore/Services/BackgroundTasksService.swift +++ b/kDriveCore/Services/BackgroundTasksService.swift @@ -121,7 +121,7 @@ struct BackgroundTasksService: BackgroundTasksServiceable { } Log.backgroundTaskScheduling("Reload operations in queue") - uploadService.rebuildUploadQueueFromObjectsInRealm() + uploadService.blockingRebuildUploadQueue() guard !expiringActivity.shouldTerminate else { Log.backgroundTaskScheduling(Self.activityShouldTerminateMessage, level: .error)