Skip to content

feat: Dynamic parallelism mechanism for all UploadQueues #1472

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
dbaeaf4
refactor(UploadService): Moved parallelism observation to uploadService
adrien-coye Mar 14, 2025
acf3387
feat(UploadQueueObserver): New mechanism to observe if a an upload qu…
adrien-coye Mar 14, 2025
d7681c9
feat(UploadParallelismOrchestrator): New object managing dynamically …
adrien-coye Mar 14, 2025
7c293e9
refactor(WorkloadParallelismHeuristic): Better default depth
adrien-coye Mar 14, 2025
cb16c56
fix(WorkloadParallelismHeuristic): Remove side effect at init that wo…
adrien-coye Mar 14, 2025
04f1cec
refactor(UploadQueue): New isActive property
adrien-coye Mar 14, 2025
7927909
feat(UploadParallelismOrchestrator): Implementation of the dynamic up…
adrien-coye Mar 14, 2025
480262f
feat(ParallelismDefaults): Enum to clarify default values
adrien-coye Mar 14, 2025
eab1046
fix(WorkloadParallelismHeuristic): Make sure to compute a first time …
adrien-coye Mar 18, 2025
f77b276
fix(UploadService): Make sure to rebuild the upload queues on init ou…
adrien-coye Mar 18, 2025
499b265
fix(UploadQueueObserver): Only notify operationQueueNoLongerEmpty whe…
adrien-coye Mar 18, 2025
a1e9344
refactor(UploadParallelismOrchestrator): Switched set logic for simpl…
adrien-coye Mar 20, 2025
19e83c9
refactor(UploadService): Dedicated queues for events and rebuilding t…
adrien-coye Mar 20, 2025
420fd57
refactor(UploadService): Explicit blocking and non blocking rebuildUp…
adrien-coye Mar 21, 2025
f816a62
refactor(WorkloadParallelismHeuristic): Internal serial event queue
adrien-coye Mar 21, 2025
a08c87e
refactor(UploadParallelismOrchestrator): Internal serial event queue
adrien-coye Mar 21, 2025
4301669
fix(WorkloadParallelismHeuristic): Double call to computeParallelism
adrien-coye Mar 21, 2025
354178a
refactor(UploadParallelismOrchestrator): Notify memory pressure on an…
adrien-coye Mar 21, 2025
9e2540c
refactor(UploadQueueObserver): Notify observation on an internal queue
adrien-coye Mar 21, 2025
eabe82d
refactor(UploadQueueObserver): Observation setup in a dedicated metho…
adrien-coye Mar 21, 2025
c95f736
refactor(UploadService): Removed extraneous lazy in DI call
adrien-coye Mar 21, 2025
9cdd583
refactor(UploadQueueObserver): Split some existing code to reduce com…
adrien-coye Mar 24, 2025
d84dd54
refactor(UploadQueueObserver): Stateless implementation
adrien-coye Mar 25, 2025
0acebfb
chore: Fix name of function in log event
adrien-coye Mar 25, 2025
a3faa7f
refactor(UploadService): Using internal serial queue for complex tran…
adrien-coye Mar 25, 2025
1910c76
refactor(UploadService): Simpler init since the called method runs on…
adrien-coye Mar 25, 2025
fe72c15
refactor(UploadService): New setupObservation function
adrien-coye Mar 25, 2025
f0fae91
refactor(UploadParallelismOrchestrator): New setupObservation function
adrien-coye Mar 25, 2025
ba9d70a
chore(UploadService): Removed comment
adrien-coye Mar 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kDrive/AppRouter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ public struct AppRouter: AppNavigable {

// Resolving an upload queue will restart it if this is the first time
@InjectService var uploadService: UploadServiceable
uploadService.rebuildUploadQueueFromObjectsInRealm()
uploadService.rebuildUploadQueue()
}

// MARK: RouterFileNavigable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,11 @@ extension PhotoSyncSettingsViewController: FooterButtonDelegate {
sender.setLoading(false)
}

