Skip to content

Commit a1e3319

Browse files
authored
Add support for custom fuse device plugin (#4612) [ci fast]
This commit allows customising the FUSE plugin device required by Fusion when using the Kubernetes executor. The FUSE plugin name can be specified by using the setting `k8s.fuseDevicePlugin` for example: ``` k8s.fuseDevicePlugin = ['my-plugin/fuse': 1] ``` Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
1 parent 36090b7 commit a1e3319

File tree

6 files changed

+89
-2
lines changed

6 files changed

+89
-2
lines changed

docs/config.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,6 +1006,11 @@ The following settings are available:
10061006
:::
10071007
: If you trace the hostname, activate this option (default: `false`).
10081008

1009+
`k8s.fuseDevicePlugin`
1010+
: :::{versionadded} 24.01.0-edge
1011+
:::
1012+
: The FUSE device plugin to be used when enabling Fusion in unprivileged mode (default: `['nextflow.io/fuse': 1]`).
1013+
10091014
`k8s.httpConnectTimeout`
10101015
: :::{versionadded} 22.10.0
10111016
:::

docs/kubernetes.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,23 @@ Then the pipeline execution can be launched using the usual run command and spec
120120
nextflow run <YOUR PIPELINE> -work-dir s3://<YOUR-BUCKET>/scratch
121121
```
122122

123+
:::{note}
124+
When using Fusion, pods will run as *privileged* by default.
125+
:::
126+
127+
To use Fusion with without the need for escalating privileges, it is required to install in the Kubernetes cluster the
128+
Nextflow [FUSE device plugin](https://github.com/nextflow-io/k8s-fuse-plugin) and add in your Nextflow configuration the following
129+
setting:
130+
131+
```
132+
fusion {
133+
privileged = false
134+
}
135+
```
136+
137+
To use a custom FUSE device plugin, specify it via the setting `k8s.fuseDevicePlugin`. See
138+
the {ref}`Kubernetes configuration section<config-k8s>` for details.
139+
123140
### Running in a pod
124141

125142
Nextflow can be executed directly from a pod running in a Kubernetes cluster. In these cases you will need to use the plain Nextflow `run` command and specify the `k8s` executor and the required persistent volume claim in the `nextflow.config` file as shown below:

modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ import nextflow.util.Duration
4242
@CompileStatic
4343
class K8sConfig implements Map<String,Object> {
4444

45+
static final private Map<String,?> DEFAULT_FUSE_PLUGIN = Map.of('nextflow.io/fuse', 1)
46+
4547
@Delegate
4648
private Map<String,Object> target
4749

@@ -116,6 +118,15 @@ class K8sConfig implements Map<String,Object> {
116118
target.storageSubPath
117119
}
118120

121+
Map<String,?> fuseDevicePlugin() {
122+
final result = target.fuseDevicePlugin
123+
if( result instanceof Map && result.size()==1 )
124+
return result as Map<String,?>
125+
if( result )
126+
log.warn1 "Setting 'fuseDevicePlugin' should be a map object providing exactly one entry - offending value: $result"
127+
return DEFAULT_FUSE_PLUGIN
128+
}
129+
119130
/**
120131
* Whenever the pod should honour the entrypoint defined by the image (default: false)
121132
*

modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
252252
if( fusionConfig().privileged() )
253253
builder.withPrivileged(true)
254254
else {
255-
builder.withResourcesLimits(["nextflow.io/fuse": 1])
255+
final device= k8sConfig.fuseDevicePlugin()
256+
builder.withResourcesLimits(device)
256257
}
257258

258259
final env = fusionLauncher().fusionEnv()

modules/nextflow/src/test/groovy/nextflow/k8s/K8sConfigTest.groovy

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,18 @@ class K8sConfigTest extends Specification {
129129

130130
}
131131

132+
def 'should set device plugin' () {
133+
when:
134+
def cfg = new K8sConfig([:])
135+
then:
136+
cfg.fuseDevicePlugin() == ['nextflow.io/fuse':1]
137+
138+
when:
139+
cfg = new K8sConfig([fuseDevicePlugin:['foo/fuse':10]])
140+
then:
141+
cfg.fuseDevicePlugin() == ['foo/fuse':10]
142+
}
143+
132144
def 'should create client config' () {
133145

134146
given:

modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,9 @@ class K8sTaskHandlerTest extends Specification {
938938
def client = Mock(K8sClient)
939939
def builder = Mock(K8sWrapperBuilder)
940940
def launcher = Mock(FusionScriptLauncher)
941-
def handler = Spy(new K8sTaskHandler(builder:builder, client: client))
941+
def k8sConfig = Spy(K8sConfig)
942+
def exec = Mock(K8sExecutor) { getK8sConfig()>>k8sConfig }
943+
def handler = Spy(new K8sTaskHandler(builder:builder, client: client, executor: exec))
942944
Map result
943945

944946
when:
@@ -973,6 +975,45 @@ class K8sTaskHandlerTest extends Specification {
973975
result.spec.containers[0].env == [[name:'FUSION_BUCKETS', value:'this,that']]
974976
result.spec.containers[0].resources == [limits:['nextflow.io/fuse':1]]
975977
!result.spec.containers[0].securityContext
978+
979+
980+
/*
981+
* use custom fuse device
982+
*/
983+
when:
984+
result = handler.newSubmitRequest(task)
985+
then:
986+
launcher.fusionEnv() >> [FUSION_BUCKETS: 'this,that']
987+
launcher.toContainerMount(WORK_DIR.resolve('.command.run')) >> Path.of('/fusion/http/work/dir/.command.run')
988+
launcher.fusionSubmitCli(task) >> ['/usr/bin/fusion', 'bash', '/fusion/http/work/dir/.command.run']
989+
and:
990+
k8sConfig.fuseDevicePlugin() >> ['custom/device/fuse': 1]
991+
and:
992+
handler.getTask() >> task
993+
handler.fusionEnabled() >> true
994+
handler.fusionLauncher() >> launcher
995+
handler.fusionConfig() >> new FusionConfig(privileged: false)
996+
and:
997+
task.getContainer() >> 'debian:latest'
998+
task.getWorkDir() >> WORK_DIR
999+
task.getConfig() >> config
1000+
and:
1001+
1 * handler.fixOwnership() >> false
1002+
1 * handler.entrypointOverride() >> false
1003+
1 * handler.getPodOptions() >> new PodOptions()
1004+
1 * handler.getSyntheticPodName(task) >> 'nf-123'
1005+
1 * handler.getLabels(task) >> [:]
1006+
1 * handler.getAnnotations() >> [:]
1007+
1 * handler.getContainerMounts() >> []
1008+
and:
1009+
1 * config.getCpus() >> 0
1010+
1 * config.getMemory() >> null
1011+
1 * client.getConfig() >> new ClientConfig()
1012+
and:
1013+
result.spec.containers[0].args == ['/usr/bin/fusion', 'bash', '/fusion/http/work/dir/.command.run']
1014+
result.spec.containers[0].env == [[name:'FUSION_BUCKETS', value:'this,that']]
1015+
result.spec.containers[0].resources == [limits:['custom/device/fuse':1]]
1016+
!result.spec.containers[0].securityContext
9761017
}
9771018

9781019
def 'get fusion submit command' () {

0 commit comments

Comments
 (0)