-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Fault Tolerant Execution
The current all-or-nothing architecture makes it significantly more difficult to tolerate faults. The likelihood of a failure grows with the time it takes to complete a query. In addition to increased likelihood of a failure the impact of failing a long running query is much higher as it often results in a significant waste of time and resources.
Due to the nature of streaming exchange in Trino all tasks are interconnected. A failure of any task results in a query failure. To support long running queries Trino has to be able to tolerate task failures.
The idea of this approach is to suspend query execution once in a while and store the current state of all tasks and operators to a persistent storage. In case of a fault a saved snapshot can be restored and a query can be resumed.
Usually this approach is taken under an assumption of the state being “not that large” so the snapshots can be taken at high enough frequency.
Pros
- Exchanges remains “cheap”
- Partitions can be assigned dynamically
Cons
- All tasks must be restarted from a checkpoint in case of a failure
- Might be impossible to make progress when the system experiences faults more often that a snapshot can be taken. For example when running on spot instances with a 2 minutes withdrawal notice a snapshot has to be taken more frequently than once in 2 minutes.
- When the workload grows so does the state. At some point taking frequent snapshots might become unmanageable.
- Doesn’t help with running queries that require more memory than available in a cluster as it doesn’t enable more granular partitioning and an iterative task execution
- Doesn't provide additional capabilities to support speculative execution
- “fault” is often a non-binary thing. A task that experiences a significant slowdown is technically not “failed”, yet it might be better to speculatively start another copy of that task to avoid waiting for an instance that might never finish.
- Doesn’t enable adaptive query optimizations as fundamentally the model of execution remains the same (an entire query is executed all at once)
The idea of this approach is to store intermediate data produced by tasks to a “distributed buffer”. This allows restarting a single task in case of a failure (assuming deterministic partitioning at each exchange boundary).
Pros
- Each task can be restarted independently
- Multiple instances of the same task can execute simultaneously
- Queries can be scheduled partially and re-optimized at exchange boundaries
- Allows partitioning into a number of partitions that is much higher than the number of nodes available in the cluster. This would allow more granular execution effectively increasing the distributed memory limit.
Cons
- Data always has to be partitioned in a deterministic way. Dynamic partitioning is challenging / not possible.
- Exchange is becoming more expensive. Execution techniques such as mark_distinct might not be practical.
Both approaches are not self exclusive and can be implemented both if needed. However the second approach seems to have much higher ROI. In addition to failure recovery it offers more granular scheduling that would effectively improve support for high memory queries and open opportunities for adaptive query optimizations (such as adaptive join reordering, adaptive join type selection, adaptive partitioning tuning).
The first approach is also more risky as taking distributed state snapshots can be rather difficult to implement and it is not very well suited for the environments with high number of processing nodes (that exponentially increases likelihood of a fault) and large internal state (that increases time needed for a distributed snapshot to be taken).
In the remaining part of this page we are going to focus on the second approach, Exchange Data Buffering.