DispatchQueue.global(qos: .userInitiated).async {
DispatchQueue.global(qos: .default).async {
// Add new pictures to be uploaded and reload upload queue
self.photoLibraryUploader.scheduleNewPicturesForUpload()
@InjectService var uploadService: UploadServiceable
uploadService.rebuildUploadQueueFromObjectsInRealm()
uploadService.rebuildUploadQueue()
}
}
}
Expand Down
21 changes: 17 additions & 4 deletions kDriveCore/DI/FactoryService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public enum FactoryService {
factoryParameters: nil,
resolver: resolver)
},
Factory(type: UploadQueueDelegate.self) { _, _ in
UploadParallelismOrchestrator()
},
Factory(type: BGTaskScheduler.self) { _, _ in
BGTaskScheduler.shared
},
Expand Down Expand Up @@ -232,12 +235,22 @@ public enum FactoryService {
}

static var uploadQueues: [FactoryWithIdentifier] {
let globalUploadQueue = Factory(type: UploadQueueable.self) { _, _ in
UploadQueue()
let globalUploadQueue = Factory(type: UploadQueueable.self) { _, resolver in
let uploadQueueDelegate = try resolver.resolve(type: UploadQueueDelegate.self,
forCustomTypeIdentifier: nil,
factoryParameters: nil,
resolver: resolver)

return UploadQueue(delegate: uploadQueueDelegate)
}

let photoUploadQueue = Factory(type: UploadQueueable.self) { _, _ in
PhotoUploadQueue()
let photoUploadQueue = Factory(type: UploadQueueable.self) { _, resolver in
let uploadQueueDelegate = try resolver.resolve(type: UploadQueueDelegate.self,
forCustomTypeIdentifier: nil,
factoryParameters: nil,
resolver: resolver)

return PhotoUploadQueue(delegate: uploadQueueDelegate)
}

let services = [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Infomaniak kDrive - iOS App
Copyright (C) 2025 Infomaniak Network SA

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

import Foundation
import InfomaniakDI

public final class UploadParallelismOrchestrator {
@LazyInjectService(customTypeIdentifier: UploadQueueID.global) private var globalUploadQueue: UploadQueueable
@LazyInjectService(customTypeIdentifier: UploadQueueID.photo) private var photoUploadQueue: UploadQueueable
@LazyInjectService private var appContextService: AppContextServiceable

private let serialEventQueue = DispatchQueue(
label: "com.infomaniak.drive.upload-parallelism-orchestrator.event",
qos: .default
)

private var uploadParallelismHeuristic: WorkloadParallelismHeuristic?
private var memoryPressureObserver: DispatchSourceMemoryPressure?

private var availableParallelism: Int {
guard let uploadParallelismHeuristic else {
return ParallelismDefaults.reducedParallelism
}
return uploadParallelismHeuristic.currentParallelism
}

private lazy var allQueues = [globalUploadQueue, photoUploadQueue]

public init() {
setupObservation()
}

private func setupObservation() {
serialEventQueue.async {
self.observeMemoryWarnings()
self.uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self)
}
}

func observeMemoryWarnings() {
guard appContextService.context == .fileProviderExtension else {
return
}

let source = DispatchSource.makeMemoryPressureSource(eventMask: .all, queue: .main)
memoryPressureObserver = source
source.setEventHandler { [weak self] in
guard let self else { return }
let event: DispatchSource.MemoryPressureEvent = source.data
switch event {
case DispatchSource.MemoryPressureEvent.normal:
Log.uploadQueue("MemoryPressureEvent normal", level: .info)
case DispatchSource.MemoryPressureEvent.warning:
Log.uploadQueue("MemoryPressureEvent warning", level: .info)
case DispatchSource.MemoryPressureEvent.critical:
Log.uploadQueue("MemoryPressureEvent critical", level: .error)
serialEventQueue.async {
@InjectService var uploadService: UploadServiceable
uploadService.rescheduleRunningOperations()
}
default:
break
}
}
source.resume()
}

private func computeUploadParallelismPerQueueAndApply() {
serialEventQueue.async {
let currentAvailableParallelism = self.availableParallelism
Log.uploadQueue("Current total available upload parallelism :\(currentAvailableParallelism)")

let activeQueues = self.allQueues.filter(\.isActive)
let inactiveQueues = self.allQueues.filter { !$0.isActive }

assert(activeQueues.count + inactiveQueues.count == self.allQueues.count, "queue count should match")

inactiveQueues.forEach { $0.parallelismShouldChange(value: ParallelismDefaults.serial) }

Log.uploadQueue("Inactive queues:\(inactiveQueues.count) set to serial")
guard !activeQueues.isEmpty else {
Log.uploadQueue("No active queues")
return
}

let parallelismPerActiveQueue = max(ParallelismDefaults.serial, currentAvailableParallelism / activeQueues.count)
Log.uploadQueue("Active queues \(activeQueues.count) new parallelism:\(parallelismPerActiveQueue)")
activeQueues.forEach { $0.parallelismShouldChange(value: parallelismPerActiveQueue) }
}
}
}

extension UploadParallelismOrchestrator: UploadQueueDelegate {
public func operationQueueBecameEmpty(_ queue: UploadQueue) {
computeUploadParallelismPerQueueAndApply()
}

public func operationQueueNoLongerEmpty(_ queue: UploadQueue) {
computeUploadParallelismPerQueueAndApply()
}
}

extension UploadParallelismOrchestrator: ParallelismHeuristicDelegate {
public func parallelismShouldChange(value: Int) {
computeUploadParallelismPerQueueAndApply()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@
import Foundation
import InfomaniakDI

public enum ParallelismDefaults {
static let reducedParallelism = 2

static let serial = 1
}

/// Delegate protocol of UploadParallelismHeuristic
protocol ParallelismHeuristicDelegate: AnyObject {
public protocol ParallelismHeuristicDelegate: AnyObject {
/// This method is called with a new parallelism to apply each time to the uploadQueue
/// - Parameter value: The new parallelism value to use
func parallelismShouldChange(value: Int)
Expand All @@ -30,61 +36,71 @@ protocol ParallelismHeuristicDelegate: AnyObject {
///
/// Value can change depending on many factors, including thermal state battery or extension mode.
/// Scaling is achieved given the number of active cores available.
final class WorkloadParallelismHeuristic {
/// With 2 Operations max, and a chuck of 1MiB max, the UploadQueue can spike to max 4MiB memory usage.
private static let reducedParallelism = 2

public final class WorkloadParallelismHeuristic {
@LazyInjectService private var appContextService: AppContextServiceable

private weak var delegate: ParallelismHeuristicDelegate?

private let serialEventQueue = DispatchQueue(
label: "com.infomaniak.drive.parallelism-heuristic.event",
qos: .default
)

init(delegate: ParallelismHeuristicDelegate) {
self.delegate = delegate
setupObservation()
}

private func setupObservation() {
// Update on thermal change
NotificationCenter.default.addObserver(
self,
selector: #selector(computeParallelism),
selector: #selector(computeParallelismInQueue),
name: ProcessInfo.thermalStateDidChangeNotification,
object: nil
)

// Update on low power mode
NotificationCenter.default.addObserver(
self,
selector: #selector(computeParallelism),
selector: #selector(computeParallelismInQueue),
name: NSNotification.Name.NSProcessInfoPowerStateDidChange,
object: nil
)

// Update the value a first time
computeParallelism()
computeParallelismInQueue()
}

deinit {
NotificationCenter.default.removeObserver(self, name: ProcessInfo.thermalStateDidChangeNotification, object: nil)
NotificationCenter.default.removeObserver(self, name: NSNotification.Name.NSProcessInfoPowerStateDidChange, object: nil)
}

@objc private func computeParallelism() {
@objc private func computeParallelismInQueue() {
serialEventQueue.async {
self.computeParallelism()
}
}

private func computeParallelism() {
let processInfo = ProcessInfo.processInfo

// If the device is too hot we cool down now
let thermalState = processInfo.thermalState
guard thermalState != .critical else {
currentParallelism = Self.reducedParallelism
currentParallelism = ParallelismDefaults.reducedParallelism
return
}

// In low power mode, we reduce parallelism
guard !processInfo.isLowPowerModeEnabled else {
currentParallelism = Self.reducedParallelism
currentParallelism = ParallelismDefaults.reducedParallelism
return
}

// In extension, to reduce memory footprint, we reduce drastically parallelism
guard !appContextService.isExtension else {
currentParallelism = Self.reducedParallelism
currentParallelism = ParallelismDefaults.reducedParallelism
return
}

Expand All @@ -93,14 +109,14 @@ final class WorkloadParallelismHeuristic {

// Beginning with .serious state, we start reducing the load on the system
guard thermalState != .serious else {
currentParallelism = max(Self.reducedParallelism, parallelism / 2)
currentParallelism = max(ParallelismDefaults.reducedParallelism, parallelism / 2)
return
}

currentParallelism = parallelism
}

public private(set) var currentParallelism = 0 {
public private(set) var currentParallelism = ParallelismDefaults.reducedParallelism {
didSet {
delegate?.parallelismShouldChange(value: currentParallelism)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ extension UploadService: UploadNotifiable {

public func sendFileUploadStateNotificationIfNeeded(with result: UploadCompletionResult) {
Log.uploadQueue("sendFileUploadStateNotificationIfNeeded")
serialQueue.async { [weak self] in
serialEventQueue.async { [weak self] in
guard let self else { return }
guard let uploadFile = result.uploadFile,
uploadFile.error != .taskRescheduled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ extension UploadService: UploadObservable {
fileId: String? = nil,
using closure: @escaping (UploadFile, File?) -> Void) -> ObservationToken {
var token: ObservationToken!
serialQueue.sync { [weak self] in
serialEventQueue.sync { [weak self] in
guard let self else { return }
let key = UUID()
observations.didUploadFile[key] = { [weak self, weak observer] uploadFile, driveFile in
Expand Down Expand Up @@ -81,7 +81,7 @@ extension UploadService: UploadObservable {
parentId: Int,
using closure: @escaping (Int, Int) -> Void) -> ObservationToken {
var token: ObservationToken!
serialQueue.sync { [weak self] in
serialEventQueue.sync { [weak self] in
guard let self else { return }
let key = UUID()
observations.didChangeUploadCountInParent[key] = { [weak self, weak observer] updatedParentId, count in
Expand Down Expand Up @@ -111,7 +111,7 @@ extension UploadService: UploadObservable {
driveId: Int,
using closure: @escaping (Int, Int) -> Void) -> ObservationToken {
var token: ObservationToken!
serialQueue.sync { [weak self] in
serialEventQueue.sync { [weak self] in
guard let self else { return }
let key = UUID()
observations.didChangeUploadCountInDrive[key] = { [weak self, weak observer] updatedDriveId, count in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ extension UploadService: UploadPublishable {
userId: Int,
driveId: Int) {
Log.uploadQueue("publishUploadCount")
serialQueue.async { [weak self] in
serialEventQueue.async { [weak self] in
guard let self else { return }
publishUploadCountInParent(parentId: parentId, userId: userId, driveId: driveId)
publishUploadCountInDrive(userId: userId, driveId: driveId)
Expand All @@ -54,7 +54,7 @@ extension UploadService: UploadPublishable {
userId: Int,
driveId: Int) {
Log.uploadQueue("publishUploadCountInParent")
serialQueue.async { [weak self] in
serialEventQueue.async { [weak self] in
guard let self else { return }

let uploadCount = getUploadingFiles(withParent: parentId, userId: userId, driveId: driveId).count
Expand All @@ -69,7 +69,7 @@ extension UploadService: UploadPublishable {
public func publishUploadCountInDrive(userId: Int,
driveId: Int) {
Log.uploadQueue("publishUploadCountInDrive")
serialQueue.async { [weak self] in
serialEventQueue.async { [weak self] in
guard let self else { return }
let uploadCount = getUploadingFiles(userId: userId, driveId: driveId).count
for closure in observations.didChangeUploadCountInDrive.values {
Expand All @@ -84,7 +84,7 @@ extension UploadService: UploadPublishable {
Log.uploadQueue("publishFileUploaded")
logFileUploadedWithSuccess(for: result.uploadFile)
sendFileUploadStateNotificationIfNeeded(with: result)
serialQueue.async { [weak self] in
serialEventQueue.async { [weak self] in
guard let self else { return }
for closure in observations.didUploadFile.values {
guard let uploadFile = result.uploadFile, !uploadFile.isInvalidated else {
Expand Down
Loading