Skip to content

[FEA] Triple buffering: traffic control by a customized thread pool #12919

@sperlingxx

Description

@sperlingxx

Is your feature request related to a problem? Please describe.
Currently, there is TrafficController restraining the over use of host memory in spark-rapids, which mainly serves the async table write.

However, in terms of the concept of triple buffering, everything could be run asynchronously, the TrafficController will be widely used. Therefore, we should review the current implementation, and figure out the extendable design on the top it, which shall support different kinds of resource budget and hopefully be able to schedule the task based on the task priority as well.

Current implementation blocks the submission of async tasks if there is not enough resource for this task: https://github.com/NVIDIA/spark-rapids/blob/branch-25.08/sql-plugin/src/main/scala/com/nvidia/spark/rapids/io/async/TrafficController.scala#L101

  /**
   * Blocks the task from being scheduled until the throttle allows it. If there is no task
   * currently scheduled, the task is scheduled immediately even if the throttle is exceeded.
   */
  def blockUntilRunnable[T](task: Task[T]): Unit = {
    lock.lockInterruptibly()
    try {
      while (numTasks > 0 && !throttle.canAccept(task)) {
        canBeScheduled.await()
      }
      numTasks += 1
      throttle.taskScheduled(task)
    } finally {
      lock.unlock()
    }
  }

Here is a rough proposal of an alternative implementation of Traffic Controller:

The alternative approach is building a customized ThreadExecutor which extends the java.util.concurrent.ThreadPoolExecutor. We can inject the flow control and other kinds of logic through the virtual method: beforeExecute and afterExecute. By doing that, the submission of async task will be totally unblocked, even if the task to submit is pended due to lack of resource or lack of available threads.

In addition, ThreadPoolExecutor works with a BlockingQueue to manage the async tasks. We can customize something like TaskPriorityBlockingQueue which is able to poll tasks according to the priority, meanwhile picking up task whose resource budget can be covered by the Throttle. Considering that each task queue shall not contain too many tasks, it will NOT be time-consuming to poll the task with some comprehensive conditions, which means time complexity increasing from O(logN) to O(N).

For now, spark-rapids already contained the priority queue: HashedPriorityQueue. But it is neither thread-safe nor compatible to ThreadPoolExecutor. I am not sure if we extend the HashedPriorityQueue or just build a new one on top of java.util.concurrent.PriorityBlockingQueue?

Metadata

Metadata

Assignees

Labels

feature requestNew feature or requestperformanceA performance related task/issue

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions