@@ -30,34 +30,34 @@ import java.io._
30
30
*/
31
31
class JupyterSparkMonitorListener (conf : SparkConf ) extends SparkListener {
32
32
33
- println(" SPARKLISTENER : Started SparkListener for Jupyter Notebook" )
33
+ println(" SPARKMONITOR_LISTENER : Started SparkListener for Jupyter Notebook" )
34
34
val port = scala.util.Properties .envOrElse(" SPARKMONITOR_KERNEL_PORT" , " ERRORNOTFOUND" )
35
- println(" SPARKLISTENER : Port obtained from environment: " + port)
35
+ println(" SPARKMONITOR_LISTENER : Port obtained from environment: " + port)
36
36
var socket : Socket = null
37
37
var out : OutputStreamWriter = null
38
38
// Open the socket to the kernel. The kernel is the server already waiting for connections.
39
39
try {
40
40
socket = new Socket (" localhost" , port.toInt)
41
41
out = new OutputStreamWriter (socket.getOutputStream())
42
42
} catch {
43
- case exception : Throwable => println(" \n SPARKLISTENER : Exception creating socket:" + exception + " \n " )
43
+ case exception : Throwable => println(" \n SPARKMONITOR_LISTENER : Exception creating socket:" + exception + " \n " )
44
44
}
45
45
46
46
/** Send a string message to the kernel using the open socket.*/
47
47
def send (msg : String ): Unit = {
48
48
try {
49
- // println("\nSPARKLISTENER : --------------Sending Message:------------------\n"+msg+
50
- // "\nSPARKLISTENER : -------------------------------------------------\n") // Uncomment to see all events
49
+ // println("\nSPARKMONITOR_LISTENER : --------------Sending Message:------------------\n"+msg+
50
+ // "\nSPARKMONITOR_LISTENER : -------------------------------------------------\n") // Uncomment to see all events
51
51
out.write(msg + " ;EOD:" )
52
52
out.flush()
53
53
} catch {
54
- case exception : Throwable => println(" \n SPARKLISTENER : Exception sending socket message:" + exception + " \n " )
54
+ case exception : Throwable => println(" \n SPARKMONITOR_LISTENER : Exception sending socket message:" + exception + " \n " )
55
55
}
56
56
}
57
57
58
58
/** Close the socket connection to the kernel.*/
59
59
def closeConnection (): Unit = {
60
- println(" SPARKLISTNER : Closing Connection" )
60
+ println(" SPARKMONITOR_LISTENER : Closing Connection" )
61
61
out.close()
62
62
socket.close()
63
63
}
@@ -112,7 +112,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
112
112
override def onApplicationStart (appStarted : SparkListenerApplicationStart ): Unit = {
113
113
startTime = appStarted.time
114
114
appId = appStarted.appId.getOrElse(" null" )
115
- println(" SPARKLISTENER Application Started: " + appId + " ...Start Time: " + appStarted.time)
115
+ println(" SPARKMONITOR_LISTENER: Application Started: " + appId + " ...Start Time: " + appStarted.time)
116
116
val json = (" msgtype" -> " sparkApplicationStart" ) ~
117
117
(" startTime" -> startTime) ~
118
118
(" appId" -> appId) ~
@@ -129,7 +129,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
129
129
* Closes the socket connection to the kernel.
130
130
*/
131
131
override def onApplicationEnd (appEnded : SparkListenerApplicationEnd ): Unit = {
132
- println(" SPARKLISTENER Application ending...End Time: " + appEnded.time)
132
+ println(" SPARKMONITOR_LISTENER: Application ending...End Time: " + appEnded.time)
133
133
endTime = appEnded.time
134
134
val json = (" msgtype" -> " sparkApplicationEnd" ) ~
135
135
(" endTime" -> endTime)
@@ -195,7 +195,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
195
195
stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData )
196
196
}
197
197
val name = jobStart.properties.getProperty(" callSite.short" , " null" )
198
- println(" Num Executors" + numExecutors.toInt)
198
+ // println("Num Executors" + numExecutors.toInt)
199
199
val json = (" msgtype" -> " sparkJobStart" ) ~
200
200
(" jobGroup" -> jobGroup.getOrElse(" null" )) ~
201
201
(" jobId" -> jobStart.jobId) ~
@@ -208,14 +208,14 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
208
208
(" appId" -> appId) ~
209
209
(" numExecutors" -> numExecutors) ~
210
210
(" name" -> name)
211
- println(" SPARKLISTENER JobStart: \n " + pretty(render(json)) + " \n " )
211
+ // println("SPARKMONITOR_LISTENER: JobStart: \n" + pretty(render(json)) + "\n")
212
212
send(pretty(render(json)))
213
213
}
214
214
215
215
/** Called when a job ends. */
216
216
override def onJobEnd (jobEnd : SparkListenerJobEnd ): Unit = synchronized {
217
217
val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {
218
- println(" SPARKLISTENER: Job completed for unknown job: " + jobEnd.jobId)
218
+ println(" SPARKMONITOR_LISTENER: Job completed for unknown job: " + jobEnd.jobId)
219
219
new JobUIData (jobId = jobEnd.jobId)
220
220
}
221
221
jobData.completionTime = Option (jobEnd.time).filter(_ >= 0 )
@@ -266,7 +266,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
266
266
val stage = stageCompleted.stageInfo
267
267
stageIdToInfo(stage.stageId) = stage
268
268
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {
269
- println(" SPARKLISTENER : Stage completed for unknown stage " + stage.stageId)
269
+ println(" SPARKMONITOR_LISTENER : Stage completed for unknown stage " + stage.stageId)
270
270
new StageUIData
271
271
})
272
272
var status = " UNKNOWN"
@@ -306,7 +306,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
306
306
(" numTasks" -> stage.numTasks) ~
307
307
(" status" -> status)
308
308
309
- println(" SPARKLISTENER Stage Completed: \n " + pretty(render(json)) + " \n " )
309
+ // println("SPARKMONITOR_LISTENER: Stage Completed: \n" + pretty(render(json)) + "\n")
310
310
send(pretty(render(json)))
311
311
}
312
312
@@ -342,7 +342,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
342
342
(" parentIds" -> stage.parentIds) ~
343
343
(" submissionTime" -> submissionTime) ~
344
344
(" jobIds" -> jobIds)
345
- println(" SPARKLISTENER Stage Submitted: \n " + pretty(render(json)) + " \n " )
345
+ // println("SPARKMONITOR_LISTENER Stage Submitted: \n" + pretty(render(json)) + "\n")
346
346
send(pretty(render(json)))
347
347
}
348
348
@@ -351,7 +351,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
351
351
val taskInfo = taskStart.taskInfo
352
352
if (taskInfo != null ) {
353
353
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
354
- println(" SPARKLISTENER : Task start for unknown stage " + taskStart.stageId)
354
+ println(" SPARKMONITOR_LISTENER : Task start for unknown stage " + taskStart.stageId)
355
355
new StageUIData
356
356
})
357
357
stageData.numActiveTasks += 1
@@ -387,7 +387,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
387
387
(" status" -> taskInfo.status) ~
388
388
(" speculative" -> taskInfo.speculative)
389
389
390
- // println("SPARKLISTENER Task Started: \n"+ pretty(render(json)) + "\n")
390
+ // println("SPARKMONITOR_LISTENER: Task Started: \n"+ pretty(render(json)) + "\n")
391
391
send(pretty(render(json)))
392
392
}
393
393
@@ -400,7 +400,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
400
400
var errorMessage : Option [String ] = None
401
401
if (info != null && taskEnd.stageAttemptId != - 1 ) {
402
402
val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), {
403
- println(" SPARKLISTENER : Task end for unknown stage " + taskEnd.stageId)
403
+ println(" SPARKMONITOR_LISTENER : Task end for unknown stage " + taskEnd.stageId)
404
404
new StageUIData
405
405
})
406
406
stageData.numActiveTasks -= 1
@@ -517,7 +517,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
517
517
(" errorMessage" -> errorMessage) ~
518
518
(" metrics" -> jsonMetrics)
519
519
520
- println(" SPARKLISTENER Task Ended: \n " + pretty(render(json)) + " \n " )
520
+ // println("SPARKMONITOR_LISTENER: Task Ended: \n" + pretty(render(json)) + "\n")
521
521
send(pretty(render(json)))
522
522
}
523
523
@@ -573,7 +573,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
573
573
(" numCores" -> executorAdded.executorInfo.totalCores) ~
574
574
(" totalCores" -> totalCores) // Sending this as browser data can be lost during reloads
575
575
576
- println(" SPARKLISTENER Executor Added: \n " + pretty(render(json)) + " \n " )
576
+ // println("SPARKMONITOR_LISTENER: Executor Added: \n" + pretty(render(json)) + "\n")
577
577
send(pretty(render(json)))
578
578
}
579
579
@@ -586,7 +586,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
586
586
(" time" -> executorRemoved.time) ~
587
587
(" totalCores" -> totalCores) // Sending this as browser data can be lost during reloads
588
588
589
- println(" SPARKLISTENER Executor Removed: \n " + pretty(render(json)) + " \n " )
589
+ // println("SPARKMONITOR_LISTENER: Executor Removed: \n" + pretty(render(json)) + "\n")
590
590
send(pretty(render(json)))
591
591
}
592
592
}
0 commit comments