-
Notifications
You must be signed in to change notification settings - Fork 257
Description
For a typical shuffle read stage, we'll basically have such kind of data dependency (we're skipping many concepts and only focus on some key concepts from data perspective):
"operators after shuffle read" (on GPU) -> "concatenateTablesInHost"(on CPU) -> "bufferNextBatch" (on CPU)
- "operators after shuffle read": e.g. aggregate after shuffle read, in most cases it is computing on GPU
- "concatenateTablesInHost": concat the small batches from mapper side to form a big batch to process
- "bufferNextBatch": it calls "BaseSerializedTableIterator.next" to accumulate a list of small batches from mapper side. The speed of bufferNextBatch is bottlenecked by how fast shuffle manager can read&decompress the input stream. For Celeborn shuffle manager our customer has already added parallel read&compress capabilities so that bufferNextBatch can be much faster. For standard shuffle manager I believe we also have multi-threaded shuffle reader.
Today these three steps are in seqential, and we'll notice its poor performance when we have multiple batches to handle for each shuffle read task. We have seen in some cases that these three steps can take simliar amount of time. So it would be nice if we can use async threads to hide part of the time in backend, especially considering that the latter two steps are both on CPU and we typically have a lot CPU resource to exploit.
In an ideal case where each step accounts for 1/3 time, in theroy we can have 67% improvement for all the tasks in shuffle read stage. But in reality it's not that ideal.