From 10926f0adc52ea2c284f073ccc14ac9907d7729c Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 2 Jul 2025 09:28:44 -0700 Subject: [PATCH] Only use PipelineRunEventBuffer in tests --- .../connect/pipelines/PipelinesHandler.scala | 2 +- .../pipelines/graph/PipelineExecution.scala | 4 +-- .../graph/PipelineUpdateContext.scala | 6 ++--- .../graph/PipelineUpdateContextImpl.scala | 12 +++------ .../logging/FlowProgressEventLogger.scala | 26 +++++++++---------- .../sql/pipelines/utils/ExecutionTest.scala | 9 +++---- .../utils}/PipelineRunEventBuffer.scala | 11 +++----- 7 files changed, 29 insertions(+), 41 deletions(-) rename sql/pipelines/src/{main/scala/org/apache/spark/sql/pipelines/logging => test/scala/org/apache/spark/sql/pipelines/utils}/PipelineRunEventBuffer.scala (77%) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala index df191632e429e..5daabc7315f1a 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala @@ -226,7 +226,7 @@ private[connect] object PipelinesHandler extends Logging { val dataflowGraphId = cmd.getDataflowGraphId val graphElementRegistry = DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId) // We will use this variable to store the run failure event if it occurs. This will be set - // by the event callback that is executed when an event is added to the PipelineRunEventBuffer. + // by the event callback. @volatile var runFailureEvent = Option.empty[PipelineEvent] // Define a callback which will stream logs back to the SparkConnect client when an internal // pipeline event is emitted during pipeline execution. We choose to pass a callback rather the diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineExecution.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineExecution.scala index 92de9ff4c4070..a2c54a908af1a 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineExecution.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineExecution.scala @@ -50,7 +50,7 @@ class PipelineExecution(context: PipelineUpdateContext) { // Execute the graph. graphExecution = Option( new TriggeredGraphExecution(initializedGraph, context, onCompletion = terminationReason => { - context.eventBuffer.addEvent( + context.eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = None, @@ -75,7 +75,7 @@ class PipelineExecution(context: PipelineUpdateContext) { context.pipelineExecution.awaitCompletion() } catch { case e: Throwable => - context.eventBuffer.addEvent( + context.eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = None, diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala index 5a1ab88a432fb..d6f2020809337 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContext.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.pipelines.graph import org.apache.spark.sql.classic.SparkSession -import org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, PipelineRunEventBuffer} +import org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, PipelineEvent} trait PipelineUpdateContext { @@ -50,8 +50,8 @@ trait PipelineUpdateContext { UnionFlowFilter(flowFilterForTables, resetCheckpointFlows) } - /** Buffer containing internal events that are emitted during a run of a pipeline. */ - def eventBuffer: PipelineRunEventBuffer + /** Callback to invoke for internal events that are emitted during a run of a pipeline. */ + def eventCallback: PipelineEvent => Unit /** Emits internal flow progress events into the event buffer. */ def flowProgressEventLogger: FlowProgressEventLogger diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala index b5428e52c1d07..c68882df79ce4 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala @@ -18,11 +18,7 @@ package org.apache.spark.sql.pipelines.graph import org.apache.spark.sql.classic.SparkSession -import org.apache.spark.sql.pipelines.logging.{ - FlowProgressEventLogger, - PipelineEvent, - PipelineRunEventBuffer -} +import org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, PipelineEvent} /** * An implementation of the PipelineUpdateContext trait used in production. @@ -31,17 +27,15 @@ import org.apache.spark.sql.pipelines.logging.{ */ class PipelineUpdateContextImpl( override val unresolvedGraph: DataflowGraph, - eventCallback: PipelineEvent => Unit + override val eventCallback: PipelineEvent => Unit ) extends PipelineUpdateContext { override val spark: SparkSession = SparkSession.getActiveSession.getOrElse( throw new IllegalStateException("SparkSession is not available") ) - override val eventBuffer = new PipelineRunEventBuffer(eventCallback) - override val flowProgressEventLogger: FlowProgressEventLogger = - new FlowProgressEventLogger(eventBuffer = eventBuffer) + new FlowProgressEventLogger(eventCallback = eventCallback) override val refreshTables: TableFilter = AllTables override val fullRefreshTables: TableFilter = NoTables diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/FlowProgressEventLogger.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/FlowProgressEventLogger.scala index 348a562c8b168..8475acd8984fe 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/FlowProgressEventLogger.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/FlowProgressEventLogger.scala @@ -36,9 +36,9 @@ import org.apache.spark.sql.pipelines.graph.{FlowExecution, ResolutionCompletedF * - All flow progress events other than errors/warnings will be logged at INFO level (including * flow progress events with metrics) and error/warning messages will be logged at their level. * - * @param eventBuffer Event log to log the flow progress events. + * @param eventCallback Callback to invoke on the flow progress events. */ -class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Logging { +class FlowProgressEventLogger(eventCallback: PipelineEvent => Unit) extends Logging { /** * This map stores flow identifier to a boolean representing whether flow is running. @@ -57,7 +57,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi * INFO level, since flows are only queued once. */ def recordQueued(flow: ResolvedFlow): Unit = synchronized { - eventBuffer.addEvent( + eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = Option(flow.displayName), @@ -76,7 +76,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi */ def recordPlanningForBatchFlow(batchFlow: ResolvedFlow): Unit = synchronized { if (batchFlow.df.isStreaming) return - eventBuffer.addEvent( + eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = Option(batchFlow.displayName), @@ -97,7 +97,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi * logged at METRICS. All other cases will be logged at INFO. */ def recordStart(flowExecution: FlowExecution): Unit = synchronized { - eventBuffer.addEvent( + eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = Option(flowExecution.displayName), @@ -114,7 +114,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi /** Records flow progress events with flow status as RUNNING. */ def recordRunning(flow: ResolvedFlow): Unit = synchronized { - eventBuffer.addEvent( + eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = Option(flow.displayName), @@ -142,7 +142,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi ): Unit = synchronized { val eventLogMessage = messageOpt.getOrElse(s"Flow '${flow.displayName}' has FAILED.") - eventBuffer.addEvent( + eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = Option(flow.displayName), @@ -165,7 +165,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi * record skipped should be used when the flow is skipped because of upstream flow failures. */ def recordSkippedOnUpStreamFailure(flow: ResolvedFlow): Unit = synchronized { - eventBuffer.addEvent( + eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = Option(flow.displayName), @@ -188,7 +188,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi * upstream failures use [[recordSkippedOnUpStreamFailure]] function. */ def recordSkipped(flow: ResolvedFlow): Unit = synchronized { - eventBuffer.addEvent( + eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = Option(flow.displayName), @@ -208,7 +208,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi /** Records flow progress events with flow status as EXCLUDED at INFO level. */ def recordExcluded(flow: ResolvedFlow): Unit = synchronized { - eventBuffer.addEvent( + eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = Option(flow.displayName), @@ -232,7 +232,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi message: Option[String] = None, cause: Option[Throwable] = None ): Unit = synchronized { - eventBuffer.addEvent( + eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = Option(flow.displayName), @@ -252,7 +252,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi /** Records flow progress events with flow status as IDLE. */ def recordIdle(flow: ResolvedFlow): Unit = synchronized { - eventBuffer.addEvent( + eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = Option(flow.displayName), @@ -277,7 +277,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi * event. */ def recordCompletion(flow: ResolvedFlow): Unit = synchronized { - eventBuffer.addEvent( + eventCallback( ConstructPipelineEvent( origin = PipelineEventOrigin( flowName = Option(flow.displayName), diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala index eebe1e7e83e53..991a47d6b562f 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/ExecutionTest.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.pipelines.logging.{ FlowProgress, FlowProgressEventLogger, PipelineEvent, - PipelineRunEventBuffer, RunProgress } @@ -60,12 +59,12 @@ trait TestPipelineUpdateContextMixin { refreshTables: TableFilter = AllTables, resetCheckpointFlows: FlowFilter = AllFlows ) extends PipelineUpdateContext { - val eventBuffer = new PipelineRunEventBuffer(eventCallback = _ => ()) + val eventBuffer = new PipelineRunEventBuffer() + + override val eventCallback: PipelineEvent => Unit = eventBuffer.addEvent override def flowProgressEventLogger: FlowProgressEventLogger = { - new FlowProgressEventLogger( - eventBuffer = eventBuffer - ) + new FlowProgressEventLogger(eventCallback = eventCallback) } } } diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineRunEventBuffer.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineRunEventBuffer.scala similarity index 77% rename from sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineRunEventBuffer.scala rename to sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineRunEventBuffer.scala index 1ef2a561a9913..e3ea61595993f 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/logging/PipelineRunEventBuffer.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/PipelineRunEventBuffer.scala @@ -15,33 +15,28 @@ * limitations under the License. */ -package org.apache.spark.sql.pipelines.logging +package org.apache.spark.sql.pipelines.utils import scala.collection.mutable.ArrayBuffer import org.apache.spark.internal.Logging +import org.apache.spark.sql.pipelines.logging.PipelineEvent /** * An in-memory buffer which contains the internal events that are emitted during a run of a * pipeline. - * - * @param eventCallback A callback function to be called when an event is added to the buffer. */ -class PipelineRunEventBuffer(eventCallback: PipelineEvent => Unit) extends Logging { +class PipelineRunEventBuffer extends Logging { /** * A buffer to hold the events emitted during a pipeline run. * This buffer is thread-safe and can be accessed concurrently. - * - * TODO(SPARK-52409): Deprecate this class to be used in test only and use a more - * robust event logging system in production. */ private val events = ArrayBuffer[PipelineEvent]() def addEvent(event: PipelineEvent): Unit = synchronized { val eventToAdd = event events.append(eventToAdd) - eventCallback(event) } def clear(): Unit = synchronized {