Skip to content

Use ECS properties instead of container properties #6270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import software.amazon.awssdk.services.batch.BatchClient
import software.amazon.awssdk.services.batch.model.DescribeComputeEnvironmentsRequest
import software.amazon.awssdk.services.batch.model.DescribeJobQueuesRequest
import software.amazon.awssdk.services.batch.model.DescribeJobsRequest
import software.amazon.awssdk.services.batch.model.EcsTaskDetails
import software.amazon.awssdk.services.batch.model.JobDetail
import software.amazon.awssdk.services.batch.model.TaskContainerDetails
import software.amazon.awssdk.services.ec2.Ec2Client
import software.amazon.awssdk.services.ec2.model.DescribeInstancesRequest
import software.amazon.awssdk.services.ec2.model.Instance
Expand Down Expand Up @@ -193,14 +196,55 @@ class AwsBatchHelper {
final response = batchClient.describeJobs(request)
if( response.jobs() ) {
final detail = response.jobs()[0]
return detail.container().logStreamName()
return getTaskContainer(detail)?.logStreamName()
?: detail.container()?.logStreamName()
}
else {
log.debug "Unable to find info for batch job id=$jobId"
return null
}
}

/**
* Retrieve the first EcsTaskDetails from the given JobDetail.
*
* In combination with {@code getTaskContainer(job)} this is analogous to {@code job.getContainer()}, but
* using the multi-container ECSProperties model.
*
* @param job
* @return
* The first EcsTaskDetails of the first TaskProperties, or {@code null}
*/
static EcsTaskDetails getTaskProperties(JobDetail job) {
try {
return job.ecsProperties().taskProperties().first()
} catch (Exception e) {
def jobId = job?.jobId() ?: '(unknown)'
log.debug "Unable to get container properties for batch job id=$jobId: ${e.getMessage()}"
return null
}
}

/**
* Retrieve the first TaskContainerDetails from the given JobDetail.
*
* In combination with {@code getTaskProperties(job)} this is analogous to {@code job.getContainer()}, but
* using the multi-container ECSProperties model.
*
* @param job
* @return
* The first TaskContainerDetails of the first TaskProperties, or {@code null}
*/
static TaskContainerDetails getTaskContainer(JobDetail job) {
try {
return getTaskProperties(job).containers().first()
} catch (Exception e) {
def jobId = job?.jobId() ?: '(unknown)'
log.debug "Unable to get container details for batch job id=$jobId: ${e.getMessage()}"
return null
}
}

