Skip to content

Commit 37981a5

Browse files
adamrtalbotpditommasobentsherman
authored
Add support for Azure Managed identities on Azure worker nodes with Fusion (#6118)
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com> Signed-off-by: Adam Talbot <12817534+adamrtalbot@users.noreply.github.com> Signed-off-by: Ben Sherman <bentshermann@gmail.com> Co-authored-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com> Co-authored-by: Ben Sherman <bentshermann@gmail.com>
1 parent 049f742 commit 37981a5

File tree

8 files changed

+234
-24
lines changed

8 files changed

+234
-24
lines changed

docs/reference/config.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,11 @@ The following settings are available:
414414
`azure.batch.pools.<name>.vmType`
415415
: Specify the virtual machine type used by the pool identified with `<name>`.
416416

417+
`azure.batch.poolIdentityClientId`
418+
: :::{versionadded} 25.05.0-edge
419+
:::
420+
: Specify the client ID for an Azure [managed identity](https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/overview) that is available on all Azure Batch node pools. This identity will be used for task-level authentication to Azure services. See {ref}`azure-managed-identities` for more details.
421+
417422
`azure.managedIdentity.clientId`
418423
: Specify the client ID for an Azure [managed identity](https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/overview). See {ref}`azure-managed-identities` for more details. Defaults to environment variable `AZURE_MANAGED_IDENTITY_USER`.
419424

modules/nf-lang/src/main/java/nextflow/config/scopes/AzureBatchConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ Delete each task when it completes (default: `true`).
8888
""")
8989
public String location;
9090

91+
@ConfigOption
92+
@Description("""
93+
The client ID for an Azure managed identity that is available on all Azure Batch node pools. This identity will be used for task-level authentication to Azure services.
94+
""")
95+
public String poolIdentityClientId;
96+
9197
@PlaceholderName("<name>")
9298
public Map<String, AzureBatchPoolConfig> pools;
9399

plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import com.azure.compute.batch.models.ContainerConfiguration
5050
import com.azure.compute.batch.models.ContainerRegistryReference
5151
import com.azure.compute.batch.models.ContainerType
5252
import com.azure.compute.batch.models.ElevationLevel
53+
import com.azure.compute.batch.models.EnvironmentSetting
5354
import com.azure.compute.batch.models.MetadataItem
5455
import com.azure.compute.batch.models.MountConfiguration
5556
import com.azure.compute.batch.models.NetworkConfiguration
@@ -81,6 +82,7 @@ import groovy.transform.Memoized
8182
import groovy.util.logging.Slf4j
8283
import nextflow.Global
8384
import nextflow.Session
85+
import nextflow.cloud.azure.config.AzBatchOpts
8486
import nextflow.cloud.azure.config.AzConfig
8587
import nextflow.cloud.azure.config.AzFileShareOpts
8688
import nextflow.cloud.azure.config.AzPoolOpts
@@ -508,28 +510,40 @@ class AzBatchService implements Closeable {
508510

509511
// Handle Fusion settings
510512
final fusionEnabled = FusionHelper.isFusionEnabled((Session)Global.session)
511-
final launcher = fusionEnabled ? FusionScriptLauncher.create(task.toTaskBean(), 'az') : null
513+
String fusionCmd = null
512514
if( fusionEnabled ) {
515+
// Create the FusionScriptLauncher from the TaskBean
516+
final taskBean = task.toTaskBean()
517+
final launcher = FusionScriptLauncher.create(taskBean, 'az')
518+
519+
// Add container options
513520
opts += "--privileged "
514-
for( Map.Entry<String,String> it : launcher.fusionEnv() ) {
515-
opts += "-e $it.key=$it.value "
521+
522+
// Add all environment variables from the launcher
523+
final fusionEnv = launcher.fusionEnv()
524+
if( fusionEnv ) {
525+
for( Map.Entry<String,String> it : fusionEnv ) {
526+
opts += "-e $it.key=$it.value "
527+
}
516528
}
529+
530+
// Get the fusion submit command
531+
final List<String> cmdList = launcher.fusionSubmitCli(task)
532+
fusionCmd = cmdList ? String.join(' ', cmdList) : null
517533
}
518534

519535
// Create container settings
520536
final containerOpts = new BatchTaskContainerSettings(container)
521537
.setContainerRunOptions(opts)
522538

523539
// submit command line
524-
final String cmd = fusionEnabled
525-
? launcher.fusionSubmitCli(task).join(' ')
526-
: "bash -o pipefail -c 'bash ${TaskRun.CMD_RUN} 2>&1 | tee ${TaskRun.CMD_LOG}'"
540+
final String cmd = fusionEnabled && fusionCmd
541+
? fusionCmd
542+
: "bash -o pipefail -c 'bash ${TaskRun.CMD_RUN} 2>&1 | tee ${TaskRun.CMD_LOG}'"
527543
// cpus and memory
528544
final slots = computeSlots(task, pool)
529545
// max wall time
530-
final constraints = new BatchTaskConstraints()
531-
if( task.config.getTime() )
532-
constraints.setMaxWallClockTime( Duration.of(task.config.getTime().toMillis(), ChronoUnit.MILLIS) )
546+
final constraints = taskConstraints(task)
533547

534548
log.trace "[AZURE BATCH] Submitting task: $taskId, cpus=${task.config.getCpus()}, mem=${task.config.getMemory()?:'-'}, slots: $slots"
535549
return new BatchTaskCreateContent(taskId, cmd)
@@ -539,6 +553,27 @@ class AzBatchService implements Closeable {
539553
.setOutputFiles(outputFileUrls(task, sas))
540554
.setRequiredSlots(slots)
541555
.setConstraints(constraints)
556+
.setEnvironmentSettings(taskEnv(config.batch()))
557+
}
558+
559+
protected List<EnvironmentSetting> taskEnv(AzBatchOpts opts) {
560+
return opts.poolIdentityClientId
561+
? List.of(new EnvironmentSetting("FUSION_AZ_MSI_CLIENT_ID")
562+
.setValue(opts.poolIdentityClientId))
563+
: List.<EnvironmentSetting>of()
564+
}
565+
566+
/**
567+
* Create task constraints based on the task configuration
568+
*
569+
* @param task The task run to create constraints for
570+
* @return The BatchTaskConstraints object
571+
*/
572+
protected BatchTaskConstraints taskConstraints(TaskRun task) {
573+
final constraints = new BatchTaskConstraints()
574+
if( task.config.getTime() )
575+
constraints.setMaxWallClockTime( Duration.of(task.config.getTime().toMillis(), ChronoUnit.MILLIS) )
576+
return constraints
542577
}
543578

544579
AzTaskKey runTask(String poolId, String jobId, TaskRun task) {
@@ -599,6 +634,13 @@ class AzBatchService implements Closeable {
599634
List<OutputFile> result = new ArrayList<>(20)
600635
result << destFile(TaskRun.CMD_EXIT, task.workDir, sas)
601636
result << destFile(TaskRun.CMD_LOG, task.workDir, sas)
637+
result << destFile(TaskRun.CMD_OUTFILE, task.workDir, sas)
638+
result << destFile(TaskRun.CMD_ERRFILE, task.workDir, sas)
639+
result << destFile(TaskRun.CMD_SCRIPT, task.workDir, sas)
640+
result << destFile(TaskRun.CMD_RUN, task.workDir, sas)
641+
result << destFile(TaskRun.CMD_STAGE, task.workDir, sas)
642+
result << destFile(TaskRun.CMD_TRACE, task.workDir, sas)
643+
result << destFile(TaskRun.CMD_ENV, task.workDir, sas)
602644
return result
603645
}
604646

plugins/nf-azure/src/main/nextflow/cloud/azure/config/AzBatchOpts.groovy

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class AzBatchOpts implements CloudTransferOptions {
5656
Boolean deleteTasksOnCompletion
5757
CopyToolInstallMode copyToolInstallMode
5858
Duration jobMaxWallClockTime
59+
String poolIdentityClientId
5960

6061
Map<String,AzPoolOpts> pools
6162

@@ -73,6 +74,7 @@ class AzBatchOpts implements CloudTransferOptions {
7374
deletePoolsOnCompletion = config.deletePoolsOnCompletion
7475
deleteTasksOnCompletion = config.deleteTasksOnCompletion
7576
jobMaxWallClockTime = config.jobMaxWallClockTime ? config.jobMaxWallClockTime as Duration : Duration.of('30d')
77+
poolIdentityClientId = config.poolIdentityClientId
7678
pools = parsePools(config.pools instanceof Map ? config.pools as Map<String,Map> : Collections.<String,Map>emptyMap())
7779
maxParallelTransfers = config.maxParallelTransfers ? config.maxParallelTransfers as int : MAX_TRANSFER
7880
maxTransferAttempts = config.maxTransferAttempts ? config.maxTransferAttempts as int : MAX_TRANSFER_ATTEMPTS

plugins/nf-azure/src/main/nextflow/cloud/azure/fusion/AzFusionEnv.groovy

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,28 +36,40 @@ import org.pf4j.Extension
3636
@Slf4j
3737
class AzFusionEnv implements FusionEnv {
3838

39+
/**
40+
* Get the environment variables for Fusion with Azure, taking into
41+
* account a managed identity ID if provided
42+
*
43+
* @param scheme The storage scheme ('az' for Azure)
44+
* @param config The Fusion configuration
45+
* @param managedIdentityId Optional managed identity client ID from pool options
46+
* @return Map of environment variables
47+
*/
3948
@Override
4049
Map<String, String> getEnvironment(String scheme, FusionConfig config) {
4150
if (scheme != 'az') {
4251
return Collections.<String, String> emptyMap()
4352
}
4453

4554
final cfg = AzConfig.config
55+
final managedIdentityId = cfg.batch().poolIdentityClientId
4656
final result = new LinkedHashMap(10)
4757

4858
if (!cfg.storage().accountName) {
4959
throw new IllegalArgumentException("Missing Azure Storage account name")
5060
}
5161

52-
if (cfg.storage().accountKey && cfg.storage().sasToken) {
53-
throw new IllegalArgumentException("Azure Storage Access key and SAS token detected. Only one is allowed")
54-
}
55-
5662
result.AZURE_STORAGE_ACCOUNT = cfg.storage().accountName
57-
// In theory, generating an impromptu SAS token for authentication methods other than
58-
// `azure.storage.sasToken` should not be necessary, because those methods should already allow sufficient
59-
// access for normal operation. Nevertheless, #5287 heavily implies that failing to do so causes the Azure
60-
// Storage plugin or Fusion to fail. In any case, it may be possible to remove this in the future.
63+
64+
// If pool has a managed identity, ONLY add the MSI client ID
65+
// DO NOT add any SAS token or reference cfg.storage().sasToken
66+
if (managedIdentityId) {
67+
result.FUSION_AZ_MSI_CLIENT_ID = managedIdentityId
68+
// No SAS token is added or generated
69+
return result
70+
}
71+
72+
// If no managed identity, use the standard environment with SAS token
6173
result.AZURE_STORAGE_SAS_TOKEN = getOrCreateSasToken()
6274

6375
return result
@@ -68,9 +80,13 @@ class AzFusionEnv implements FusionEnv {
6880
* authentication method.
6981
*/
7082
synchronized String getOrCreateSasToken() {
71-
7283
final cfg = AzConfig.config
7384

85+
// Check for incompatible configuration
86+
if (cfg.storage().accountKey && cfg.storage().sasToken) {
87+
throw new IllegalArgumentException("Azure Storage Access key and SAS token detected. Only one is allowed")
88+
}
89+
7490
// If a SAS token is already defined in the configuration, just return it
7591
if (cfg.storage().sasToken) {
7692
return cfg.storage().sasToken

plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchServiceTest.groovy

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,17 @@ import java.time.Instant
66
import java.time.temporal.ChronoUnit
77
import java.util.function.Predicate
88

9-
import com.azure.compute.batch.BatchClient
109
import com.azure.compute.batch.models.BatchPool
11-
import com.azure.compute.batch.models.BatchJobCreateContent
1210
import com.azure.compute.batch.models.ElevationLevel
11+
import com.azure.compute.batch.models.EnvironmentSetting
1312
import com.azure.core.exception.HttpResponseException
1413
import com.azure.core.http.HttpResponse
1514
import com.azure.identity.ManagedIdentityCredential
1615
import com.google.common.hash.HashCode
1716
import nextflow.Global
1817
import nextflow.Session
1918
import nextflow.SysEnv
19+
import nextflow.cloud.azure.config.AzBatchOpts
2020
import nextflow.cloud.azure.config.AzConfig
2121
import nextflow.cloud.azure.config.AzManagedIdentityOpts
2222
import nextflow.cloud.azure.config.AzPoolOpts
@@ -31,8 +31,6 @@ import nextflow.util.MemoryUnit
3131
import reactor.core.publisher.Flux
3232
import spock.lang.Specification
3333
import spock.lang.Unroll
34-
import com.azure.compute.batch.models.BatchJobConstraints
35-
import com.azure.compute.batch.models.BatchPoolInfo
3634
/**
3735
*
3836
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
@@ -839,6 +837,32 @@ class AzBatchServiceTest extends Specification {
839837
[managedIdentity: [clientId: 'client-123']] | 'client-123'
840838
}
841839

840+
def 'should use pool identity client id for fusion tasks' () {
841+
given:
842+
def POOL_IDENTITY_CLIENT_ID = 'pool-identity-123'
843+
def exec = Mock(AzBatchExecutor) {
844+
getConfig() >> new AzConfig([
845+
batch: [poolIdentityClientId: POOL_IDENTITY_CLIENT_ID],
846+
storage: [sasToken: 'test-sas-token', accountName: 'testaccount']
847+
])
848+
}
849+
def service = new AzBatchService(exec)
850+
851+
and:
852+
Global.session = Mock(Session) {
853+
getConfig() >> [fusion: [enabled: true]]
854+
}
855+
856+
when:
857+
def env = [:] as Map<String,String>
858+
if( service.config.batch().poolIdentityClientId && true ) { // fusionEnabled = true
859+
env.put('FUSION_AZ_MSI_CLIENT_ID', service.config.batch().poolIdentityClientId)
860+
}
861+
862+
then:
863+
env['FUSION_AZ_MSI_CLIENT_ID'] == POOL_IDENTITY_CLIENT_ID
864+
}
865+
842866

843867
def 'should cache job id' () {
844868
given:
@@ -972,4 +996,55 @@ class AzBatchServiceTest extends Specification {
972996
null | 0
973997
}
974998

999+
def 'should create task constraints' () {
1000+
given:
1001+
def exec = Mock(AzBatchExecutor)
1002+
def service = new AzBatchService(exec)
1003+
def task = Mock(TaskRun) {
1004+
getConfig() >> Mock(TaskConfig) {
1005+
getTime() >> TIME
1006+
}
1007+
}
1008+
1009+
when:
1010+
def result = service.taskConstraints(task)
1011+
1012+
then:
1013+
result != null
1014+
if (TIME) {
1015+
assert result.maxWallClockTime != null
1016+
assert result.maxWallClockTime.toMillis() == TIME.toMillis()
1017+
} else {
1018+
assert result.maxWallClockTime == null
1019+
}
1020+
1021+
where:
1022+
TIME << [
1023+
null,
1024+
Duration.of('1h'),
1025+
Duration.of('30m'),
1026+
Duration.of('2d')
1027+
]
1028+
}
1029+
1030+
@Unroll
1031+
def 'should create task env' () {
1032+
given:
1033+
def exec = Mock(AzBatchExecutor)
1034+
def service = new AzBatchService(exec)
1035+
List<EnvironmentSetting> env
1036+
1037+
when:
1038+
env = service.taskEnv(new AzBatchOpts([:]))
1039+
then:
1040+
env == []
1041+
1042+
when:
1043+
env = service.taskEnv(new AzBatchOpts([poolIdentityClientId:'12345']))
1044+
then:
1045+
env.size() == 1
1046+
env.first.name == 'FUSION_AZ_MSI_CLIENT_ID'
1047+
env.first.value == '12345'
1048+
}
1049+
9751050
}

plugins/nf-azure/src/test/nextflow/cloud/azure/config/AzureConfigTest.groovy

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,23 @@ class AzureConfigTest extends Specification {
8383
cfg.batch().autoPoolOpts().scaleInterval == Duration.of('5 min')
8484
cfg.batch().autoPoolOpts().autoScale == false
8585
!cfg.batch().canCreatePool()
86+
cfg.batch().poolIdentityClientId == null
87+
}
88+
89+
def 'should get azure batch pool identity client id' () {
90+
given:
91+
def POOL_IDENTITY_CLIENT_ID = 'pool-identity-123'
92+
def session = Mock(Session) {
93+
getConfig() >> [ azure:
94+
[batch:[
95+
poolIdentityClientId: POOL_IDENTITY_CLIENT_ID
96+
] ]]
97+
}
98+
99+
when:
100+
def cfg = AzConfig.getConfig(session)
101+
then:
102+
cfg.batch().poolIdentityClientId == POOL_IDENTITY_CLIENT_ID
86103
}
87104

88105
def 'should get azure batch options' () {

0 commit comments

Comments
 (0)