Skip to content

Commit de385f7

Browse files
committed
Fixed merge placeholders
2 parents e7c0ec5 + 79fc667 commit de385f7

File tree

77 files changed

+722
-470
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+722
-470
lines changed

build.gradle

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
*/
1919

2020
apply plugin: 'gradle-one-jar'
21+
<<<<<<< HEAD
2122
version = '0.5.3'
23+
=======
24+
version = '0.5.2'
25+
>>>>>>> upstream
2226

2327
allprojects {
24-
apply plugin: 'java'
2528
apply plugin: 'groovy'
2629

2730
repositories {
@@ -42,12 +45,6 @@ allprojects {
4245
}
4346
}
4447

45-
compileJava {
46-
options.compilerArgs << '-XDignore.symbol.file'
47-
options.fork = true // may not needed on 1.8
48-
options.forkOptions.executable = 'javac' // may not needed on 1.8
49-
}
50-
5148
configurations {
5249
oneJarLib
5350
dnanexus.extendsFrom runtime
@@ -75,6 +72,12 @@ dependencies {
7572
}
7673
}
7774

75+
sourceSets.main.java.srcDirs = []
76+
sourceSets.main.groovy.srcDirs = ['src/main/java', 'src/main/groovy']
77+
78+
compileGroovy {
79+
options.compilerArgs = ['-XDignore.symbol.file']
80+
}
7881

7982
subprojects {
8083
apply plugin: 'groovy'

examples/blast.nf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ params.db = "$HOME/tools/blast-db/pdb/pdb"
44
params.query = "$HOME/sample.fa"
55
params.chunkSize = 1
66

7-
DB=params.db
7+
DB= file(params.db)
88
seq = channel()
99

1010
inputFile = file(params.query)

examples/small_parallel.nf

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
params.query = "$HOME/sample.fa"
22
params.db = "$HOME/tools/blast-db/pdb/pdb"
33

4+
db = file(params.db)
45
fasta = channel()
56
queryFile = file(params.query)
67
queryFile.chunkFasta {
@@ -15,7 +16,7 @@ process blast {
1516
file top_hits
1617

1718
"""
18-
blastp -db ${params.db} -query query.fa -outfmt 6 > blast_result
19+
blastp -db ${db} -query query.fa -outfmt 6 > blast_result
1920
cat blast_result | head -n 10 | cut -f 2 > top_hits
2021
"""
2122
}
@@ -28,7 +29,7 @@ process extract {
2829
output:
2930
file sequences
3031

31-
"blastdbcmd -db ${params.db} -entry_batch $top_hits > sequences"
32+
"blastdbcmd -db ${db} -entry_batch $top_hits > sequences"
3233
}
3334

3435
process all(merge:true) {

examples/small_pipeline.nf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ process extractTopHits {
2525
output:
2626
file sequences
2727

28-
"blastdbcmd -db ${params.db} -entry_batch $top_hits > sequences"
28+
"blastdbcmd -db ${db} -entry_batch $top_hits > sequences"
2929
}
3030

3131
process align {

src/main/groovy/nextflow/Const.groovy

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,29 @@ class Const {
5757
/**
5858
* The application version
5959
*/
60+
<<<<<<< HEAD
6061
static final String APP_VER = "0.5.3"
62+
=======
63+
static final String APP_VER = "0.5.2"
64+
>>>>>>> upstream
6165

6266
/**
6367
* The app build time as linux/unix timestamp
6468
*/
69+
<<<<<<< HEAD
6570
static final long APP_TIMESTAMP = 1386613774413
71+
=======
72+
static final long APP_TIMESTAMP = 1382739512876
73+
>>>>>>> upstream
6674

6775
/**
6876
* The app build number
6977
*/
78+
<<<<<<< HEAD
7079
static final int APP_BUILDNUM = 1112
80+
=======
81+
static final int APP_BUILDNUM = 1138
82+
>>>>>>> upstream
7183

7284
/**
7385
* The date time formatter string

src/main/groovy/nextflow/Session.groovy

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import groovyx.gpars.group.NonDaemonPGroup
2929
import groovyx.gpars.group.PGroup
3030
import groovyx.gpars.util.PoolUtils
3131
import jsr166y.Phaser
32-
import nextflow.processor.TaskScheduler
32+
import nextflow.processor.TaskDispatcher
3333
/**
3434
*
3535
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
@@ -43,9 +43,9 @@ class Session {
4343
final List<DataflowProcessor> allProcessors = []
4444

4545
/**
46-
* The scheduler monitoring the tasks execution
46+
* Dispatch tasks for executions
4747
*/
48-
final TaskScheduler scheduler
48+
final TaskDispatcher dispatcher
4949

5050
/**
5151
* Holds the configuration object
@@ -143,7 +143,7 @@ class Session {
143143
pgroup = new NonDaemonPGroup( config.poolSize as int )
144144
Dataflow.activeParallelGroup.set(pgroup)
145145

146-
scheduler = new TaskScheduler(this)
146+
dispatcher = new TaskDispatcher(this)
147147

148148
log.debug ">>> phaser register (session)"
149149
phaser.register()
@@ -152,10 +152,6 @@ class Session {
152152
@PackageScope
153153
def getPhaser() { phaser }
154154

155-
def void start() {
156-
scheduler.start()
157-
}
158-
159155
/**
160156
* Await the termination of all processors
161157
*/
@@ -196,6 +192,57 @@ class Session {
196192
}
197193

198194

195+
public int getQueueSize( String execName, int defValue ) {
196+
// creating the running tasks queue
197+
def size = null
198+
199+
// make sure that the *executor* is a map object
200+
// it could also be a plain string (when it specifies just the its name)
201+
if( config.executor instanceof Map ){
202+
if( execName ) {
203+
size = config.executor?."$execName"?.queueSize
204+
}
205+
206+
if( !size && config.executor?.queueSize ) {
207+
size = config.executor?.queueSize
208+
}
209+
}
210+
211+
212+
if( !size ) {
213+
size = defValue
214+
log.debug "Undefined executor queueSize property runnable queue size -- fallback default value: $size"
215+
}
216+
217+
return size
218+
}
219+
220+
public long getPollInterval( String execName, long defValue = 1_000 ) {
221+
// creating the running tasks queue
222+
def result = null
223+
224+
// make sure that the *executor* is a map object
225+
// it could also be a plain string (when it specifies just the its name)
226+
if( config.executor instanceof Map ){
227+
if( execName ) {
228+
result = config.executor?."$execName"?.pollInterval
229+
}
230+
231+
if( !result && config.executor?.pollInterval ) {
232+
result = config.executor?.pollInterval
233+
}
234+
}
235+
236+
237+
if( !result ) {
238+
result = defValue
239+
log.debug "Undefined executor queueSize property runnable queue size -- fallback on num of available processors-1: $result"
240+
}
241+
242+
return result
243+
}
244+
245+
199246
// /**
200247
// * Create a table report of all executed or running tasks
201248
// *

src/main/groovy/nextflow/ast/ProcessDefTransformImpl.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ class ProcessDefTransformImpl implements ASTTransformation {
231231

232232
def methodCall = expression as MethodCallExpression
233233
def methodName = methodCall.getMethodAsString()
234-
log.debug "convert > input method: $methodName"
234+
log.trace "convert > input method: $methodName"
235235

236236
if( methodName in ['val','env','file','each'] ) {
237237
//this methods require a special prefix '__in_'
@@ -249,15 +249,15 @@ class ProcessDefTransformImpl implements ASTTransformation {
249249
}
250250

251251
def void convertOutputMethod( Expression expression ) {
252-
log.debug "convert > output expression: $expression"
252+
log.trace "convert > output expression: $expression"
253253

254254
if( !(expression instanceof MethodCallExpression) ) {
255255
return
256256
}
257257

258258
def methodCall = expression as MethodCallExpression
259259
def methodName = methodCall.getMethodAsString()
260-
log.debug "convert > output method: $methodName"
260+
log.trace "convert > output method: $methodName"
261261

262262
if( methodName in ['val','file'] ) {
263263
// prefix the method name with the string '__out_'

src/main/groovy/nextflow/executor/AbstractExecutor.groovy

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ import java.nio.file.Path
44

55
import groovy.io.FileType
66
import groovy.util.logging.Slf4j
7+
import nextflow.Session
78
import nextflow.exception.MissingFileException
89
import nextflow.processor.FileHolder
910
import nextflow.processor.FileInParam
1011
import nextflow.processor.FileOutParam
1112
import nextflow.processor.TaskConfig
1213
import nextflow.processor.TaskHandler
14+
import nextflow.processor.TaskMonitor
1315
import nextflow.processor.TaskRun
1416
/**
1517
* Declares methods have to be implemented by a generic
@@ -25,12 +27,65 @@ abstract class AbstractExecutor {
2527
*/
2628
TaskConfig taskConfig
2729

30+
/**
31+
* The current session object
32+
*/
33+
Session session
34+
35+
/**
36+
* The executor simple name
37+
*/
38+
String name
39+
40+
/**
41+
* The queue holder that keep track of all tasks for this executor.
42+
*/
43+
private TaskMonitor monitor
44+
45+
46+
/**
47+
* Let to post initialize the executor
48+
*/
49+
def void init() {
50+
if( !monitor ) {
51+
monitor = session.dispatcher.getOrCreateMonitor(this.class) { createTaskMonitor() }
52+
}
53+
}
54+
55+
/**
56+
* @return Create a new instance of the {@code TaskQueueHolder} component
57+
*/
58+
abstract protected TaskMonitor createTaskMonitor()
59+
60+
/**
61+
* @return A reference to the current {@code #queueHolder} object
62+
*/
63+
TaskMonitor getTaskMonitor() { monitor }
64+
2865
/**
2966
* @return Create a new {@code TaskHandler} to manage the scheduling
3067
* actions for this task
3168
*/
3269
abstract TaskHandler createTaskHandler(TaskRun task)
3370

71+
/**
72+
* Submit a task for execution
73+
*
74+
* @param task
75+
* @return
76+
*/
77+
TaskHandler submitTask( TaskRun task ) {
78+
def handler = createTaskHandler(task)
79+
monitor.put(handler)
80+
try {
81+
handler.submit()
82+
}
83+
catch( Exception e ) {
84+
monitor.remove(handler)
85+
}
86+
return handler
87+
}
88+
3489

3590
/**
3691
* Collect the file(s) with the name specified, produced by the execution

src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class BashWrapperBuilder {
113113
* 4 - un-stage e.g. copy back the result files to the working folder
114114
*/
115115

116-
def ENDL = '\n'
116+
final ENDL = '\n'
117117
def wrapper = new StringBuilder()
118118
wrapper << '#!/bin/bash -Eeu' << ENDL
119119
wrapper << 'trap onexit 1 2 3 15 ERR' << ENDL

src/main/groovy/nextflow/executor/GridExecutors.groovy

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import nextflow.exception.InvalidExitException
3030
import nextflow.processor.FileInParam
3131
import nextflow.processor.TaskConfig
3232
import nextflow.processor.TaskHandler
33+
import nextflow.processor.TaskPollingMonitor
34+
import nextflow.processor.TaskMonitor
3335
import nextflow.processor.TaskRun
3436
import nextflow.util.CmdLineHelper
3537
import org.apache.commons.io.IOUtils
@@ -42,6 +44,19 @@ import org.apache.commons.io.IOUtils
4244
@Slf4j
4345
abstract class AbstractGridExecutor extends AbstractExecutor {
4446

47+
/**
48+
* Create a a queue holder for this executor
49+
* @return
50+
*/
51+
def TaskMonitor createTaskMonitor() {
52+
final queueSize = session.getQueueSize(name, 50)
53+
final pollInterval = session.getPollInterval(name, 1_000)
54+
log.debug "Creating executor queue with size: $queueSize; poll-interval: $pollInterval"
55+
56+
return new TaskPollingMonitor(session, queueSize, pollInterval) .start()
57+
}
58+
59+
4560
/*
4661
* Prepare and launch the task in the underlying execution platform
4762
*/
@@ -204,7 +219,7 @@ class GridTaskHandler extends TaskHandler {
204219
}
205220
// save the JobId in the
206221
this.jobId = executor.parseJobId(result)
207-
this.status = Status.RUNNING
222+
this.status = Status.SUBMITTED
208223
}
209224
catch( Exception e ) {
210225
task.exitCode = exitStatus
@@ -241,11 +256,7 @@ class GridTaskHandler extends TaskHandler {
241256
@Override
242257
boolean checkIfRunning() {
243258

244-
if( !isNew() ) {
245-
return true
246-
}
247-
248-
if( isNew() && startFile.exists() ) {
259+
if( isSubmitted() && startFile.exists() ) {
249260
status = Status.RUNNING
250261
return true
251262
}
@@ -256,10 +267,6 @@ class GridTaskHandler extends TaskHandler {
256267
@Override
257268
boolean checkIfTerminated() {
258269

259-
if( isTerminated() ) {
260-
return true
261-
}
262-
263270
if( isRunning() && exitFile.exists() ) {
264271

265272
// finalize the task

0 commit comments

Comments
 (0)