-
Notifications
You must be signed in to change notification settings - Fork 257
Description
Is your feature request related to a problem? Please describe.
Currently, there is TrafficController
restraining the over use of host memory in spark-rapids, which mainly serves the async table write.
However, in terms of the concept of triple buffering, everything could be run asynchronously, the TrafficController
will be widely used. Therefore, we should review the current implementation, and figure out the extendable design on the top it, which shall support different kinds of resource budget and hopefully be able to schedule the task based on the task priority as well.
Current implementation blocks the submission of async tasks if there is not enough resource for this task: https://github.com/NVIDIA/spark-rapids/blob/branch-25.08/sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/TrafficController.scala#L101
/**
* Blocks the task from being scheduled until the throttle allows it. If there is no task
* currently scheduled, the task is scheduled immediately even if the throttle is exceeded.
*/
def blockUntilRunnable[T](task: Task[T]): Unit = {
lock.lockInterruptibly()
try {
while (numTasks > 0 && !throttle.canAccept(task)) {
canBeScheduled.await()
}
numTasks += 1
throttle.taskScheduled(task)
} finally {
lock.unlock()
}
}
Here is a rough proposal of an alternative implementation of Traffic Controller:
The alternative approach is building a customized ThreadExecutor which extends the java.util.concurrent.ThreadPoolExecutor
. We can inject the flow control and other kinds of logic through the virtual method: beforeExecute
and afterExecute
. By doing that, the submission of async task will be totally unblocked, even if the task to submit is pended due to lack of resource or lack of available threads.
In addition, ThreadPoolExecutor
works with a BlockingQueue
to manage the async tasks. We can customize something like TaskPriorityBlockingQueue which is able to poll tasks according to the priority, meanwhile picking up task whose resource budget can be covered by the Throttle
. Considering that each task queue shall not contain too many tasks, it will NOT be time-consuming to poll the task with some comprehensive conditions, which means time complexity increasing from O(logN) to O(N).
For now, spark-rapids already contained the priority queue: HashedPriorityQueue. But it is neither thread-safe nor compatible to ThreadPoolExecutor
. I am not sure if we extend the HashedPriorityQueue
or just build a new one on top of java.util.concurrent.PriorityBlockingQueue
?