Skip to content

Commit 544b8c4

Browse files
authored
Update trace observers to v2 (#6257)
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
1 parent 9a4f79d commit 544b8c4

File tree

18 files changed

+281
-362
lines changed

18 files changed

+281
-362
lines changed

modules/nextflow/src/main/groovy/nextflow/Session.groovy

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -439,8 +439,8 @@ class Session implements ISession {
439439
this.disableRemoteBinDir = getExecConfigProp(null, 'disableRemoteBinDir', false)
440440
this.classesDir = FileHelper.createLocalDir()
441441
this.executorFactory = new ExecutorFactory(Plugins.manager)
442-
this.observersV1 = createObserversV1()
443442
this.observersV2 = createObserversV2()
443+
this.observersV1 = createObserversV1()
444444
this.statsEnabled = observersV1.any { ob -> ob.enableMetrics() } || observersV2.any { ob -> ob.enableMetrics() }
445445
this.workflowMetadata = new WorkflowMetadata(this, scriptFile)
446446

@@ -463,31 +463,23 @@ class Session implements ISession {
463463
}
464464

465465
/**
466-
* Given the `run` command line options creates the required {@link TraceObserver}s
467-
*
468-
* @param runOpts The {@code CmdRun} object holding the run command options
469-
* @return A list of {@link TraceObserver} objects or an empty list
466+
* Create the required trace observers based on the given CLI and config options.
470467
*/
471468
@PackageScope
472469
List<TraceObserver> createObserversV1() {
473-
474-
final result = new ArrayList(10)
475-
476-
// stats is created as first because others may depend on it
477-
statsObserver = new WorkflowStatsObserver(this)
478-
result.add(statsObserver)
479-
470+
final result = new ArrayList<TraceObserver>(10)
480471
for( TraceObserverFactory f : Plugins.getExtensions(TraceObserverFactory) ) {
481472
log.debug "Observer factory: ${f.class.simpleName}"
482473
result.addAll(f.create(this))
483474
}
484-
485475
return result
486476
}
487477

488478
@PackageScope
489479
List<TraceObserverV2> createObserversV2() {
490-
final result = new ArrayList(10)
480+
final result = new ArrayList<TraceObserverV2>(10)
481+
this.statsObserver = new WorkflowStatsObserver(this)
482+
result.add(statsObserver)
491483
for( TraceObserverFactoryV2 f : Plugins.getExtensions(TraceObserverFactoryV2) ) {
492484
log.debug "Observer factory (v2): ${f.class.simpleName}"
493485
result.addAll(f.create(this))

modules/nextflow/src/main/groovy/nextflow/trace/AnsiLogObserver.groovy

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ import java.util.regex.Pattern
2121
import groovy.transform.CompileStatic
2222
import jline.TerminalFactory
2323
import nextflow.Session
24-
import nextflow.processor.TaskHandler
24+
import nextflow.trace.event.TaskEvent
2525
import nextflow.util.Duration
2626
import nextflow.util.Threads
2727
import org.fusesource.jansi.Ansi
2828
import org.fusesource.jansi.AnsiConsole
29+
2930
import static nextflow.util.LoggerHelper.isHashLogPrefix
3031
import static org.fusesource.jansi.Ansi.Attribute
3132
import static org.fusesource.jansi.Ansi.Color
@@ -38,7 +39,7 @@ import static org.fusesource.jansi.Ansi.ansi
3839
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
3940
*/
4041
@CompileStatic
41-
class AnsiLogObserver implements TraceObserver {
42+
class AnsiLogObserver implements TraceObserverV2 {
4243

4344
static final private String NEWLINE = '\n'
4445

@@ -107,7 +108,7 @@ class AnsiLogObserver implements TraceObserver {
107108

108109
boolean getStarted() { started }
109110

110-
boolean getStopped() { stopped }
111+
boolean getStopped() { stopped }
111112

112113
private boolean hasProgressChanges() {
113114
final long progress = statsObserver.changeTimestamp ?: 0
@@ -193,7 +194,7 @@ class AnsiLogObserver implements TraceObserver {
193194

194195
protected void renderMessages( Ansi term, List<Event> allMessages, Color color=null ) {
195196
int BLANKS=0
196-
def itr = allMessages.iterator()
197+
final itr = allMessages.iterator()
197198
while( itr.hasNext() ) {
198199
final event = itr.next()
199200

@@ -247,7 +248,7 @@ class AnsiLogObserver implements TraceObserver {
247248
}
248249

249250
protected void renderProcesses(Ansi term, WorkflowStats stats) {
250-
def processes = stats.getProcesses()
251+
final processes = stats.getProcesses()
251252
if( !processes || (!session.isSuccess() && errors && !rendered) ) {
252253
// prevent to show a useless process progress if there's an error
253254
// on startup and the execution is terminated
@@ -286,7 +287,7 @@ class AnsiLogObserver implements TraceObserver {
286287
}
287288
}
288289
// Tell the user how many processes without active tasks were hidden
289-
if( skippedLines > 0 ){
290+
if( skippedLines > 0 ) {
290291
term.a(Attribute.ITALIC).a(Attribute.INTENSITY_FAINT).a("Plus ").bold().a(skippedLines).reset()
291292
term.a(Attribute.ITALIC).a(Attribute.INTENSITY_FAINT).a(" more processes waiting for tasks…").reset().newline()
292293
}
@@ -441,7 +442,7 @@ class AnsiLogObserver implements TraceObserver {
441442
// Final process name, regular text
442443
term.a(labelFinalProcess)
443444
// Active process with a tag, eg: (genes.gtf.gz)
444-
if( labelTag ){
445+
if( labelTag ) {
445446
// Tag in yellow, () dim but tag text regular
446447
term.fg(Color.YELLOW).a(Attribute.INTENSITY_FAINT).a(' (').reset()
447448
term.fg(Color.YELLOW).a(labelTag)
@@ -490,7 +491,7 @@ class AnsiLogObserver implements TraceObserver {
490491
}
491492

492493
@Override
493-
void onFlowCreate(Session session){
494+
void onFlowCreate(Session session) {
494495
this.started = true
495496
this.session = session
496497
this.statsObserver = session.statsObserver
@@ -500,7 +501,7 @@ class AnsiLogObserver implements TraceObserver {
500501
}
501502

502503
@Override
503-
void onFlowComplete(){
504+
void onFlowComplete() {
504505
stopped = true
505506
endTimestamp = System.currentTimeMillis()
506507
renderer.join()
@@ -511,9 +512,9 @@ class AnsiLogObserver implements TraceObserver {
511512
* @param handler
512513
*/
513514
@Override
514-
synchronized void onProcessSubmit(TaskHandler handler, TraceRecord trace){
515+
synchronized void onTaskSubmit(TaskEvent event) {
515516
// executor counter
516-
final exec = handler.task.processor.executor.name
517+
final exec = event.handler.task.processor.executor.name
517518
Integer count = executors[exec] ?: 0
518519
executors[exec] = count+1
519520
markModified()

modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy

Lines changed: 57 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,103 +1,104 @@
1+
/*
2+
* Copyright 2013-2024, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package nextflow.trace
218

319
import java.nio.file.Path
420

521
import nextflow.Session
22+
import nextflow.file.FileHelper
623

724
/**
825
* Creates Nextflow observes object
926
*
1027
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
1128
*/
12-
class DefaultObserverFactory implements TraceObserverFactory {
29+
class DefaultObserverFactory implements TraceObserverFactoryV2 {
1330

1431
private Map config
15-
private Session session
1632

1733
@Override
18-
Collection<TraceObserver> create(Session session) {
19-
this.session = session
34+
Collection<TraceObserverV2> create(Session session) {
2035
this.config = session.config
2136

22-
final result = new ArrayList(10)
23-
createTraceFileObserver(result)
37+
final result = new ArrayList<TraceObserverV2>(5)
38+
if( session.ansiLog )
39+
createAnsiLogObserver(result)
40+
createGraphObserver(result)
2441
createReportObserver(result)
2542
createTimelineObserver(result)
26-
createDagObserver(result)
27-
createAnsiLogObserver(result)
43+
createTraceFileObserver(result)
2844
return result
2945
}
3046

31-
protected void createAnsiLogObserver(Collection<TraceObserver> result) {
32-
if( session.ansiLog ) {
33-
session.ansiLogObserver = new AnsiLogObserver()
34-
result << session.ansiLogObserver
35-
}
47+
protected void createAnsiLogObserver(Collection<TraceObserverV2> result) {
48+
session.ansiLogObserver = new AnsiLogObserver()
49+
result << session.ansiLogObserver
3650
}
3751

38-
/**
39-
* Create workflow report file observer
40-
*/
41-
protected void createReportObserver(Collection<TraceObserver> result) {
42-
Boolean isEnabled = config.navigate('report.enabled') as Boolean
52+
protected void createReportObserver(Collection<TraceObserverV2> result) {
53+
final isEnabled = config.navigate('report.enabled') as Boolean
4354
if( !isEnabled )
4455
return
4556

46-
String fileName = config.navigate('report.file')
47-
def maxTasks = config.navigate('report.maxTasks', ReportObserver.DEF_MAX_TASKS) as int
48-
if( !fileName ) fileName = ReportObserver.DEF_FILE_NAME
49-
def report = (fileName as Path).complete()
50-
def observer = new ReportObserver(report)
57+
final fileName = config.navigate('report.file', ReportObserver.DEF_FILE_NAME) as String
58+
final maxTasks = config.navigate('report.maxTasks', ReportObserver.DEF_MAX_TASKS) as int
59+
final overwrite = config.navigate('report.overwrite') as Boolean
60+
61+
final observer = new ReportObserver(FileHelper.asPath(fileName), overwrite)
5162
observer.maxTasks = maxTasks
52-
config.navigate('report.overwrite') { observer.overwrite = it }
5363
result << observer
5464
}
5565

56-
/**
57-
* Create timeline report file observer
58-
*/
59-
protected void createTimelineObserver(Collection<TraceObserver> result) {
60-
Boolean isEnabled = config.navigate('timeline.enabled') as Boolean
66+
protected void createTimelineObserver(Collection<TraceObserverV2> result) {
67+
final isEnabled = config.navigate('timeline.enabled') as Boolean
6168
if( !isEnabled )
6269
return
6370

64-
String fileName = config.navigate('timeline.file')
65-
if( !fileName ) fileName = TimelineObserver.DEF_FILE_NAME
66-
def traceFile = (fileName as Path).complete()
67-
def observer = new TimelineObserver(traceFile)
68-
config.navigate('timeline.overwrite') { observer.overwrite = it }
69-
result << observer
71+
final fileName = config.navigate('timeline.file', TimelineObserver.DEF_FILE_NAME) as String
72+
final overwrite = config.navigate('timeline.overwrite') as Boolean
73+
74+
result << new TimelineObserver(FileHelper.asPath(fileName), overwrite)
7075
}
7176

72-
protected void createDagObserver(Collection<TraceObserver> result) {
73-
Boolean isEnabled = config.navigate('dag.enabled') as Boolean
77+
protected void createGraphObserver(Collection<TraceObserverV2> result) {
78+
final isEnabled = config.navigate('dag.enabled') as Boolean
7479
if( !isEnabled )
7580
return
7681

77-
String fileName = config.navigate('dag.file')
78-
if( !fileName ) fileName = GraphObserver.DEF_FILE_NAME
79-
def traceFile = (fileName as Path).complete()
80-
def observer = new GraphObserver(traceFile)
81-
config.navigate('dag.overwrite') { observer.overwrite = it }
82-
result << observer
82+
final fileName = config.navigate('dag.file', GraphObserver.DEF_FILE_NAME) as String
83+
final overwrite = config.navigate('dag.overwrite') as Boolean
84+
85+
result << new GraphObserver(FileHelper.asPath(fileName), overwrite)
8386
}
8487

85-
/*
86-
* create the execution trace observer
87-
*/
88-
protected void createTraceFileObserver(Collection<TraceObserver> result) {
89-
Boolean isEnabled = config.navigate('trace.enabled') as Boolean
88+
protected void createTraceFileObserver(Collection<TraceObserverV2> result) {
89+
final isEnabled = config.navigate('trace.enabled') as Boolean
9090
if( !isEnabled )
9191
return
9292

93-
String fileName = config.navigate('trace.file')
94-
if( !fileName ) fileName = TraceFileObserver.DEF_FILE_NAME
95-
def traceFile = (fileName as Path).complete()
96-
def observer = new TraceFileObserver(traceFile)
97-
config.navigate('trace.raw') { it -> observer.useRawNumbers(it == true) }
98-
config.navigate('trace.sep') { observer.separator = it }
99-
config.navigate('trace.fields') { observer.setFieldsAndFormats(it) }
100-
config.navigate('trace.overwrite') { observer.overwrite = it }
93+
final fields = config.navigate('trace.fields', '') as String
94+
final fileName = config.navigate('trace.file', TraceFileObserver.DEF_FILE_NAME) as String
95+
final overwrite = config.navigate('trace.overwrite') as Boolean
96+
final raw = config.navigate('trace.raw') as Boolean
97+
final separator = config.navigate('trace.sep', '\t') as String
98+
99+
final observer = new TraceFileObserver(FileHelper.asPath(fileName), overwrite, separator)
100+
observer.useRawNumbers(raw)
101+
observer.setFieldsAndFormats(fields)
101102
result << observer
102103
}
103104

modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package nextflow.trace
1818

19-
import java.nio.file.Files
2019
import java.nio.file.Path
2120

2221
import groovy.transform.PackageScope
@@ -31,16 +30,14 @@ import nextflow.dag.MermaidRenderer
3130
import nextflow.dag.MermaidHtmlRenderer
3231
import nextflow.exception.AbortOperationException
3332
import nextflow.file.FileHelper
34-
import nextflow.processor.TaskHandler
35-
import nextflow.processor.TaskProcessor
3633
/**
3734
* Render the DAG document on pipeline completion using the
3835
* format specified by the user
3936
*
4037
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
4138
*/
4239
@Slf4j
43-
class GraphObserver implements TraceObserver {
40+
class GraphObserver implements TraceObserverV2 {
4441

4542
static public final String DEF_FILE_NAME = "dag-${TraceHelper.launchTimestampFmt()}.html"
4643

@@ -52,17 +49,18 @@ class GraphObserver implements TraceObserver {
5249

5350
private String format
5451

55-
boolean overwrite
52+
private boolean overwrite
5653

5754
String getFormat() { format }
5855

5956
String getName() { name }
6057

61-
GraphObserver( Path file ) {
58+
GraphObserver(Path file, Boolean overwrite=false) {
6259
assert file
6360
this.file = file
6461
this.name = file.baseName
6562
this.format = file.getExtension().toLowerCase() ?: 'html'
63+
this.overwrite = overwrite
6664
}
6765

6866
@Override
@@ -108,30 +106,4 @@ class GraphObserver implements TraceObserver {
108106
new GraphvizRenderer(name, format)
109107
}
110108

111-
112-
@Override
113-
void onProcessCreate(TaskProcessor process) {
114-
115-
}
116-
117-
118-
@Override
119-
void onProcessSubmit(TaskHandler handler, TraceRecord trace) {
120-
121-
}
122-
123-
@Override
124-
void onProcessStart(TaskHandler handler, TraceRecord trace) {
125-
126-
}
127-
128-
@Override
129-
void onProcessComplete(TaskHandler handler, TraceRecord trace) {
130-
131-
}
132-
133-
@Override
134-
boolean enableMetrics() {
135-
return false
136-
}
137109
}

0 commit comments

Comments
 (0)