Skip to content

Commit 55bbe5a

Browse files
committed
[SPARK-52409][SDP] Only use PipelineRunEventBuffer in tests
### What changes were proposed in this pull request? This PR inverts the relationship between event callbacks and `PipelineRunEventBuffer`. Prior to this PR: - `PipelineUpdateContext` had a `PipelineRunEventBuffer`, that all events from the run were written to. - This `PipelineRunEventBuffer` contained an `eventCallback` val which the Connect backend used to forward events to the client - The only code that reads from the buffer is test code After this PR: - `PipelineUpdateContext` has an `eventCallback`, which is invoked on all events. - The test harness constructs a `PipelineRunEventBuffer` and passes its `addEvent` method to the `eventCallback` ### Why are the changes needed? With the prior code, there was a risk of memory pressure from the event buffer for long-running pipelines. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No This patch had conflicts when merged, resolved by Committer: Sandy Ryza <sandy.ryza@databricks.com> Closes #51352 from sryza/pipeline-run-event-buffer. Lead-authored-by: Sandy Ryza <sandyryza@gmail.com> Co-authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Sandy Ryza <sandy.ryza@databricks.com>
1 parent 2460893 commit 55bbe5a

File tree

7 files changed

+29
-41
lines changed

7 files changed

+29
-41
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ private[connect] object PipelinesHandler extends Logging {
226226
val dataflowGraphId = cmd.getDataflowGraphId
227227
val graphElementRegistry = DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
228228
// We will use this variable to store the run failure event if it occurs. This will be set
229-
// by the event callback that is executed when an event is added to the PipelineRunEventBuffer.
229+
// by the event callback.
230230
@volatile var runFailureEvent = Option.empty[PipelineEvent]
231231
// Define a callback which will stream logs back to the SparkConnect client when an internal
232232
// pipeline event is emitted during pipeline execution. We choose to pass a callback rather the

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineExecution.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class PipelineExecution(context: PipelineUpdateContext) {
5050
// Execute the graph.
5151
graphExecution = Option(
5252
new TriggeredGraphExecution(initializedGraph, context, onCompletion = terminationReason => {
53-
context.eventBuffer.addEvent(
53+
context.eventCallback(
5454
ConstructPipelineEvent(
5555
origin = PipelineEventOrigin(
5656
flowName = None,
@@ -75,7 +75,7 @@ class PipelineExecution(context: PipelineUpdateContext) {
7575
context.pipelineExecution.awaitCompletion()
7676
} catch {
7777
case e: Throwable =>
78-
context.eventBuffer.addEvent(
78+
context.eventCallback(
7979
ConstructPipelineEvent(
8080
origin = PipelineEventOrigin(
8181
flowName = None,

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.pipelines.graph
1919

2020
import org.apache.spark.sql.classic.SparkSession
21-
import org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, PipelineRunEventBuffer}
21+
import org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, PipelineEvent}
2222

2323
trait PipelineUpdateContext {
2424

@@ -50,8 +50,8 @@ trait PipelineUpdateContext {
5050
UnionFlowFilter(flowFilterForTables, resetCheckpointFlows)
5151
}
5252

53-
/** Buffer containing internal events that are emitted during a run of a pipeline. */
54-
def eventBuffer: PipelineRunEventBuffer
53+
/** Callback to invoke for internal events that are emitted during a run of a pipeline. */
54+
def eventCallback: PipelineEvent => Unit
5555

5656
/** Emits internal flow progress events into the event buffer. */
5757
def flowProgressEventLogger: FlowProgressEventLogger

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,7 @@
1818
package org.apache.spark.sql.pipelines.graph
1919

2020
import org.apache.spark.sql.classic.SparkSession
21-
import org.apache.spark.sql.pipelines.logging.{
22-
FlowProgressEventLogger,
23-
PipelineEvent,
24-
PipelineRunEventBuffer
25-
}
21+
import org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, PipelineEvent}
2622

2723
/**
2824
* An implementation of the PipelineUpdateContext trait used in production.
@@ -31,17 +27,15 @@ import org.apache.spark.sql.pipelines.logging.{
3127
*/
3228
class PipelineUpdateContextImpl(
3329
override val unresolvedGraph: DataflowGraph,
34-
eventCallback: PipelineEvent => Unit
30+
override val eventCallback: PipelineEvent => Unit
3531
) extends PipelineUpdateContext {
3632

3733
override val spark: SparkSession = SparkSession.getActiveSession.getOrElse(
3834
throw new IllegalStateException("SparkSession is not available")
3935
)
4036

41-
override val eventBuffer = new PipelineRunEventBuffer(eventCallback)
42-
4337
override val flowProgressEventLogger: FlowProgressEventLogger =
44-
new FlowProgressEventLogger(eventBuffer = eventBuffer)
38+
new FlowProgressEventLogger(eventCallback = eventCallback)
4539

4640
override val refreshTables: TableFilter = AllTables
4741
override val fullRefreshTables: TableFilter = NoTables

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/FlowProgressEventLogger.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ import org.apache.spark.sql.pipelines.graph.{FlowExecution, ResolutionCompletedF
3636
* - All flow progress events other than errors/warnings will be logged at INFO level (including
3737
* flow progress events with metrics) and error/warning messages will be logged at their level.
3838
*
39-
* @param eventBuffer Event log to log the flow progress events.
39+
* @param eventCallback Callback to invoke on the flow progress events.
4040
*/
41-
class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Logging {
41+
class FlowProgressEventLogger(eventCallback: PipelineEvent => Unit) extends Logging {
4242

4343
/**
4444
* This map stores flow identifier to a boolean representing whether flow is running.
@@ -57,7 +57,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
5757
* INFO level, since flows are only queued once.
5858
*/
5959
def recordQueued(flow: ResolvedFlow): Unit = synchronized {
60-
eventBuffer.addEvent(
60+
eventCallback(
6161
ConstructPipelineEvent(
6262
origin = PipelineEventOrigin(
6363
flowName = Option(flow.displayName),
@@ -76,7 +76,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
7676
*/
7777
def recordPlanningForBatchFlow(batchFlow: ResolvedFlow): Unit = synchronized {
7878
if (batchFlow.df.isStreaming) return
79-
eventBuffer.addEvent(
79+
eventCallback(
8080
ConstructPipelineEvent(
8181
origin = PipelineEventOrigin(
8282
flowName = Option(batchFlow.displayName),
@@ -97,7 +97,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
9797
* logged at METRICS. All other cases will be logged at INFO.
9898
*/
9999
def recordStart(flowExecution: FlowExecution): Unit = synchronized {
100-
eventBuffer.addEvent(
100+
eventCallback(
101101
ConstructPipelineEvent(
102102
origin = PipelineEventOrigin(
103103
flowName = Option(flowExecution.displayName),
@@ -114,7 +114,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
114114

115115
/** Records flow progress events with flow status as RUNNING. */
116116
def recordRunning(flow: ResolvedFlow): Unit = synchronized {
117-
eventBuffer.addEvent(
117+
eventCallback(
118118
ConstructPipelineEvent(
119119
origin = PipelineEventOrigin(
120120
flowName = Option(flow.displayName),
@@ -142,7 +142,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
142142
): Unit = synchronized {
143143
val eventLogMessage = messageOpt.getOrElse(s"Flow '${flow.displayName}' has FAILED.")
144144

145-
eventBuffer.addEvent(
145+
eventCallback(
146146
ConstructPipelineEvent(
147147
origin = PipelineEventOrigin(
148148
flowName = Option(flow.displayName),
@@ -165,7 +165,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
165165
* record skipped should be used when the flow is skipped because of upstream flow failures.
166166
*/
167167
def recordSkippedOnUpStreamFailure(flow: ResolvedFlow): Unit = synchronized {
168-
eventBuffer.addEvent(
168+
eventCallback(
169169
ConstructPipelineEvent(
170170
origin = PipelineEventOrigin(
171171
flowName = Option(flow.displayName),
@@ -188,7 +188,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
188188
* upstream failures use [[recordSkippedOnUpStreamFailure]] function.
189189
*/
190190
def recordSkipped(flow: ResolvedFlow): Unit = synchronized {
191-
eventBuffer.addEvent(
191+
eventCallback(
192192
ConstructPipelineEvent(
193193
origin = PipelineEventOrigin(
194194
flowName = Option(flow.displayName),
@@ -208,7 +208,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
208208

209209
/** Records flow progress events with flow status as EXCLUDED at INFO level. */
210210
def recordExcluded(flow: ResolvedFlow): Unit = synchronized {
211-
eventBuffer.addEvent(
211+
eventCallback(
212212
ConstructPipelineEvent(
213213
origin = PipelineEventOrigin(
214214
flowName = Option(flow.displayName),
@@ -232,7 +232,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
232232
message: Option[String] = None,
233233
cause: Option[Throwable] = None
234234
): Unit = synchronized {
235-
eventBuffer.addEvent(
235+
eventCallback(
236236
ConstructPipelineEvent(
237237
origin = PipelineEventOrigin(
238238
flowName = Option(flow.displayName),
@@ -252,7 +252,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
252252

253253
/** Records flow progress events with flow status as IDLE. */
254254
def recordIdle(flow: ResolvedFlow): Unit = synchronized {
255-
eventBuffer.addEvent(
255+
eventCallback(
256256
ConstructPipelineEvent(
257257
origin = PipelineEventOrigin(
258258
flowName = Option(flow.displayName),
@@ -277,7 +277,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
277277
* event.
278278
*/
279279
def recordCompletion(flow: ResolvedFlow): Unit = synchronized {
280-
eventBuffer.addEvent(
280+
eventCallback(
281281
ConstructPipelineEvent(
282282
origin = PipelineEventOrigin(
283283
flowName = Option(flow.displayName),

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import org.apache.spark.sql.pipelines.logging.{
3434
FlowProgress,
3535
FlowProgressEventLogger,
3636
PipelineEvent,
37-
PipelineRunEventBuffer,
3837
RunProgress
3938
}
4039

@@ -60,12 +59,12 @@ trait TestPipelineUpdateContextMixin {
6059
refreshTables: TableFilter = AllTables,
6160
resetCheckpointFlows: FlowFilter = AllFlows
6261
) extends PipelineUpdateContext {
63-
val eventBuffer = new PipelineRunEventBuffer(eventCallback = _ => ())
62+
val eventBuffer = new PipelineRunEventBuffer()
63+
64+
override val eventCallback: PipelineEvent => Unit = eventBuffer.addEvent
6465

6566
override def flowProgressEventLogger: FlowProgressEventLogger = {
66-
new FlowProgressEventLogger(
67-
eventBuffer = eventBuffer
68-
)
67+
new FlowProgressEventLogger(eventCallback = eventCallback)
6968
}
7069
}
7170
}

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineRunEventBuffer.scala renamed to sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineRunEventBuffer.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,28 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.pipelines.logging
18+
package org.apache.spark.sql.pipelines.utils
1919

2020
import scala.collection.mutable.ArrayBuffer
2121

2222
import org.apache.spark.internal.Logging
23+
import org.apache.spark.sql.pipelines.logging.PipelineEvent
2324

2425
/**
2526
* An in-memory buffer which contains the internal events that are emitted during a run of a
2627
* pipeline.
27-
*
28-
* @param eventCallback A callback function to be called when an event is added to the buffer.
2928
*/
30-
class PipelineRunEventBuffer(eventCallback: PipelineEvent => Unit) extends Logging {
29+
class PipelineRunEventBuffer extends Logging {
3130

3231
/**
3332
* A buffer to hold the events emitted during a pipeline run.
3433
* This buffer is thread-safe and can be accessed concurrently.
35-
*
36-
* TODO(SPARK-52409): Deprecate this class to be used in test only and use a more
37-
* robust event logging system in production.
3834
*/
3935
private val events = ArrayBuffer[PipelineEvent]()
4036

4137
def addEvent(event: PipelineEvent): Unit = synchronized {
4238
val eventToAdd = event
4339
events.append(eventToAdd)
44-
eventCallback(event)
4540
}
4641

4742
def clear(): Unit = synchronized {

0 commit comments

Comments
 (0)