diff --git a/kDrive/AppRouter.swift b/kDrive/AppRouter.swift
index ee817a1a6e..945148014d 100644
--- a/kDrive/AppRouter.swift
+++ b/kDrive/AppRouter.swift
@@ -660,7 +660,7 @@ public struct AppRouter: AppNavigable {
// Resolving an upload queue will restart it if this is the first time
@InjectService var uploadService: UploadServiceable
- uploadService.rebuildUploadQueueFromObjectsInRealm()
+ uploadService.rebuildUploadQueue()
}
// MARK: RouterFileNavigable
diff --git a/kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift b/kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift
index fe91e82ea9..24563fa71c 100644
--- a/kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift
+++ b/kDrive/UI/Controller/Menu/PhotoSyncSettingsViewController.swift
@@ -504,11 +504,11 @@ extension PhotoSyncSettingsViewController: FooterButtonDelegate {
sender.setLoading(false)
}
- DispatchQueue.global(qos: .userInitiated).async {
+ DispatchQueue.global(qos: .default).async {
// Add new pictures to be uploaded and reload upload queue
self.photoLibraryUploader.scheduleNewPicturesForUpload()
@InjectService var uploadService: UploadServiceable
- uploadService.rebuildUploadQueueFromObjectsInRealm()
+ uploadService.rebuildUploadQueue()
}
}
}
diff --git a/kDriveCore/DI/FactoryService.swift b/kDriveCore/DI/FactoryService.swift
index 7fa651c67a..ec6bd13e3d 100644
--- a/kDriveCore/DI/FactoryService.swift
+++ b/kDriveCore/DI/FactoryService.swift
@@ -124,6 +124,9 @@ public enum FactoryService {
factoryParameters: nil,
resolver: resolver)
},
+ Factory(type: UploadQueueDelegate.self) { _, _ in
+ UploadParallelismOrchestrator()
+ },
Factory(type: BGTaskScheduler.self) { _, _ in
BGTaskScheduler.shared
},
@@ -232,12 +235,22 @@ public enum FactoryService {
}
static var uploadQueues: [FactoryWithIdentifier] {
- let globalUploadQueue = Factory(type: UploadQueueable.self) { _, _ in
- UploadQueue()
+ let globalUploadQueue = Factory(type: UploadQueueable.self) { _, resolver in
+ let uploadQueueDelegate = try resolver.resolve(type: UploadQueueDelegate.self,
+ forCustomTypeIdentifier: nil,
+ factoryParameters: nil,
+ resolver: resolver)
+
+ return UploadQueue(delegate: uploadQueueDelegate)
}
- let photoUploadQueue = Factory(type: UploadQueueable.self) { _, _ in
- PhotoUploadQueue()
+ let photoUploadQueue = Factory(type: UploadQueueable.self) { _, resolver in
+ let uploadQueueDelegate = try resolver.resolve(type: UploadQueueDelegate.self,
+ forCustomTypeIdentifier: nil,
+ factoryParameters: nil,
+ resolver: resolver)
+
+ return PhotoUploadQueue(delegate: uploadQueueDelegate)
}
let services = [
diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift
new file mode 100644
index 0000000000..ac583fffde
--- /dev/null
+++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift
@@ -0,0 +1,122 @@
+/*
+ Infomaniak kDrive - iOS App
+ Copyright (C) 2025 Infomaniak Network SA
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see .
+ */
+
+import Foundation
+import InfomaniakDI
+
+public final class UploadParallelismOrchestrator {
+ @LazyInjectService(customTypeIdentifier: UploadQueueID.global) private var globalUploadQueue: UploadQueueable
+ @LazyInjectService(customTypeIdentifier: UploadQueueID.photo) private var photoUploadQueue: UploadQueueable
+ @LazyInjectService private var appContextService: AppContextServiceable
+
+ private let serialEventQueue = DispatchQueue(
+ label: "com.infomaniak.drive.upload-parallelism-orchestrator.event",
+ qos: .default
+ )
+
+ private var uploadParallelismHeuristic: WorkloadParallelismHeuristic?
+ private var memoryPressureObserver: DispatchSourceMemoryPressure?
+
+ private var availableParallelism: Int {
+ guard let uploadParallelismHeuristic else {
+ return ParallelismDefaults.reducedParallelism
+ }
+ return uploadParallelismHeuristic.currentParallelism
+ }
+
+ private lazy var allQueues = [globalUploadQueue, photoUploadQueue]
+
+ public init() {
+ setupObservation()
+ }
+
+ private func setupObservation() {
+ serialEventQueue.async {
+ self.observeMemoryWarnings()
+ self.uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self)
+ }
+ }
+
+ func observeMemoryWarnings() {
+ guard appContextService.context == .fileProviderExtension else {
+ return
+ }
+
+ let source = DispatchSource.makeMemoryPressureSource(eventMask: .all, queue: .main)
+ memoryPressureObserver = source
+ source.setEventHandler { [weak self] in
+ guard let self else { return }
+ let event: DispatchSource.MemoryPressureEvent = source.data
+ switch event {
+ case DispatchSource.MemoryPressureEvent.normal:
+ Log.uploadQueue("MemoryPressureEvent normal", level: .info)
+ case DispatchSource.MemoryPressureEvent.warning:
+ Log.uploadQueue("MemoryPressureEvent warning", level: .info)
+ case DispatchSource.MemoryPressureEvent.critical:
+ Log.uploadQueue("MemoryPressureEvent critical", level: .error)
+ serialEventQueue.async {
+ @InjectService var uploadService: UploadServiceable
+ uploadService.rescheduleRunningOperations()
+ }
+ default:
+ break
+ }
+ }
+ source.resume()
+ }
+
+ private func computeUploadParallelismPerQueueAndApply() {
+ serialEventQueue.async {
+ let currentAvailableParallelism = self.availableParallelism
+ Log.uploadQueue("Current total available upload parallelism :\(currentAvailableParallelism)")
+
+ let activeQueues = self.allQueues.filter(\.isActive)
+ let inactiveQueues = self.allQueues.filter { !$0.isActive }
+
+ assert(activeQueues.count + inactiveQueues.count == self.allQueues.count, "queue count should match")
+
+ inactiveQueues.forEach { $0.parallelismShouldChange(value: ParallelismDefaults.serial) }
+
+ Log.uploadQueue("Inactive queues:\(inactiveQueues.count) set to serial")
+ guard !activeQueues.isEmpty else {
+ Log.uploadQueue("No active queues")
+ return
+ }
+
+ let parallelismPerActiveQueue = max(ParallelismDefaults.serial, currentAvailableParallelism / activeQueues.count)
+ Log.uploadQueue("Active queues \(activeQueues.count) new parallelism:\(parallelismPerActiveQueue)")
+ activeQueues.forEach { $0.parallelismShouldChange(value: parallelismPerActiveQueue) }
+ }
+ }
+}
+
+extension UploadParallelismOrchestrator: UploadQueueDelegate {
+ public func operationQueueBecameEmpty(_ queue: UploadQueue) {
+ computeUploadParallelismPerQueueAndApply()
+ }
+
+ public func operationQueueNoLongerEmpty(_ queue: UploadQueue) {
+ computeUploadParallelismPerQueueAndApply()
+ }
+}
+
+extension UploadParallelismOrchestrator: ParallelismHeuristicDelegate {
+ public func parallelismShouldChange(value: Int) {
+ computeUploadParallelismPerQueueAndApply()
+ }
+}
diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/WorkloadParallelismHeuristic.swift b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift
similarity index 72%
rename from kDriveCore/Data/Upload/UploadQueue/Queue/WorkloadParallelismHeuristic.swift
rename to kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift
index e8de6e163f..85f8aa52d2 100644
--- a/kDriveCore/Data/Upload/UploadQueue/Queue/WorkloadParallelismHeuristic.swift
+++ b/kDriveCore/Data/Upload/Servicies/UploadService/Parallelism/WorkloadParallelismHeuristic.swift
@@ -19,8 +19,14 @@
import Foundation
import InfomaniakDI
+public enum ParallelismDefaults {
+ static let reducedParallelism = 2
+
+ static let serial = 1
+}
+
/// Delegate protocol of UploadParallelismHeuristic
-protocol ParallelismHeuristicDelegate: AnyObject {
+public protocol ParallelismHeuristicDelegate: AnyObject {
/// This method is called with a new parallelism to apply each time to the uploadQueue
/// - Parameter value: The new parallelism value to use
func parallelismShouldChange(value: Int)
@@ -30,21 +36,26 @@ protocol ParallelismHeuristicDelegate: AnyObject {
///
/// Value can change depending on many factors, including thermal state battery or extension mode.
/// Scaling is achieved given the number of active cores available.
-final class WorkloadParallelismHeuristic {
- /// With 2 Operations max, and a chuck of 1MiB max, the UploadQueue can spike to max 4MiB memory usage.
- private static let reducedParallelism = 2
-
+public final class WorkloadParallelismHeuristic {
@LazyInjectService private var appContextService: AppContextServiceable
private weak var delegate: ParallelismHeuristicDelegate?
+ private let serialEventQueue = DispatchQueue(
+ label: "com.infomaniak.drive.parallelism-heuristic.event",
+ qos: .default
+ )
+
init(delegate: ParallelismHeuristicDelegate) {
self.delegate = delegate
+ setupObservation()
+ }
+ private func setupObservation() {
// Update on thermal change
NotificationCenter.default.addObserver(
self,
- selector: #selector(computeParallelism),
+ selector: #selector(computeParallelismInQueue),
name: ProcessInfo.thermalStateDidChangeNotification,
object: nil
)
@@ -52,13 +63,12 @@ final class WorkloadParallelismHeuristic {
// Update on low power mode
NotificationCenter.default.addObserver(
self,
- selector: #selector(computeParallelism),
+ selector: #selector(computeParallelismInQueue),
name: NSNotification.Name.NSProcessInfoPowerStateDidChange,
object: nil
)
- // Update the value a first time
- computeParallelism()
+ computeParallelismInQueue()
}
deinit {
@@ -66,25 +76,31 @@ final class WorkloadParallelismHeuristic {
NotificationCenter.default.removeObserver(self, name: NSNotification.Name.NSProcessInfoPowerStateDidChange, object: nil)
}
- @objc private func computeParallelism() {
+ @objc private func computeParallelismInQueue() {
+ serialEventQueue.async {
+ self.computeParallelism()
+ }
+ }
+
+ private func computeParallelism() {
let processInfo = ProcessInfo.processInfo
// If the device is too hot we cool down now
let thermalState = processInfo.thermalState
guard thermalState != .critical else {
- currentParallelism = Self.reducedParallelism
+ currentParallelism = ParallelismDefaults.reducedParallelism
return
}
// In low power mode, we reduce parallelism
guard !processInfo.isLowPowerModeEnabled else {
- currentParallelism = Self.reducedParallelism
+ currentParallelism = ParallelismDefaults.reducedParallelism
return
}
// In extension, to reduce memory footprint, we reduce drastically parallelism
guard !appContextService.isExtension else {
- currentParallelism = Self.reducedParallelism
+ currentParallelism = ParallelismDefaults.reducedParallelism
return
}
@@ -93,14 +109,14 @@ final class WorkloadParallelismHeuristic {
// Beginning with .serious state, we start reducing the load on the system
guard thermalState != .serious else {
- currentParallelism = max(Self.reducedParallelism, parallelism / 2)
+ currentParallelism = max(ParallelismDefaults.reducedParallelism, parallelism / 2)
return
}
currentParallelism = parallelism
}
- public private(set) var currentParallelism = 0 {
+ public private(set) var currentParallelism = ParallelismDefaults.reducedParallelism {
didSet {
delegate?.parallelismShouldChange(value: currentParallelism)
}
diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift
index 8746f267c7..4357e293ea 100644
--- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift
+++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Notifications.swift
@@ -48,7 +48,7 @@ extension UploadService: UploadNotifiable {
public func sendFileUploadStateNotificationIfNeeded(with result: UploadCompletionResult) {
Log.uploadQueue("sendFileUploadStateNotificationIfNeeded")
- serialQueue.async { [weak self] in
+ serialEventQueue.async { [weak self] in
guard let self else { return }
guard let uploadFile = result.uploadFile,
uploadFile.error != .taskRescheduled,
diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift
index 07108d1f73..55f5fb4d93 100644
--- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift
+++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Observation.swift
@@ -49,7 +49,7 @@ extension UploadService: UploadObservable {
fileId: String? = nil,
using closure: @escaping (UploadFile, File?) -> Void) -> ObservationToken {
var token: ObservationToken!
- serialQueue.sync { [weak self] in
+ serialEventQueue.sync { [weak self] in
guard let self else { return }
let key = UUID()
observations.didUploadFile[key] = { [weak self, weak observer] uploadFile, driveFile in
@@ -81,7 +81,7 @@ extension UploadService: UploadObservable {
parentId: Int,
using closure: @escaping (Int, Int) -> Void) -> ObservationToken {
var token: ObservationToken!
- serialQueue.sync { [weak self] in
+ serialEventQueue.sync { [weak self] in
guard let self else { return }
let key = UUID()
observations.didChangeUploadCountInParent[key] = { [weak self, weak observer] updatedParentId, count in
@@ -111,7 +111,7 @@ extension UploadService: UploadObservable {
driveId: Int,
using closure: @escaping (Int, Int) -> Void) -> ObservationToken {
var token: ObservationToken!
- serialQueue.sync { [weak self] in
+ serialEventQueue.sync { [weak self] in
guard let self else { return }
let key = UUID()
observations.didChangeUploadCountInDrive[key] = { [weak self, weak observer] updatedDriveId, count in
diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift
index 5d49b098e8..75cec7fec9 100644
--- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift
+++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService+Publish.swift
@@ -43,7 +43,7 @@ extension UploadService: UploadPublishable {
userId: Int,
driveId: Int) {
Log.uploadQueue("publishUploadCount")
- serialQueue.async { [weak self] in
+ serialEventQueue.async { [weak self] in
guard let self else { return }
publishUploadCountInParent(parentId: parentId, userId: userId, driveId: driveId)
publishUploadCountInDrive(userId: userId, driveId: driveId)
@@ -54,7 +54,7 @@ extension UploadService: UploadPublishable {
userId: Int,
driveId: Int) {
Log.uploadQueue("publishUploadCountInParent")
- serialQueue.async { [weak self] in
+ serialEventQueue.async { [weak self] in
guard let self else { return }
let uploadCount = getUploadingFiles(withParent: parentId, userId: userId, driveId: driveId).count
@@ -69,7 +69,7 @@ extension UploadService: UploadPublishable {
public func publishUploadCountInDrive(userId: Int,
driveId: Int) {
Log.uploadQueue("publishUploadCountInDrive")
- serialQueue.async { [weak self] in
+ serialEventQueue.async { [weak self] in
guard let self else { return }
let uploadCount = getUploadingFiles(userId: userId, driveId: driveId).count
for closure in observations.didChangeUploadCountInDrive.values {
@@ -84,7 +84,7 @@ extension UploadService: UploadPublishable {
Log.uploadQueue("publishFileUploaded")
logFileUploadedWithSuccess(for: result.uploadFile)
sendFileUploadStateNotificationIfNeeded(with: result)
- serialQueue.async { [weak self] in
+ serialEventQueue.async { [weak self] in
guard let self else { return }
for closure in observations.didUploadFile.values {
guard let uploadFile = result.uploadFile, !uploadFile.isInvalidated else {
diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift
index 282998b50e..9b9e8466de 100644
--- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift
+++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadService.swift
@@ -26,19 +26,26 @@ public enum UploadServiceBackgroundIdentifier {
}
public final class UploadService {
- @LazyInjectService(customTypeIdentifier: UploadQueueID.global) var globalUploadQueue: UploadQueueable
- @LazyInjectService(customTypeIdentifier: UploadQueueID.photo) var photoUploadQueue: UploadQueueable
+ @InjectService(customTypeIdentifier: UploadQueueID.global) var globalUploadQueue: UploadQueueable
+ @InjectService(customTypeIdentifier: UploadQueueID.photo) var photoUploadQueue: UploadQueueable
+
@LazyInjectService(customTypeIdentifier: kDriveDBID.uploads) var uploadsDatabase: Transactionable
@LazyInjectService var notificationHelper: NotificationsHelpable
@LazyInjectService var appContextService: AppContextServiceable
- let serialQueue: DispatchQueue = {
- @LazyInjectService var appContextService: AppContextServiceable
+ private let serialTransactionQueue = DispatchQueue(
+ label: "com.infomaniak.drive.upload-service.rebuild-uploads",
+ qos: .default,
+ autoreleaseFrequency: .workItem
+ )
+
+ let serialEventQueue: DispatchQueue = {
+ @InjectService var appContextService: AppContextServiceable
let autoreleaseFrequency: DispatchQueue.AutoreleaseFrequency = appContextService.isExtension ? .workItem : .inherit
return DispatchQueue(
- label: "com.infomaniak.drive.upload-service",
- qos: .userInitiated,
+ label: "com.infomaniak.drive.upload-service.event",
+ qos: .default,
autoreleaseFrequency: autoreleaseFrequency
)
}()
@@ -59,9 +66,7 @@ public final class UploadService {
public var pausedNotificationSent = false
public init() {
- Task {
- rebuildUploadQueueFromObjectsInRealm()
- }
+ rebuildUploadQueue()
}
}
@@ -85,43 +90,52 @@ extension UploadService: UploadServiceable {
allQueues.allSatisfy(\.isSuspended)
}
- public func rebuildUploadQueueFromObjectsInRealm() {
- Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm")
- serialQueue.sync {
- // Clean cache if necessary before we try to restart the uploads.
- @InjectService var freeSpaceService: FreeSpaceService
- freeSpaceService.cleanCacheIfAlmostFull()
+ public func blockingRebuildUploadQueue() {
+ serialTransactionQueue.sync {
+ self.rebuildUploadQueueFromObjectsInRealm()
+ }
+ }
- guard let uploadFileQuery else {
- Log.uploadQueue("\(#function) disabled in \(appContextService.context.rawValue)", level: .error)
- return
- }
+ public func rebuildUploadQueue() {
+ serialTransactionQueue.async {
+ self.rebuildUploadQueueFromObjectsInRealm()
+ }
+ }
- let uploadingFiles = uploadsDatabase.fetchResults(ofType: UploadFile.self) { lazyCollection in
- return lazyCollection.filter(uploadFileQuery)
- .sorted(byKeyPath: "taskCreationDate")
- }
+ private func rebuildUploadQueueFromObjectsInRealm() {
+ Log.uploadQueue("rebuildUploadQueue")
+ @InjectService var freeSpaceService: FreeSpaceService
+ freeSpaceService.cleanCacheIfAlmostFull()
- let uploadingFileIds = Array(uploadingFiles.map(\.id))
- Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm uploads to restart:\(uploadingFileIds.count)")
+ guard let uploadFileQuery else {
+ Log.uploadQueue("\(#function) disabled in \(appContextService.context.rawValue)", level: .error)
+ return
+ }
- let batches = uploadingFileIds.chunks(ofCount: 100)
- Log.uploadQueue("batched count:\(batches.count)")
- for batch in batches {
- Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm in batch")
- let batchArray = Array(batch)
- let matchedFrozenFiles = uploadsDatabase.fetchResults(ofType: UploadFile.self) { lazyCollection in
- lazyCollection.filter("id IN %@", batchArray).freezeIfNeeded()
- }
- for file in matchedFrozenFiles {
- let uploadQueue = uploadQueue(for: file)
- uploadQueue.addToQueueIfNecessary(uploadFile: file, itemIdentifier: nil)
- }
- resumeAllOperations()
- }
+ let uploadingFiles = uploadsDatabase.fetchResults(ofType: UploadFile.self) { lazyCollection in
+ return lazyCollection.filter(uploadFileQuery)
+ .sorted(byKeyPath: "taskCreationDate")
+ }
- Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm exit")
+ let uploadingFileIds = Array(uploadingFiles.map(\.id))
+ Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm uploads to restart:\(uploadingFileIds.count)")
+
+ let batches = uploadingFileIds.chunks(ofCount: 100)
+ Log.uploadQueue("batched count:\(batches.count)")
+ for batch in batches {
+ Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm in batch")
+ let batchArray = Array(batch)
+ let matchedFrozenFiles = uploadsDatabase.fetchResults(ofType: UploadFile.self) { lazyCollection in
+ lazyCollection.filter("id IN %@", batchArray).freezeIfNeeded()
+ }
+ for file in matchedFrozenFiles {
+ let uploadQueue = uploadQueue(for: file)
+ uploadQueue.addToQueueIfNecessary(uploadFile: file, itemIdentifier: nil)
+ }
+ resumeAllOperations()
}
+
+ Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm exit")
}
public func suspendAllOperations() {
@@ -147,13 +161,13 @@ extension UploadService: UploadServiceable {
return
}
- Task {
+ serialTransactionQueue.async {
guard let frozenFile = self.uploadsDatabase.fetchObject(ofType: UploadFile.self, forPrimaryKey: uploadFileId)?
.freeze() else {
return
}
- let specificQueue = uploadQueue(for: frozenFile)
+ let specificQueue = self.uploadQueue(for: frozenFile)
try? self.uploadsDatabase.writeTransaction { writableRealm in
guard let file = writableRealm.object(ofType: UploadFile.self, forPrimaryKey: uploadFileId),
@@ -168,7 +182,7 @@ extension UploadService: UploadServiceable {
}
specificQueue.addToQueue(uploadFile: frozenFile, itemIdentifier: nil)
- resumeAllOperations()
+ self.resumeAllOperations()
}
}
@@ -179,16 +193,16 @@ extension UploadService: UploadServiceable {
return
}
- Task {
- let failedFileIds = getFailedFileIds(parentId: parentId, userId: userId, driveId: driveId)
+ serialTransactionQueue.async {
+ let failedFileIds = self.getFailedFileIds(parentId: parentId, userId: userId, driveId: driveId)
let batches = failedFileIds.chunks(ofCount: 100)
Log.uploadQueue("batches:\(batches.count)")
- resumeAllOperations()
+ self.resumeAllOperations()
for batch in batches {
- cancelAnyInBatch(batch)
- enqueueAnyInBatch(batch)
+ self.cancelAnyInBatch(batch)
+ self.enqueueAnyInBatch(batch)
}
}
}
@@ -252,14 +266,14 @@ extension UploadService: UploadServiceable {
return
}
- Task {
- suspendAllOperations()
+ serialTransactionQueue.async {
+ self.suspendAllOperations()
defer {
- resumeAllOperations()
+ self.resumeAllOperations()
Log.uploadQueue("cancelAllOperations finished")
}
- let uploadingFiles = getUploadingFiles(withParent: parentId, userId: userId, driveId: driveId)
+ let uploadingFiles = self.getUploadingFiles(withParent: parentId, userId: userId, driveId: driveId)
let allUploadingFilesIds = Array(uploadingFiles.map(\.id))
let photoSyncUploadingFilesIds: [String] = uploadingFiles.compactMap { uploadFile in
guard uploadFile.isPhotoSyncUpload else { return nil }
@@ -277,7 +291,7 @@ extension UploadService: UploadServiceable {
Log.uploadQueue("cancelAllOperations IDS count:\(allUploadingFilesIds.count) parentId:\(parentId)")
- try? uploadsDatabase.writeTransaction { writableRealm in
+ try? self.uploadsDatabase.writeTransaction { writableRealm in
// Delete all the linked UploadFiles from Realm. This is fast.
Log.uploadQueue("delete all matching files count:\(uploadingFiles.count) parentId:\(parentId)")
let objectsToDelete = writableRealm.objects(UploadFile.self).filter("id IN %@", allUploadingFilesIds)
@@ -286,10 +300,10 @@ extension UploadService: UploadServiceable {
Log.uploadQueue("Done deleting all matching files for parentId:\(parentId)")
}
- globalUploadQueue.cancelAllOperations(uploadingFilesIds: globalUploadingFilesIds)
- photoUploadQueue.cancelAllOperations(uploadingFilesIds: photoSyncUploadingFilesIds)
+ self.globalUploadQueue.cancelAllOperations(uploadingFilesIds: globalUploadingFilesIds)
+ self.photoUploadQueue.cancelAllOperations(uploadingFilesIds: photoSyncUploadingFilesIds)
- publishUploadCount(withParent: parentId, userId: userId, driveId: driveId)
+ self.publishUploadCount(withParent: parentId, userId: userId, driveId: driveId)
}
}
diff --git a/kDriveCore/Data/Upload/Servicies/UploadService/UploadServiceable.swift b/kDriveCore/Data/Upload/Servicies/UploadService/UploadServiceable.swift
index f733e9f32b..bb671d741a 100644
--- a/kDriveCore/Data/Upload/Servicies/UploadService/UploadServiceable.swift
+++ b/kDriveCore/Data/Upload/Servicies/UploadService/UploadServiceable.swift
@@ -28,7 +28,9 @@ public protocol UploadServiceable {
var operationCount: Int { get }
- func rebuildUploadQueueFromObjectsInRealm()
+ func blockingRebuildUploadQueue()
+
+ func rebuildUploadQueue()
func suspendAllOperations()
diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift
index 4c6e6a1694..6421f77b48 100644
--- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift
+++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue+Queue.swift
@@ -34,6 +34,10 @@ extension UploadQueue: UploadQueueable {
operationQueue.isSuspended
}
+ public var isActive: Bool {
+ operationQueue.operationCount > 0 && !operationQueue.isSuspended
+ }
+
public func waitForCompletion(_ completionHandler: @escaping () -> Void) {
Log.uploadQueue("waitForCompletion")
DispatchQueue.global(qos: .default).async {
@@ -163,14 +167,14 @@ extension UploadQueue: UploadQueueable {
return
}
- Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm ufid:\(uploadFile.id)")
+ Log.uploadQueue("addToQueueIfNecessary ufid:\(uploadFile.id)")
guard operation(uploadFileId: uploadFile.id) != nil else {
- Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm ADD ufid:\(uploadFile.id)")
+ Log.uploadQueue("addToQueueIfNecessary ADD ufid:\(uploadFile.id)")
addToQueue(uploadFile: uploadFile, itemIdentifier: nil)
return
}
- Log.uploadQueue("rebuildUploadQueueFromObjectsInRealm NOOP ufid:\(uploadFile.id)")
+ Log.uploadQueue("addToQueueIfNecessary NOOP ufid:\(uploadFile.id)")
}
@discardableResult
diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift
index 8f50c19920..69990d7580 100644
--- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift
+++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueue.swift
@@ -23,18 +23,22 @@ import InfomaniakDI
import RealmSwift
import Sentry
-public class UploadQueue: ParallelismHeuristicDelegate {
- private var memoryPressure: DispatchSourceMemoryPressure?
+public protocol UploadQueueDelegate: AnyObject {
+ func operationQueueBecameEmpty(_ queue: UploadQueue)
+ func operationQueueNoLongerEmpty(_ queue: UploadQueue)
+}
+public class UploadQueue: ParallelismHeuristicDelegate {
@LazyInjectService var appContextService: AppContextServiceable
@LazyInjectService var uploadPublisher: UploadPublishable
+ private var queueObserver: UploadQueueObserver?
+
+ weak var delegate: UploadQueueDelegate?
+
/// Something to track an operation for a File ID
let keyedUploadOperations = KeyedUploadOperationable()
- /// Something to adapt the upload parallelism live
- var uploadParallelismHeuristic: WorkloadParallelismHeuristic?
-
public lazy var operationQueue: OperationQueue = {
let queue = OperationQueue()
queue.name = "kDrive upload queue"
@@ -69,62 +73,28 @@ public class UploadQueue: ParallelismHeuristicDelegate {
/// Should suspend operation queue based on explicit `suspendAllOperations()` call
var forceSuspendQueue = false
- public init() {
+ public init(delegate: UploadQueueDelegate?) {
guard appContextService.context != .shareExtension else {
Log.uploadQueue("UploadQueue disabled in ShareExtension", level: .error)
return
}
- Log.uploadQueue("Starting up")
+ self.delegate = delegate
- uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self)
-
- // Observe network state change
ReachabilityListener.instance.observeNetworkChange(self) { [weak self] _ in
- guard let self else {
- return
- }
+ guard let self else { return }
let isSuspended = (shouldSuspendQueue || forceSuspendQueue)
operationQueue.isSuspended = isSuspended
Log.uploadQueue("observeNetworkChange :\(isSuspended)")
}
- observeMemoryWarnings()
-
- Log.uploadQueue("UploadQueue parallelism is:\(operationQueue.maxConcurrentOperationCount)")
- }
-
- // MARK: - Memory warnings
-
- /// A critical memory warning in `FileProvider` context will reschedule, in order to transition uploads to Main App.
- private func observeMemoryWarnings() {
- guard appContextService.context == .fileProviderExtension else {
- return
- }
-
- let source = DispatchSource.makeMemoryPressureSource(eventMask: .all, queue: .main)
- memoryPressure = source
- source.setEventHandler {
- let event: DispatchSource.MemoryPressureEvent = source.data
- switch event {
- case DispatchSource.MemoryPressureEvent.normal:
- Log.uploadQueue("MemoryPressureEvent normal", level: .info)
- case DispatchSource.MemoryPressureEvent.warning:
- Log.uploadQueue("MemoryPressureEvent warning", level: .info)
- case DispatchSource.MemoryPressureEvent.critical:
- Log.uploadQueue("MemoryPressureEvent critical", level: .error)
- self.rescheduleRunningOperations()
- default:
- break
- }
- }
- source.resume()
+ queueObserver = UploadQueueObserver(uploadQueue: self, delegate: delegate)
}
// MARK: - ParallelismHeuristicDelegate
- func parallelismShouldChange(value: Int) {
+ public func parallelismShouldChange(value: Int) {
Log.uploadQueue("Upload queue new parallelism: \(value)", level: .info)
operationQueue.maxConcurrentOperationCount = value
}
diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift
new file mode 100644
index 0000000000..d977f15c87
--- /dev/null
+++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueObserver.swift
@@ -0,0 +1,68 @@
+/*
+ Infomaniak kDrive - iOS App
+ Copyright (C) 2025 Infomaniak Network SA
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see .
+ */
+
+import Foundation
+
+public final class UploadQueueObserver {
+ private var observation: NSKeyValueObservation?
+
+ private let serialEventQueue = DispatchQueue(
+ label: "com.infomaniak.drive.upload-queue-observer.event.\(UUID().uuidString)",
+ qos: .default
+ )
+
+ var uploadQueue: UploadQueue
+ weak var delegate: UploadQueueDelegate?
+
+ init(uploadQueue: UploadQueue, delegate: UploadQueueDelegate?) {
+ self.uploadQueue = uploadQueue
+ self.delegate = delegate
+
+ setupObservation()
+ }
+
+ private func setupObservation() {
+ observation = uploadQueue.operationQueue.observe(\.operationCount, options: [
+ .new,
+ .old
+ ]) { [weak self] _, change in
+ guard let self else { return }
+ self.serialEventQueue.async {
+ self.operationCountDidChange(previousCount: change.oldValue, newCount: change.newValue)
+ }
+ }
+ }
+
+ private func operationCountDidChange(previousCount: Int?, newCount: Int?) {
+ guard let newCount else {
+ delegate?.operationQueueBecameEmpty(uploadQueue)
+ return
+ }
+
+ guard let previousCount else {
+ delegate?.operationQueueNoLongerEmpty(uploadQueue)
+ return
+ }
+
+ if newCount == 0 {
+ delegate?.operationQueueBecameEmpty(uploadQueue)
+ } else if previousCount == 0 && newCount > 0 {
+ delegate?.operationQueueNoLongerEmpty(uploadQueue)
+ }
+ }
+}
diff --git a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift
index 1bab7aa131..79a8eef15f 100644
--- a/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift
+++ b/kDriveCore/Data/Upload/UploadQueue/Queue/UploadQueueable.swift
@@ -38,9 +38,13 @@ public protocol UploadQueueable {
func rescheduleRunningOperations()
+ func parallelismShouldChange(value: Int)
+
func cancel(uploadFileId: String)
var operationCount: Int { get }
var isSuspended: Bool { get }
+
+ var isActive: Bool { get }
}
diff --git a/kDriveCore/Services/BackgroundTasksService.swift b/kDriveCore/Services/BackgroundTasksService.swift
index fd7dfe1e48..3df93fd41a 100644
--- a/kDriveCore/Services/BackgroundTasksService.swift
+++ b/kDriveCore/Services/BackgroundTasksService.swift
@@ -121,7 +121,7 @@ struct BackgroundTasksService: BackgroundTasksServiceable {
}
Log.backgroundTaskScheduling("Reload operations in queue")
- uploadService.rebuildUploadQueueFromObjectsInRealm()
+ uploadService.blockingRebuildUploadQueue()
guard !expiringActivity.shouldTerminate else {
Log.backgroundTaskScheduling(Self.activityShouldTerminateMessage, level: .error)