/**
* Retrieve the cloudwatch logs for the specified AWS Batch Job ID
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package nextflow.cloud.aws.batch


import static nextflow.cloud.aws.batch.AwsBatchHelper.*
import static nextflow.cloud.aws.batch.AwsContainerOptionsMapper.*

import java.nio.file.Path
Expand Down Expand Up @@ -51,13 +51,13 @@ import nextflow.util.TestOnly
import software.amazon.awssdk.services.batch.BatchClient
import software.amazon.awssdk.services.batch.model.ArrayProperties
import software.amazon.awssdk.services.batch.model.AssignPublicIp
import software.amazon.awssdk.services.batch.model.AttemptContainerDetail
import software.amazon.awssdk.services.batch.model.AttemptDetail
import software.amazon.awssdk.services.batch.model.BatchException
import software.amazon.awssdk.services.batch.model.ClientException
import software.amazon.awssdk.services.batch.model.ContainerOverrides
import software.amazon.awssdk.services.batch.model.DescribeJobDefinitionsRequest
import software.amazon.awssdk.services.batch.model.DescribeJobDefinitionsResponse
import software.amazon.awssdk.services.batch.model.DescribeJobsRequest
import software.amazon.awssdk.services.batch.model.EcsPropertiesOverride
import software.amazon.awssdk.services.batch.model.EphemeralStorage
import software.amazon.awssdk.services.batch.model.EvaluateOnExit
import software.amazon.awssdk.services.batch.model.Host
Expand All @@ -79,6 +79,8 @@ import software.amazon.awssdk.services.batch.model.RetryStrategy
import software.amazon.awssdk.services.batch.model.RuntimePlatform
import software.amazon.awssdk.services.batch.model.SubmitJobRequest
import software.amazon.awssdk.services.batch.model.SubmitJobResponse
import software.amazon.awssdk.services.batch.model.TaskContainerOverrides
import software.amazon.awssdk.services.batch.model.TaskPropertiesOverride
import software.amazon.awssdk.services.batch.model.TerminateJobRequest
import software.amazon.awssdk.services.batch.model.Volume
/**
Expand Down Expand Up @@ -246,7 +248,8 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
checkIfUnschedulable(job)
// fetch the task arn
if( !taskArn )
taskArn = job?.container()?.taskArn()
taskArn = getTaskProperties(job)?.taskRoleArn()
?: job?.container()?.taskArn()
Comment on lines +251 to +252
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why now check first getTaskProperties(job) and then job?.container()?.taskArn() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it from Tom's PR #5391. I think it is needed because of the reuse of job definitions. If the job was defined with ECS, the information is inside TaskProperties. If it is reusing an old definition with container properties, the taskProperties are empty and you need to get in the Container. It is the same for other job properties.

Maybe, with the change I did to do not reuse jobs definitions with container properties, it is not needed (54ee92b). However, I am not 100% sure because the hash is only applied when container properties are defined, and there could be cases were old definitions are still reused. Maybe the best is just add a comment to explain it.

return result
}

Expand Down Expand Up @@ -282,12 +285,20 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
final result = new ArrayList(2)
if( job.statusReason() )
result.add(job.statusReason())
final AttemptContainerDetail container = job.attempts() ? job.attempts()[-1].container() : null
if( container?.reason() )
result.add(container.reason())
final attemptDetail = job.attempts() ? job.attempts()[-1] : null
final reason = errReasonFromAttempt (attemptDetail)
if( reason )
result.add(reason)
return result.join(' - ')
}

private String errReasonFromAttempt(AttemptDetail attempt){
if( !attempt )
return null
return attempt.taskProperties()?.first()?.containers()?.first()?.reason()
?: attempt.container()?.reason()
}

/**
* {@inheritDoc}
*/
Expand All @@ -307,7 +318,9 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
// take the exit code from the `.exitcode` file create by nextflow
// the rationale of this is that, in case of error, the exit code return
// by the batch API is more reliable.
task.exitStatus = job.container().exitCode() ?: readExitFile()
task.exitStatus = getTaskContainer(job)?.exitCode()
?: job.container()?.exitCode()
?: readExitFile()
// finalize the task
task.stdout = outputFile
if( job?.status() == JobStatus.FAILED || task.exitStatus==Integer.MAX_VALUE ) {
Expand Down Expand Up @@ -818,7 +831,7 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job

// set the actual command
final resources = new ArrayList<ResourceRequirement>(5)
final container = ContainerOverrides.builder()
final container = TaskContainerOverrides.builder()
container.command(getSubmitCommand())
// set the task memory
final cpus = task.config.getCpus()
Expand Down Expand Up @@ -849,7 +862,13 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
if( vars )
container.environment(vars)

builder.containerOverrides(container.build())
builder.ecsPropertiesOverride(EcsPropertiesOverride.builder()
.taskProperties(TaskPropertiesOverride.builder()
.containers(container.build())
.build()
)
.build()
)

// set the array properties
if( task instanceof TaskArrayRun ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package nextflow.cloud.aws.batch.model

import groovy.transform.CompileStatic
import software.amazon.awssdk.services.batch.model.ContainerProperties
import software.amazon.awssdk.services.batch.model.EcsTaskProperties
import software.amazon.awssdk.services.batch.model.EphemeralStorage
import software.amazon.awssdk.services.batch.model.KeyValuePair
import software.amazon.awssdk.services.batch.model.LinuxParameters
Expand All @@ -27,13 +27,14 @@ import software.amazon.awssdk.services.batch.model.NetworkConfiguration
import software.amazon.awssdk.services.batch.model.ResourceRequirement
import software.amazon.awssdk.services.batch.model.RuntimePlatform
import software.amazon.awssdk.services.batch.model.Secret
import software.amazon.awssdk.services.batch.model.TaskContainerProperties
import software.amazon.awssdk.services.batch.model.Ulimit
import software.amazon.awssdk.services.batch.model.Volume

/**
* Models the container properties used to configure an AWS Batch job.
*
* This is a mutable version of {@link ContainerProperties} required
* This is a mutable version of {@link TaskContainerProperties} required
* to simplify the extension of container settings in the AWS Batch executor
* and its sub-classes (e.g. nf-xpack).
*
Expand Down Expand Up @@ -236,14 +237,11 @@ class ContainerPropertiesModel {
return runtimePlatform
}

ContainerProperties toBatchContainerProperties() {
def builder = ContainerProperties.builder()

EcsTaskProperties toBatchContainerProperties() {
def builder = TaskContainerProperties.builder();
if (image) builder.image(image)
if (command) builder.command(command)
if (resourceRequirements) builder.resourceRequirements(resourceRequirements)
if (jobRoleArn) builder.jobRoleArn(jobRoleArn)
if (executionRoleArn) builder.executionRoleArn(executionRoleArn)
if (linuxParameters) builder.linuxParameters(linuxParameters)
if (environment) builder.environment(environment)
if (privileged) builder.privileged(privileged)
Expand All @@ -252,12 +250,16 @@ class ContainerPropertiesModel {
if (ulimits) builder.ulimits(ulimits)
if (logConfiguration) builder.logConfiguration(logConfiguration)
if (mountPoints) builder.mountPoints(mountPoints)
if (volumes) builder.volumes(volumes)
if (networkConfiguration) builder.networkConfiguration(networkConfiguration)
if (ephemeralStorage) builder.ephemeralStorage(ephemeralStorage)
if (runtimePlatform) builder.runtimePlatform(runtimePlatform)
def ecsTaskBuilder = EcsTaskProperties.builder()
ecsTaskBuilder.containers(builder.build())
if (jobRoleArn) ecsTaskBuilder.taskRoleArn(jobRoleArn)
if (executionRoleArn) ecsTaskBuilder.executionRoleArn(executionRoleArn)
if (volumes) ecsTaskBuilder.volumes(volumes)
if (networkConfiguration) ecsTaskBuilder.networkConfiguration(networkConfiguration)
if (ephemeralStorage) ecsTaskBuilder.ephemeralStorage(ephemeralStorage)
if (runtimePlatform) ecsTaskBuilder.runtimePlatform(runtimePlatform)

return builder.build()
return ecsTaskBuilder.build()
}

@Override
Expand All @@ -280,6 +282,7 @@ class ContainerPropertiesModel {
", networkConfiguration=" + networkConfiguration +
", ephemeralStorage=" + ephemeralStorage +
", runtimePlatform=" + runtimePlatform +
", ecsMode=true" + //Added to generate a different token if the job definition was generated with older containerProperties
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nextflow.cloud.aws.batch.model


import groovy.transform.CompileStatic
import software.amazon.awssdk.services.batch.model.EcsProperties
import software.amazon.awssdk.services.batch.model.JobDefinitionType
import software.amazon.awssdk.services.batch.model.PlatformCapability
import software.amazon.awssdk.services.batch.model.RegisterJobDefinitionRequest
Expand Down Expand Up @@ -116,7 +117,10 @@ class RegisterJobDefinitionModel {
if (platformCapabilities)
builder.platformCapabilities(platformCapabilities)
if (containerProperties)
builder.containerProperties(containerProperties.toBatchContainerProperties())
builder.ecsProperties(EcsProperties.builder()
.taskProperties(containerProperties.toBatchContainerProperties())
.build()
)
if (parameters)
builder.parameters(parameters)
if (tags)
Expand Down
Loading