-
Notifications
You must be signed in to change notification settings - Fork 8
feat: Dynamic parallelism mechanism for all UploadQueues #1472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
adrien-coye
merged 29 commits into
feat/photo-upload-queue
from
feat/upload-parallelism
Mar 26, 2025
Merged
Changes from all commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
dbaeaf4
refactor(UploadService): Moved parallelism observation to uploadService
adrien-coye acf3387
feat(UploadQueueObserver): New mechanism to observe if a an upload qu…
adrien-coye d7681c9
feat(UploadParallelismOrchestrator): New object managing dynamically …
adrien-coye 7c293e9
refactor(WorkloadParallelismHeuristic): Better default depth
adrien-coye cb16c56
fix(WorkloadParallelismHeuristic): Remove side effect at init that wo…
adrien-coye 04f1cec
refactor(UploadQueue): New isActive property
adrien-coye 7927909
feat(UploadParallelismOrchestrator): Implementation of the dynamic up…
adrien-coye 480262f
feat(ParallelismDefaults): Enum to clarify default values
adrien-coye eab1046
fix(WorkloadParallelismHeuristic): Make sure to compute a first time …
adrien-coye f77b276
fix(UploadService): Make sure to rebuild the upload queues on init ou…
adrien-coye 499b265
fix(UploadQueueObserver): Only notify operationQueueNoLongerEmpty whe…
adrien-coye a1e9344
refactor(UploadParallelismOrchestrator): Switched set logic for simpl…
adrien-coye 19e83c9
refactor(UploadService): Dedicated queues for events and rebuilding t…
adrien-coye 420fd57
refactor(UploadService): Explicit blocking and non blocking rebuildUp…
adrien-coye f816a62
refactor(WorkloadParallelismHeuristic): Internal serial event queue
adrien-coye a08c87e
refactor(UploadParallelismOrchestrator): Internal serial event queue
adrien-coye 4301669
fix(WorkloadParallelismHeuristic): Double call to computeParallelism
adrien-coye 354178a
refactor(UploadParallelismOrchestrator): Notify memory pressure on an…
adrien-coye 9e2540c
refactor(UploadQueueObserver): Notify observation on an internal queue
adrien-coye eabe82d
refactor(UploadQueueObserver): Observation setup in a dedicated metho…
adrien-coye c95f736
refactor(UploadService): Removed extraneous lazy in DI call
adrien-coye 9cdd583
refactor(UploadQueueObserver): Split some existing code to reduce com…
adrien-coye d84dd54
refactor(UploadQueueObserver): Stateless implementation
adrien-coye 0acebfb
chore: Fix name of function in log event
adrien-coye a3faa7f
refactor(UploadService): Using internal serial queue for complex tran…
adrien-coye 1910c76
refactor(UploadService): Simpler init since the called method runs on…
adrien-coye fe72c15
refactor(UploadService): New setupObservation function
adrien-coye f0fae91
refactor(UploadParallelismOrchestrator): New setupObservation function
adrien-coye ba9d70a
chore(UploadService): Removed comment
adrien-coye File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
122 changes: 122 additions & 0 deletions
122
...eCore/Data/Upload/Servicies/UploadService/Parallelism/UploadParallelismOrchestrator.swift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
Infomaniak kDrive - iOS App | ||
Copyright (C) 2025 Infomaniak Network SA | ||
|
||
This program is free software: you can redistribute it and/or modify | ||
it under the terms of the GNU General Public License as published by | ||
the Free Software Foundation, either version 3 of the License, or | ||
(at your option) any later version. | ||
|
||
This program is distributed in the hope that it will be useful, | ||
but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
GNU General Public License for more details. | ||
|
||
You should have received a copy of the GNU General Public License | ||
along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
import Foundation | ||
import InfomaniakDI | ||
|
||
public final class UploadParallelismOrchestrator { | ||
@LazyInjectService(customTypeIdentifier: UploadQueueID.global) private var globalUploadQueue: UploadQueueable | ||
@LazyInjectService(customTypeIdentifier: UploadQueueID.photo) private var photoUploadQueue: UploadQueueable | ||
@LazyInjectService private var appContextService: AppContextServiceable | ||
|
||
private let serialEventQueue = DispatchQueue( | ||
label: "com.infomaniak.drive.upload-parallelism-orchestrator.event", | ||
qos: .default | ||
) | ||
|
||
private var uploadParallelismHeuristic: WorkloadParallelismHeuristic? | ||
private var memoryPressureObserver: DispatchSourceMemoryPressure? | ||
|
||
private var availableParallelism: Int { | ||
guard let uploadParallelismHeuristic else { | ||
return ParallelismDefaults.reducedParallelism | ||
} | ||
return uploadParallelismHeuristic.currentParallelism | ||
} | ||
|
||
private lazy var allQueues = [globalUploadQueue, photoUploadQueue] | ||
|
||
public init() { | ||
setupObservation() | ||
} | ||
|
||
private func setupObservation() { | ||
serialEventQueue.async { | ||
self.observeMemoryWarnings() | ||
self.uploadParallelismHeuristic = WorkloadParallelismHeuristic(delegate: self) | ||
} | ||
} | ||
|
||
func observeMemoryWarnings() { | ||
guard appContextService.context == .fileProviderExtension else { | ||
return | ||
} | ||
|
||
let source = DispatchSource.makeMemoryPressureSource(eventMask: .all, queue: .main) | ||
memoryPressureObserver = source | ||
source.setEventHandler { [weak self] in | ||
guard let self else { return } | ||
let event: DispatchSource.MemoryPressureEvent = source.data | ||
switch event { | ||
case DispatchSource.MemoryPressureEvent.normal: | ||
Log.uploadQueue("MemoryPressureEvent normal", level: .info) | ||
case DispatchSource.MemoryPressureEvent.warning: | ||
Log.uploadQueue("MemoryPressureEvent warning", level: .info) | ||
case DispatchSource.MemoryPressureEvent.critical: | ||
Log.uploadQueue("MemoryPressureEvent critical", level: .error) | ||
serialEventQueue.async { | ||
@InjectService var uploadService: UploadServiceable | ||
uploadService.rescheduleRunningOperations() | ||
} | ||
default: | ||
break | ||
} | ||
} | ||
source.resume() | ||
} | ||
|
||
private func computeUploadParallelismPerQueueAndApply() { | ||
serialEventQueue.async { | ||
let currentAvailableParallelism = self.availableParallelism | ||
Log.uploadQueue("Current total available upload parallelism :\(currentAvailableParallelism)") | ||
|
||
let activeQueues = self.allQueues.filter(\.isActive) | ||
let inactiveQueues = self.allQueues.filter { !$0.isActive } | ||
|
||
assert(activeQueues.count + inactiveQueues.count == self.allQueues.count, "queue count should match") | ||
|
||
inactiveQueues.forEach { $0.parallelismShouldChange(value: ParallelismDefaults.serial) } | ||
|
||
Log.uploadQueue("Inactive queues:\(inactiveQueues.count) set to serial") | ||
guard !activeQueues.isEmpty else { | ||
Log.uploadQueue("No active queues") | ||
return | ||
} | ||
|
||
let parallelismPerActiveQueue = max(ParallelismDefaults.serial, currentAvailableParallelism / activeQueues.count) | ||
Log.uploadQueue("Active queues \(activeQueues.count) new parallelism:\(parallelismPerActiveQueue)") | ||
activeQueues.forEach { $0.parallelismShouldChange(value: parallelismPerActiveQueue) } | ||
} | ||
} | ||
} | ||
|
||
extension UploadParallelismOrchestrator: UploadQueueDelegate { | ||
public func operationQueueBecameEmpty(_ queue: UploadQueue) { | ||
computeUploadParallelismPerQueueAndApply() | ||
} | ||
|
||
public func operationQueueNoLongerEmpty(_ queue: UploadQueue) { | ||
computeUploadParallelismPerQueueAndApply() | ||
} | ||
} | ||
|
||
extension UploadParallelismOrchestrator: ParallelismHeuristicDelegate { | ||
public func parallelismShouldChange(value: Int) { | ||
computeUploadParallelismPerQueueAndApply() | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.