Description
Bug report
A channel may emit all, some, or none of what is expected, depending on the order of completion of upstream tasks. This can be caused when using the groupTuple
operator in combination with groupKey
.
It seems that the underlying issue may be related to the output from groupKey not being a string in some contexts (seemingly inconsistent, it works in some places but not in others). This can be remedied by calling .toString()
on the value output by groupKey
but it would be great if this emitted a string so channels would behave predictably and consistently across contexts.
Expected behavior and actual behavior
Expected behavior for a channel would be that the values emitted would be the same (perhaps in a different order) so long as the input channels emit the same values (again, order should not matter).
There exists an example where this is not the case. Depending on the order of upstream task completions, the channel may emit fewer tuples (without error) than expected, or even none as demonstrated in the example below.
Steps to reproduce the problem
In the below example, this is setup to ensure that the by_sample
tasks will finish first (sleep 1
) and by_batch
tasks will finish after all samples have completed (sleep 30
). This will create a situation where nothing is emitted from the batch_done
channel (even though we'd expect 2 tuples to emitted, one for each batch).
process by_sample {
input:
val sample_id
output:
val sample_id, emit: sample_ids
script:
"""
sleep 1
echo $sample_id
"""
}
process by_batch {
input:
val batch_id
output:
val batch_id, emit: batch_ids
script:
"""
sleep 30
echo $batch_id
"""
}
workflow {
// Input samples, each belonging to one of 2 batches.
samp_ch = Channel.from(
['SAMP1', 'BATCH1'],
['SAMP2', 'BATCH1'],
['SAMP3', 'BATCH2'],
['SAMP4', 'BATCH2'],
['SAMP5', 'BATCH2'],
)
// A process which scatters on sample
by_sample(samp_ch.map { sample, batch -> sample })
by_sample.out.sample_ids.dump(tag: 'by_sample')
by_batch(samp_ch.map { sample, batch -> batch }.unique())
by_batch.out.batch_ids.dump(tag: 'by_batch')
// This will create a channel resulting in:
// tuple(batch_id, num_samples_in_batch)
sample_counts = samp_ch
.map { sample_id, batch -> tuple(batch, sample_id) }
.groupTuple()
.map { batch, sample_ids -> tuple(batch, sample_ids.size()) }
sample_counts.dump(tag: 'sample_counts')
// Using the above counts of samples per batch, we can create a channel
// which will emit a batch ID when the following conditions are met:
// 1. All samples in a single batch have completed process "by_sample"
// 2. The batch has completed the "by_batch process"
batch_done = by_sample.out.sample_ids
.join(samp_ch)
.map { sample, batch -> tuple(batch, sample) }
.combine(sample_counts, by:0)
.map { batch, sample, batch_size -> tuple(groupKey(batch, batch_size), sample) }
.groupTuple()
.join(by_batch.out.batch_ids)
.map { batch, samples -> tuple(batch, "batch_done") }.dump(tag: 'batch_done')
}
Program output
If samples finish before batches (as enforced in the example above), we do not see any emitted from the batch_done
channel:
N E X T F L O W ~ version 23.04.2
Launching `src/bioinformatics/workflows/nextflow/hello_world/main.nf` [goofy_albattani] DSL2 - revision: 6badd408f3
executor > local (7)
[e0/e080c9] process > by_sample (3) [100%] 5 of 5 ✔
[1c/dafd4a] process > by_batch (1) [100%] 2 of 2 ✔
[DUMP: sample_counts] ['BATCH1', 2]
[DUMP: sample_counts] ['BATCH2', 3]
[DUMP: by_sample] 'SAMP1'
[DUMP: by_sample] 'SAMP5'
[DUMP: by_sample] 'SAMP2'
[DUMP: by_sample] 'SAMP4'
[DUMP: by_sample] 'SAMP3'
[DUMP: by_batch] 'BATCH2'
[DUMP: by_batch] 'BATCH1'
If we switch it up and enforce the batches to finish before samples (i.e. by_batch
: sleep 1
and by_sample
: sleep 30
), the output looks similar to (notice that now batch_done
emits a tuple for each batch):
N E X T F L O W ~ version 23.04.2
Launching `src/bioinformatics/workflows/nextflow/hello_world/main.nf` [confident_mercator] DSL2 - revision: 717a98d981
executor > local (7)
[11/334d03] process > by_sample (4) [100%] 5 of 5 ✔
[d4/c85a77] process > by_batch (1) [100%] 2 of 2 ✔
[DUMP: sample_counts] ['BATCH1', 2]
[DUMP: sample_counts] ['BATCH2', 3]
[DUMP: by_batch] 'BATCH2'
[DUMP: by_batch] 'BATCH1'
[DUMP: by_sample] 'SAMP2'
[DUMP: by_sample] 'SAMP5'
[DUMP: by_sample] 'SAMP3'
[DUMP: by_sample] 'SAMP1'
[DUMP: batch_done] [BATCH1, 'batch_done']
[DUMP: by_sample] 'SAMP4'
[DUMP: batch_done] [BATCH2, 'batch_done']
Environment
- Nextflow version: 23.04.2
- Java version:
OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9)
- Operating system: macOS (but also tested on Linux with same behavior)
- Bash version: zsh 5.9 (x86_64-apple-darwin22.0)
Additional context
This can also be fixed in a couple other ways which may demonstrate the underlying problem. If we change the channel creation of batch_done
to the following, we always see the expected output, no matter the order upstream tasks complete in (notice the call to batch.toString()
in line 7):
batch_done = by_sample.out.sample_ids
.join(samp_ch)
.map { sample, batch -> tuple(batch, sample) }
.combine(sample_counts, by:0)
.map { batch, sample, batch_size -> tuple(groupKey(batch, batch_size), sample) }
.groupTuple()
.map { batch, samples -> tuple(batch.toString(), samples) }
.join(by_batch.out.batch_ids)
.map { batch, samples -> tuple(batch, "batch_done") }.dump(tag: 'batch_done')