From 164509834c7408bc8e8c26fc26da293046bc5211 Mon Sep 17 00:00:00 2001 From: Rob Syme Date: Thu, 29 May 2025 19:13:29 -0400 Subject: [PATCH 1/4] HashBuilder Signed-off-by: Rob Syme --- .../groovy/nextflow/extension/DataflowHelper.groovy | 2 +- .../src/main/groovy/nextflow/extension/KeyPair.groovy | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/DataflowHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/DataflowHelper.groovy index 14c775b04e..aa23b2d520 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/DataflowHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/DataflowHelper.groovy @@ -360,7 +360,7 @@ class DataflowHelper { if( !(entry instanceof List) ) { if( pivot != [0] ) throw new IllegalArgumentException("Not a valid `by` index: $pivot") - result.keys = [entry] + result.addKey(entry) result.values = [] return result } diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/KeyPair.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/KeyPair.groovy index e794760b82..fb5a21930b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/KeyPair.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/KeyPair.groovy @@ -19,6 +19,7 @@ package nextflow.extension import groovy.transform.CompileStatic import groovy.transform.EqualsAndHashCode import groovy.transform.ToString +import nextflow.extension.GroupKey /** * Implements an helper key-value helper object used in dataflow operators @@ -33,6 +34,9 @@ class KeyPair { List values void addKey(el) { + if (keys == null) { + keys = [] + } keys.add(safeStr(el)) } @@ -41,6 +45,10 @@ class KeyPair { } static private safeStr(key) { - key instanceof GString ? key.toString() : key + if (key instanceof GString) + return key.toString() + if (key instanceof GroupKey) + return key.getGroupTarget() + return key } } From d1058421d1f45250c9890d184938ed1275d86332 Mon Sep 17 00:00:00 2001 From: Rob Syme Date: Sat, 31 May 2025 18:24:21 -0400 Subject: [PATCH 2/4] Ensure GroupKey is preserved and commutative in join/combine - Store original keys in KeyPair and use them for output emission in join/combine - Always prefer GroupKey over plain keys when both are present for the same match - Fixes timing-dependent bugs when joining or combining channels with mixed GroupKey/plain keys - Updates buffer logic to store KeyPair objects, ensuring metadata like group size is retained - Updates remainder and mismatch handling to work with new buffer structure - Adds and passes comprehensive tests for GroupKey preservation and commutativity Fixes #4104 Signed-off-by: Rob Syme --- .../nextflow/extension/CombineOp.groovy | 45 ++++++- .../nextflow/extension/DataflowHelper.groovy | 3 - .../groovy/nextflow/extension/JoinOp.groovy | 113 ++++++++++++++++-- .../groovy/nextflow/extension/KeyPair.groovy | 11 +- .../extension/JoinOpGroupKeyTest.groovy | 113 ++++++++++++++++++ 5 files changed, 264 insertions(+), 21 deletions(-) create mode 100644 modules/nextflow/src/test/groovy/nextflow/extension/JoinOpGroupKeyTest.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/CombineOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/CombineOp.groovy index 8ef6765faf..f93f3cab27 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/CombineOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/CombineOp.groovy @@ -48,6 +48,8 @@ class CombineOp { private Map rightValues = [:] + private Map originalKeyMap = [:] + private static final int LEFT = 0 private static final int RIGHT = 1 @@ -93,7 +95,7 @@ class CombineOp { opts.onNext = { if( pivot ) { def pair = makeKey(pivot, it) - emit(target, index, pair.keys, pair.values) + emit(target, index, pair) } else { emit(target, index, NONE, it) @@ -120,7 +122,36 @@ class CombineOp { } @PackageScope - synchronized void emit( DataflowWriteChannel target, int index, List p, v ) { + synchronized void emit( DataflowWriteChannel target, int index, KeyPair pair ) { + emit(target, index, pair.originalKeys, pair.keys, pair.values) + } + + @PackageScope + synchronized void emit( DataflowWriteChannel target, int index, List originalKeys, List keys, v ) { + def p = keys // Use normalized keys for matching + + // Store/update the mapping from normalized key to original key + // Prefer GroupKey over plain keys + def existingOriginal = originalKeyMap.get(p) + if (existingOriginal == null) { + originalKeyMap[p] = originalKeys + } else { + // Check if any of the new original keys is a GroupKey + for (int i = 0; i < originalKeys.size(); i++) { + def newKey = originalKeys[i] + def oldKey = existingOriginal instanceof List ? existingOriginal[i] : existingOriginal + if (newKey instanceof GroupKey && !(oldKey instanceof GroupKey)) { + originalKeyMap[p] = originalKeys + break + } + } + } + + // Use the best available original key (preferring GroupKey) + def bestOriginalKeys = originalKeyMap[p] + + // Ensure bestOriginalKeys is a List for the tuple method + def bestKeysList = bestOriginalKeys instanceof List ? bestOriginalKeys : [bestOriginalKeys] if( leftValues[p] == null ) leftValues[p] = [] if( rightValues[p] == null ) rightValues[p] = [] @@ -128,7 +159,7 @@ class CombineOp { if( index == LEFT ) { log.trace "combine >> left >> by=$p; val=$v; right-values: ${rightValues[p]}" for ( Object x : rightValues[p] ) { - target.bind( tuple(p, v, x) ) + target.bind( tuple(bestKeysList, v, x) ) // Use best original keys in output } leftValues[p].add(v) return @@ -137,7 +168,7 @@ class CombineOp { if( index == RIGHT ) { log.trace "combine >> right >> by=$p; val=$v; right-values: ${leftValues[p]}" for ( Object x : leftValues[p] ) { - target.bind( tuple(p, x, v) ) + target.bind( tuple(bestKeysList, x, v) ) // Use best original keys in output } rightValues[p].add(v) return @@ -146,6 +177,12 @@ class CombineOp { throw new IllegalArgumentException("Not a valid spread operator index: $index") } + @PackageScope + synchronized void emit( DataflowWriteChannel target, int index, List p, v ) { + // Legacy method for when pivot is NONE + emit(target, index, p, p, v) + } + DataflowWriteChannel apply() { target = CH.create() diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/DataflowHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/DataflowHelper.groovy index aa23b2d520..ab875853b5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/DataflowHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/DataflowHelper.groovy @@ -361,13 +361,10 @@ class DataflowHelper { if( pivot != [0] ) throw new IllegalArgumentException("Not a valid `by` index: $pivot") result.addKey(entry) - result.values = [] return result } def list = (List)entry - result.keys = new ArrayList(pivot.size()) - result.values = new ArrayList(list.size()) for( int i=0; i originalKeyMap = new HashMap() + JoinOp( DataflowReadChannel source, DataflowReadChannel target, Map params = null ) { CheckHelper.checkParams('join', params, JOIN_PARAMS) this.source = source @@ -173,6 +175,24 @@ class JoinOp { // get the index key for this object final item0 = DataflowHelper.makeKey(pivot, data) + // Store the mapping from normalized key to original key + // Prefer GroupKey over plain keys + def existingOriginal = originalKeyMap.get(item0.keys) + if (existingOriginal == null) { + originalKeyMap[item0.keys] = item0.originalKeys + } else { + // Check if any of the new original keys is a GroupKey + // If so, replace the existing mapping + for (int i = 0; i < item0.originalKeys.size(); i++) { + def newKey = item0.originalKeys[i] + def oldKey = existingOriginal instanceof List ? existingOriginal[i] : existingOriginal + if (newKey instanceof GroupKey && !(oldKey instanceof GroupKey)) { + originalKeyMap[item0.keys] = item0.originalKeys + break + } + } + } + // check for unique keys checkForDuplicate(item0.keys, item0.values, index, false) @@ -190,8 +210,8 @@ class JoinOp { def entries = channels[index] // add the received item to the list - // when it is used in the gather op add always as the first item - entries << item0.values + // Store the full KeyPair to preserve original keys + entries << item0 setSingleton(index, item0.values.size()==0) // now check if it has received an element matching for each channel @@ -200,15 +220,39 @@ class JoinOp { } def result = [] + + // Find the best key (prefer GroupKey) from all channels + def bestOriginalKeys = null + for (Map.Entry entry : channels.entrySet()) { + def channelItems = entry.getValue() + if (channelItems && channelItems.size() > 0) { + def keyPair = channelItems[0] as KeyPair + if (bestOriginalKeys == null) { + bestOriginalKeys = keyPair.originalKeys + } else { + // Check if this channel has a GroupKey version + for (int i = 0; i < keyPair.originalKeys.size(); i++) { + def candidateKey = keyPair.originalKeys[i] + def currentKey = bestOriginalKeys instanceof List ? bestOriginalKeys[i] : bestOriginalKeys + if (candidateKey instanceof GroupKey && !(currentKey instanceof GroupKey)) { + bestOriginalKeys = keyPair.originalKeys + break + } + } + } + } + } + // add the key - addToList(result, item0.keys) + addToList(result, bestOriginalKeys ?: item0.originalKeys) final itr = channels.iterator() while( itr.hasNext() ) { def entry = (Map.Entry)itr.next() def list = entry.getValue() - addToList(result, list[0]) + def keyPair = list[0] as KeyPair + addToList(result, keyPair.values) list.remove(0) if( list.size() == 0 ) { @@ -221,6 +265,17 @@ class JoinOp { return result } + // Helper method to retrieve original data from buffer + private def getOriginalDataFromBuffer(Map> buffer, Object key, int channelIndex) { + def channels = buffer.get(key) + if (channels == null) return null + def items = channels.get(channelIndex) + if (items == null || items.isEmpty()) return null + // Need to reconstruct the original data from the values and the key + // This is a simplified version - in reality we'd need to track the full original items + return null // For now, we'll use a different approach + } + private final void checkRemainder(Map> buffers, int count, DataflowWriteChannel target ) { log.trace "Operator `join` remainder buffer: ${-> buffers}" @@ -231,17 +286,43 @@ class JoinOp { boolean fill=false def result = new ArrayList(count+1) - addToList(result, key) + + // Find the best original key from available channels + def bestOriginalKey = null + for( int i=0; i 0 ) { + def keyPair = items[0] as KeyPair + if (bestOriginalKey == null) { + bestOriginalKey = keyPair.originalKeys + } else { + // Check if this channel has a GroupKey version + for (int j = 0; j < keyPair.originalKeys.size(); j++) { + def candidateKey = keyPair.originalKeys[j] + def currentKey = bestOriginalKey instanceof List ? bestOriginalKey[j] : bestOriginalKey + if (candidateKey instanceof GroupKey && !(currentKey instanceof GroupKey)) { + bestOriginalKey = keyPair.originalKeys + break + } + } + } + } + } + + // Use the best available original key, or fall back to the map key + def originalKey = bestOriginalKey ?: originalKeyMap.get(key) ?: key + addToList(result, originalKey) for( int i=0; i + if (item instanceof KeyPair) { + values.add(item.values) + } else { + values.add(item) + } + } + result[key].add(csv0(values,',')) } } diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/KeyPair.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/KeyPair.groovy index fb5a21930b..d2df6659c5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/KeyPair.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/KeyPair.groovy @@ -31,12 +31,17 @@ import nextflow.extension.GroupKey @EqualsAndHashCode class KeyPair { List keys + List originalKeys List values + KeyPair() { + this.keys = [] + this.originalKeys = [] + this.values = [] + } + void addKey(el) { - if (keys == null) { - keys = [] - } + originalKeys.add(el) keys.add(safeStr(el)) } diff --git a/modules/nextflow/src/test/groovy/nextflow/extension/JoinOpGroupKeyTest.groovy b/modules/nextflow/src/test/groovy/nextflow/extension/JoinOpGroupKeyTest.groovy new file mode 100644 index 0000000000..9024820fa1 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/extension/JoinOpGroupKeyTest.groovy @@ -0,0 +1,113 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.extension + +import nextflow.Channel +import nextflow.Session +import spock.lang.Specification + +/** + * Test GroupKey preservation in join operations + * + * @author Your Name + */ +class JoinOpGroupKeyTest extends Specification { + + def setup() { + new Session() + } + + def 'should preserve GroupKey when joining channels' () { + given: + def key1 = new GroupKey('X', 2) + def key2 = new GroupKey('Y', 3) + + def ch1 = Channel.of([key1, 1], [key2, 2]) + def ch2 = Channel.of(['X', 'a'], ['Y', 'b']) + + when: + def op = new JoinOp(ch1, ch2) + def result = op.apply().toList().getVal() + + then: + result.size() == 2 + + // Check that GroupKey is preserved in the output + result.each { tuple -> + assert tuple[0] instanceof GroupKey + assert tuple.size() == 3 + } + + // Verify the actual values + def sorted = result.sort { it[0].toString() } + sorted[0][0].toString() == 'X' + sorted[0][0].groupSize == 2 + sorted[0][1] == 1 + sorted[0][2] == 'a' + + sorted[1][0].toString() == 'Y' + sorted[1][0].groupSize == 3 + sorted[1][1] == 2 + sorted[1][2] == 'b' + } + + def 'should preserve GroupKey when GroupKey is on right channel' () { + given: + def key1 = new GroupKey('X', 2) + def key2 = new GroupKey('Y', 3) + + def ch1 = Channel.of(['X', 'a'], ['Y', 'b']) + def ch2 = Channel.of([key1, 1], [key2, 2]) + + when: + def op = new JoinOp(ch1, ch2) + def result = op.apply().toList().getVal() + + then: + result.size() == 2 + + // Check that GroupKey is preserved in the output + result.each { tuple -> + assert tuple[0] instanceof GroupKey + assert tuple.size() == 3 + } + } + + def 'should handle mix of GroupKey and plain keys correctly' () { + given: + def key1 = new GroupKey('X', 2) + + def ch1 = Channel.of([key1, 1], ['Y', 2]) // Mix of GroupKey and plain key + def ch2 = Channel.of(['X', 'a'], ['Y', 'b']) + + when: + def op = new JoinOp(ch1, ch2) + def result = op.apply().toList().getVal().sort { it[0].toString() } + + then: + result.size() == 2 + + // First tuple should have GroupKey + result[0][0] instanceof GroupKey + result[0][0].toString() == 'X' + result[0][0].groupSize == 2 + + // Second tuple should have plain string + result[1][0] == 'Y' + !(result[1][0] instanceof GroupKey) + } +} \ No newline at end of file From fc58bec233120849d498f8fb28a3200d6ea8c44c Mon Sep 17 00:00:00 2001 From: Rob Syme Date: Sat, 31 May 2025 18:31:24 -0400 Subject: [PATCH 3/4] Cleanup unused methods and code Signed-off-by: Rob Syme --- .../groovy/nextflow/extension/JoinOp.groovy | 33 +------------------ 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/JoinOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/JoinOp.groovy index 443da3d01c..facf51b6ef 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/JoinOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/JoinOp.groovy @@ -58,8 +58,6 @@ class JoinOp { private Set uniqueKeys = new LinkedHashSet() - private Map originalKeyMap = new HashMap() - JoinOp( DataflowReadChannel source, DataflowReadChannel target, Map params = null ) { CheckHelper.checkParams('join', params, JOIN_PARAMS) this.source = source @@ -175,24 +173,6 @@ class JoinOp { // get the index key for this object final item0 = DataflowHelper.makeKey(pivot, data) - // Store the mapping from normalized key to original key - // Prefer GroupKey over plain keys - def existingOriginal = originalKeyMap.get(item0.keys) - if (existingOriginal == null) { - originalKeyMap[item0.keys] = item0.originalKeys - } else { - // Check if any of the new original keys is a GroupKey - // If so, replace the existing mapping - for (int i = 0; i < item0.originalKeys.size(); i++) { - def newKey = item0.originalKeys[i] - def oldKey = existingOriginal instanceof List ? existingOriginal[i] : existingOriginal - if (newKey instanceof GroupKey && !(oldKey instanceof GroupKey)) { - originalKeyMap[item0.keys] = item0.originalKeys - break - } - } - } - // check for unique keys checkForDuplicate(item0.keys, item0.values, index, false) @@ -210,7 +190,6 @@ class JoinOp { def entries = channels[index] // add the received item to the list - // Store the full KeyPair to preserve original keys entries << item0 setSingleton(index, item0.values.size()==0) @@ -265,16 +244,6 @@ class JoinOp { return result } - // Helper method to retrieve original data from buffer - private def getOriginalDataFromBuffer(Map> buffer, Object key, int channelIndex) { - def channels = buffer.get(key) - if (channels == null) return null - def items = channels.get(channelIndex) - if (items == null || items.isEmpty()) return null - // Need to reconstruct the original data from the values and the key - // This is a simplified version - in reality we'd need to track the full original items - return null // For now, we'll use a different approach - } private final void checkRemainder(Map> buffers, int count, DataflowWriteChannel target ) { log.trace "Operator `join` remainder buffer: ${-> buffers}" @@ -310,7 +279,7 @@ class JoinOp { } // Use the best available original key, or fall back to the map key - def originalKey = bestOriginalKey ?: originalKeyMap.get(key) ?: key + def originalKey = bestOriginalKey ?: key addToList(result, originalKey) for( int i=0; i Date: Sat, 31 May 2025 19:35:40 -0400 Subject: [PATCH 4/4] Pull out the findBestOriginalKeys method --- .../groovy/nextflow/extension/JoinOp.groovy | 74 +++++++++---------- 1 file changed, 34 insertions(+), 40 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/JoinOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/JoinOp.groovy index facf51b6ef..50134e247c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/JoinOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/JoinOp.groovy @@ -201,26 +201,7 @@ class JoinOp { def result = [] // Find the best key (prefer GroupKey) from all channels - def bestOriginalKeys = null - for (Map.Entry entry : channels.entrySet()) { - def channelItems = entry.getValue() - if (channelItems && channelItems.size() > 0) { - def keyPair = channelItems[0] as KeyPair - if (bestOriginalKeys == null) { - bestOriginalKeys = keyPair.originalKeys - } else { - // Check if this channel has a GroupKey version - for (int i = 0; i < keyPair.originalKeys.size(); i++) { - def candidateKey = keyPair.originalKeys[i] - def currentKey = bestOriginalKeys instanceof List ? bestOriginalKeys[i] : bestOriginalKeys - if (candidateKey instanceof GroupKey && !(currentKey instanceof GroupKey)) { - bestOriginalKeys = keyPair.originalKeys - break - } - } - } - } - } + def bestOriginalKeys = findBestOriginalKeys(channels) // add the key addToList(result, bestOriginalKeys ?: item0.originalKeys) @@ -257,26 +238,7 @@ class JoinOp { def result = new ArrayList(count+1) // Find the best original key from available channels - def bestOriginalKey = null - for( int i=0; i 0 ) { - def keyPair = items[0] as KeyPair - if (bestOriginalKey == null) { - bestOriginalKey = keyPair.originalKeys - } else { - // Check if this channel has a GroupKey version - for (int j = 0; j < keyPair.originalKeys.size(); j++) { - def candidateKey = keyPair.originalKeys[j] - def currentKey = bestOriginalKey instanceof List ? bestOriginalKey[j] : bestOriginalKey - if (candidateKey instanceof GroupKey && !(currentKey instanceof GroupKey)) { - bestOriginalKey = keyPair.originalKeys - break - } - } - } - } - } + def bestOriginalKey = findBestOriginalKeys(entry) // Use the best available original key, or fall back to the map key def originalKey = bestOriginalKey ?: key @@ -310,6 +272,38 @@ class JoinOp { } } + /** + * Finds the best original keys from a map of channel items, preferring GroupKey over plain keys + * + * @param channelItems Map of channel index to list of items (KeyPair objects) + * @return The best original keys found, or null if no items available + */ + private def findBestOriginalKeys(Map channelItems) { + def bestOriginalKeys = null + + for (Map.Entry entry : channelItems.entrySet()) { + def items = entry.getValue() + if (items && items.size() > 0) { + def keyPair = items[0] as KeyPair + if (bestOriginalKeys == null) { + bestOriginalKeys = keyPair.originalKeys + } else { + // Check if this channel has a GroupKey version + for (int i = 0; i < keyPair.originalKeys.size(); i++) { + def candidateKey = keyPair.originalKeys[i] + def currentKey = bestOriginalKeys instanceof List ? bestOriginalKeys[i] : bestOriginalKeys + if (candidateKey instanceof GroupKey && !(currentKey instanceof GroupKey)) { + bestOriginalKeys = keyPair.originalKeys + break + } + } + } + } + } + + return bestOriginalKeys + } + protected void checkForDuplicate( key, value, int dir, boolean add ) { if( failOnDuplicate && ( (add && !uniqueKeys.add(key)) || (!add && uniqueKeys.contains(key)) ) ) { final msg = "Detected join operation duplicate emission on ${dir==0 ? 'left' : 'right'} channel -- offending element: key=${csv0(key,',')}; value=${csv0(value,',')}"